1. 权限控制的接入
在上一篇文章中我们了解到,“身份认证”功能的接入是在KafkaChannel中以Authenticator验证器实现的,但Kafka中“权限控制”的接入则被放置到了网络层。
Kafka服务端的网络层中,Acceptor会处理客户端的通信连接请求,然后讲具体的数据读写操作交给了Processor池处理,Processor最终会将请求通过KafkaApis接口层传递给各个Handler事件处理器。
在Processor的processCompletedReceives()
方法中,会将通过“身份认证”后的客户端的身份封装成Session对象,与读取到的请求信息一起放入RequestChannel中等待Handler线程处理,该方法的源码如下:
而在KafkaServer的启动过程中,会读取server.properties文件中的authorizer.class.name
配置项,反射创建指定的Authorizer对象,调用其configure(...)
方法,同时将其传递给KafkaApis对象,回顾我们之前在server.properties中的配置:
以及KafkaServer的startup()
初始化方法源码:
在KafkaApis所有处理请求的以handle
开头的方法中,都会调用KafkaApis的authorize()
方法进行权限控制,该方法源码如下:
可见KafkaApis的authorize(...)
其实调用了所有Authorizer的authorize(...)
方法进行验证。
2. 权限验证器
在server.properties文件中,我们将authorizer.class.name
配置为了kafka.security.auth.SimpleAclAuthorizer,该类的定义和重要字段如下:
2.1. 权限验证器的初始化
前面讲到过,在KafkaServer初始化时,该类的configure(...)
方法就会被调用,这个方法的源码如下:
可见,在configure(...)
方法中,会初始化超级用户的信息、shouldAllowEveryoneIfNoAclIsFound
字段,调用loadCache()
方法将Zookeeper中的ACLs信息加载到aclCache
字段中,注册ZkNodeChangeNotificationListener监听器监听Zookeeper的/kafka-acl-changes
节点。
我们先关注其中的loadCache()
方法,它会遍历ZooKeeper的/kafka-acl
节点中记录的ACLs信息,并将其加载到aclCache
集合中,源码如下:
可见,aclCache
中会以Resource对象为键,VersionedAcls对象为值管理ACLs信息。
在上一篇文章中提到过,Kafka对权限控制分为资源控制、操作控制和控制语义,这里的Resource类就用于表示资源控制的范围,它封装了资源类型和资源名称,源码如下:
其中,resourceType
参数的类型ResourceType是一个特质,它有Cluster、Topic、Group三个实现类:
VersionedAcls类中封装了Acl对象集合和zkVersion
信息:
Acl类的源码如下:
Acl类中的字段其实与ZooKeeper中记录的ACLs信息一一对应,在Zookeeper的/kafka-acl/[resource_type]/[resource_name]
节点中分别记录者对应的ACLs信息,例如对于主题“test”来说,存储关于它的ACLs信息的节点就是/kafka-acl/Topic/test
,一般该节点的信息如下:
- {
- "version": 1,
- "acls":[
- {
- "principal": "User:jack",
- "permissionType": "Allow",
- "operation": "Read",
- "host": "*"
- },
- {
- "principal": "User:mike",
- "permissionType": "Allow",
- "operation": "Write",
- "host": "*"
- }
- ]
- }
上面的ACLs信息表示用户“jack”和“mike”分别拥有对主题“test”的读和写权限。
2.2. 权限修改的监听
在上面介绍过,SimpleAclAuthorizer的configure(...)
方法会创建ZkNodeChangeNotificationListener监听器,并调用其init()
方法,它会在ZooKeeper的/kafka-acl-changes
节点上注册NodeChangeListener监听器,用来监听其子节点的变化,并注册ZkStateChangeListener监听器监听与Zookeeper的连接状态的变化。
ZkNodeChangeNotificationListener类的源码如下:
NodeChangeListener是ZkNodeChangeNotificationListener的内部对象,实现了IZkChildListener接口,当/kafka-acl-changes
节点的子节点发生变化时,会触发其handleChildChange(...)
方法,而该方法内部会调用ZkNodeChangeNotificationListener的processNotifications(...)
方法;源码如下:
ZkStateChangeListener也是ZkNodeChangeNotificationListener的内部对象,实现了IZkStateListener接口,该监听器会在与Zookeeper建立新的Session时,调用ZkNodeChangeNotificationListener的processAllNotifications(...)
方法;源码如下:
当使用kafka-acls.sh脚本增删ACLs信息时,最终会修改Zookeeper的/kafka-acl
路径下的ACLs数据,还会在/kafka-acl-changes
路径下添加一个持久顺序节点,节点名称的前缀是acl_changes_
字符串,该节点中记录的数据是修改的资源类型和资源名称。之后NodeChangeListener会被触发,它会根据节点名称重新加载相应资源的ACLs信息到aclCache集合中。
其中最终被执行的ZkNodeChangeNotificationListener的processNotifications(...)
和processAllNotifications(...)
方法的源码如下:
可见,notificationHandler
的processNotification(...)
方法会处理ALCs的更新,而purgeObsoleteNotifications(...)
会删除过期节点;最终新的ACLs信息都会被更新到aclCache
集合中;其中notificationHandler
实际上由AclChangedNotificationHandler实例化,它的processNotification(...)
方法如下:
purgeObsoleteNotifications(...)
则出自ZkNodeChangeNotificationListener类:
3. 权限验证
前面提到过,在KafkaApis所有处理请求的以handle
开头的方法中,都会调用KafkaApis的authorize()
方法进行权限控制,最终其实调用了所有Authorizer的authorize(...)
方法进行验证。这里Authorizer的具体实现类是SimpleAclAuthorizer,它的authorize(...)
方法中会将传入的客户端对应的身份信息以及请求操作的资源信息与自己的aclCache
集合中保存的ACLs信息进行匹配,决定是否有权限操作相应资源,源码如下:
其中aclMatch(...)
方法的实现如下:
SimpleAclAuthorizer的authorize(...)
方法会将验证结果返回给上层KafkaApis中用于处理请求的以handle
开头的方法,将对结果的控制交由具体的业务逻辑。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...