大数据
流式处理
Kafka
源码解析

Kafka系列 25 - 服务端源码分析 16:权限控制

简介:主要讲解Kafka中权限控制的实现

1. 权限控制的接入

在上一篇文章中我们了解到,“身份认证”功能的接入是在KafkaChannel中以Authenticator验证器实现的,但Kafka中“权限控制”的接入则被放置到了网络层。

Kafka服务端的网络层中,Acceptor会处理客户端的通信连接请求,然后讲具体的数据读写操作交给了Processor池处理,Processor最终会将请求通过KafkaApis接口层传递给各个Handler事件处理器。

在Processor的processCompletedReceives()方法中,会将通过“身份认证”后的客户端的身份封装成Session对象,与读取到的请求信息一起放入RequestChannel中等待Handler线程处理,该方法的源码如下:

kafka.network.Processor#processCompletedReceives
  • /**
  • * 遍历completedReceives,将NetworkReceive、ProcessorId、身份认证信息一起封装成RequestChannel.Request对象
  • * 并放入RequestChannel.requestQueue队列中,等待Handler线程的后续处理。
  • * 之后,取消对应KafkaChannel注册的OP_READ事件,表示在发送响应之前,此连接不能再读取任何请求了。
  • */
  • private def processCompletedReceives() {
  • // 遍历KSelector.completedReceives队列
  • selector.completedReceives.asScala.foreach { receive =>
  • try {
  • // 获取对应的KafkaChannel
  • val channel = selector.channel(receive.source)
  • // 从KafkaChannel中封装的SaslServerAuthenticator对象中获取authorizationID信息,并封装成Session,用于权限控制
  • val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
  • channel.socketAddress)
  • // 将Session与请求数据封装成RequestChannel.Request对象
  • val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
  • // 将RequestChannel.Requestd放入RequestChannel.requestQueue队列中等待Handler线程的后续处理
  • requestChannel.sendRequest(req)
  • // 取消注册的OP_READ事件,连接将不再读取数据
  • selector.mute(receive.source)
  • } catch {
  • case e @ (_: InvalidRequestException | _: SchemaException) =>
  • // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
  • error(s"Closing socket for ${receive.source} because of error", e)
  • close(selector, receive.source)
  • }
  • }
  • }

而在KafkaServer的启动过程中,会读取server.properties文件中的authorizer.class.name配置项,反射创建指定的Authorizer对象,调用其configure(...)方法,同时将其传递给KafkaApis对象,回顾我们之前在server.properties中的配置:

${KAFKA_HOME}/config/server.properties
  • listeners=SASL_PLAINTEXT://HOST:PORT
  • security.inter.broker.protocol=SASL_PLAINTEXT
  • sasl.mechanism.inter.broker.protocol=PLAIN
  • sasl.enabled.mechanisms=PLAIN
  • authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  • super.users=User:admin

以及KafkaServer的startup()初始化方法源码:

kafka.server.KafkaServer#startup
  • def startup() {
  • ...
  • /**
  • * Get the authorizer and initialize it if one is specified.
  • * 创建并配置Authorizer对象,根据server.properties文件中的authorizer.class.name配置项
  • **/
  • authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
  • // 通过反射方式初始化authorizer.class.name配置项指定的Authorizer对象
  • val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
  • // 调用Authorizer对象的configure()方法进行配置
  • authZ.configure(config.originals())
  • authZ
  • }
  • /* start processing requests */
  • // 这里会将Authorizer对象传递给KafkaApis对象
  • apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
  • kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
  • requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
  • brokerState.newState(RunningAsBroker)
  • ...
  • }

在KafkaApis所有处理请求的以handle开头的方法中,都会调用KafkaApis的authorize()方法进行权限控制,该方法源码如下:

kafka.server.KafkaApis#authorize
  • // 用于权限验证,使用的是Authorizer的方法
  • private def authorize(session: Session, operation: Operation, resource: Resource): Boolean =
  • authorizer.map(_.authorize(session, operation, resource)).getOrElse(true)

可见KafkaApis的authorize(...)其实调用了所有Authorizer的authorize(...)方法进行验证。

2. 权限验证器

在server.properties文件中,我们将authorizer.class.name配置为了kafka.security.auth.SimpleAclAuthorizer,该类的定义和重要字段如下:

kafka.security.auth.SimpleAclAuthorizer
  • // 用于权限控制,将权限信息储存在ZooKeeper中
  • class SimpleAclAuthorizer extends Authorizer with Logging {
  • private val authorizerLogger = Logger.getLogger("kafka.authorizer.logger")
  • // 用于记录配置文件中指定的超级用户的信息
  • private var superUsers = Set.empty[KafkaPrincipal]
  • /**
  • * 默认值false,表示如果一个资源在ACLs中没有相关记录时,除了超级用户,任何用户都不能访问。
  • * 可以通过allow.everyone.if.no.acl.found配置项修改默认行为。
  • */
  • private var shouldAllowEveryoneIfNoAclIsFound = false
  • private var zkUtils: ZkUtils = null
  • // ZooKeeper上的监听器
  • private var aclChangeListener: ZkNodeChangeNotificationListener = null
  • // ACLs在内存中的缓存
  • private val aclCache = new scala.collection.mutable.HashMap[Resource, VersionedAcls]
  • ...
  • }

2.1. 权限验证器的初始化

前面讲到过,在KafkaServer初始化时,该类的configure(...)方法就会被调用,这个方法的源码如下:

kafka.security.auth.SimpleAclAuthorizer#configure
  • /**
  • * Guaranteed to be called before any authorize call is made.
  • * 初始化操作
  • */
  • override def configure(javaConfigs: util.Map[String, _]) {
  • val configs = javaConfigs.asScala
  • // 对配置信息进行转换
  • val props = new java.util.Properties()
  • configs.foreach { case (key, value) => props.put(key, value.toString) }
  • // 获取超级用户的信息
  • superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
  • case str: String if str.nonEmpty => str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet
  • }.getOrElse(Set.empty[KafkaPrincipal])
  • // 根据allow.everyone.if.no.acl.found配置初始化shouldAllowEveryoneIfNoAclIsFound
  • shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
  • // Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this
  • // means that `KafkaConfig.zkConnect` must always be set by the user (even if `SimpleAclAuthorizer.ZkUrlProp` is also
  • // set).
  • val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
  • // 获取Zookeeper地址、连接超时时间、Session过期时间等配置
  • // authorizer.zookeeper.url,zookeeper.connect
  • val zkUrl = configs.get(SimpleAclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
  • // authorizer.zookeeper.connection.timeout.ms
  • val zkConnectionTimeoutMs = configs.get(SimpleAclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
  • // zookeeper.session.timeout.ms
  • val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
  • // 创建ZkUtils用于与Zookeeper交互
  • zkUtils = ZkUtils(zkUrl,
  • zkConnectionTimeoutMs,
  • zkSessionTimeOutMs,
  • JaasUtils.isZkSecurityEnabled())
  • // 检测/kafka-acl这个持久节点在Zookeeper中是否存在,如果不存在就创建
  • zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
  • // 将Zookeeper中的ACLs信息加载到aclCache字段中
  • loadCache()
  • // 检测/kafka-acl-changes节点在Zookeeper中是否存在,如果不存在就创建
  • zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
  • // 添加Zookeeper监听器,监听/kafka-acl-changes节点
  • aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
  • aclChangeListener.init()
  • }

可见,在configure(...)方法中,会初始化超级用户的信息、shouldAllowEveryoneIfNoAclIsFound字段,调用loadCache()方法将Zookeeper中的ACLs信息加载到aclCache字段中,注册ZkNodeChangeNotificationListener监听器监听Zookeeper的/kafka-acl-changes节点。

我们先关注其中的loadCache()方法,它会遍历ZooKeeper的/kafka-acl节点中记录的ACLs信息,并将其加载到aclCache集合中,源码如下:

kafka.security.auth.SimpleAclAuthorizer
  • private def loadCache() {
  • inWriteLock(lock) {
  • // 获取哦/kafka-acl子节点集合,表示资源的类型,如:Topic、Cluster、ConsumerGroup
  • val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
  • // 遍历资源类型集合
  • for (rType <- resourceTypes) {
  • // 转换为ResourceType类型
  • val resourceType = ResourceType.fromString(rType)
  • // 拼接/kafka-acl/[ResourceType]路径
  • val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
  • // 获取/kafka-acl/[ResourceType]下字节点的集合,表示具体的资源
  • val resourceNames = zkUtils.getChildren(resourceTypePath)
  • // 遍历每个资源
  • for (resourceName <- resourceNames) {
  • // 获取对应节点的JSON数据并转换为VersionedAcls
  • val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName.toString))
  • // 将VersionedAcls对象保存到aclCache集合中
  • updateCache(new Resource(resourceType, resourceName), versionedAcls)
  • }
  • }
  • }
  • }
  • // 从Zookeeper中读取ACLs信息,封装为VersionedAcls对象
  • private def getAclsFromZk(resource: Resource): VersionedAcls = {
  • val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource))
  • VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion)
  • }
  • // 更新ACLs信息到aclCache
  • private def updateCache(resource: Resource, versionedAcls: VersionedAcls) {
  • if (versionedAcls.acls.nonEmpty) {
  • // 只有在Acls不为空时才添加
  • aclCache.put(resource, versionedAcls)
  • } else {
  • // 否则视为移除
  • aclCache.remove(resource)
  • }
  • }

