依赖
生产者和消费者使用相同的客户端 JAR,修改 pom.xml 文件添加依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency>
Consumer
创建一个 Consumer 对象:
Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.1.106:9092"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id", "tiger"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
配置说明:
bootstrap.serversKafka Broker 列表
key.deserializerKey 的反序列化类,与生产者 key.serializer 保持匹配
value.deserializerValue 的反序列化类,与生产者 value.serializer 保持匹配
group.id消费组标识
相同消费组内的消费者,将会从主题(topic)不同分区(partition)接收消息
利用分组的特点,可以实现传统消息队列的“队列”模型(多个消费者在同一个消费组)和“发布订阅”模型(多个消费者在不同的消费组)
订阅主题
consumer.subscribe(Collections.singletonList("test")); Thread mainThread = Thread.currentThread(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { consumer.wakeup(); try { mainThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } })); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(500); for (ConsumerRecord record: records) { System.out.println("consumer fetch"); System.out.println(record.value()); } } } catch (WakeupException ignore) {} finally { consumer.close(); }
使用 consumer.subscribe
以列表的方式或正则表达式方式,订阅多个主题
使用 consumer.poll
获取记录
ConsumerRecords 是封装了消费者 poll 获取的多条记录,并实现了 java.lang.Iterable
接口,可以 for 循环进行遍历
consumer.wakeup
是一个线程安全的方法,会让消费者在 poll 消息过程中产生一个 WakeupException 异常,从而优雅地中断 while(true) 循环
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。