[TOC]
在KafkaServer中的入口在:
apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
首先根据相关参数,实例化KafkaApis,然后实例化KafkaRequestHandlerPool。下面我们首先看下KafkaRequestHandlerPool。
一、KafkaRequestHandlerPool
class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, numThreads: Int) extends Logging with KafkaMetricsGroup { /* a meter to track the average free capacity of the request handlers */ private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], " val threads = new Array[Thread](numThreads) val runnables = new Array[KafkaRequestHandler](numThreads) for(i <- 0 until numThreads) { runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis) threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) threads(i).start() } //... }
主要是启动了numThreads个数的线程,然后线程中执行的内容是KafkaRequestHandler。
/** * 响应kafka请求的线程 */ class KafkaRequestHandler(id: Int, brokerId: Int, val aggregateIdleMeter: Meter, val totalHandlerThreads: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging { this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], " def run() { while(true) { try { var req : RequestChannel.Request = null while (req == null) { // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. val startSelectTime = SystemTime.nanoseconds req = requestChannel.receiveRequest(300) val idleTime = SystemTime.nanoseconds - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads) } if(req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down command".format( id, brokerId)) return } req.requestDequeueTimeMs = SystemTime.milliseconds trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) apis.handle(req)//这边是如何处理请求的重点 } catch { case e: Throwable => error("Exception when handling request", e) } } } //shutdown。。 }
在run方法中,我们可以看到,主要处理消息的地方是api.handle(req)。下面我们主要看下这块的内容。
二、KafkaApis.handle
直接看代码:
/** * Top-level method that handles all requests and multiplexes to the right api */ def handle(request: RequestChannel.Request) { try { trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal)) ApiKeys.forId(request.requestId) match {//根据requestId,调用不同的方法,处理不同的请求 case ApiKeys.PRODUCE => handleProducerRequest(request) case ApiKeys.FETCH => handleFetchRequest(request) case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request) case ApiKeys.METADATA => handleTopicMetadataRequest(request) case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request) case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request) case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request) case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request) case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request) case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request) case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request) case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request) case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request) case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request) case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => if (request.requestObj != null) { request.requestObj.handleError(e, requestChannel, request) error("Error when handling request %s".format(request.requestObj), e) } else { val response = request.body.getErrorResponse(request.header.apiVersion, e) val respHeader = new ResponseHeader(request.header.correlationId) /* If request doesn't have a default error response, we just close the connection. For example, when produce request has acks set to 0 */ if (response == null) requestChannel.closeConnection(request.processor, request) else requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response))) error("Error when handling request %s".format(request.body), e) } } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds }
2.1 ApiKeys枚举类
PRODUCE(0, "Produce"),//生产者消息 FETCH(1, "Fetch"),//消费者获取消息 LIST_OFFSETS(2, "Offsets"),//获取偏移量 METADATA(3, "Metadata"),//获取topic源数据 LEADER_AND_ISR(4, "LeaderAndIsr"), STOP_REPLICA(5, "StopReplica"),//停止副本复制 UPDATE_METADATA_KEY(6, "UpdateMetadata"),//更新源数据 CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),//controller停止 OFFSET_COMMIT(8, "OffsetCommit"),//提交offset OFFSET_FETCH(9, "OffsetFetch"),//获取offset GROUP_COORDINATOR(10, "GroupCoordinator"),//组协调 JOIN_GROUP(11, "JoinGroup"),//加入组 HEARTBEAT(12, "Heartbeat"),//心跳 LEAVE_GROUP(13, "LeaveGroup"),//离开组 SYNC_GROUP(14, "SyncGroup"),//同步组 DESCRIBE_GROUPS(15, "DescribeGroups"),//描述组 LIST_GROUPS(16, "ListGroups"),//列出组 SASL_HANDSHAKE(17, "SaslHandshake"),//加密握手 API_VERSIONS(18, "ApiVersions");//版本
这块比较简单,主要的是Request的数据结构,还有后续的处理方法。下面我们逐步来分析。
三、Request数据结构
所有的请求,最终都会变成这个RequestChannel.Request。所以我们先看下这个Request。
case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) { //... val requestId = buffer.getShort() private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]= Map(ApiKeys.FETCH.id -> FetchRequest.readFrom, ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom ) val requestObj = keyToNameAndDeserializerMap.get(requestId).map(readFrom => readFrom(buffer)).orNull val header: RequestHeader = if (requestObj == null) { buffer.rewind try RequestHeader.parse(buffer) catch { case ex: Throwable => throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex) } } else null val body: AbstractRequest = if (requestObj == null) try { // For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) new ApiVersionsRequest else AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer) } catch { case ex: Throwable => throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex) } else null buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger") def requestDesc(details: Boolean): String = { if (requestObj != null) requestObj.describe(details) else header.toString + " -- " + body.toString } //... }
主要有几个部分,
- 首先是requestId,是一个short类型的值。
- 然后是header,即消息头,是一个RequestHeader
- 最后是body,是消息的内容,类型为AbstractRequest
3.1 requestId
这个requestId表示的是api的类型,KafkaApis需要根据这个requestId,来判断调用哪个方法处理消息。
3.2 header
我们看下RequestHeader的结构。
private final short apiKey; private final short apiVersion; private final String clientId; private final int correlationId;
主要是四个变量,apiKey,APIVersion,clientId,correlationId。
3.3 body
消息体,对应的类为AbstractRequest。主要的内容是根据版本号和apiKey来解析出消息的具体内容。
public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) { ApiKeys apiKey = ApiKeys.forId(requestId); switch (apiKey) { case PRODUCE: return ProduceRequest.parse(buffer, versionId); case FETCH: return FetchRequest.parse(buffer, versionId); case LIST_OFFSETS: return ListOffsetRequest.parse(buffer, versionId); case METADATA: return MetadataRequest.parse(buffer, versionId); case OFFSET_COMMIT: return OffsetCommitRequest.parse(buffer, versionId); case OFFSET_FETCH: return OffsetFetchRequest.parse(buffer, versionId); case GROUP_COORDINATOR: return GroupCoordinatorRequest.parse(buffer, versionId); case JOIN_GROUP: return JoinGroupRequest.parse(buffer, versionId); case HEARTBEAT: return HeartbeatRequest.parse(buffer, versionId); case LEAVE_GROUP: return LeaveGroupRequest.parse(buffer, versionId); case SYNC_GROUP: return SyncGroupRequest.parse(buffer, versionId); case STOP_REPLICA: return StopReplicaRequest.parse(buffer, versionId); case CONTROLLED_SHUTDOWN_KEY: return ControlledShutdownRequest.parse(buffer, versionId); case UPDATE_METADATA_KEY: return UpdateMetadataRequest.parse(buffer, versionId); case LEADER_AND_ISR: return LeaderAndIsrRequest.parse(buffer, versionId); case DESCRIBE_GROUPS: return DescribeGroupsRequest.parse(buffer, versionId); case LIST_GROUPS: return ListGroupsRequest.parse(buffer, versionId); case SASL_HANDSHAKE: return SaslHandshakeRequest.parse(buffer, versionId); case API_VERSIONS: return ApiVersionsRequest.parse(buffer, versionId); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " + "code should be updated to do so.", apiKey)); } }
这块的请求类型很多,想要了解具体结构的,可以到每个类中具体看。
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。