大数据
流式处理
Kafka
源码解析

Kafka系列 10 - 服务端源码分析 01:网络层和API层

简介:主要讲解Kafka服务端的1 + M + N模型

1. Kafka服务端网络层简介

Kafka的客户端会与服务端的多个Broker通过网络连接进行客户端与服务端之间的交互。Kafka服务端需要满足高并发、低延迟的需求,因此其使用Reactor模式实现其网络层。Kafka的网络层管理的网络连接中不仅有来自客户端的,还会有来自其他Broker的网络连接。

Reactor模式,是一种基于事件驱动的模式,但相较于简单的Java NIO的Reactor模式来说,Kafka使用了多个Selector来处理不同的业务环节,涉及Acceptor、Processor和Handler等处理线程类。当客户端发起到服务端的网络连接时,服务端的Selector监听到此OP_ACCEPT事件,会触发Acceptor来处理OP_ACCEPT,然后将其交给Processor负责处理OP_READ和OP_WRITE等操作,Processor线程会有多个,同时每个Processor拥有多个Selector;Processor会将具体的请求交给多个Handler线程处理,Handler线程用于处理请求并将产生响应返回给Processor线程,Processor线程与Handler线程之间通过RequestChannel进行通信。

2. SocketServer

SocketServer类位于kafka.network包下,是统筹Acceptor、Processor和Handler三者之间交互操作的类,同时该类负责根据硬件环境对网络配置进行具体实现。我们先关注它的几个重要属性:

  • class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
  • // 服务器可以有多块网卡,Kafka可以配置监听多个端口,Endpoint类封装了需要监听的host、port及网络协议
  • private val endpoints = config.listeners
  • // Processor的线程个数,num.network.threads,默认值为3
  • private val numProcessorThreads = config.numNetworkThreads
  • // 在RequestChannel的requestQueue中缓存的最大请求个数,queued.max.requests,默认值为500
  • private val maxQueuedRequests = config.queuedMaxRequests
  • // Processor线程的总个数
  • private val totalProcessorThreads = numProcessorThreads * endpoints.size
  • // 每个IP上能创建的最大连接数,max.connections.per.ip,默认值为2147483647
  • private val maxConnectionsPerIp = config.maxConnectionsPerIp
  • // Map[String,Int]类型,具体指定某IP上最大的连接数,这里指定的最大连接数会覆盖上面maxConnectionsPerIp字段的值。
  • private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
  • this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
  • // 创建Processor线程与Handler线程之间交换数据的队列,其中有totalProcessorThreads个responseQueue队列
  • val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
  • // 创建保存Processor线程的数组。此数组中包含所有Endpoint对应的Processors线程
  • private val processors = new Array[Processor](totalProcessorThreads)
  • // 创建Acceptor对象集合,每个Endpoint对应一个Acceptor对象
  • private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
  • /**
  • * 在ConnectionQuotas中,提供了控制每个IP上的最大连接数的功能。
  • * 底层通过一个Map对象,记录每个IP地址上建立的连接数,
  • * 创建新Connect时与maxConnectionsPerIpOverrides指定的最大值(或maxConnectionsPerIp)进行比较,若超出限制,则报错。
  • * 因为有多个Acceptor线程并发访问底层的Map对象,则需要synchronized进行同步。
  • */
  • private var connectionQuotas: ConnectionQuotas = _
  • ...
  • }

在上述的属性中,网卡端endpoints对象、Acceptor集合acceptors、Processor线程数numProcessorThreads和Processor总线程数totalProcessorThreads之间存在着下面的关系:

  • // Processor线程的总个数
  • private val totalProcessorThreads = numProcessorThreads * endpoints.size
  • // 创建Acceptor对象集合,每个Endpoint对应一个Acceptor对象
  • private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()

从这几个属性的关系可知,在Kafka网络层上,会针对物理机的每个网卡分别初始化EndPoint对象,同时每个网卡分别对应一个Acceptor对象和数量为numProcessorThreads的Processor线程,因此Processor线程总数为totalProcessorThreads

RequestChannel类型的属性requestChannel是Processor与Handler之间交换数据的主要组件,它内部维护了requestQueueresponseQueues两个集合用于缓冲Request和Response;在后面的内容中将对其详细讲解。

startup()是SocketServer主要的初始化流程代码,源码如下:

  • /**
  • * Start the socket server
  • * 初始化SocketServer的核心代码
  • */
  • def startup() {
  • // 同步代码块
  • this.synchronized {
  • // 创建ConnectionQuotas
  • connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
  • // Socket的sendBuffer大小(socket.send.buffer.bytes)
  • val sendBufferSize = config.socketSendBufferBytes
  • // Socket的receiveBuffer大小(socket.receive.buffer.bytes)
  • val recvBufferSize = config.socketReceiveBufferBytes
  • // broker的ID(broker.id)
  • val brokerId = config.brokerId
  • var processorBeginIndex = 0
  • // 遍历Endpoints集合
  • endpoints.values.foreach { endpoint =>
  • val protocol = endpoint.protocolType
  • val processorEndIndex = processorBeginIndex + numProcessorThreads
  • // 循环 processorBeginIndex ~ processorEndIndex次
  • for (i <- processorBeginIndex until processorEndIndex)
  • // 创建Processor对象,放入processors数组
  • processors(i) = newProcessor(i, connectionQuotas, protocol)
  • // 创建Acceptor,同时为processor创建对应的线程
  • val acceptor = new Acceptor(endpoint,
  • sendBufferSize,
  • recvBufferSize,
  • brokerId,
  • processors.slice(processorBeginIndex, processorEndIndex), // processors数组中与此Acceptor对象对应的Processor对象
  • connectionQuotas)
  • // 将endpoint与acceptor放入acceptors字典
  • acceptors.put(endpoint, acceptor)
  • // 创建并启动Acceptor对应的线程
  • Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
  • // 主线程阻塞等待Acceptor线程启动完成
  • acceptor.awaitStartup()
  • // 修改processorBeginIndex
  • processorBeginIndex = processorEndIndex
  • }
  • }
  • newGauge("NetworkProcessorAvgIdlePercent",
  • new Gauge[Double] {
  • def value = allMetricNames.map( metricName =>
  • metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
  • }
  • )
  • info("Started " + acceptors.size + " acceptor threads")
  • }

startup()类中首先初始化了一个ConnectionQuotas对象,前面提到过,该对象主要提供了控制每个IP上的最大连接数的功能,它的源码如下:

  • /**
  • * @param defaultMax 每个IP上能创建的最大连接数
  • * @param overrideQuotas 具体指定某IP上最大的连接数,这里指定的最大连接数会覆盖上面maxConnectionsPerIp字段的值
  • */
  • class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
  • private val overrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) }
  • private val counts = mutable.Map[InetAddress, Int]()
  • // 增加指定的地址记录
  • def inc(address: InetAddress) {
  • counts.synchronized {
  • // 从counts字典获取指定地址当前的连接数
  • val count = counts.getOrElseUpdate(address, 0)
  • // 更新counts字典
  • counts.put(address, count + 1)
  • // 根据当时传入的defaultMax及overrideQuotas来获取允许的最大连接数
  • val max = overrides.getOrElse(address, defaultMax)
  • // 如果过载,就抛出异常
  • if (count >= max)
  • throw new TooManyConnectionsException(address, max)
  • }
  • }
  • // 减少指定的地址记录
  • def dec(address: InetAddress) {
  • counts.synchronized {
  • // 从counts字典获取指定地址当前的连接数
  • val count = counts.getOrElse(address,
  • throw new IllegalArgumentException(s"Attempted to decrease connection count for address with no connections, address: $address"))
  • // 如果指定地址的连接数已经为1,则可以将其从counts中移除了
  • if (count == 1)
  • counts.remove(address)
  • else
  • // 否则更新其计数器减1
  • counts.put(address, count - 1)
  • }
  • }
  • // 获取指定地址上的连接数
  • def get(address: InetAddress): Int = counts.synchronized {
  • counts.getOrElse(address, 0)
  • }
  • }
  • class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException("Too many connections from %s (maximum = %d)".format(ip, count))