可见,aclCache中会以Resource对象为键,VersionedAcls对象为值管理ACLs信息。

在上一篇文章中提到过,Kafka对权限控制分为资源控制、操作控制和控制语义,这里的Resource类就用于表示资源控制的范围,它封装了资源类型和资源名称,源码如下:

kafka.security.auth.Resource
  • /**
  • *
  • * @param resourceType type of resource. 资源类型
  • * @param name name of the resource, for topic this will be topic name , for group it will be group name. For cluster type
  • * it will be a constant string kafka-cluster. 资源名称
  • */
  • case class Resource(val resourceType: ResourceType, val name: String) {
  • override def toString: String = {
  • resourceType.name + Resource.Separator + name
  • }
  • }

其中,resourceType参数的类型ResourceType是一个特质,它有Cluster、Topic、Group三个实现类:

kafka.security.auth
  • sealed trait ResourceType extends BaseEnum { def errorCode: Short }
  • case object Cluster extends ResourceType {
  • val name = "Cluster"
  • val errorCode = Errors.CLUSTER_AUTHORIZATION_FAILED.code
  • }
  • case object Topic extends ResourceType {
  • val name = "Topic"
  • val errorCode = Errors.TOPIC_AUTHORIZATION_FAILED.code
  • }
  • case object Group extends ResourceType {
  • val name = "Group"
  • val errorCode = Errors.GROUP_AUTHORIZATION_FAILED.code
  • }

VersionedAcls类中封装了Acl对象集合和zkVersion信息:

kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
  • // Acl对象集合以及zkVersion
  • private case class VersionedAcls(acls: Set[Acl], zkVersion: Int)

Acl类的源码如下:

kafka.security.auth.Acl
  • /**
  • * An instance of this class will represent an acl that can express following statement.
  • * <pre>
  • * Principal P has permissionType PT on Operation O1 from hosts H1.
  • * </pre>
  • * @param principal A value of *:* indicates all users.
  • * @param permissionType Allow或者Deny
  • * @param host A value of * indicates all hosts.
  • * @param operation A value of ALL indicates all operations. 有Read、Write、Create、Delete、Alter、Describe、ClusterAction、All八种取值
  • */
  • case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host: String, operation: Operation) {
  • /**
  • * TODO: Ideally we would have a symmetric toJson method but our current json library can not jsonify/dejsonify complex objects.
  • * @return Map representation of the Acl.
  • */
  • def toMap(): Map[String, Any] = {
  • Map(Acl.PrincipalKey -> principal.toString,
  • Acl.PermissionTypeKey -> permissionType.name,
  • Acl.OperationKey -> operation.name,
  • Acl.HostsKey -> host)
  • }
  • override def toString: String = {
  • "%s has %s permission for operations: %s from hosts: %s".format(principal, permissionType.name, operation, host)
  • }
  • }

