大数据
流式处理
Kafka
源码解析

Kafka系列 24 - 服务端源码分析 15:身份认证

简介:主要讲解Kafka中权限认证的配置和身份认证的具体流程

1. 概览和配置

从Kafka的0.9版本开始提供“身份认证(Authentication)”和“权限控制(Authorization)”的功能。“身份认证”在Kafka中的具体体现是服务器是否允许与当前请求的客户端建立连接,默认情况下“身份认证”机制是不开启的,“权限控制”则体现在对消息的读写等方面的权限上。

  • 身份认证:指的是客户端(生产者或消费者)通过某些凭据,例如用户名/密码或是SSL证书,让服务端确认客户端的真实身份。
  • 权限控制:是指的是服务端根据客户端的身份,决定其对某些资源是否有某些操作权限。

对于“身份认证”,客户端与服务端支持使用SSL、SASL/PLAIN(0.10开始支持)等方式进行连接,客户端包括生产者、消费者、其他的Broker、工具类等。

  • SSL(Secure Sockets Layer)是一种基于传输层(比如TCP/IP)或应用层(比如HTTP)的协议。SSL协议依赖于数字证书,数字证书中的核心组成部分是私钥和公钥两部分。SSL分为“握手协议”和“传输协议”两部分,其中“握手协议”是基于非对称加密的,而“传输协议”是基于对称加密的。“握手协议”的主要目的是为了在客户端与服务端之间协商和交换对称密钥。之所以这么做是因为非对称密钥的算法比较复杂,速度较慢,不适合对大量数据进行加密,而相较之下,对称加密速度较快,适合对大量数据加密。
  • SASL(Simple Authentication and Security Layer)是一种用来扩充C/S模式验证能力的认证机制,它定义了客户端和服务端之间数据的交换方式,但是并没指定数据的内容。其中,SASL/PLAIN是最简单的、也是最危险的机制,因为用户名和密码都是以明文的形式在网络中传输的,别人可以轻松地从网络中截取这些信息。一般情况,在安全的网络环境下考虑使用此机制,当然也可以将用户名密码进行加密以提高安全性。

“权限控制”功能作为插件形式提供,Kafka提供了一个基于ZooKeeper实现的Authorizer,将所有的ACLs(Access Control Lists,访问控制列表)信息保存在ZooKeeper中,在ACLs中指定了用户对某些资源有某些权限。

  • 资源控制:Kafka提供的“权限控制”针对访问资源的分类有Cluster、Topic和Consumer Group三种。
  • 操作控制:对于操作来说,Kafka提供了Read、Write、Create、Delete、Alter、Describe、ClusterAction、All八个粒度。
  • 控制语义:对于各类操作来说,Kafka提供了Allow和Deny两种语义,分别表示允许和禁止。

1.1. SASL/PLAIN认证的配置

如果想要开启并使用“身份认证”,需要在Kafka服务端和客户端分别做不同的配置。

对于服务端的配置,步骤如下:

  1. 修改server.properties配置文件。

我们需要对Kafka的${KAFKA_HOME}/config/server.properties配置文件进行修改,内容如下:

${KAFKA_HOME}/config/server.properties
  • listeners=SASL_PLAINTEXT://HOST:PORT
  • security.inter.broker.protocol=SASL_PLAINTEXT
  • sasl.mechanism.inter.broker.protocol=PLAIN
  • sasl.enabled.mechanisms=PLAIN
  • authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  • super.users=User:admin

其中最后一行的super.users=User:admin代表配置超级用户admin,如果要配置多个超级账户,可以将其用;分隔:

  • super.users=User:admin;User:root
  1. 提供kafka_server_jaas.conf配置文件。

另外,还需要提供一个名为kafka_server_jaas.conf的配置文件,放置位置可以自由指定,但需要让Kafka应用有访问权限,这里直接将其放置在${KAFKA_HOME}/config/目录,该文件的内容如下:

  • KafkaServer {
  • org.apache.kafka.common.security.plain.PlainLoginModule required
  • username="admin"
  • password="admin"
  • user_admin="admin"
  • user_root="root";
  • user_jack="jack";
  • };

这里我们配置了三个用户admin、root和jack,密码分别为admin、root和jack。注意,此处的user_jack="jack"等号前面的“jack”是用户名,等号后面的字符串“jack”是密码。

  1. 传递配置给Kafka应用。

我们可以在启动Kafka服务端时,将第2步配置的kafka_server_jaas.conf文件以JVM参数的方式传递给Kafka应用的java.security.auth.login.config环境变量,如:

  • -Djava.security.auth.login.config="${KAFKA_HOME}/config/kafka_server_jaas.conf"

