[聚合文章] 学习 Apache Kafka(四):生产者

消息系统 2017-11-18 14 阅读

依赖

新建一个 Maven 工程,修改 pom.xml 文件,添加依赖:

<dependency>  
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

Producer

创建一个 Producer 对象:

Properties properties = new Properties();  
        properties.put("bootstrap.servers", "broker0:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

配置说明:

bootstrap.serversKafka Broker 列表,生产环境上,至少配置两个 Kafka Broker

key.serializerKey 的序列化类

value.serializerValue 的序列化类

Kafka 客户端提供了 Integer、String 等常用的序列化类,在 org.apache.kafka.common.serialization 包下

Kafka 客户端支持自定义序列化类,实现 org.apache.kafka.common.serialization.Serializer<T> 接口即可

序列化器的作用是将对象序列化为字节数据进行传输

:point_up_2:为必配项,其它配置项参考: 生产者配置列表

同步/异步发送

同步发送:

try {  
    RecordMetadata metadata = producer.send(record).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

异步发送:

producer.send(record, (metadata, e) -> {  
    //TODO
});

问题

异常信息:

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for xxx: xxx ms has passed since batch creation plus linger time

原因:Producer 无法和 Kafka Broker 创建连接,连接超时

检查 Kafka Broker conf/server.properties 配置文件的 listernersadvertised.listeners 配置

listeners 是 Kafka Broker 服务监听的地址,修改为:PLAINTEXT://0.0.0.0:9092

advertised.listeners 是 Producer 和 Consumer 连接的主机和端口地址,修改为:PLAINTEXT://<主机名>:9092

客户端机器的 hosts 文件添加主机名的 IP 映射,并重启 Kafka Broker

注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。