SpringBoot集成Kafka

Posted by Monk on October 21, 2020

SpringBoot集成Kafka

1. 引入依赖包
implementation 'org.springframework.kafka:spring-kafka'
implementation 'com.google.code.gson:gson'
2. 编写消息实体
@Data
@NoArgsConstructor
@AllArgsConstructor
public class KafkaMessage {

  private long id;
  private String username;
  private String password;
  private LocalDateTime time;
}
3. 编写Produer
@Component
@RequiredArgsConstructor
public class KafkaProducer {

  @Value("${kafka.topic.test}")
  private String topic;
  private final KafkaTemplate<String, String> kafkaTemplate;

  public void sendMessage(KafkaMessage message) {
    kafkaTemplate.send(topic, new Gson().toJson(message));
  }
}
4. 编写Consumer
@Component
public class KafkaConsumer {

  @KafkaListener(topics = "myJavaTopic", groupId = "myGroup")
  public void consumerMessage(ConsumerRecord<String, String> record) {
    System.out.println("consumerMessage method invoked!");
    System.out.println("topic: " + record.topic());
    System.out.println("key: " + record.key());
    System.out.println("value: " + record.value());
    System.out.println("partition: " + record.partition());
    System.out.println("timestamp: " + record.timestamp());
  }
}
5. 配置YML文件
spring:  
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
  topic:
    test: myJavaTopic
6. 测试结果
@RestController
@RequiredArgsConstructor
@RequestMapping(value = "/kafka")
public class KafkaMessageController {

  private final KafkaProducer kafkaProducer;

  @GetMapping(value = "/send")
  public KafkaMessage send(@RequestParam("id") long id, @RequestParam("username") String username, @RequestParam("password") String password) {
    KafkaMessage message = new KafkaMessage(id, username, password, LocalDateTime.now());
    kafkaProducer.sendMessage(message);
    return message;
  }
}