Acl类中的字段其实与ZooKeeper中记录的ACLs信息一一对应,在Zookeeper的/kafka-acl/[resource_type]/[resource_name]节点中分别记录者对应的ACLs信息,例如对于主题“test”来说,存储关于它的ACLs信息的节点就是/kafka-acl/Topic/test,一般该节点的信息如下:

  • {
  • "version": 1,
  • "acls":[
  • {
  • "principal": "User:jack",
  • "permissionType": "Allow",
  • "operation": "Read",
  • "host": "*"
  • },
  • {
  • "principal": "User:mike",
  • "permissionType": "Allow",
  • "operation": "Write",
  • "host": "*"
  • }
  • ]
  • }

上面的ACLs信息表示用户“jack”和“mike”分别拥有对主题“test”的读和写权限。

2.2. 权限修改的监听

在上面介绍过,SimpleAclAuthorizer的configure(...)方法会创建ZkNodeChangeNotificationListener监听器,并调用其init()方法,它会在ZooKeeper的/kafka-acl-changes节点上注册NodeChangeListener监听器,用来监听其子节点的变化,并注册ZkStateChangeListener监听器监听与Zookeeper的连接状态的变化。

ZkNodeChangeNotificationListener类的源码如下:

kafka.common.ZkNodeChangeNotificationListener
  • /**
  • * A listener that subscribes to seqNodeRoot for any child changes where all children are assumed to be sequence node
  • * with seqNodePrefix. When a child is added under seqNodeRoot this class gets notified, it looks at lastExecutedChange
  • * number to avoid duplicate processing and if it finds an unprocessed child, it reads its data and calls supplied
  • * notificationHandler's processNotification() method with the child's data as argument. As part of processing these changes it also
  • * purges any children with currentTime - createTime > changeExpirationMs.
  • *
  • * The caller/user of this class should ensure that they use zkClient.subscribeStateChanges and call processAllNotifications
  • * method of this class from ZkStateChangeListener's handleNewSession() method. This is necessary to ensure that if zk session
  • * is terminated and reestablished any missed notification will be processed immediately.
  • *
  • * 对于权限控制来说:
  • * 会在ZooKeeper的/kafka-acl-changes节点上注册NodeChangeListener,用来监听其子节点的变化。
  • * 当通过kafka-acks命令增删ACLs信息时,除了修改/kafka-acl路径下的ACLs数据,
  • * 还会在/kafka-acl-changes路径下添加一个持久顺序节点,节点名称的前缀是acl_changes_字符串,该节点中记录的数据是修改的资源类型和资源名称。
  • * 之后NodeChangeListener会被触发,它会根据节点名称重新加载相应资源的ACLs信息到aclCache集合中。
  • *
  • * @param zkUtils
  • * @param seqNodeRoot 指定监听的路径,对于权限控制来说是/kafka-acl-changes
  • * @param seqNodePrefix 持久顺序节点的前缀,对于权限控制来说是acl_changes_
  • * @param notificationHandler 当监听到seqNodeRoot路径下子节点集合发生变化时,执行的响应操作
  • * @param changeExpirationMs 如果顺序节点创建后,超过该值指定的时间,则认为可以被删除,默认值为15分钟
  • * @param time
  • */
  • class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
  • private val seqNodeRoot: String,
  • private val seqNodePrefix: String,
  • private val notificationHandler: NotificationHandler,
  • private val changeExpirationMs: Long = 15 * 60 * 1000,
  • private val time: Time = SystemTime) extends Logging {
  • // 记录上次处理的顺序节点的编号
  • private var lastExecutedChange = -1L
  • private val isClosed = new AtomicBoolean(false)
  • /**
  • * create seqNodeRoot and begin watching for any new children nodes.
  • */
  • def init() {
  • // 确保seqNodeRoot所指定的路径存在
  • zkUtils.makeSurePersistentPathExists(seqNodeRoot)
  • // 注册NodeChangeListener,监听seqNodeRoot的子节点的变化
  • zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
  • // 注册ZkStateChangeListener,监听与ZooKeeper的连接状态变化
  • zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
  • // 处理seqNodeRoot的子节点
  • processAllNotifications()
  • }
  • ...
  • }

NodeChangeListener是ZkNodeChangeNotificationListener的内部对象,实现了IZkChildListener接口,当/kafka-acl-changes节点的子节点发生变化时,会触发其handleChildChange(...)方法,而该方法内部会调用ZkNodeChangeNotificationListener的processNotifications(...)方法;源码如下:

kafka.common.ZkNodeChangeNotificationListener.NodeChangeListener
  • /**
  • * A listener that gets invoked when a node is created to notify changes.
  • */
  • object NodeChangeListener extends IZkChildListener {
  • override def handleChildChange(path: String, notifications: java.util.List[String]) {
  • try {
  • import scala.collection.JavaConverters._
  • if (notifications != null)
  • // 该方法是ZkNodeChangeNotificationListener的
  • processNotifications(notifications.asScala.sorted)
  • } catch {
  • case e: Exception => error(s"Error processing notification change for path = $path and notification= $notifications :", e)
  • }
  • }
  • }

ZkStateChangeListener也是ZkNodeChangeNotificationListener的内部对象,实现了IZkStateListener接口,该监听器会在与Zookeeper建立新的Session时,调用ZkNodeChangeNotificationListener的processAllNotifications(...)方法;源码如下:

kafka.common.ZkNodeChangeNotificationListener.NodeChangeListener
  • object ZkStateChangeListener extends IZkStateListener {
  • override def handleNewSession() {
  • // 该方法是ZkNodeChangeNotificationListener的
  • processAllNotifications
  • }
  • override def handleSessionEstablishmentError(error: Throwable) {
  • fatal("Could not establish session with zookeeper", error)
  • }
  • override def handleStateChanged(state: KeeperState) {
  • debug(s"New zookeeper state: ${state}")
  • }
  • }

当使用kafka-acls.sh脚本增删ACLs信息时,最终会修改Zookeeper的/kafka-acl路径下的ACLs数据,还会在/kafka-acl-changes路径下添加一个持久顺序节点,节点名称的前缀是acl_changes_字符串,该节点中记录的数据是修改的资源类型和资源名称。之后NodeChangeListener会被触发,它会根据节点名称重新加载相应资源的ACLs信息到aclCache集合中。

其中最终被执行的ZkNodeChangeNotificationListener的processNotifications(...)processAllNotifications(...)方法的源码如下:

kafka.common.ZkNodeChangeNotificationListener
  • /**
  • * Process the given list of notifications
  • */
  • private def processNotifications(notifications: Seq[String]) {
  • if (notifications.nonEmpty) {
  • info(s"Processing notification(s) to $seqNodeRoot")
  • try {
  • val now = time.milliseconds
  • // 遍历子节点集合
  • for (notification <- notifications) {
  • // 获取子节点编号
  • val changeId = changeNumber(notification)
  • // 检测此子节点是否已经处理过
  • if (changeId > lastExecutedChange) {
  • val changeZnode = seqNodeRoot + "/" + notification
  • // 读取子节点状态和其中记录的数据
  • val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
  • // 调用NotificationHandler的processNotification()方法实现更新
  • data map (notificationHandler.processNotification(_)) getOrElse (logger.warn(s"read null data from $changeZnode when processing notification $notification"))
  • }
  • // 记录处理的子节点编号
  • lastExecutedChange = changeId
  • }
  • // 删除过期节点
  • purgeObsoleteNotifications(now, notifications)
  • } catch {
  • case e: ZkInterruptedException =>
  • if (!isClosed.get)
  • throw e
  • }
  • }
  • }
  • /**
  • * Process all changes
  • * 当NodeChangeListener和ZkStateChangeListener被触发时,
  • * 都会调用processAllNotifications()方法处理seqNodeRoot的子节点
  • */
  • def processAllNotifications() {
  • val changes = zkUtils.zkClient.getChildren(seqNodeRoot)
  • processNotifications(changes.asScala.sorted)
  • }

可见,notificationHandlerprocessNotification(...)方法会处理ALCs的更新,而purgeObsoleteNotifications(...)会删除过期节点;最终新的ACLs信息都会被更新到aclCache集合中;其中notificationHandler实际上由AclChangedNotificationHandler实例化,它的processNotification(...)方法如下:

kafka.security.auth.SimpleAclAuthorizer.AclChangedNotificationHandler
  • object AclChangedNotificationHandler extends NotificationHandler {
  • override def processNotification(notificationMessage: String) {
  • val resource: Resource = Resource.fromString(notificationMessage)
  • inWriteLock(lock) {
  • // 从Zookeeper监听器传递过来的信息中读取指定的资源的ACLs信息
  • val versionedAcls = getAclsFromZk(resource)
  • // 将新的VersionedAcls更新到aclCache集合中
  • updateCache(resource, versionedAcls)
  • }
  • }
  • }

