依赖
生产者和消费者使用相同的客户端 JAR,修改 pom.xml 文件添加依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
Consumer
创建一个 Consumer 对象:
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.1.106:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "tiger");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
配置说明:
bootstrap.serversKafka Broker 列表
key.deserializerKey 的反序列化类,与生产者 key.serializer 保持匹配
value.deserializerValue 的反序列化类,与生产者 value.serializer 保持匹配
group.id消费组标识
相同消费组内的消费者,将会从主题(topic)不同分区(partition)接收消息
利用分组的特点,可以实现传统消息队列的“队列”模型(多个消费者在同一个消费组)和“发布订阅”模型(多个消费者在不同的消费组)
订阅主题
consumer.subscribe(Collections.singletonList("test"));
Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord record: records) {
System.out.println("consumer fetch");
System.out.println(record.value());
}
}
}
catch (WakeupException ignore) {}
finally {
consumer.close();
}
使用 consumer.subscribe
以列表的方式或正则表达式方式,订阅多个主题
使用 consumer.poll
获取记录
ConsumerRecords 是封装了消费者 poll 获取的多条记录,并实现了 java.lang.Iterable
接口,可以 for 循环进行遍历
consumer.wakeup
是一个线程安全的方法,会让消费者在 poll 消息过程中产生一个 WakeupException 异常,从而优雅地中断 while(true) 循环
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。