[聚合文章] Kafka技术内幕拾遗

消息系统 2017-10-19 10 阅读

Kafka技术内幕拾遗

  • :white_check_mark: 客户端元数据(Metadata)
  • 即席查询(Interactive Query)
  • EOS事务(Transaction)

客户端的元数据对象

客户端的连接对象( NetworkClient )在轮询时会判断是否需要更新元数据。客户端调用元数据更新器的 maybeUpdate() 方法,并不一定每次都需要更新元数据。只有当元数据的超时时间( metadataTimeout )等于 0 时,客户端才会发送元数据请求。

1. 客户端轮询与元数据更新器

客户端调用选择器的轮询方法,最长的阻塞时间会在“轮询时间( pollTimeout )、元数据的更新时间( metadataTimeout )、请求的超时时间( requestTimeoutMs )”三者中选取最小值。如果元数据的更新时间等于0,表示客户端会立即发送元数据请求,不会阻塞。下面解释这几个时间变量的数据来源,以及它们在发送请求过程中所代表的含义。

  • 生产者的 requestTimeoutMs 变量,对应的配置项是 request.timeout.ms ,默认值 30 秒。该配置表示生产者等待收到响应结果的最长时间。如果生产者在这个时间超时后没有收到响应结果,就会认为生产请求失败,它可以重新发送生产请求。
  • 生产者的 retryBackoffMs 变量,对应的配置项是 retry.backoff.ms ,默认值 100 毫秒。该配置表示客户端发送请求失败时,为了避免在短时间内客户端重复地发送请求导致重试次数用光,客户端必须要等待一小会儿才允许发送新的请求。这个配置项可用于元数据请求、生产请求和拉取请求,但只有在发送失败时才会用到。该配置会传给元数据对象(元数据请求)、记录收集器(生产请求)。
  • 生产者的 lingerMs 变量,对应的配置项是 linger.ms ,默认值为 0 毫秒。该配置表示生产者在发送请求之前是否会延迟等待一段时间收集更多的消息。如果等于0,表示生产者会立即发送请求。
// 客户端的网络连接对象在每次轮询之前,都会判断是否需要更新元数据
public class NetworkClient implements KafkaClient {
  private final MetadataUpdater metadataUpdater; // 元数据的更新器

  // 生产者会由发送线程调用该方法,消费者会由ConsumerNetworkClient调用该方法
  public List<ClientResponse> poll(long pollTimeout, long now) {
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    selector.poll(Utils.min(pollTimeout,metadataTimeout,requestTimeoutMs));
  }
  class DefaultMetadataUpdater implements MetadataUpdater {
    Metadata metadata;
    void maybeUpdate(long now, Node node) {
      // 这里简化了其他一些判断条件,实际的超时时间计算方式比较复杂
      long metadataTimeout = metadata.timeToNextUpdate(now);
      if(metadataTimeout == 0) // 准备发送“获取元数据”的请求
        doSend(new MetadataRequest(metadata.topics()), now);
      return metadataTimeout; 
    }
    // 处理“获取元数据请求”的响应
    void handleResponse(RequestHeader header, Struct body, long now) {
      Cluster cluster = new MetadataResponse(body).cluster();
      this.metadata.update(cluster, now); // 更新元数据的具体逻辑
    }
  }
}

客户端每次轮询收到元数据请求的响应结果后,会解析成 Cluster 对象,然后更新元数据对象。

2. 元数据对象

元数据对象有多个用于控制元数据更新策略的变量,相关的时间配置项主要有下面几个。

  • metadata.fetch.timeout.ms (生产者的 maxBlockTimeMs 变量,默认值为 60 秒):生产者第一次发送消息,如果主题没有分区,它等待元数据更新的最长阻塞时间(第7.3.2节第三小节)。
  • metadata.max.age.ms (元数据的 metadataExpireMs 变量,默认值为五分钟):即使不需要更新元数据,客户端也需要间隔一段时间更新一次元数据。
  • retry.backoff.ms (元数据的 refreshBackoffMs 变量,默认值为 100 毫秒):客户端多次发送元数据请求,需要等待一小段时间再发送元数据请求。