// TODO 补充ConnectionQuotas的作用
源码比较简单,注释也非常详细,这里就不赘述了。

startup()方法内还会遍历endpoints集合,也即是对每个网卡抽象对象EndPoint进行操作;在遍历代码块内,为每个EndPoint创建数量为numProcessorThreads的Processor线程,存放在processors集合内,然后将根据这些Processor线程创建Acceptor对象,并将该Acceptor对象与EndPoint对象作为键值对存入到acceptors字典,以维护二者的映射关系。最终会调用Acceptor对象的awaitStartup()方法启动Acceptor线程。

startup()方法对应的,SocketServer类的shutdown()方法则负责关闭创建的Acceptor和Processor线程,代码非常简单:

  • def shutdown() = {
  • info("Shutting down")
  • // 同步
  • this.synchronized {
  • // 调用所有Acceptor的shutdown
  • acceptors.values.foreach(_.shutdown)
  • // 调用所有Processor的shutdown
  • processors.foreach(_.shutdown)
  • }
  • info("Shutdown completed")
  • }

3. AbstractServerThread类

AbstractServerThread类是一个抽象类,它实现了Runnable接口,而上面提到的Acceptor和Processor两个类都继承自该类,因此我们先来了解该类的设计,它的源码比较简单:

  • private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
  • // CountDownLatch对象,count为1,标识当前线程的startup操作是否完成
  • private val startupLatch = new CountDownLatch(1)
  • // CountDownLatch对象,count为1,标识当前线程的shutdown操作是否完成
  • private val shutdownLatch = new CountDownLatch(1)
  • // 标识当前线程是否存活
  • private val alive = new AtomicBoolean(true)
  • // 由子类实现
  • def wakeup()
  • /**
  • * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
  • * 阻塞等待关闭
  • */
  • def shutdown(): Unit = {
  • // 修改运行状态
  • alive.set(false)
  • // 唤醒当前的AbstractServerThread线程
  • wakeup()
  • // 调用startupLatch的await,阻塞等待关闭
  • shutdownLatch.await()
  • }
  • /**
  • * Wait for the thread to completely start up
  • * 调用startupLatch的await,阻塞等待启动
  • */
  • def awaitStartup(): Unit = startupLatch.await
  • /**
  • * Record that the thread startup is complete
  • * 启动完成,唤醒阻塞的线程
  • */
  • protected def startupComplete() = {
  • // 调用startupLatch的countDown,唤醒阻塞线程
  • startupLatch.countDown()
  • }
  • /**
  • * Record that the thread shutdown is complete
  • * 调用shutdownLatch的countDown,唤醒阻塞线程
  • */
  • protected def shutdownComplete() = shutdownLatch.countDown()
  • /**
  • * Is the server still running?
  • */
  • protected def isRunning = alive.get
  • /**
  • * Close the connection identified by `connectionId` and decrement the connection count.
  • * 关闭指定连接
  • */
  • def close(selector: KSelector, connectionId: String) {
  • val channel = selector.channel(connectionId)
  • if (channel != null) {
  • debug(s"Closing selector connection $connectionId")
  • val address = channel.socketAddress
  • if (address != null)
  • // 减少connectionQuotas中记录的连接数
  • connectionQuotas.dec(address)
  • // 根据传入的connectionId关闭SocketChannel
  • selector.close(connectionId)
  • }
  • }
  • /**
  • * Close `channel` and decrement the connection count.
  • */
  • def close(channel: SocketChannel) {
  • if (channel != null) {
  • debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
  • connectionQuotas.dec(channel.socket.getInetAddress)
  • swallowError(channel.socket().close())
  • swallowError(channel.close())
  • }
  • }
  • }

AbstractServerThread类拥有三个属性startupLatchshutdownLatchalive,它们相互协作用以控制和维护线程的启动和暂停以及对应的状态。startupLatchshutdownLatch都是CountDownLatch对象,在执行线程启动和关闭的的操作时,会使用这两个属性控制因启动或关闭AbstractServerThread线程的阻塞状态。以启动线程为例,当调用awaitStartup()方法后,会调用startupLatch.await阻塞当前线程,当AbstractServerThread线程启动完成后会调用startupComplete(),而该方法调用了startupLatch.countDown()方法,唤醒之前阻塞的线程。

4. Acceptor类

Acceptor的主要功能是接收客户端建立连接的请求,创建Socket连接并分配给Processor处理。它内部拥有一个Java NIO Selector对象,同时根据映射在该Acceptor上的EndPoint对象创建ServerSocketChannel对象,源码如下:

  • /**
  • * Thread that accepts and configures new connections. There is one of these per endpoint.
  • * 接收客户端建立连接的请求,创建Socket连接并分配给Processor处理
  • */
  • private[kafka] class Acceptor(val endPoint: EndPoint,
  • val sendBufferSize: Int,
  • val recvBufferSize: Int,
  • brokerId: Int,
  • processors: Array[Processor],
  • connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  • ...
  • // 创建Java NIO Selector
  • private val nioSelector = NSelector.open()
  • // 创建ServerSocketChannel
  • val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  • ...
  • }