也可以通过在${KAFKA_HOME/bin/kafka-run-class.sh脚本中添加以下内容直接添加JVM参数

  • KAFKA_SASL_OPTS='-Djava.security.auth.login.config=${KAFKA_HOME}/config/kafka_server_jaas.conf'
  • # Launch mode
  • if [ "x$DAEMON_MODE" = "xtrue" ]; then
  • nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
  • else
  • exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
  • fi

至此,Kafka服务端的配置就完成了。

对于客户端,我们也需要做一些改动,步骤如下:

  1. 提供kafka_client_jaas.conf配置文件。

我们需要在编写生产者或消费者客户端时提供一个kafka_client_jaas.conf配置文件,文件放置在客户端代码可以访问的位置,内容如下:

  • KafkaClient {
  • org.apache.kafka.common.security.plain.PlainLoginModule required
  • username="jack"
  • password="jack"
  • };
  1. 编写Kafka客户端代码。

有了kafka_client_jaas.conf配置文件后,我们在编写客户端代码时,需要将该文件配置到环境变量中:

  • // 环境变量添加,需要输入配置文件的路径
  • System.setProperty("java.security.auth.login.config", "../kafka_client_jaas.conf");
  • Properties props = new Properties();
  • // 配置认证方式
  • props.put("security.protocol", "SASL_PLAINTEXT");
  • props.put("sasl.mechanism", "PLAIN");

此时执行客户端,可能会抛出如下异常:

  • org.apache.kafka.common.errors.TopicAuthorizationException:
  • Not authorized to access topics: [test]

这是因为在默认情况下,不指定具体的权限就表示禁止此权限。这里并未给jack用户分配读写test这个主题的权限,默认是禁止的;使用kafka-acl.sh脚本进行权限分配之后,客户端即可正常运行,具体命令如下:

  • kafka-acl.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  • --add \
  • --allow-principal User:jack \
  • --operation Read --operation Write \
  • --topic test

1.2. 权限控制的配置

Kafka中对于权限控制的配置一般是通过kafka-acl.sh脚本实现的,前面提到过,权限控制是针对于Cluster、Topic和Consumer Group三种资源而言的,提供了Read、Write、Create、Delete、Alter、Describe、ClusterAction、All八个操作粒度。下面介绍几种常用的配置操作。

1.2.1. 添加权限

kafka-acls.sh脚本通过--add为特定用户添加权限:

  • # 为用户jack在test主题上添加读写的权限
  • $ > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  • --add \
  • --allow-principal User:jack \
  • --operation Read \
  • --operation Write \
  • --topic test
  • # 对于test主题,拒绝来自IP为198.168.199.1账户为mike进行Read操作,其他用户都允许
  • $ > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  • --add \
  • --allow-principal User:* \
  • --allow-host * \
  • --deny-principal User:mike \
  • --deny-host 198.168.199.1 \
  • --operation Read --topic test
  • # 对于test主题,为mike和jack添加允许来自IP为198.168.199.1或者198.168.199.2的读写请求
  • $ > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  • --add \
  • --allow-principal User:mike \
  • --allow-principal User:jack \
  • --allow-host 198.168.199.1 \
  • --allow-host 198.168.199.2 \
  • --operation Read --operation Write \
  • --topic test

注:有些场景下,允许的ACLs信息非常多,但禁止的ACLs信息很少,我们可以使用--deny-principal指定那些需要禁止的ACLs信息。如上的第二个例子所示。

为了用户方便快速地完成权限配置,kafka-acls.sh脚本提供了--producer--consumer这两个特殊的参数。--producer参数表示指定用户可以像生产者一样使用主题,其本质是WRITE、DESCRIBE、CREATE等权限的集合。--consumer表示用户可以像消费者一样使用主题,其本质是READ、DESCRIBE等权限的集合。示例命令如下:

  • # 为用户jack添加对test主题的生产者所需的功能
  • $ > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  • --add \
  • --allow-principal User:jack \
  • --producer \
  • --topic test
  • # 为用户mike添加对test主题的消费者所需的功能
  • $ > bin/kafka-acls.bat --authorizer-properties zookeeper.connect=localhost:2181 \
  • --add \
  • --allow-principal User:mike \
  • --consumer \
  • --topic test

1.2.2. 查看权限

kafka-acls.sh脚本通过--list为查看特定资源的权限:

  • # 列出test主题的所有权限
  • $ > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  • --list \
  • --topic test

输出如下:

  • Current ACLs for resource `Topic:test`:
  • User:jack has Allow permission for operations: Describe from hosts: *
  • User:jack has Allow permission for operations: Read from hosts: *
  • User:mike has Allow permission for operations: Write from hosts: *

1.2.3. 移除权限

kafka-acls.sh脚本通过--remove移除之前指定的某些权限:

  • # 对于test主题,移除用户jack和mike从IP为198.168.199.1或者198.168.199.2发起的读写请求
  • $ > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  • --remove \
  • --allow-principal User:jack \
  • --allow-principal User:mike \
  • --allow-host 198.168.199.1 \
  • --allow-host 198.168.199.2 \
  • --operation Read \
  • --operation Write \
  • --topic test

2. 身份认证

Kafka对于身份认证的实现其实使用了JAAS(Java Authentication Authorization Service)机制。JAAS在应用层与底层安全机制之间加入了一层抽象,简化了Java Security包之上的开发工作,可以为开发人员屏蔽掉具体使用的安全机制,而且当安全机制改变后,应用层也不需要修改安全相关的代码。上层应用的代码主要是面向LoginContext进行编程,在LoginContext下层是可动态配置的LoginModule组件,在LoginModule组件中封装了使用正确的安全机制进行验证的相关代码。

2.1. 客户端认证

2.1.1. 配置信息装载

回顾前面我们为客户端配置的kafka_client_jaas.conf文件的内容,如下:

  • KafkaClient {
  • org.apache.kafka.common.security.plain.PlainLoginModule required
  • username="jack"
  • password="jack"
  • };

可见Kafka客户端使用的LoginModule是org.apache.kafka.common.security.plain.PlainLoginModule组件,它的源码如下:

org.apache.kafka.common.security.plain.PlainLoginModule
  • public class PlainLoginModule implements LoginModule {
  • private static final String USERNAME_CONFIG = "username";
  • private static final String PASSWORD_CONFIG = "password";
  • static {
  • // 注册PlainSaslServerProvider,在PlainSaslServerProvider中则以Map的方式记录了PlainSaslServerFactory工厂类的名称
  • PlainSaslServerProvider.initialize();
  • }
  • @Override
  • public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
  • // 读取username
  • String username = (String) options.get(USERNAME_CONFIG);
  • if (username != null)
  • // 将username添加到Subject中
  • subject.getPublicCredentials().add(username);
  • // 读取password
  • String password = (String) options.get(PASSWORD_CONFIG);
  • if (password != null)
  • // 将password添加到Subject中
  • subject.getPrivateCredentials().add(password);
  • }
  • @Override
  • public boolean login() throws LoginException {
  • // 直接返回true
  • return true;
  • }
  • @Override
  • public boolean logout() throws LoginException {
  • // 直接返回true
  • return true;
  • }
  • @Override
  • public boolean commit() throws LoginException {
  • // 直接返回true
  • return true;
  • }
  • @Override
  • public boolean abort() throws LoginException {
  • // 直接返回false
  • return false;
  • }
  • }

在PlainLoginModule组件中并没有具体的认证操作,仅仅是读取了配置文件中的用户名和密码并设置到Subject中。Subject对象表示的是一个主体,也就是对资源访问的发起者,其中有三个字段principalspubCredentialsprivCredentials,分别表示主体的身份、公钥/用户名、私钥/密码,该类是由JDK的javax.security.auth包提供的:

javax.security.auth.Subject
  • public final class Subject implements java.io.Serializable {
  • ...
  • // 主体的身份
  • Set<Principal> principals;
  • // 公钥/用户名
  • transient Set<Object> pubCredentials;
  • // 私钥/密
  • transient Set<Object> privCredentials;
  • ...
  • }

使用JAAS时,应用层的代码主要是操纵LoginContext对象,在Kafka中使用Login接口对LoginContext进行了又一次封装;Login接口定义如下:

org.apache.kafka.common.security.auth.Login
  • /**
  • * Login interface for authentication.
  • */
  • public interface Login {
  • /**
  • * Configures this login instance.
  • * 配置Login对象
  • */
  • void configure(Map<String, ?> configs, String loginContextName);
  • /**
  • * Performs login for each login module specified for the login context of this instance.
  • * 调用LoginModule的login()方法和commit()方法
  • */
  • LoginContext login() throws LoginException;
  • /**
  • * Returns the authenticated subject of this login context.
  • * 返回PlainLoginModule中配置好的Subject对象
  • */
  • Subject subject();
  • /**
  • * Returns the service name to be used for SASL.
  • * 返回服务名称,在DefaultLogin实现中始终返回"kafka"字符串
  • */
  • String serviceName();
  • /**
  • * Closes this instance.
  • */
  • void close();
  • }

在SASL/PLAIN身份认证的场景下,使用的是DefaultLogin实现,其具体实现都继承自AbstractLogin抽象类,源码如下:

org.apache.kafka.common.security.authenticator.AbstractLogin
  • public abstract class AbstractLogin implements Login {
  • private static final Logger log = LoggerFactory.getLogger(AbstractLogin.class);
  • /**
  • * 指明了使用LoginContext的名称,Kafka客户端中该字段值为KafkaClient,
  • * 与kafka_client_jaas.conf配置文件第一行的"KafkaClient"匹配才能找到此LoginContext
  • */
  • private String loginContextName;
  • // LogContext对象,与loginContextName指定的名称对应,其中可以包含多个LoginModule
  • private LoginContext loginContext;
  • @Override
  • public void configure(Map<String, ?> configs, String loginContextName) {
  • this.loginContextName = loginContextName;
  • }
  • @Override
  • public LoginContext login() throws LoginException {
  • String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM); // java.security.auth.login.config
  • // 是否指定了java.security.auth.login.config配置,该值配置的即是配置文件的路径,如./config/kafka_client_jaas.conf
  • if (jaasConfigFile == null) {
  • log.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is not set, using default JAAS configuration.");
  • }
  • AppConfigurationEntry[] configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
  • // 检测是否能找到"KafkaClient"这个LoginContext的配置
  • if (configEntries == null) {
  • String errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
  • JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
  • throw new IllegalArgumentException(errorMessage);
  • }
  • // 创建LoginContext对象
  • loginContext = new LoginContext(loginContextName, new LoginCallbackHandler());
  • // 调用LoginContext的login()方法完成认证
  • loginContext.login();
  • log.info("Successfully logged in.");
  • return loginContext;
  • }
  • @Override
  • public Subject subject() {
  • // 返回PlainLoginModule中设置好用户名和密码的Subject对象
  • return loginContext.getSubject();
  • }
  • /**
  • * Callback handler for creating login context. Login callback handlers
  • * should support the callbacks required for the login modules used by
  • * the KafkaServer and KafkaClient contexts. Kafka does not support
  • * callback handlers which require additional user input.
  • *
  • */
  • public static class LoginCallbackHandler implements CallbackHandler {
  • @Override
  • public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
  • for (Callback callback : callbacks) {
  • if (callback instanceof NameCallback) {
  • NameCallback nc = (NameCallback) callback;
  • nc.setName(nc.getDefaultName());
  • } else if (callback instanceof PasswordCallback) {
  • String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" +
  • " client code does not currently support obtaining a password from the user.";
  • throw new UnsupportedCallbackException(callback, errorMessage);
  • } else if (callback instanceof RealmCallback) {
  • RealmCallback rc = (RealmCallback) callback;
  • rc.setText(rc.getDefaultText());
  • } else {
  • throw new UnsupportedCallbackException(callback, "Unrecognized SASL Login callback");
  • }
  • }
  • }
  • }
  • }