元数据的更新时间主要与后两项配置有关。 refreshBackoffMs 变量用来计算允许更新的时间( timeToAllowUpdate ), metadataExpireMs 变量用来计算失效的时间( timeToExpire )。默认情况下, retry.backoff.ms 等于 100 毫秒时,允许更新的时间一般小于 0timeToNextUpdate() 方法主要取决于失效的时间,下面列举了几种不同的场景。

  • 需要更新元数据时,失效时间等于 0 ,表示需要立即更新元数据。
  • 当前时间在失效阈值的范围内,即上次更新时间加上失效阈值大于当前时间,失效时间等于上次更新时间加上失效阈值,再减去当前时间,结果会大于0,表示再过指定的失效时间才需要更新元数据。
  • 当前时间超过失效阈值的范围,即当前时间大于上次更新时间加上失效阈值,失效时间也设置为0。

注意:元数据对象的 metadataExpireMsrefreshBackoffMs 都是固定的值, timeToNextUpdate() 方法依赖 needUpdate 和上次的更新时间,来计算下次更新元数据的时间。当调用元数据对象的 requestUpdate() 方法和 update() 方法时,才会分别更新 needUpdate 和上次的更新时间。

public final class Metadata {
  private final long refreshBackoffMs; // 更新失败时,下一次更新的补偿时间
  private final long metadataExpireMs; // 每隔多久,更细一次元数据
  private int version; // 版本号,当更新一次元数据,版本号加一
  private long lastRefreshMs; // 上一次更新的时间,更新失败也会更新这个值
  private long lastSuccessfulRefreshMs; // 上一次成功更新的时间
  private Cluster cluster; // 集群的配置信息
  private boolean needUpdate; // 是否需要更新元数据

  public synchronized int requestUpdate() {
    this.needUpdate = true; // 需要更新元数据
    return this.version; // 返回当前的版本号,这个版本号是旧的
  }
  public synchronized boolean updateRequested(){return this.needUpdate;}