需要注意的是,这里的NSelector就是Java NIO包中的Selector,在导入时做了重命名操作:

  • import java.nio.channels.{Selector => NSelector}

同时会启动参数processors传入的Processor集合内的Processor线程:

  • // 同步处理
  • this.synchronized {
  • // 遍历processors
  • processors.foreach { processor =>
  • // 为对应的Processor创建线程并启动
  • Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
  • }
  • }

Acceptor类的run()方法中是处理主要逻辑的代码,源码如下:

  • /**
  • * Accept loop that checks for new connection attempts
  • */
  • def run() {
  • // 注册OP_ACCEPT事件
  • serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
  • // 标识当前线程已启动完成
  • startupComplete()
  • try {
  • var currentProcessor = 0
  • // 当线程在运行时
  • while (isRunning) {
  • try {
  • // select操作,超时500ms
  • val ready = nioSelector.select(500)
  • // 有select到的SelectionKey
  • if (ready > 0) {
  • // 获取所有SelectionKey
  • val keys = nioSelector.selectedKeys()
  • // 遍历所有SelectionKey
  • val iter = keys.iterator()
  • while (iter.hasNext && isRunning) {
  • try {
  • val key = iter.next
  • // 移除
  • iter.remove()
  • if (key.isAcceptable)
  • // 如果SelectionKey是Acceptable的,将这个SelectionKey交给一个Processor线程
  • accept(key, processors(currentProcessor))
  • else
  • // 否则抛出IllegalStateException异常
  • throw new IllegalStateException("Unrecognized key state for acceptor thread.")
  • // round robin to the next processor thread
  • // Round Robin方式从processors数组中选择Processor
  • currentProcessor = (currentProcessor + 1) % processors.length
  • } catch {
  • case e: Throwable => error("Error while accepting connection", e)
  • }
  • }
  • }
  • }
  • catch {
  • // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
  • // to a select operation on a specific channel or a bad request. We don't want the
  • // the broker to stop responding to requests from other clients in these scenarios.
  • case e: ControlThrowable => throw e // ControlThrowable异常就抛出
  • case e: Throwable => error("Error occurred", e)
  • }
  • }
  • } finally {
  • debug("Closing server socket and selector.")
  • // 关掉ServerSocketChannel和NIO Selector,并吞掉异常
  • swallowError(serverChannel.close())
  • swallowError(nioSelector.close())
  • // 标识关闭操作完成
  • shutdownComplete()
  • }
  • }

ServerSocketChannel对象在该Selector上注册了OP_ACCEPT事件之后,才会标识Acceptor线程启动完成,然后会进入一个isRunning变量控制的while循环,不断对Selector对象nioSelector进行超时时间为500ms的select操作;如果能够获取到可Acceptable的SelectionKey时,会调用accept(key: SelectionKey, processor: Processor)方法处理该SelectionKey,否则抛出IllegalStateException异常,这是由于Acceptor线程只处理注册了OP_ACCEPT事件的SelectionKey。这里需要注意的是,在从Processor集合中选择处理请求的Processor线程时使用了Round Robin轮询方式。

由上面的分析可知,accept(key: SelectionKey, processor: Processor)方法才是Acceptor处理OP_ACCEPT事件的主要方法,源码如下:

  • /*
  • * Accept a new connection
  • */
  • def accept(key: SelectionKey, processor: Processor) {
  • // 获取ServerSocketChannel
  • val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
  • // 接受连接
  • val socketChannel = serverSocketChannel.accept()
  • try {
  • // 增加ConnectQuotas中记录的连接数
  • connectionQuotas.inc(socketChannel.socket().getInetAddress)
  • // 配置非阻塞、TCP NoDelay、KeepAlive、SendBufferSize
  • socketChannel.configureBlocking(false)
  • socketChannel.socket().setTcpNoDelay(true)
  • socketChannel.socket().setKeepAlive(true)
  • socketChannel.socket().setSendBufferSize(sendBufferSize)
  • debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
  • .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
  • socketChannel.socket.getSendBufferSize, sendBufferSize,
  • socketChannel.socket.getReceiveBufferSize, recvBufferSize))
  • // 将socketChannel交给processor处理
  • processor.accept(socketChannel)
  • } catch {
  • // 异常处理
  • case e: TooManyConnectionsException =>
  • info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
  • // 出现异常会关闭SocketChannel
  • close(socketChannel)
  • }
  • }

accept(...)方法中,会接受请求并建立连接,得到客户端的SocketChannel通道socketChannel,然后对该通道进行一系列的配置之后,将其交给Processor对象的accept(socketChannel: SocketChannel)方法。

5. Processor类

Processor主要用于完成读取请求和写回响应的操作,但它并不参与具体业务逻辑的处理,而是会交给Handler处理。Processor类中的几个字段如下:

  • private[kafka] class Processor(val id: Int,
  • time: Time,
  • maxRequestSize: Int,
  • requestChannel: RequestChannel, // Processor与Handler线程之间传递数据的队列
  • connectionQuotas: ConnectionQuotas,
  • connectionsMaxIdleMs: Long,
  • protocol: SecurityProtocol,
  • channelConfigs: java.util.Map[String, _],
  • metrics: Metrics) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  • ...
  • // 保存了由此Processor处理的新建的SocketChannel
  • private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
  • // 保存未发送的响应,会在发送成功后移除
  • private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
  • ...
  • }

Processor类的accept(socketChannel: SocketChannel)方法源码非常简单,它会将传入的SocketChannel对象放入newConnections队列中,该队列是并发安全队列ConcurrentLinkedQueue,然后唤醒底层的Java NIO Selector:

  • /**
  • * Queue up a new connection for reading
  • */
  • def accept(socketChannel: SocketChannel) {
  • // 将SocketChannel添加到newConnections队列中
  • newConnections.add(socketChannel)
  • // 唤醒底层的Java NIO Selector的wakeup()
  • wakeup()
  • }