AbstractLogin中的loginContextName指明了使用LoginContext的名称,Kafka客户端中该字段值为KafkaClient,与kafka_client_jaas.conf配置文件第一行的“KafkaClient”匹配才能找到此LoginContext。loginContext字段是一个LogContext对象,与loginContextName指定的名称对应,其中可以包含多个LoginModule。

当调用LogContext的login()方法时,会依次调用所有的LoginModule(这里的配置文件中只有PlainLoginModule)对象的login()方法和commit()方法来完成认证操作。

DefaultLogin对象时在LoginManager中创建的,它会根据是否开启了Kerberos认证来决定创建哪种Login对象:

org.apache.kafka.common.security.authenticator.LoginManager
  • public class LoginManager {
  • private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap<>(LoginType.class);
  • private final Login login;
  • private final LoginType loginType;
  • private int refCount;
  • private LoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs) throws IOException, LoginException {
  • this.loginType = loginType;
  • String loginContext = loginType.contextName();
  • // 根据是否使用Kerberos来选择Login对象
  • login = hasKerberos ? new KerberosLogin() : new DefaultLogin();
  • // 配置
  • login.configure(configs, loginContext);
  • // 登录
  • login.login();
  • }
  • ...
  • }

LoginManager的核心逻辑就是创建根据配置创建Login的子类对象,并调用其login()方法。

上面涉及到的组件其实只是读取了kafka_client_jaas.conf文件的内容并将其记录到相关的实例中,没有完成任何认证操作。同时我们也能够理解,LoginManager其实是开启上述流程的首要组件,因此我们需要关注LoginManager在何时被创建。

2.1.2. 身份认证的接入

在KafkaProducer和KafkaConsumer两个组件中,都是使用NetworkClient进行网络连接和通信的,NetworkClient底层依赖于Kafka自定义的Selector组件,它对Java NIO的Selector进行了封装,底层管理者多个KafkaChannel对象,而KafkaChannel内部维护着一个TransportLayer对象用于真正的网络通信,TransportLayer继承自JDK NIO中提供的ScatteringByteChannel和GatheringByteChannel两个Channel接口,提供了聚集和分散的读写功能。

KafkaChannel都是通过Kafka提供的ChannelBuilder创建的,回顾源码:

  • @SuppressWarnings({"unchecked", "deprecation"})
  • private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
  • ...
  • // 根据配置的协议,创建不同的ChannelBuilder
  • ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
  • // 创建NetworkClient网络I/O核心
  • NetworkClient client = new NetworkClient(
  • new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
  • this.metadata,
  • clientId,
  • config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
  • config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
  • config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
  • config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
  • this.requestTimeoutMs, time);
  • ...
  • }

ChannelBuilder是一个接口,它有三个实现类:PlaintextChannelBuilder、SaslChannelBuilder和SslChannelBuilder。上面ClientUtils的createChannelBuilder(...)方法会根据config.values()中配置的协议选择使用对应的ChannelBuilder实现类,该方法源码如下:

org.apache.kafka.clients.ClientUtils#createChannelBuilder
  • /**
  • * @param configs client/server configs
  • * @return configured ChannelBuilder based on the configs.
  • */
  • public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
  • // 根据"security.protocol"配置项的值,得到对应的SecurityProtocol对象,也即是创建生产者或消费者时设置的props.put("security.protocol", "SASL_PLAINTEXT");
  • SecurityProtocol securityProtocol = SecurityProtocol.forName((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); // security.protocol
  • // 检查是否支持,可支持的有PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL
  • if (!SecurityProtocol.nonTestingValues().contains(securityProtocol))
  • throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
  • // // 根据"ssasl.mechanism"配置项的值,也即是创建生产者或消费者时设置的props.put("sasl.mechanism", "PLAIN");
  • String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM); // ssasl.mechanism
  • // 调用ChannelBuilders的create()方法创建对应的ChannelBuilder对象
  • return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs, clientSaslMechanism, true);
  • }

该方法内部调用了ChannelBuilders的create(...)方法,传递的mode参数是Mode.CLIENT表示客户端,loginType参数是LoginType.CLIENT,即字符串“KafkaClient”:

org.apache.kafka.common.network.ChannelBuilders#create
  • /**
  • * @param securityProtocol the securityProtocol
  • * @param mode the mode, it must be non-null if `securityProtocol` is not `PLAINTEXT`;
  • * it is ignored otherwise
  • * @param loginType the loginType, it must be non-null if `securityProtocol` is SASL_*; it is ignored otherwise
  • * @param configs client/server configs
  • * @param clientSaslMechanism SASL mechanism if mode is CLIENT, ignored otherwise
  • * @param saslHandshakeRequestEnable flag to enable Sasl handshake requests; disabled only for SASL
  • * inter-broker connections with inter-broker protocol version < 0.10
  • * @return the configured `ChannelBuilder`
  • * @throws IllegalArgumentException if `mode` invariants described above is not maintained
  • */
  • public static ChannelBuilder create(SecurityProtocol securityProtocol,
  • Mode mode,
  • LoginType loginType,
  • Map<String, ?> configs,
  • String clientSaslMechanism,
  • boolean saslHandshakeRequestEnable) {
  • ChannelBuilder channelBuilder;
  • // 根据不同的SecurityProtocol进行分别处理
  • switch (securityProtocol) {
  • case SSL:
  • requireNonNullMode(mode, securityProtocol);
  • channelBuilder = new SslChannelBuilder(mode);
  • break;
  • case SASL_SSL:
  • case SASL_PLAINTEXT:
  • requireNonNullMode(mode, securityProtocol);
  • if (loginType == null)
  • throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
  • if (mode == Mode.CLIENT && clientSaslMechanism == null)
  • throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
  • channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable);
  • break;
  • case PLAINTEXT:
  • case TRACE:
  • channelBuilder = new PlaintextChannelBuilder();
  • break;
  • default:
  • throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
  • }
  • // 调用ChannelBuilder的configure()方法配置ChannelBuilder对象
  • channelBuilder.configure(configs);
  • return channelBuilder;
  • }

可见,对于SASL_PLAINTEXT协议,该方法最终创建的是SaslChannelBuilder实现类的实例,同时在该方法的最后直接调用了创建的SaslChannelBuilder实例的configure(...)方法。

我们先了解一下SaslChannelBuilder类的定义和重要字段,如下:

org.apache.kafka.common.network.SaslChannelBuilder
  • public class SaslChannelBuilder implements ChannelBuilder {
  • ...
  • // 使用的安全协议,也即是props.put("security.protocol", "SASL_PLAINTEXT");配置的
  • private final SecurityProtocol securityProtocol;
  • // 使用过的SASL机制,也即是props.put("sasl.mechanism", "PLAIN");配置的
  • private final String clientSaslMechanism;
  • // 标识当前是客户端还是服务端,枚举值:CLIENT和SERVER
  • private final Mode mode;
  • // 枚举值:CLIENT和SERVER,分别是"KafkaClient"和"KafkaServer"
  • private final LoginType loginType;
  • // 是否发送握手消息
  • private final boolean handshakeRequestEnable;
  • // 用于封装LoginContext的LogManager对象
  • private LoginManager loginManager;
  • // 配置信息
  • private Map<String, ?> configs;
  • ...
  • }

