一、Kafka库 通过Maven导入Kafka Client库:
在项目的POM中导入:
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 2.5.0</version > </dependency > <dependency > <groupId > org.slf4j</groupId > <artifactId > slf4j-simple</artifactId > <version > 1.7.9</version > </dependency >
导入slf4j-simple用于解决org.slf4j.impl.StaticLoggerBinder无法加载的错误,也可以引入其他日志库。
二、发布消息 实例化一个java.util.Properties
类,写入服务器地址和序列化反序列化的类配置:
1 2 3 4 Properties p = new Properties (); p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.228:9092" ); p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
实例化一个KafkaProducer
,Properties
作为参数传入:
1 KafkaProducer<String,String> kafkaProducer = new KafkaProducer <>(p);
实例化一个ProducerRecord
,第一个参数为Topic名称,第二个参数为消息体:
1 ProducerRecord<String,String> record = new ProducerRecord <>(topic, msg);
使用KafkaProducer
发送这个Record:
1 kafkaProducer.send(record);
全部发送完成后调用close方法关闭这个Producer。
三、订阅消息 实例化Properties
类,在上面代码片段基础上加一个消费者的分组名称:
1 2 3 4 5 Properties p = new Properties (); p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.228:9092" ); p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); p.put(ConsumerConfig.GROUP_ID_CONFIG, "MKMTest" );
实例化一个KafkaConsumer
,Properties
作为参数传入,并订阅Topic:
1 2 KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer <>(p); kafkaConsumer.subscribe(Collections.singletonList(topic));
在循环中获取消息,使用kafkaConsumer.poll
方法获取消息数组,并通过循环读取每个消息:
1 2 3 4 5 6 7 while (true ) { ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(500 )); for (ConsumerRecord<String,String> record : records){ System.out.println(String.format("topic:%s, offset:%d, 消息:%s" , record.topic(), record.offset(), record.value())); } }
其中,record的三个方法分别返回:
topic:消息来源主题
offset:消息在分区中的偏移
value:消息的值