Processor也继承自AbstractServerThread类,因此它也是一个Runnable类,它在Acceptor类初始化时就会被启动,主要业务逻辑位于run()方法中:

  • override def run() {
  • // 标识启动完成,唤醒等待Processor线程完成的线程
  • startupComplete()
  • while (isRunning) { // isRunning字段取的是alive.get,因此在Processor调用shutdown()时会将alive置为false,循环会结束
  • try {
  • // setup any new connections that have been queued up
  • // 处理每个SocketChannel注册OP_READ事件的工作
  • configureNewConnections()
  • // register any new responses for writing
  • // 处理responseQueue队列中缓存的Response
  • processNewResponses()
  • // 调用poll()方法读取请求,发送响应
  • poll()
  • // 处理KSelector.completedReceives队列
  • processCompletedReceives()
  • // 处理KSelector.completedSends队列
  • processCompletedSends()
  • // 处理KSelector.disconnected队列
  • processDisconnected()
  • } catch {
  • // We catch all the throwables here to prevent the processor thread from exiting. We do this because
  • // letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
  • // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
  • // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
  • case e: ControlThrowable => throw e
  • case e: Throwable =>
  • error("Processor got uncaught exception.", e)
  • }
  • }
  • debug("Closing selector - processor " + id)
  • swallowError(closeAll())
  • shutdownComplete()
  • }

run()方法会调用startupComplete()标识线程成功启动,然后在isRunning变量控制的while循环,不断地调用configureNewConnections()processNewResponses()poll()processCompletedReceives()processCompletedSends()processDisconnected()分别处理不同的业务,下面将分别介绍。

5.1. configureNewConnections()方法

configureNewConnections()方法的源码如下:

  • /**
  • * Register any new connections that have been queued up
  • */
  • private def configureNewConnections() {
  • // 只有在newConnections不为空时才进行
  • while (!newConnections.isEmpty) {
  • // poll一个SocketChannel对象
  • val channel = newConnections.poll()
  • try {
  • debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
  • val localHost = channel.socket().getLocalAddress.getHostAddress
  • val localPort = channel.socket().getLocalPort
  • val remoteHost = channel.socket().getInetAddress.getHostAddress
  • val remotePort = channel.socket().getPort
  • // 根据本地Host、port,客户端Host、port构造ConnectionId对象
  • val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
  • // 注册OP_READ事件,这里使用的是Kafka封装的KSelector
  • selector.register(connectionId, channel)
  • } catch {
  • // We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
  • // throwables will be caught in processor and logged as uncaught exceptions.
  • case NonFatal(e) =>
  • // need to close the channel here to avoid a socket leak.
  • close(channel)
  • error(s"Processor $id closed connection from ${channel.getRemoteAddress}", e)
  • }
  • }
  • }

configureNewConnections()方法主要功能是处理前面accept(...)方法中添加到newConnections队列的SocketChannel对象。newConnections是一个ConcurrentLinkedQueue队列,通过poll()方法取出队首SocketChannel对象后,根据其信息构建ConnectionId对象,然后调用selector.register(String id, SocketChannel socketChannel)注册事件,注意,此处的selector是一个KSelector对象,它的register(...)方法源码如下:

  • /**
  • * Register the nioSelector with an existing channel
  • * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
  • * Note that we are not checking if the connection id is valid - since the connection already exists
  • */
  • public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
  • // 注册OP_READ事件,得到键
  • SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
  • // 此处会根据已知信息创建KafkaChannel对象,并将其attach到SelectionKey上
  • KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
  • key.attach(channel);
  • this.channels.put(id, channel);
  • }

register(...)源码可知,此时SocketChannel对象会注册OP_READ事件到nioSelector选择器上,同时SocketChannel对象会被封装成KafkaChannel对象并Attach到SelectionKey上,所以后面触发OP_READ事件时,从SelectionKey上获取的是KafkaChannel类型的对象。

因此,configureNewConnections()方法主要是处理新建连接,注册OP_READ事件。

5.2. processNewResponses()

processNewResponses()方法的主要作用则是处理Handler返回的响应,Processor会负责将该响应发送回客户端。下面是processNewResponses()方法的源码:

  • private def processNewResponses() {
  • // 从requestChannel中获取对应的responseQueue队列,并得到出队的RequestChannel.Response对象
  • var curr = requestChannel.receiveResponse(id)
  • while (curr != null) {
  • try {
  • // 匹配响应动作类型
  • curr.responseAction match {
  • // 无操作
  • case RequestChannel.NoOpAction =>
  • // There is no response to send to the client, we need to read more pipelined requests
  • // that are sitting in the server's socket buffer
  • curr.request.updateRequestMetrics
  • trace("Socket server received empty response to send, registering for read: " + curr)
  • // unmute,其实是注册OP_READ事件
  • selector.unmute(curr.request.connectionId)
  • case RequestChannel.SendAction =>
  • // 需要发送响应给客户端,会将响应放入inflightResponses队列缓存
  • sendResponse(curr)
  • case RequestChannel.CloseConnectionAction =>
  • curr.request.updateRequestMetrics
  • trace("Closing socket connection actively according to the response code.")
  • // 需要关闭连接
  • close(selector, curr.request.connectionId)
  • }
  • } finally {
  • // 继续处理responseQueue
  • curr = requestChannel.receiveResponse(id)
  • }
  • }
  • }

在上面的代码中,第三行的requestChannel是一个RequestChannel类型的对象;上面提到过,它是Processor与Handler之间交换数据的主要组件,内部维护了requestQueueresponseQueues两个集合用于缓冲Request和Response,分别是requestQueueresponseQueues

  • class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  • ...
  • // Processor线程向Handler线程传递请求的队列,Processor线程有多个,因此这里使用线程安全的队列
  • private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
  • // 创建numProcessors大小的数组,元素泛型为BlockingQueue
  • private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
  • // 初始化数组元素
  • for(i <- 0 until numProcessors)
  • // 每个元素为LinkedBlockingQueue实例
  • responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
  • ...
  • }

其中requestQueue是一个简单的线程安全队列ArrayBlockingQueue,其中装载了Processor扔给Handler的RequestChannel.Request对象,而responseQueues则是一个元素类型为LinkedBlockingQueue[RequestChannel.Respons]的数组,该结构表示,Handler为每一个Processor都创建了一个LinkedBlockingQueue队列,以保证所有的响应都能正确的交给对应的Processor。

processNewResponses()方法里通过requestChannel对象的receiveResponse(processor: Int)方法,传入自己的ID以获取一个RequestChannel.Response对象,该方法源码如下:

  • /** Get a response for the given processor if there is one */
  • def receiveResponse(processor: Int): RequestChannel.Response = {
  • // 通过Processor的ID获取对应Processor的Response队列,并poll出一个RequestChannel.Response对象
  • val response = responseQueues(processor).poll()
  • if (response != null)
  • // 设置出队时间为当前时间
  • response.request.responseDequeueTimeMs = SystemTime.milliseconds
  • // 返回
  • response
  • }