在SaslChannelBuilder的configure(...)方法中,会创建LoginManager对象,在LoginManager的构造方法中会创建Login对象并调用其login()方法:

org.apache.kafka.common.network.SaslChannelBuilder#configure
  • public void configure(Map<String, ?> configs) throws KafkaException {
  • try {
  • this.configs = configs;
  • boolean hasKerberos;
  • if (mode == Mode.SERVER) {
  • List<String> enabledMechanisms = (List<String>) this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS);
  • hasKerberos = enabledMechanisms == null || enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM);
  • } else {
  • hasKerberos = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM);
  • }
  • if (hasKerberos) {
  • String defaultRealm;
  • try {
  • defaultRealm = JaasUtils.defaultKerberosRealm();
  • } catch (Exception ke) {
  • defaultRealm = "";
  • }
  • @SuppressWarnings("unchecked")
  • List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES);
  • if (principalToLocalRules != null)
  • kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
  • }
  • // 创建LoginManager对象,并调用其Login对象的login()方法
  • this.loginManager = LoginManager.acquireLoginManager(loginType, hasKerberos, configs);
  • if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
  • // Disable SSL client authentication as we are using SASL authentication
  • this.sslFactory = new SslFactory(mode, "none");
  • this.sslFactory.configure(configs);
  • }
  • } catch (Exception e) {
  • throw new KafkaException(e);
  • }
  • }

从上面的流程我们就可以理清身份认证的接入时机和LoginManager的创建时机了。

2.1.3. Authenticator的构建

KafkaProducer将由ClientUtils创建的SaslChannelBuilder对象传递给了Selector对象,而在Selector的connect()方法发起网络连接时,会调用SaslChannelBuilder的buildChannel()方法创建KafkaChannel对象,然后与服务端进行通信交互:

org.apache.kafka.common.network.Selector#connect
  • public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
  • ...
  • // 注册OP_CONNECT键
  • SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
  • // 创建KafkaChannel对象,调用ChannelBuilder的buildChannel()方法创建KafkaChannel对象,之后会连接服务端进行交互
  • KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
  • // 将创建的KafkaChannel作为SelectionKey键对象的附件
  • key.attach(channel);
  • // 将创建的KafkaChannel装入channels字典中,键为链接id,值为KafkaChannel实例
  • this.channels.put(id, channel);
  • ...
  • }

SaslChannelBuilder的buildChannel()方法的源码如下:

  • public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
  • try {
  • SocketChannel socketChannel = (SocketChannel) key.channel();
  • // 创建PlaintextTransportLayer对象,它表示底层连接,其中封装了SocketChannel和SelectionKey
  • TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
  • Authenticator authenticator;
  • // 创建Authenticator对象,这是完成认证操作的关键
  • if (mode == Mode.SERVER)
  • // 服务端创建的是SaslServerAuthenticator对象
  • authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer,
  • socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize);
  • else
  • // 客户端创建的是SaslClientAuthenticator对象
  • authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),
  • socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable);
  • // Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes.
  • // 通过configure()方法将TransportLayer作为参数传递过去,在SaslServerAuthenticator中会与服务端进行通信,完成身份认证
  • authenticator.configure(transportLayer, null, this.configs);
  • return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
  • } catch (Exception e) {
  • log.info("Failed to create channel due to ", e);
  • throw new KafkaException(e);
  • }
  • }

该过程分为以下两步:

  1. 创建TransportLayer对象,它表示底层连接,其中封装了SocketChannel和SelectionKey,该过程由buildTransportLayer(...)方法完成,该方法的源码表简单。由于这里使用的是SASL/PLAIN协议,所以创建的是PlaintextTransportLayer对象。
  2. 创建Authenticator对象,由于这里针对的是Mode.CLIENT客户端,因此创建的是SaslClientAuthenticator对象,其中会将LoginManager中保存的Subject传入。
  3. 最终将创建KafkaChannel实例,并将SaslClientAuthenticator对象绑定到KafkaChannel实例中并返回。

我们先关注SaslClientAuthenticator的定义和重要字段,如下:

  • public class SaslClientAuthenticator implements Authenticator {
  • ...
  • public enum SaslState {
  • SEND_HANDSHAKE_REQUEST, RECEIVE_HANDSHAKE_RESPONSE, INITIAL, INTERMEDIATE, COMPLETE, FAILED
  • }
  • // 表示用于身份认证的主体
  • private final Subject subject;
  • // assigned in `configure`
  • // javax.security包中提供的用于SASL身份认证的客户端接口
  • private SaslClient saslClient;
  • // 用于收集身份认证信息的回调函数
  • private AuthCallbackHandler callbackHandler;
  • // PlaintextTransportLayer对象,该字段表示底层的网络连接,其中封装了SocketChannel和SelectionKey
  • private TransportLayer transportLayer;
  • // buffers used in `authenticate`
  • // 读取身份认证信息的输入缓冲区
  • private NetworkReceive netInBuffer;
  • // 发送身份认证信息的输出缓冲区
  • private NetworkSend netOutBuffer;
  • // Current SASL state
  • // 标识当前SaslClientAuthenticator的状态
  • private SaslState saslState;
  • // Next SASL state to be set when outgoing writes associated with the current SASL state complete
  • // 在输出缓冲区中的内容全部清空前,由该字段暂存下一个saslState的值
  • private SaslState pendingSaslState;
  • ...
  • }

在上面创建SaslClientAuthenticator对象的同时调用它的的configure(...)方法配置状态、回调Handler和SaslClient对象等信息,configure(...)方法源码如下:

org.apache.kafka.common.security.authenticator.SaslClientAuthenticator#configure
  • public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) throws KafkaException {
  • try {
  • // PlaintextTransportLayer对象
  • this.transportLayer = transportLayer;
  • // 配置信息
  • this.configs = configs;
  • // 初始化saslState字段为SEND_HANDSHAKE_REQUEST,其pendingSaslState字段设为null
  • setSaslState(handshakeRequestEnable ? SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL);
  • // determine client principal from subject.
  • if (!subject.getPrincipals().isEmpty()) {
  • Principal clientPrincipal = subject.getPrincipals().iterator().next();
  • this.clientPrincipalName = clientPrincipal.getName();
  • } else {
  • clientPrincipalName = null;
  • }
  • // 用于收集认证信息的SaslClientCallbackHandler
  • callbackHandler = new SaslClientCallbackHandler();
  • callbackHandler.configure(configs, Mode.CLIENT, subject, mechanism);
  • // 创建SaslClient对象,使用SASL/PLAIN进行身份认证时,创建的是PlainClient对象
  • saslClient = createSaslClient();
  • } catch (Exception e) {
  • throw new KafkaException("Failed to configure SaslClientAuthenticator", e);
  • }
  • }

从上面的源码中可以得知,SaslClientAuthenticator的configure(...)方法中还会创建用于收集认证信息的SaslClientCallbackHandler对象和SaslClient对象,由于使用的是SASL/PLAIN进行身份认证,创建的SaslClient是PlainClient对象;而在创建PlainClient对象过程中,会调用SaslClientCallbackHandler的handle(...)方法搜集认证需要的信息,并保存到PlainClient中:

org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler#handle
  • // 在创建PlainClient对象时,会调用该方法收集认证需要的信息,并保存到PlainClient中
  • @Override
  • public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
  • for (Callback callback : callbacks) {
  • if (callback instanceof NameCallback) {
  • NameCallback nc = (NameCallback) callback;
  • // 获取用户名
  • if (!isKerberos && subject != null && !subject.getPublicCredentials(String.class).isEmpty()) {
  • nc.setName(subject.getPublicCredentials(String.class).iterator().next());
  • } else
  • nc.setName(nc.getDefaultName());
  • } else if (callback instanceof PasswordCallback) {
  • // 获取密码
  • if (!isKerberos && subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) {
  • char [] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
  • ((PasswordCallback) callback).setPassword(password);
  • } else {
  • String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" +
  • " client code does not currently support obtaining a password from the user.";
  • if (isKerberos) {
  • errorMessage += " Make sure -Djava.security.auth.login.config property passed to JVM and" +
  • " the client is configured to use a ticket cache (using" +
  • " the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using" +
  • " FQDN of the Kafka broker you are trying to connect to.";
  • }
  • throw new UnsupportedCallbackException(callback, errorMessage);
  • }
  • } else if (callback instanceof RealmCallback) {
  • RealmCallback rc = (RealmCallback) callback;
  • rc.setText(rc.getDefaultText());
  • } else if (callback instanceof AuthorizeCallback) {
  • AuthorizeCallback ac = (AuthorizeCallback) callback;
  • String authId = ac.getAuthenticationID();
  • String authzId = ac.getAuthorizationID();
  • ac.setAuthorized(authId.equals(authzId));
  • // 认证通过,设置authorizedId
  • if (ac.isAuthorized())
  • ac.setAuthorizedID(authzId);
  • } else {
  • throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
  • }
  • }
  • }

2.1.4. 身份认证的主要流程

从上面的流程我们可以知道,KafkaChannel实例中是绑定了SaslClientAuthenticator认证器实例的,而在Kafka的Selector进行poll操作时,会针对第一次建立连接的KafkaChannel进行身份认证操作,调用栈如下:

  • SaslClientAuthenticator.authenticate()
  • ↖ KafkaChannel.prepare()
  • ↖ Selector.pollSelectionKeys(...)
  • ↖ Selector.poll(...)
  • ↖ Selector.send(...)
  • ↖ NetworkClient.doSend(...)
  • ↖ NetworkClient.send(...)

关键环节的源码片段如下:

  • // org.apache.kafka.common.network.Selector#pollSelectionKeys
  • // 处理IO操作
  • private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
  • ...
  • // 获取与SelectionKey绑定的KafkaChannel
  • KafkaChannel channel = channel(key);
  • ...
  • /* if channel is not ready finish prepare */
  • if (channel.isConnected() && !channel.ready())
  • // 身份认证
  • channel.prepare();
  • ...
  • }
  • // org.apache.kafka.common.network.KafkaChannel#prepare
  • /**
  • * Does handshake of transportLayer and authentication using configured authenticator
  • */
  • public void prepare() throws IOException {
  • if (!transportLayer.ready())
  • transportLayer.handshake();
  • if (transportLayer.ready() && !authenticator.complete())
  • // 实现认证
  • authenticator.authenticate();
  • }

因此我们可以知道,SaslClientAuthenticator的authenticate()就是身份认证的入口,该方法源码如下:

org.apache.kafka.common.security.authenticator.SaslClientAuthenticator#authenticate
  • /**
  • * Sends an empty message to the server to initiate the authentication process. It then evaluates server challenges
  • * via `SaslClient.evaluateChallenge` and returns client responses until authentication succeeds or fails.
  • *
  • * The messages are sent and received as size delimited bytes that consists of a 4 byte network-ordered size N
  • * followed by N bytes representing the opaque payload.
  • */
  • public void authenticate() throws IOException {
  • // 发送缓冲区中还有未发送的数据,则需要先将这些数据发送完毕
  • if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
  • return;
  • switch (saslState) {
  • case SEND_HANDSHAKE_REQUEST:
  • String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
  • currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, clientId, correlationId++);
  • // 创建并发送SaslHandshakeRequest握手消息
  • SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest(mechanism);
  • send(RequestSend.serialize(currentRequestHeader, handshakeRequest.toStruct()));
  • // 切换为RECEIVE_HANDSHAKE_RESPONSE状态
  • setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
  • break;
  • case RECEIVE_HANDSHAKE_RESPONSE:
  • // 读取SaslHandshakeResponse响应
  • byte[] responseBytes = receiveResponseOrToken();
  • if (responseBytes == null) // 未读取到一个完整的消息,跳出等待下次读取
  • break;
  • else {
  • // 读取到完整数据
  • try {
  • // 解析SaslHandshakeResponse响应,如果服务端返回了非零的错误码则抛出异常,否则正常返回
  • handleKafkaResponse(currentRequestHeader, responseBytes);
  • currentRequestHeader = null;
  • } catch (Exception e) {
  • setSaslState(SaslState.FAILED);
  • throw e;
  • }
  • // 切换为INITIAL状态
  • setSaslState(SaslState.INITIAL);
  • // Fall through and start SASL authentication using the configured client mechanism
  • }
  • // 由于这里没有break操作,且saslState切换为了INITIAL状态,因此还会继续下面的分支
  • case INITIAL:
  • // 发送空的byte数组,初始化身份认证流程
  • sendSaslToken(new byte[0], true);
  • // 设置为INTERMEDIATE状态
  • setSaslState(SaslState.INTERMEDIATE);
  • break;
  • case INTERMEDIATE:
  • // 读取服务端返回的Challenge信息
  • byte[] serverToken = receiveResponseOrToken();
  • if (serverToken != null) { // 读取到完整的Challenge信息
  • // 处理Challenge信息
  • sendSaslToken(serverToken, false);
  • }
  • // 身份认证通过
  • if (saslClient.isComplete()) {
  • // 切换为COMPLETE状态
  • setSaslState(SaslState.COMPLETE);
  • transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
  • }
  • break;
  • case COMPLETE:
  • break;
  • case FAILED:
  • throw new IOException("SASL handshake failed");
  • }
  • }

从整体源码可知,authenticate()方法是根据saslState的状态分别进行操作的,因此我们先了解saslState的状态转换。saslState字段的类型是SaslState枚举,它有SEND_HANDSHAKE_REQUEST、RECEIVE_HANDSHAKE_RESPONSE、INITIAL、INTERMEDIATE、COMPLETE和FAILED六种枚举值,这六种状态之间的转换示意图如下:

1.客户端SaslState转换示意图.png

在创建SaslClientAuthenticator对象时,会根据handshakeRequestEnable参数决定saslState的初始状态,一般情况下saslState会被初始为SEND_HANDSHAKE_REQUEST。