  public synchronized long timeToNextUpdate(long nowMs) {
    long timeToExpire = needUpdate ? 0 : Math.max(
      this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
    long timeToAllowUpdate=this.lastRefreshMs+this.refreshBackoffMs-nowMs;
    return Math.max(timeToExpire, timeToAllowUpdate);
  }

  public synchronized void awaitUpdate(int lastVersion,long maxWaitMs){
    long begin = System.currentTimeMillis();
    long remainingWaitMs = maxWaitMs;
    while (this.version <= lastVersion) {
      if (remainingWaitMs != 0) wait(remainingWaitMs); // 等待
      long elapsed = System.currentTimeMillis() - begin;
      if (elapsed >= maxWaitMs) throw new TimeoutException("failed")
      remainingWaitMs = maxWaitMs - elapsed;
    }
  }
  public synchronized void update(Cluster cluster, long now) {
    this.needUpdate = false;
    this.version += 1;
    this.lastRefreshMs = now;
    this.lastSuccessfulRefreshMs = now;
    for(Listener listener:listeners) listener.onMetadataUpdate(cluster);
    this.cluster = cluster;
    notifyAll(); // 通知
  }
}

元数据对象的每个方法都加上了 synchronized 关键字,即使有多个客户端线程(用户线程)使用同一个生产者示例,并且访问相同的元数据对象,也是线程安全的。 awaitUpdate() 方法只会被生产者在的 waitOnMetadata() 方法调用。如果元数据的版本号( this.version )小于上一次的版本号( lastVersion ),用户线程会通过 wait() 进入阻塞状态。调用元数据对象的 update() 方法,更新版本号,并通知用户线程退出 awaitUpdate() 方法。

元数据对象除了会更新元数据内容,还有一个保存集群配置的 Cluster 对象。 Cluster 保存了分区信息相关的变量,分区信息包括分区的主副本、 ISRAR 等内容。第二章生产者客户端发送消息时,利用“分区信息”为消息指定分区编号。本章从控制器、 LeaderAndIsr 请求,最后到 Metadata 请求,与第二章的“分区信息”互相呼应,算是画上了一个圆满的句号。

public final class Cluster { // 集群配置
  private final List<Node> nodes;
  private final Set<String> unauthorizedTopics;
  private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
  private final Map<String, List<PartitionInfo>> partitionsByTopic;
  private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
  private final Map<Integer, List<PartitionInfo>> partitionsByNode;
  private final Map<Integer, Node> nodesById;
}
public class PartitionInfo { // 分区信息
  private final String topic;
  private final int partition;
  private final Node leader;
  private final Node[] replicas;
  private final Node[] inSyncReplicas;
}

3. 元数据更新的日志与实例

下面举例了生产者发送两条消息,为了模拟发送第一条消息时,生产者必须要等待元数据更新完成。下面的代码会在第一条消息发送完成后等待一秒钟才发送第二条消息。

// 生产者发送消息的示例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
log.info("start producer client app");
Thread.sleep(1000*10);

log.info("start send #1 message...");
producer.send(new ProducerRecord<String, String>("test", "m1"));
log.info("sending #1 message end..");
Thread.sleep(1000); // 等待一秒才发送第二条消息
log.info("start send #2 message...");
producer.send(new ProducerRecord<String, String>("test", "m2"));
log.info("sending #2 message end..");

为了更清晰地理解元数据、 NetworkClient 一些变量的含义,在必要的地方加上了日志(比如 needUpdatemetadataTimeout 等)。将日志级别调成 TRACE 后,更详细的日志如下。

[18:00:04,596] TRACE Starting the Kafka producer
[18:00:04,939] DEBUG Updated cluster metadata version 1 to Cluster(
  nodes = [localhost:9092 (id: -1 rack: null)], partitions = [])
[18:00:05,077] DEBUG Starting Kafka producer I/O thread.
[18:00:05,079] INFO [NetworkClient] select timeout:30000
[18:00:05,094] DEBUG Kafka producer started
[18:00:05,094] INFO start producer client app (kafka.examples.Producer)

[18:00:15,103] INFO start send #1 message... (kafka.examples.Producer)
[18:00:15,109] TRACE Requesting metadata update for topic test. 【1】
[18:00:15,109] TRACE Waking up Sender thread for metadata update.
[18:00:15,111] INFO [Metadata] awaitUpdate begin...
[18:00:15,117] INFO [Sender] readyNodes:0
[18:00:15,117] INFO [Metadata] needUpdate: true
[18:00:15,118] INFO [MetadataUpdater] metadataTimeout: 0
[18:00:15,118] DEBUG Initialize connection to node1 for send metadata request
[18:00:15,118] DEBUG Initiating connection to node1 at localhost:9092. 【2】
[18:00:15,241] INFO [NetworkClient] metadataTimeout:0
[18:00:15,241] INFO [NetworkClient] select timeout:0
[18:00:15,246] DEBUG Completed connection to node -1

[18:00:15,246] INFO [Sender] readyNodes:0
[18:00:15,246] INFO [Metadata] needUpdate: true
[18:00:15,247] INFO [MetadataUpdater] metadataTimeout: 0
[18:00:15,443] DEBUG Sending metadata request {topics=[test]} to node -1 【3】
[18:00:15,444] INFO [NetworkClient] metadataTimeout:0
[18:00:15,444] INFO [NetworkClient] select timeout:0
[18:00:15,448] INFO [Sender] readyNodes:0
[18:00:15,448] INFO [Metadata] needUpdate: true
[18:00:15,449] INFO [NetworkClient] metadataTimeout:2147483647
[18:00:15,449] INFO [NetworkClient] select timeout:30000

[18:00:15,628] DEBUG Updated cluster metadata version 2 to Cluster( 【4】
  nodes = [192.168.199.101:9092 (id: 0 rack: null)], partitions = [
   Partition(topic=test,partition=1,leader=0,replicas=[0,],isr=[0,], 
   Partition(topic=test,partition=0,leader=0,replicas=[0,],isr=[0,], 
   Partition(topi =test,partition=2,leader=0,replicas=[0,],isr=[0,]])
[18:00:15,628] INFO [Metadata] awaitUpdate end...

[18:00:15,628] INFO [Sender] readyNodes:0
[18:00:15,628] INFO [Metadata] needUpdate: false
[18:00:15,629] INFO [NetworkClient] metadataTimeout:299839
[18:00:15,629] INFO [NetworkClient] select timeout:30000

[18:00:15,636] TRACE Sending record ProducerRecord(topic=test, partition=null,
  key=null, value=m1, timestamp=null) with callback null to topic test_0 【5】
[18:00:15,636] TRACE Allocating a new 16384 byte message buffer for test_0
[18:00:15,700] TRACE Waking up the sender, test_0 is full or a new batch 【6】
[18:00:15,700] INFO sending #1 message end.. (kafka.examples.Producer)

[18:00:15,702] INFO [accumulator] batch: test-0
[18:00:15,702] INFO [accumulator] ready expired: true
[18:00:15,702] INFO [Metadata] needUpdate: false
[18:00:15,703] DEBUG Initiating connection to node 0 at localhost:9092. 【7】
[18:00:15,704] INFO [Sender] readyNodes:0
[18:00:15,705] INFO [NetworkClient] metadataTimeout:299767
[18:00:15,705] INFO [NetworkClient] select timeout:30000
[18:00:15,706] DEBUG Completed connection to node 0

[18:00:15,706] INFO [accumulator] batch: test-0
[18:00:15,707] INFO [accumulator] ready expired: true
[18:00:15,707] INFO [Metadata] needUpdate: false
[18:00:15,707] INFO [accumulator] drained batch: test-0
[18:00:15,718] TRACE Nodes with data ready to send: [localhost:9092]
[18:00:15,719] TRACE Created 1 produce requests: [ClientRequest( 【8】
  expectResponse=true,callback=o.a.k.c.p.internals.Sender$1@6008d3ea, 
  request=RequestSend(header={.}, body={acks=1,timeout=30000,
    topic_data=[{topic=test,data=[{partition=0,
      record_set=HeapByteBuffer[pos=0 lim=36 cap=16384]
  }]}]}), createdTimeMs=1494151215706, sendTimeMs=0)]
[18:00:15,719] INFO [Sender] readyNodes:1
[18:00:15,720] INFO [NetworkClient] poll timeout:0
[18:00:15,720] INFO [NetworkClient] metadataTimeout:299761
[18:00:15,720] INFO [NetworkClient] select timeout:0
[18:00:15,720] INFO [Sender] readyNodes:0
[18:00:15,721] INFO [Metadata] needUpdate: false
[18:00:15,721] INFO [NetworkClient] metadataTimeout:299747
[18:00:15,721] INFO [NetworkClient] select timeout:30000

[18:00:15,737] TRACE Received produce response from node 0 【9】
[18:00:15,740] TRACE Produced messages to test-0 with base offset offset 11.
[18:00:15,741] INFO [Sender] readyNodes:0
[18:00:15,741] INFO [Metadata] needUpdate: false
[18:00:15,741] INFO [NetworkClient] metadataTimeout:299726
[18:00:15,741] INFO [NetworkClient] select timeout:30000

[18:00:16,705] INFO start send #2 message... (kafka.examples.Producer)
[18:00:16,706] TRACE [KafkaProducer] waitedOnMetadataMs: 0
[18:00:16,706] TRACE Sending record ProducerRecord(topic=test, partition=null,
  key=null, value=m2, timestamp=null) with callback null to test_2
[18:00:16,706] TRACE Allocating a new 16384 byte message buffer for test_2
[18:00:16,706] TRACE Waking up the sender, test_2 is full or a new batch
[18:00:16,706] INFO sending #2 message end.. (kafka.examples.Producer)

如图1所示,将上面日志中一些重要的时间点与事件抽取出来,具体步骤如下。

  1. 第一次发送消息,唤醒发送线程,等待元数据更新完成;
  2. 初始化网络连接,为发送元数据请求做准备;
  3. 生产者发送元数据请求;
  4. 收到元数据响应,更新元数据对象,步骤(1)等待元数据更新完成正式结束;
  5. 生产者发送消息的流程接着执行,为消息指定分区,追加消息到记录收集器;
  6. 创建新的批记录(RecordBatch),再次唤醒发送线程;
  7. 从记录收集器中获取准备好的目标代理节点,并初始化网络连接,准备发送生产请求;
  8. 从记录收集器中再次获取准备好的节点,并获取需要发送的数据,创建生产请求;
  9. 发送生产请求,并等待响应结果,一批记录(实际上只有一条记录)的发送流程结束。

图1 生产者发送消息与更新元数据的过程

即席查询(Interactive Query)

EOS事务(Transaction)

注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。