对照着responseQueues的结构,这个操作很容易理解了;while循环会不断地取出Response对象,直到responseQueues为空;在循环体内,会根据Response对象的动作类型做出不同的操作;分别有动作类型:

  1. RequestChannel.NoOpAction:无动作类型;表示此事可能需要从客户端读取更多的数据,因此将KSelector对象unmute。
  2. RequestChannel.SendAction:发送类型;表示需要发送响应给客户端,调用sendResponse(curr)方法将响应放入inflightResponses队列缓存。
  3. RequestChannel.CloseConnectionAction:关闭连接类型;表示需要关闭连接,调用close(selector: KSelector, connectionId: String)方法关闭连接。

5.2.1. unmute和mute操作

unmute操作的底层其实是注册OP_READ事件,以便继续读取数据:

  • // org.apache.kafka.common.network.Selector
  • @Override
  • public void unmute(String id) {
  • KafkaChannel channel = channelOrFail(id);
  • unmute(channel);
  • }
  • private void unmute(KafkaChannel channel) {
  • channel.unmute();
  • }
  • // org.apache.kafka.common.network.KafkaChannel
  • public void unmute() {
  • transportLayer.addInterestOps(SelectionKey.OP_READ);
  • }
  • // org.apache.kafka.common.network.PlaintextTransportLayer
  • @Override
  • public void addInterestOps(int ops) {
  • key.interestOps(key.interestOps() | ops);
  • }

与之对应的还有mute操作,底层其实是取消OP_READ事件,即终止数据的读取:

  • // org.apache.kafka.common.network.Selector
  • @Override
  • public void mute(String id) {
  • KafkaChannel channel = channelOrFail(id);
  • mute(channel);
  • }
  • private void mute(KafkaChannel channel) {
  • channel.mute();
  • }
  • // org.apache.kafka.common.network.KafkaChannel
  • public void mute() {
  • transportLayer.removeInterestOps(SelectionKey.OP_READ);
  • }
  • // org.apache.kafka.common.network.PlaintextTransportLayer
  • @Override
  • public void removeInterestOps(int ops) {
  • key.interestOps(key.interestOps() & ~ops);
  • }

这两个操作在前面讲解KafkaConsumer时提到过,主要用于保证请求和响应的时序一致。

5.2.2. 缓存响应

sendResponse(curr)方法只会将响应放入inflightResponses队列缓存,源码如下:

  • // kafka.network.Processor
  • /**
  • * `protected` for test usage
  • * 将连接放入inflightResponses队列,等待发送
  • * */
  • protected[network] def sendResponse(response: RequestChannel.Response) {
  • trace(s"Socket server received response to send, registering for write and sending data: $response")
  • // 获取对应的KafkaChannel
  • val channel = selector.channel(response.responseSend.destination)
  • // `channel` can be null if the selector closed the connection because it was idle for too long
  • // 如果KafkaChannel为null,表示连接可能失效了,记录错误信息,并更新到监控系统
  • if (channel == null) {
  • warn(s"Attempting to send response via channel for which there is no open connection, connection id $id")
  • response.request.updateRequestMetrics()
  • }
  • else {
  • // 将响应绑定到KSelector的send属性上
  • selector.send(response.responseSend)
  • // 添加响应到inflightResponses字典
  • inflightResponses += (response.request.connectionId -> response)
  • }
  • }
  • // org.apache.kafka.common.network.Selector
  • /**
  • * Queue the given request for sending in the subsequent {@link #poll(long)} calls
  • * @param send The request to send
  • */
  • public void send(Send send) {
  • KafkaChannel channel = channelOrFail(send.destination());
  • try {
  • /**
  • * 将send对象缓存到KafkaChannel的send字段中,同时添加OP_WRITE事件的关注
  • * send对象实际类型是RequestSend对象,其中封装了具体的请求数据,包括请求头和请求体
  • * 这里只是将RequestSend对象用KafkaChannel的send字段记录下来
  • * 具体的发送会在Selector.poll()方法中进行
  • * KafkaChannel每次只会发送一个RequestSend对象
  • */
  • channel.setSend(send);
  • } catch (CancelledKeyException e) {
  • this.failedSends.add(send.destination());
  • close(channel);
  • }
  • }

这里的响应发送过程其实与前面KafkaConsumer中的类型,读者可以翻阅前面内容,这里不再赘述。

5.2.3. 关闭连接

close(selector: KSelector, connectionId: String)方法用于关闭连接,该方法是Processor继承自AbstractServerThread类的,前面已经讲到过,源码如下:

  • // kafka.network.AbstractServerThread
  • /**
  • * Close the connection identified by `connectionId` and decrement the connection count.
  • * 关闭指定连接
  • */
  • def close(selector: KSelector, connectionId: String) {
  • val channel = selector.channel(connectionId)
  • if (channel != null) {
  • debug(s"Closing selector connection $connectionId")
  • val address = channel.socketAddress
  • if (address != null)
  • // 减少connectionQuotas中记录的连接数
  • connectionQuotas.dec(address)
  • // 根据传入的connectionId关闭SocketChannel
  • selector.close(connectionId)
  • }
  • }

5.3. poll()

Processor的poll()方法则是调用SocketServer的poll()方法读取请求,发送响应。poll()方法底层调用的是KSelecor的poll()方法:

  • // kafka.network.Processor
  • private def poll() {
  • /**
  • * 将读取的请求、发送成功的请求以及断开的连接放入其
  • * completedReceives、completedSends、disconnected队列中等待处理
  • */
  • try selector.poll(300)
  • catch {
  • case e @ (_: IllegalStateException | _: IOException) =>
  • error(s"Closing processor $id due to illegal state or IO exception")
  • swallow(closeAll())
  • shutdownComplete()
  • throw e
  • }
  • }

5.4. processCompletedReceives()

processCompletedReceives()方法则主要处理完全收到的请求,该方法源码如下:

  • // kafka.network.Processor
  • /**
  • * 遍历completedReceives,将NetworkReceive、ProcessorId、身份认证信息一起封装成RequestChannel.Request对象
  • * 并放入RequestChannel.requestQueue队列中,等待Handler线程的后续处理。
  • * 之后,取消对应KafkaChannel注册的OP_READ事件,表示在发送响应之前,此连接不能再读取任何请求了。
  • */
  • private def processCompletedReceives() {
  • // 遍历KSelector.completedReceives队列
  • selector.completedReceives.asScala.foreach { receive =>
  • try {
  • // 获取对应的KafkaChannel
  • val channel = selector.channel(receive.source)
  • // 创建KafkaChannel对应的Session对象,用于权限控制
  • val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
  • channel.socketAddress)
  • // 封装Request请求对象
  • val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
  • // 将RequestChannel.Requestd放入RequestChannel.requestQueue队列中等待Handler线程的后续处理
  • requestChannel.sendRequest(req)
  • // 取消注册的OP_READ事件,连接将不再读取数据
  • selector.mute(receive.source)
  • } catch {
  • case e @ (_: InvalidRequestException | _: SchemaException) =>
  • // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
  • error(s"Closing socket for ${receive.source} because of error", e)
  • close(selector, receive.source)
  • }
  • }
  • }
  • // kafka.network.RequestChannel
  • /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
  • def sendRequest(request: RequestChannel.Request) {
  • requestQueue.put(request)
  • }