接下来我们重点分析authenticate()方法的各个分支。

  1. 在初始状态时,saslState为SEND_HANDSHAKE_REQUEST,此时会创建SaslHandshakeRequest,其中记录了当前客户端使用的SASL机制,并将此请求发送给向服务端,并切换成RECEIVE_HANDSHAKE_RESPONSE状态。这里使用了send(...)方法负责发送输出缓冲区中的数据,源码如下:
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator
  • private void send(ByteBuffer buffer) throws IOException {
  • try {
  • // 将待发送数据封装成NetworkSend对象
  • netOutBuffer = new NetworkSend(node, buffer);
  • // 尝试刷新缓冲区并发送数据
  • flushNetOutBufferAndUpdateInterestOps();
  • } catch (IOException e) {
  • setSaslState(SaslState.FAILED);
  • throw e;
  • }
  • }
  • private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
  • // 检查缓冲区的数据是否都发送完,如果没有就将其发送后再次检测并将结果返回
  • boolean flushedCompletely = flushNetOutBuffer();
  • if (flushedCompletely) { // 缓冲区数据已发送完
  • // 移除OP_WRITE事件
  • transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
  • // 将暂存的下一个saslState的值设置到saslState上
  • if (pendingSaslState != null)
  • setSaslState(pendingSaslState);
  • } else
  • // 缓冲区数据还未发送完,继续关注OP_WRITE事件
  • transportLayer.addInterestOps(SelectionKey.OP_WRITE);
  • return flushedCompletely;
  • }
  • private boolean flushNetOutBuffer() throws IOException {
  • if (!netOutBuffer.completed()) {
  • // 向SocketChannel中写数据
  • netOutBuffer.writeTo(transportLayer);
  • }
  • return netOutBuffer.completed();
  • }
  1. 在RECEIVE_HANDSHAKE_RESPONSE状态下,客户端会接收服务端返回的SaslHandshakeResponse,在其中记录了错误码以及服务端支持的SASL机制集合。如果服务端返回了非零的错误码则抛出异常,并切换成FAILED状态;否则切换为INITIAL状态。其中receiveResponseOrToken()用于从SocketChannel中读取一个完整的消息,handleKafkaResponse(...)则负责解析SaslHandshakeResponse响应,它们的源码如下:
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator
  • // 从SocketChannel中读取一个完整的消息
  • private byte[] receiveResponseOrToken() throws IOException {
  • // 创建缓冲区
  • if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
  • // 从SocketChannel中读取数据
  • netInBuffer.readFrom(transportLayer);
  • byte[] serverPacket = null;
  • if (netInBuffer.complete()) { // 完成后才会状态数据到serverPacket
  • netInBuffer.payload().rewind();
  • serverPacket = new byte[netInBuffer.payload().remaining()];
  • netInBuffer.payload().get(serverPacket, 0, serverPacket.length);
  • // 清空缓冲区
  • netInBuffer = null; // reset the networkReceive as we read all the data.
  • }
  • return serverPacket;
  • }
  • private void handleKafkaResponse(RequestHeader requestHeader, byte[] responseBytes) {
  • Struct struct;
  • ApiKeys apiKey;
  • try {
  • struct = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader);
  • apiKey = ApiKeys.forId(requestHeader.apiKey());
  • } catch (SchemaException | IllegalArgumentException e) {
  • LOG.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens");
  • throw new AuthenticationException("Invalid SASL mechanism response", e);
  • }
  • switch (apiKey) {
  • case SASL_HANDSHAKE:
  • handleSaslHandshakeResponse(new SaslHandshakeResponse(struct));
  • break;
  • default:
  • throw new IllegalStateException("Unexpected API key during handshake: " + apiKey);
  • }
  • }
  • private void handleSaslHandshakeResponse(SaslHandshakeResponse response) {
  • // 获取错误码
  • Errors error = Errors.forCode(response.errorCode());
  • switch (error) {
  • case NONE: // 无错误
  • break;
  • case UNSUPPORTED_SASL_MECHANISM: // 33,不支持的SASL mechanism
  • throw new UnsupportedSaslMechanismException(String.format("Client SASL mechanism '%s' not enabled in the server, enabled mechanisms are %s",
  • mechanism, response.enabledMechanisms()));
  • case ILLEGAL_SASL_STATE: // 34,非法的SASL状态
  • throw new IllegalSaslStateException(String.format("Unexpected handshake request with client mechanism %s, enabled mechanisms are %s",
  • mechanism, response.enabledMechanisms()));
  • default: // 其他
  • throw new AuthenticationException(String.format("Unknown error code %d, client mechanism is %s, enabled mechanisms are %s",
  • response.errorCode(), mechanism, response.enabledMechanisms()));
  • }
  • }
  1. 当切换为INITIAL状态后,会以一个空的字节数组作为初始Response发送给服务端,并切换为INTERMEDIATE状态。注意,由于Switch操作的RECEIVE_HANDSHAKE_RESPONSE分支是没有break操作的,也就是说RECEIVE_HANDSHAKE_RESPONSE状态处理完后会立即切换到INITIAL状态。

  2. 在INTERMEDIATE状态,会读取服务端返回的Challenge,之后调用SaslClient的evaluateChallenge()方法进行处理并得到Response。这一步中读取服务端返回的Challenge仍由receiveResponseOrToken(...)方法处理,而发送Response响应则有sendSaslToken(...)方法处理,该方法源码如下:

org.apache.kafka.common.security.authenticator.SaslClientAuthenticator
  • // 负责处理服务端发送过来的Challenge信息,并将得到的新Response信息发送给服务端
  • private void sendSaslToken(byte[] serverToken, boolean isInitial) throws IOException {
  • if (!saslClient.isComplete()) {
  • // 处理Challenge信息
  • byte[] saslToken = createSaslToken(serverToken, isInitial);
  • if (saslToken != null)
  • send(ByteBuffer.wrap(saslToken));
  • }
  • }
  • private byte[] createSaslToken(final byte[] saslToken, boolean isInitial) throws SaslException {
  • if (saslToken == null)
  • throw new SaslException("Error authenticating with the Kafka Broker: received a `null` saslToken.");
  • try {
  • // 初始Response的处理
  • if (isInitial && !saslClient.hasInitialResponse())
  • return saslToken;
  • else
  • return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
  • public byte[] run() throws SaslException {
  • // 调用SaslClient的evaluateChallenge()处理Challenge信息
  • return saslClient.evaluateChallenge(saslToken);
  • }
  • });
  • } catch (PrivilegedActionException e) {
  • String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
  • // Try to provide hints to use about what went wrong so they can fix their configuration.
  • // TODO: introspect about e: look for GSS information.
  • final String unknownServerErrorText =
  • "(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)";
  • if (e.toString().indexOf(unknownServerErrorText) > -1) {
  • error += " This may be caused by Java's being unable to resolve the Kafka Broker's" +
  • " hostname correctly. You may want to try to adding" +
  • " '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment." +
  • " Users must configure FQDN of kafka brokers when authenticating using SASL and" +
  • " `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm`";
  • }
  • error += " Kafka Client will go to AUTH_FAILED state.";
  • //Unwrap the SaslException inside `PrivilegedActionException`
  • throw new SaslException(error, e.getCause());
  • }
  • }

可见,在上述的createSaslToken(...)方法中调用了saslClientevaluateChallenge(...)方法,也即是PlainClient的evaluateChallenge(...)方法。

  1. 如果已经验证通过则Response为空,不需要继续发送Response;否则继续发送Response。如果身份认证通过,则将状态切换为COMPLETE,并且不再关注TransportLayer上的OP_WRITE事件;否则继续维持INTERMEDIATE状态。

2.2. 服务端认证

服务端和客户端一样,都是使用NetworkClient组件来管理网络连接,ChannelBuilder和KafkaChannel的创建过程与客户端相同。Kafka在服务端实现SASL/PLAIN身份认证的方式也与客户端的类似,也是在KafkaChannel的prepare()方法中通过调用Authenticator的authenticate()方法实现的,但是使用的Authenticator接口的实现类不同,客户端使用的是SaslClientAuthenticator,而服务端则使用SaslServerAuthenticator;这一点在SaslChannelBuilder的buildChannel(...)方法中有所体现:

org.apache.kafka.common.network.SaslChannelBuilder#buildChannel
  • public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
  • try {
  • SocketChannel socketChannel = (SocketChannel) key.channel();
  • // 创建PlaintextTransportLayer对象,它表示底层连接,其中封装了SocketChannel和SelectionKey
  • TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
  • Authenticator authenticator;
  • // 创建Authenticator对象,这是完成认证操作的关键
  • if (mode == Mode.SERVER)
  • // 服务端创建的是SaslServerAuthenticator对象
  • authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer,
  • socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize);
  • else
  • // 客户端创建的是SaslClientAuthenticator对象
  • authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),
  • socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable);
  • // Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes.
  • // 通过configure()方法将TransportLayer作为参数传递过去,在SaslServerAuthenticator中会与服务端进行通信,完成身份认证
  • authenticator.configure(transportLayer, null, this.configs);
  • return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
  • } catch (Exception e) {
  • log.info("Failed to create channel due to ", e);
  • throw new KafkaException(e);
  • }
  • }