purgeObsoleteNotifications(...)则出自ZkNodeChangeNotificationListener类:

kafka.common.ZkNodeChangeNotificationListener#purgeObsoleteNotifications
  • /**
  • * Purges expired notifications.
  • * @param now
  • * @param notifications
  • */
  • private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
  • for (notification <- notifications.sorted) {
  • // 读取节点状态信息和其中记录的数据
  • val notificationNode = seqNodeRoot + "/" + notification
  • val (data, stat) = zkUtils.readDataMaybeNull(notificationNode)
  • if (data.isDefined) {
  • // 检测节点是否过期
  • if (now - stat.getCtime > changeExpirationMs) {
  • debug(s"Purging change notification $notificationNode")
  • // 删除节点
  • zkUtils.deletePath(notificationNode)
  • }
  • }
  • }
  • }

3. 权限验证

前面提到过,在KafkaApis所有处理请求的以handle开头的方法中,都会调用KafkaApis的authorize()方法进行权限控制,最终其实调用了所有Authorizer的authorize(...)方法进行验证。这里Authorizer的具体实现类是SimpleAclAuthorizer,它的authorize(...)方法中会将传入的客户端对应的身份信息以及请求操作的资源信息与自己的aclCache集合中保存的ACLs信息进行匹配,决定是否有权限操作相应资源,源码如下:

kafka.security.auth.SimpleAclAuthorizer#authorize
  • // 针对客户端传入的身份信息及请求操作的资源信息决定是否授权
  • override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
  • // 获取用户身份,即username
  • val principal = session.principal
  • // 获取用户Host地址
  • val host = session.clientAddress.getHostAddress
  • // 获取指定资源的ACLs信息
  • val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
  • //check if there is any Deny acl match that would disallow this operation.
  • // 检测是否存在Deny类型的ACLs信息
  • val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls)
  • //if principal is allowed to read or write we allow describe by default, the reverse does not apply to Deny.
  • // 如果有Describe权限,则默认提供Read和Write权限
  • val ops = if (Describe == operation)
  • Set[Operation](operation, Read, Write)
  • else
  • Set[Operation](operation)
  • //now check if there is any allow acl that will allow this operation.
  • // 检测是否存在Allow类型的ACLs信息
  • val allowMatch = ops.exists(operation => aclMatch(session, operation, resource, principal, host, Allow, acls))
  • //we allow an operation if a user is a super user or if no acls are found and user has configured to allow all users
  • //when no acls are found or if no deny acls are found and at least one allow acls matches.
  • // 检测是否是超级管理员,检测是否开启了shouldAllowEveryoneIfNoAclIsFound,检测之前的匹配是否成功
  • val authorized = isSuperUser(operation, resource, principal, host) ||
  • isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
  • (!denyMatch && allowMatch)
  • logAuditMessage(principal, authorized, operation, resource, host)
  • authorized
  • }

其中aclMatch(...)方法的实现如下:

kafka.security.auth.SimpleAclAuthorizer#aclMatch
  • private def aclMatch(session: Session, operations: Operation, resource: Resource, principal: KafkaPrincipal, host: String, permissionType: PermissionType, acls: Set[Acl]): Boolean = {
  • // 在传入的acls集合中查找
  • acls.find ( acl =>
  • acl.permissionType == permissionType // 匹配PermissionType
  • && (acl.principal == principal || acl.principal == Acl.WildCardPrincipal) // 匹配身份信息以及通配符
  • && (operations == acl.operation || acl.operation == All) // 匹配操作以及通配符
  • && (acl.host == host || acl.host == Acl.WildCardHost) // 匹配主机名以及通配符
  • ).map { acl: Acl =>
  • authorizerLogger.debug(s"operation = $operations on resource = $resource from host = $host is $permissionType based on acl = $acl")
  • true
  • }.getOrElse(false)
  • }

SimpleAclAuthorizer的authorize(...)方法会将验证结果返回给上层KafkaApis中用于处理请求的以handle开头的方法,将对结果的控制交由具体的业务逻辑。