通过遍历KSelector的completedReceives数组获取其中的NetReceive对象,然后根据NetReceive创建RequestChannel.Request对象并添加到requestChannelrequestQueue队列中,同时对KSelector对象执行mute操作以取消OP_READ事件,连接将终止读取客户端发送的数据。

5.5. processCompletedSends()

processCompletedSends()方法的源码如下:

  • // kafka.network.Processor
  • private def processCompletedSends() {
  • // 遍历completedSends队列
  • selector.completedSends.asScala.foreach { send =>
  • // 从inflightResponses字典中取出Response
  • val resp = inflightResponses.remove(send.destination).getOrElse {
  • // 如果没取到就抛出异常
  • throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
  • }
  • resp.request.updateRequestMetrics()
  • // 注册OP_READ事件,允许连接继续读取数据
  • selector.unmute(send.destination)
  • }
  • }

processCompletedSends()方法则会处理KSelector的completedSends队列,将inflightResponses中保存的对应Response删除,然后通过KSelector的unmute操作为对应连接重新注册OP_READ事件,允许从该连接读取数据。

5.6. processDisconnected()

processDisconnected()方法的源码如下:

  • // kafka.network.Processor
  • private def processDisconnected() {
  • // 遍历disconnected队列
  • selector.disconnected.asScala.foreach { connectionId =>
  • val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
  • throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
  • }.remoteHost
  • // 从inflightResponses中移除该连接对应的所有Response
  • inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
  • // the channel has been closed by the selector but the quotas still need to be updated
  • // 减少ConnectionQuotas中记录的连接数,为后续的新建连接做准备
  • connectionQuotas.dec(InetAddress.getByName(remoteHost))
  • }
  • }

processDisconnected()方法会处理KSelector的disconnected队列,先从inflightResponses中删除该连接对应的所有Response,然后减少ConnectionQuotas中记录的连接数,为后续的新建连接做准备。

上面的六个方法,是Processor处理请求和响应的主要步骤,这里总结一下它们的作用:

  1. configureNewConnections():处理新的连接,为每个SocketChannel注册OP_READ事件。
  2. processNewResponses():处理responseQueue队列中缓存的Response,这些Response是Handler线程产生的,Processor需要根据Response不同的动作类型做出不同的操作。
  3. poll():调用KSelector的poll()方法读取请求,发送响应。
  4. processCompletedReceives():处理KSelector的completedReceives队列,处理收到的完整的请求。
  5. processCompletedSends():处理KSelector的completedSends队列,主要是将已完成发送的响应从inflightResponses字典中移除。
  6. processDisconnected():处理KSelector的disconnected队列,处理断开的连接。

对应的,在Processor的run()方法中,当isRunning标记为false之后即表示Processor线程关闭了,此时会跳出while循环,执行swallowError(closeAll())shutdownComplete()方法以关闭连接,其中swallowError(action: => Unit)用于吞掉closeAll()产生的异常,三个方法源码如下:

  • // kafka.utils.Logging
  • def swallowError(action: => Unit) {
  • CoreUtils.swallow(logger.error, action)
  • }
  • // kafka.utils.CoreUtils
  • def swallow(log: (Object, Throwable) => Unit, action: => Unit) {
  • try {
  • action
  • } catch {
  • case e: Throwable => log(e.getMessage(), e)
  • }
  • }
  • // kafka.network.Processor
  • private def closeAll() {
  • // 关闭KSelector上所有的KafkaChannel,并关闭KSelector
  • selector.channels.asScala.foreach { channel =>
  • close(selector, channel.id)
  • }
  • selector.close()
  • }
  • // kafka.network.AbstractServerThread
  • protected def shutdownComplete() = shutdownLatch.countDown()

6. RequestChannel类

RequestChannel是Processor与Handler之间传递数据的桥梁。在RequestChannel中包含了一个requestQueue队列和多个responseQueues队列,每个Processor线程对应一个responseQueue。Processor线程将读取到的请求存入requestQueue中,Handler线程从requestQueue队列中取出请求进行处理;Handler线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程从其对应的responseQueue中取出响应并发送给客户端。该类的定义和重要属性如下:

  • class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  • // 监听器列表,其中的监听器的主要作用是Handler线程向responseQueue存放响应时唤醒对应的Processor线程
  • private var responseListeners: List[(Int) => Unit] = Nil
  • // Processor线程向Handler线程传递请求的队列,Processor线程有多个,因此这里使用线程安全的队列
  • private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
  • // 创建numProcessors大小的数组,元素泛型为BlockingQueue
  • private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
  • // 初始化数组元素
  • for(i <- 0 until numProcessors)
  • // 每个元素为LinkedBlockingQueue实例
  • responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
  • ...
  • }

这些属性在前面都有简单提到过,这里着重讲一下responseListeners属性,该属性是一个集合类,内部元素是(Int) => Unit类型的函数,用于装载以函数方式表示的监听器。在SocketServer类中,会添加一个id => processors(id).wakeup()函数作为监听器:

  • // kafka.network.SocketServer
  • /**
  • * 向RequestChannel中添加监听器,RequestChannel是Processor与Handler传递数据的桥梁
  • * 当Handler线程向某个responseQueue中写入数据时,会唤醒对应的Processor线程进行处理
  • */
  • requestChannel.addResponseListener(id => processors(id).wakeup())

