使用Java连接Kafka发布和订阅消息

一、Kafka库

通过Maven导入Kafka Client库:

在项目的POM中导入:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.9</version>
</dependency>

导入slf4j-simple用于解决org.slf4j.impl.StaticLoggerBinder无法加载的错误,也可以引入其他日志库。

二、发布消息

实例化一个java.util.Properties类,写入服务器地址和序列化反序列化的类配置:

1
2
3
4
Properties p = new Properties();  
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.228:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

实例化一个KafkaProducerProperties作为参数传入:

1
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(p);

实例化一个ProducerRecord,第一个参数为Topic名称,第二个参数为消息体:

1
ProducerRecord<String,String> record = new ProducerRecord<>(topic, msg);

使用KafkaProducer发送这个Record:

1
kafkaProducer.send(record);

全部发送完成后调用close方法关闭这个Producer。

三、订阅消息

实例化Properties类,在上面代码片段基础上加一个消费者的分组名称:

1
2
3
4
5
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.228:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "MKMTest"); //消费者分组名称

实例化一个KafkaConsumerProperties作为参数传入,并订阅Topic:

1
2
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(p);
kafkaConsumer.subscribe(Collections.singletonList(topic)); //订阅消息

在循环中获取消息,使用kafkaConsumer.poll方法获取消息数组,并通过循环读取每个消息:

1
2
3
4
5
6
7
while (true) {
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String,String> record : records){
System.out.println(String.format("topic:%s, offset:%d, 消息:%s",
record.topic(), record.offset(), record.value()));
}
}

其中,record的三个方法分别返回:

  • topic:消息来源主题
  • offset:消息在分区中的偏移
  • value:消息的值

使用Java连接Kafka发布和订阅消息
https://maphical.cn/2020/08/connect-kafka-in-java/
作者
MaphicalYng
发布于
2020年8月13日
许可协议