依赖
新建一个 Maven 工程,修改 pom.xml 文件,添加依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency>
Producer
创建一个 Producer 对象:
Properties properties = new Properties(); properties.put("bootstrap.servers", "broker0:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
配置说明:
bootstrap.serversKafka Broker 列表,生产环境上,至少配置两个 Kafka Broker
key.serializerKey 的序列化类
value.serializerValue 的序列化类
Kafka 客户端提供了 Integer、String 等常用的序列化类,在 org.apache.kafka.common.serialization
包下
Kafka 客户端支持自定义序列化类,实现 org.apache.kafka.common.serialization.Serializer<T>
接口即可
序列化器的作用是将对象序列化为字节数据进行传输
:point_up_2:为必配项,其它配置项参考: 生产者配置列表
同步/异步发送
同步发送:
try { RecordMetadata metadata = producer.send(record).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
异步发送:
producer.send(record, (metadata, e) -> { //TODO });
问题
异常信息:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for xxx: xxx ms has passed since batch creation plus linger time
原因:Producer 无法和 Kafka Broker 创建连接,连接超时
检查 Kafka Broker conf/server.properties 配置文件的 listerners 和 advertised.listeners 配置
listeners 是 Kafka Broker 服务监听的地址,修改为:PLAINTEXT://0.0.0.0:9092
advertised.listeners 是 Producer 和 Consumer 连接的主机和端口地址,修改为:PLAINTEXT://<主机名>:9092
客户端机器的 hosts 文件添加主机名的 IP 映射,并重启 Kafka Broker
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。