相较于SaslClientAuthenticator,服务端使用SaslServerAuthenticator的多了一个重要的字段saslServer,它用于SASL身份认证的服务端接口,在服务端使用的是Kafka提供的实现类PlainSaslServer:

  • public class SaslServerAuthenticator implements Authenticator {
  • ...
  • public enum SaslState {
  • GSSAPI_OR_HANDSHAKE_REQUEST, HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED
  • }
  • // 表示用于身份认证的主体
  • private final Subject subject;
  • // Current SASL state
  • // 标识当前SaslServerAuthenticator的状态
  • private SaslState saslState = SaslState.GSSAPI_OR_HANDSHAKE_REQUEST;
  • // Next SASL state to be set when outgoing writes associated with the current SASL state complete
  • private SaslState pendingSaslState = null;
  • // 用于SASL身份认证的服务端接口,在服务端使用的是Kafka提供的实现类PlainSaslServer
  • private SaslServer saslServer;
  • // assigned in `configure`
  • // PlaintextTransportLayer对象,该字段表示底层的网络连接,其中封装了SocketChannel和SelectionKey
  • private TransportLayer transportLayer;
  • // 记录了服务端支持的SASL机制
  • private Set<String> enabledMechanisms;
  • // buffers used in `authenticate`
  • // 读取身份认证信息的输入缓冲区
  • private NetworkReceive netInBuffer;
  • // 发送身份认证信息的输出缓冲区
  • private NetworkSend netOutBuffer;
  • ...
  • }

SaslServerAuthenticator的configure(...)方法主要用来初始化enabledMechanisms字段的内容:

org.apache.kafka.common.security.authenticator.SaslServerAuthenticator#configure
  • public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) {
  • // 初始化transportLayer
  • this.transportLayer = transportLayer;
  • // 初始化配置信息
  • this.configs = configs;
  • // 读取配置信息,初始化enabledMechanisms字段
  • List<String> enabledMechanisms = (List<String>) this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS); // sasl.enabled.mechanisms
  • if (enabledMechanisms == null || enabledMechanisms.isEmpty())
  • throw new IllegalArgumentException("No SASL mechanisms are enabled");
  • this.enabledMechanisms = new HashSet<>(enabledMechanisms);
  • }

与SaslClientAuthenticator类似,SaslServerAuthenticator的authenticate()方法主要用于服务端的验证操作,该方法的源码如下:

  • /**
  • * Evaluates client responses via `SaslServer.evaluateResponse` and returns the issued challenge to the client until
  • * authentication succeeds or fails.
  • *
  • * The messages are sent and received as size delimited bytes that consists of a 4 byte network-ordered size N
  • * followed by N bytes representing the opaque payload.
  • */
  • public void authenticate() throws IOException {
  • // 如果输出缓冲区中还有未发送完成的数据,则先将这些数据发送出去
  • if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
  • return;
  • // 检测认证是否已经完成
  • if (saslServer != null && saslServer.isComplete()) {
  • // 如果完成就将saslState设置为COMPLETE并直接返回
  • setSaslState(SaslState.COMPLETE);
  • return;
  • }
  • // 创建输入缓冲区
  • if (netInBuffer == null) netInBuffer = new NetworkReceive(maxReceiveSize, node);
  • // 从SocketChannel中读取数据
  • netInBuffer.readFrom(transportLayer);
  • if (netInBuffer.complete()) { // 检测是否读取了一个完整的信息
  • netInBuffer.payload().rewind();
  • byte[] clientToken = new byte[netInBuffer.payload().remaining()];
  • // 获取消息载荷
  • netInBuffer.payload().get(clientToken, 0, clientToken.length);
  • // 重置输入缓冲区
  • netInBuffer = null; // reset the networkReceive as we read all the data.
  • try {
  • switch (saslState) {
  • // HANDSHAKE_REQUEST和GSSAPI_OR_HANDSHAKE_REQUEST状态都会处理握手消息
  • case HANDSHAKE_REQUEST:
  • handleKafkaRequest(clientToken);
  • break;
  • case GSSAPI_OR_HANDSHAKE_REQUEST:
  • if (handleKafkaRequest(clientToken))
  • break;
  • // For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet.
  • // This is required for interoperability with 0.9.0.x clients which do not send handshake request
  • case AUTHENTICATE:
  • // 调用PlainSaslServer的evaluateResponse()处理客户端发送过来的Response信息
  • byte[] response = saslServer.evaluateResponse(clientToken);
  • if (response != null) {
  • // 返回Challenge信息
  • netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(response));
  • flushNetOutBufferAndUpdateInterestOps();
  • }
  • // When the authentication exchange is complete and no more tokens are expected from the client,
  • // update SASL state. Current SASL state will be updated when outgoing writes to the client complete.
  • // 认证成功,切换状态为COMPLETE
  • if (saslServer.isComplete())
  • setSaslState(SaslState.COMPLETE);
  • break;
  • default:
  • break;
  • }
  • } catch (Exception e) {
  • // 出现异常,切换状态为FAILED
  • setSaslState(SaslState.FAILED);
  • throw new IOException(e);
  • }
  • }
  • }

SaslServerAuthenticator的authenticate()方法也对应着几种SaslState状态的切换,但不同的是,在服务端SaslState状态有GSSAPI_OR_HANDSHAKE_REQUEST、HANDSHAKE_REQUEST、AUTHENTICATE、COMPLETE和FAILED五种,它们的转换示意图如下:

2.服务端SaslState转换示意图.png

authenticate()方法的大致步骤是:首先处理客户端发来的SaslHandshakeRequest,验证服务端是否支持请求中指定的SASL机制,向客户端返回SaslHandshakeResponse,并将状态切换为AUTHENTICATE。之后,处理客户端发来的Response信息,如果身份认证失败,则生成相应的Challenge信息返回给客户端;如果成功,则不返回任何数据,只是将状态切换成COMPLETE。上述过程中出现任何异常,都会将状态切换成FAILED并抛出异常。

在GSSAPI_OR_HANDSHAKE_REQUEST和HANDSHAKE_REQUEST状态时,SaslServerAuthenticator的authenticate()方法都会调用handleKafkaRequest(...)方法中会检测mechanism、切换状态,并在握手成功后根据指定的SASL机制创建SaslServer,即PlainSaslServer;在身份认证未通过之前,handleKafkaRequest(...)方法会完成ApiVersionsRequest的处理,当身份认证通过之后则由KafkaApis来完成。返回的ApiVersionResponse中记录了错误码、Broker支持的所有APIKeys以及对应的版本号范围。该方法的源码如下:

org.apache.kafka.common.security.authenticator.SaslServerAuthenticator#handleKafkaRequest
  • /**
  • * 处理HANDSHAKE_REQUEST和GSSAPI_OR_HANDSHAKE_REQUEST状态下的Response,检测mechanism、切换状态
  • * 在握手成功后根据指定的SASL机制创建SaslServer,即PlainSaslServer
  • */
  • private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
  • boolean isKafkaRequest = false;
  • String clientMechanism = null;
  • try {
  • ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
  • RequestHeader requestHeader = RequestHeader.parse(requestBuffer);
  • ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
  • // A valid Kafka request header was received. SASL authentication tokens are now expected only
  • // following a SaslHandshakeRequest since this is not a GSSAPI client token from a Kafka 0.9.0.x client.
  • // 切换状态
  • setSaslState(SaslState.HANDSHAKE_REQUEST);
  • isKafkaRequest = true;
  • if (!Protocol.apiVersionSupported(requestHeader.apiKey(), requestHeader.apiVersion())) { // 检测apiKey和version是否合法
  • if (apiKey == ApiKeys.API_VERSIONS)
  • sendKafkaResponse(requestHeader, ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION));
  • else
  • throw new UnsupportedVersionException("Version " + requestHeader.apiVersion() + " is not supported for apiKey " + apiKey);
  • } else { // apiKey和version合法
  • // 解析请求得到对应的请求对象
  • AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), requestBuffer);
  • LOG.debug("Handle Kafka request {}", apiKey);
  • // 根据apiKey分别处理
  • switch (apiKey) {
  • case API_VERSIONS: // 返回Broker支持的所有的ApiKey
  • handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
  • break;
  • case SASL_HANDSHAKE: // 检测服务端是否支持客户端指定的mechanism,并返回响应
  • clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request);
  • break;
  • default:
  • throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
  • }
  • }
  • } catch (SchemaException | IllegalArgumentException e) {
  • if (saslState == SaslState.GSSAPI_OR_HANDSHAKE_REQUEST) {
  • // SchemaException is thrown if the request is not in Kafka format. IIlegalArgumentException is thrown
  • // if the API key is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
  • // starting with 0x60, revert to GSSAPI for both these exceptions.
  • if (LOG.isDebugEnabled()) {
  • StringBuilder tokenBuilder = new StringBuilder();
  • for (byte b : requestBytes) {
  • tokenBuilder.append(String.format("%02x", b));
  • if (tokenBuilder.length() >= 20)
  • break;
  • }
  • LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
  • }
  • if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
  • LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
  • clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
  • } else
  • throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
  • } else
  • throw e;
  • }
  • if (clientMechanism != null) {
  • // 如果SASL机制检测通过,则创建PlainSaslServer
  • createSaslServer(clientMechanism);
  • // 切换为AUTHENTICATE状态
  • setSaslState(SaslState.AUTHENTICATE);
  • }
  • return isKafkaRequest;
  • }