RequestChannel在每次向responseQueues添加请求时都要触发responseListeners列表中的监听器:

  • // kafka.network.RequestChannel
  • /**
  • * Send a response back to the socket server to be sent over the network
  • * 向responseQueues添加Response(SendAction类型)
  • * */
  • def sendResponse(response: RequestChannel.Response) {
  • // 添加Response
  • responseQueues(response.processor).put(response)
  • // 告诉监听器
  • for(onResponse <- responseListeners)
  • onResponse(response.processor)
  • }
  • /**
  • * No operation to take for the request, need to read more over the network
  • * 向responseQueues添加Response(NoOpAction类型)
  • * */
  • def noOperation(processor: Int, request: RequestChannel.Request) {
  • // 添加Response
  • responseQueues(processor).put(new RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction))
  • // 告诉监听器
  • for(onResponse <- responseListeners)
  • onResponse(processor)
  • }
  • /**
  • * Close the connection for the request
  • * 向responseQueues添加Response(CloseConnectionAction类型)
  • * */
  • def closeConnection(processor: Int, request: RequestChannel.Request) {
  • responseQueues(processor).put(new RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction))
  • // 告诉监听器
  • for(onResponse <- responseListeners)
  • onResponse(processor)
  • }

sendResponse(response: RequestChannel.Response)noOperation(processor: Int, request: RequestChannel.Request)closeConnection(processor: Int, request: RequestChannel.Request)三个方法分别处理上面的提到的三种响应动作类型:RequestChannel.SendAction、RequestChannel.NoOpAction和RequestChannel.CloseConnectionAction。

这三个方法内都会遍历responseListeners集合,执行所有的监听器,监听器的作用是调用对应的Processor对象的wakeup()方法唤醒Processor线程。

6.1. RequestChannel.Request类

RequestChannel.Request是Processor传递给Handler的请求包装类,它是一个样例类,其中比较重要源码如下:

  • // kafka.network.RequestChannel.Request
  • // Request请求Case类
  • case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
  • // These need to be volatile because the readers are in the network thread and the writers are in the request
  • // handler threads or the purgatory threads
  • // @volatile修饰保证线程可见性;记录操作时间的几个字段
  • @volatile var requestDequeueTimeMs = -1L
  • @volatile var apiLocalCompleteTimeMs = -1L
  • @volatile var responseCompleteTimeMs = -1L
  • @volatile var responseDequeueTimeMs = -1L
  • @volatile var apiRemoteCompleteTimeMs = -1L
  • // 请求类型ID
  • val requestId = buffer.getShort()
  • // TODO: this will be removed once we migrated to client-side format
  • // for server-side request / response format
  • // NOTE: this map only includes the server-side request/response handlers. Newer
  • // request types should only use the client-side versions which are parsed with
  • // o.a.k.common.requests.AbstractRequest.getRequest()
  • private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
  • Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
  • ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom
  • )
  • // TODO: this will be removed once we migrated to client-side format
  • val requestObj =
  • keyToNameAndDeserializerMap.get(requestId).map(readFrom => readFrom(buffer)).orNull
  • // if we failed to find a server-side mapping, then try using the
  • // client-side request / response format
  • // 请求头
  • val header: RequestHeader =
  • if (requestObj == null) {
  • buffer.rewind
  • // 尝试解析ByteBuffer
  • try RequestHeader.parse(buffer)
  • catch {
  • case ex: Throwable =>
  • throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex)
  • }
  • } else
  • null
  • // 请求体
  • val body: AbstractRequest =
  • if (requestObj == null)
  • try {
  • // For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later
  • // 判断请求头的类型
  • if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion))
  • new ApiVersionsRequest
  • else
  • // 尝试解析请求体
  • AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
  • } catch {
  • case ex: Throwable =>
  • throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
  • }
  • else
  • null
  • // 解析完成后将ByteBuffer置空
  • buffer = null
  • private val requestLogger = Logger.getLogger("kafka.request.logger")
  • def requestDesc(details: Boolean): String = {
  • if (requestObj != null)
  • requestObj.describe(details)
  • else
  • header.toString + " -- " + body.toString
  • }
  • ...
  • }

从源码可知,RequestChannel.Request会对请求进行解析,形成requestId(请求类型ID)、header(请求头)、body(请求体)等字段,供Handler线程使用,并提供了一些记录操作时间的字段供监控程序使用。

6.2. RequestChannel.Response类

RequestChannel.Response类则比较简单,它也是一个样例类,它只是对数据进行了简单的封装。

  • // kafka.network.RequestChannel.Response
  • /**
  • * Response响应Case类
  • * @param processor 对应的Processor处理器
  • * @param request 请求对象
  • * @param responseSend 装载响应数据
  • * @param responseAction 有SendAction、NoOpAction、CloseConnectionAction三种类型
  • */
  • case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
  • request.responseCompleteTimeMs = SystemTime.milliseconds
  • def this(processor: Int, request: Request, responseSend: Send) =
  • this(processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction)
  • def this(request: Request, send: Send) =
  • this(request.processor, request, send)
  • }
  • // kafka.network.RequestChannel.ResponseAction
  • trait ResponseAction
  • // kafka.network.RequestChannel.SendAction
  • case object SendAction extends ResponseAction
  • // kafka.network.RequestChannel.NoOpAction
  • case object NoOpAction extends ResponseAction
  • // kafka.network.RequestChannel.CloseConnectionAction
  • case object CloseConnectionAction extends ResponseAction

7. KafkaRequestHandler类

Processor线程会将请求放入到RequestChannel的requestQueue队列中,Handler线程则负责从requestQueue队列中取出RequestChannel.Request请求进行处理,并将响应通过responseQueues数组传递给Processor线程。KafkaRequestHandler类是主要负责从RequestChannel的requestQueue队列获取请求并进行处理的Handler,它是一个Runnable类,源码如下:

  • // kafka.server.KafkaRequestHandler
  • class KafkaRequestHandler(id: Int,
  • brokerId: Int,
  • val aggregateIdleMeter: Meter,
  • val totalHandlerThreads: Int,
  • val requestChannel: RequestChannel,
  • apis: KafkaApis) extends Runnable with Logging {
  • this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
  • def run() {
  • while(true) {
  • try {
  • // 循环获取RequestChannel.Request
  • var req : RequestChannel.Request = null
  • while (req == null) {
  • // We use a single meter for aggregate idle percentage for the thread pool.
  • // Since meter is calculated as total_recorded_value / time_window and
  • // time_window is independent of the number of threads, each recorded idle
  • // time should be discounted by # threads.
  • val startSelectTime = SystemTime.nanoseconds
  • // 从RequestChannel的requestQueue队列中获取请求
  • req = requestChannel.receiveRequest(300)
  • val idleTime = SystemTime.nanoseconds - startSelectTime
  • aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
  • }
  • // 接收到Shutdown命令
  • if(req eq RequestChannel.AllDone) {
  • debug("Kafka request handler %d on broker %d received shut down command".format(
  • id, brokerId))
  • return
  • }
  • req.requestDequeueTimeMs = SystemTime.milliseconds
  • trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
  • // 使用KafkaApis处理RequestChannel.Request
  • apis.handle(req)
  • } catch {
  • case e: Throwable => error("Exception when handling request", e)
  • }
  • }
  • }
  • // 关闭线程,向RequestChannel的requestQueue队列中添加一个RequestChannel.AllDone请求,表明关闭Handler
  • def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
  • }
  • // kafka.network.RequestChannel
  • /** Get the next request or block until specified time has elapsed */
  • def receiveRequest(timeout: Long): RequestChannel.Request =
  • requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

在KafkaRequestHandler类的run()方法中,会不断从RequestChannel的requestQueue队列中取出RequestChannel.Request请求,然后交给KafkaApis的handle(request: RequestChannel.Request)方法进行处理,KafkaApis类会在后面讲解。

8. KafkaRequestHandlerPool线程池

单个KafkaRequestHandler线程是无法满足高并发的请求操作的,因此KafkaServer实现了一个简易的线程池:KafkaRequestHandlerPool。它的源码如下:

  • class KafkaRequestHandlerPool(val brokerId: Int,
  • val requestChannel: RequestChannel,
  • val apis: KafkaApis,
  • numThreads: Int) extends Logging with KafkaMetricsGroup {
  • /* a meter to track the average free capacity of the request handlers */
  • private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
  • this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
  • val threads = new Array[Thread](numThreads)
  • val runnables = new Array[KafkaRequestHandler](numThreads)
  • // 循环创建KafkaRequestHandler对象
  • for(i <- 0 until numThreads) {
  • // 放入runnables数组
  • runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
  • // 将线程置为守护线程
  • threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
  • // 启动线程
  • threads(i).start()
  • }
  • // 关闭线程池
  • def shutdown() {
  • info("shutting down")
  • // 遍历并关闭线程
  • for(handler <- runnables)
  • handler.shutdown
  • // 等待threads中的所有线程终止
  • for(thread <- threads)
  • thread.join
  • info("shut down completely")
  • }
  • }

KafkaRequestHandlerPool的实现比较简单,内部仅仅简单地创建了数量为numThreads(由num.io.threads参数决定,默认为8)的KafkaRequestHandler线程并设置为守护线程,然后将它们全部启动。

9. KafkaApis类

KafkaApis类是处理请求的具体类,它的定义及handle(request: RequestChannel.Request)方法的源码如下:

  • // kafka.server.KafkaApis
  • class KafkaApis(val requestChannel: RequestChannel,
  • val replicaManager: ReplicaManager,
  • val coordinator: GroupCoordinator,
  • val controller: KafkaController,
  • val zkUtils: ZkUtils,
  • val brokerId: Int,
  • val config: KafkaConfig,
  • val metadataCache: MetadataCache,
  • val metrics: Metrics,
  • val authorizer: Option[Authorizer]) extends Logging {
  • this.logIdent = "[KafkaApi-%d] ".format(brokerId)
  • // Store all the quota managers for each type of request
  • val quotaManagers: Map[Short, ClientQuotaManager] = instantiateQuotaManagers(config)
  • /**
  • * Top-level method that handles all requests and multiplexes to the right api
  • */
  • def handle(request: RequestChannel.Request) {
  • try {
  • trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
  • format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
  • // 根据requestId获取请求对应的ApiKeys,进行匹配
  • ApiKeys.forId(request.requestId) match {
  • case ApiKeys.PRODUCE => handleProducerRequest(request) // 生产
  • case ApiKeys.FETCH => handleFetchRequest(request) // 拉取
  • case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request) // 获取offsets
  • case ApiKeys.METADATA => handleTopicMetadataRequest(request) // 获取原数据
  • case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) // 获取Leader及ISR信息
  • case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) // 停止副本
  • case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request) // 更新原数据
  • case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
  • case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request) // 提交offset
  • case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request) // 获取offset
  • case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request) // 获取GroupCoordinator
  • case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request) // JoinGroup请求
  • case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request) // 心跳
  • case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request) // 离开Group
  • case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request) // 同步Group
  • case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request) // 获取Group信息
  • case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) // 获取Group列表
  • case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) // SASL握手
  • case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) // 获取API Version
  • case requestId => throw new KafkaException("Unknown api code " + requestId) // 未知
  • }
  • } catch {
  • case e: Throwable =>
  • if (request.requestObj != null) {
  • // 如果request.requestObj不为空,则使用request.requestObj处理异常
  • request.requestObj.handleError(e, requestChannel, request)
  • error("Error when handling request %s".format(request.requestObj), e)
  • } else {
  • // 否则构建Response,使用RequestChannel返回异常响应
  • val response = request.body.getErrorResponse(request.header.apiVersion, e)
  • val respHeader = new ResponseHeader(request.header.correlationId)
  • /* If request doesn't have a default error response, we just close the connection.
  • For example, when produce request has acks set to 0 */
  • if (response == null)
  • requestChannel.closeConnection(request.processor, request)
  • else
  • requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
  • error("Error when handling request %s".format(request.body), e)
  • }
  • } finally
  • request.apiLocalCompleteTimeMs = SystemTime.milliseconds
  • }
  • ...
  • }

KafkaApis类的handle(request: RequestChannel.Request)方法会根据RequestChannel.Request的requestId获取ApiKeys并进行匹配,针对不同的请求调用不同的方法进行具体的处理,而这些处理请求的方法都定义于KafkaApis类中,在后面会陆续介绍。

10. 总结

Kafka Server的网络层和API层的看起来比较复杂,其实内部设计是比较简单的,网络层使用了Reactor模式进行网络通信处理,而客户端所有要求的具体操作都交给了API层;其中有以下几条规则:

  1. 每个网卡对应一个EndPoint对象;
  2. 每个EndPoint对象对应一个Acceptor线程;
  3. 每个Acceptor线程对应多个Processor线程;
  4. 所有的Processor线程对应一个RequestChannel对象;
  5. 多个Processor线程对应一个Request队列,每个Processor线程对应一个Response队列;
  6. 客户端的请求发送到服务端后,首先由Acceptor线程负责接收连接,然后将对应的SocketChannel交给一个Processor线程;
  7. Processor线程会将请求放入RequestChannel中的Request队列,由Handler线程来处理这些请求;
  8. Handler线程有多个,使用简易的线程池进行管理;
  9. Handler线程会根据RequestChannel.Request请求的API类型,将请求交给KafkaApis进行处理;
  10. KafkaApis处理完请求后,会将响应放入Processor对应的Response队列,Processor会根据Response的类型做出不同的处理,并响应给客户端。