handleKafkaRequest(...)方法中,handleApiVersionsRequest(...)方法和handleHandshakeRequest(...)方法中都会调用sendKafkaResponse(...)方法将响应发送给客户端,具体实现与SaslClientAuthenticator的send(...)方法类似。

handleKafkaRequest(...)方法的最后,如果需要使用SaslServer,会通过createSaslServer(...)方法查找到PlainSaslServerFactory工厂类,并通过反射的方式创建其对象,然后创建PlainSaslServer对象并赋值给saslServer字段:

org.apache.kafka.common.security.authenticator.SaslServerAuthenticator#createSaslServer
  • // 创建SaslServer对象
  • private void createSaslServer(String mechanism) throws IOException {
  • this.saslMechanism = mechanism;
  • callbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration(), kerberosNamer);
  • callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);
  • if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) { // GSSAPI
  • if (subject.getPrincipals().isEmpty())
  • throw new IllegalArgumentException("subject must have at least one principal");
  • saslServer = createSaslKerberosServer(callbackHandler, configs);
  • } else { // 其他
  • try {
  • saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
  • public SaslServer run() throws SaslException {
  • // 调用Sasl的createSaslServer()方法
  • return Sasl.createSaslServer(saslMechanism, "kafka", host, configs, callbackHandler);
  • }
  • });
  • } catch (PrivilegedActionException e) {
  • throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
  • }
  • }
  • }

其中Sasl的createSaslServer(...)方法源码如下:

  • public static SaslServer
  • createSaslServer(String mechanism,
  • String protocol,
  • String serverName,
  • Map<String,?> props,
  • javax.security.auth.callback.CallbackHandler cbh)
  • throws SaslException {
  • SaslServer mech = null;
  • SaslServerFactory fac;
  • String className;
  • if (mechanism == null) {
  • throw new NullPointerException("Mechanism name cannot be null");
  • } else if (mechanism.length() == 0) {
  • return null;
  • }
  • String mechFilter = "SaslServerFactory." + mechanism;
  • // 查找PlainSaslServerProvider
  • Provider[] provs = Security.getProviders(mechFilter);
  • for (int j = 0; provs != null && j < provs.length; j++) {
  • // 查找工厂类
  • className = provs[j].getProperty(mechFilter);
  • if (className == null) {
  • throw new SaslException("Provider does not support " +
  • mechFilter);
  • }
  • // 使用反射的方法创建PlainSaslServerFactory对象
  • fac = (SaslServerFactory) loadFactory(provs[j], className);
  • if (fac != null) {
  • // 使用PlainSaslServerFactory工厂对象创建PlainSaslServer对象
  • mech = fac.createSaslServer(
  • mechanism, protocol, serverName, props, cbh);
  • if (mech != null) {
  • return mech;
  • }
  • }
  • }
  • return null;
  • }

上述代码中fac是SaslServerFactory类型的工厂,这里使用的是PlainSaslServerFactory工厂对象;该对象由在前面介绍的PlainLoginModule的静态代码块注册PlainSaslServerProvider提供:

org.apache.kafka.common.security.plain.PlainLoginModule
  • public class PlainLoginModule implements LoginModule {
  • ...
  • static {
  • // 注册PlainSaslServerProvider,在PlainSaslServerProvider中则以Map的方式记录了PlainSaslServerFactory工厂类的名称
  • PlainSaslServerProvider.initialize();
  • }
  • ...
  • }

而在PlainSaslServerProvider中则以Map的方式记录了PlainSaslServerFactory工厂类的名称:

org.apache.kafka.common.security.plain.PlainSaslServerProvider
  • public class PlainSaslServerProvider extends Provider {
  • private static final long serialVersionUID = 1L;
  • protected PlainSaslServerProvider() {
  • super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN Server Provider for Kafka");
  • // 记录PlainSaslServerFactory工厂类
  • super.put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, PlainSaslServerFactory.class.getName());
  • }
  • public static void initialize() {
  • // 注册
  • Security.addProvider(new PlainSaslServerProvider());
  • }
  • }

PlainSaslServer的evaluateResponse(...)方法实现了对客户端的Response信息的处理:

org.apache.kafka.common.security.plain.PlainSaslServer#evaluateResponse
  • @Override
  • public byte[] evaluateResponse(byte[] response) throws SaslException {
  • /*
  • * Message format (from https://tools.ietf.org/html/rfc4616):
  • *
  • * message = [authzid] UTF8NUL authcid UTF8NUL passwd
  • * authcid = 1*SAFE ; MUST accept up to 255 octets
  • * authzid = 1*SAFE ; MUST accept up to 255 octets
  • * passwd = 1*SAFE ; MUST accept up to 255 octets
  • * UTF8NUL = %x00 ; UTF-8 encoded NUL character
  • *
  • * SAFE = UTF1 / UTF2 / UTF3 / UTF4
  • * ;; any UTF-8 encoded Unicode character except NUL
  • */
  • String[] tokens;
  • try {
  • // 解析
  • tokens = new String(response, "UTF-8").split("\u0000");
  • } catch (UnsupportedEncodingException e) {
  • throw new SaslException("UTF-8 encoding not supported", e);
  • }
  • if (tokens.length != 3)
  • throw new SaslException("Invalid SASL/PLAIN response: expected 3 tokens, got " + tokens.length);
  • authorizationID = tokens[0];
  • String username = tokens[1];
  • String password = tokens[2];
  • // 检测username和password是否为空
  • if (username.isEmpty()) {
  • throw new SaslException("Authentication failed: username not specified");
  • }
  • if (password.isEmpty()) {
  • throw new SaslException("Authentication failed: password not specified");
  • }
  • /**
  • * 在使用SASL/PAIN方式进行身份认证时,authorizationID为空,会被赋值为username,
  • * 在后面权限控制时,会通过该字段确定其权限
  • */
  • if (authorizationID.isEmpty())
  • authorizationID = username;
  • try {
  • // 读取配置文件中的信息,JAAS_USER_PREFIX为字符串"user_"
  • String expectedPassword = JaasUtils.jaasConfig(LoginType.SERVER.contextName(), JAAS_USER_PREFIX + username);
  • // 检测密码是否正确
  • if (!password.equals(expectedPassword)) {
  • throw new SaslException("Authentication failed: Invalid username or password");
  • }
  • } catch (IOException e) {
  • throw new SaslException("Authentication failed: Invalid JAAS configuration", e);
  • }
  • complete = true;
  • return new byte[0];
  • }