大数据
流式处理
源码解析
Spark

Spark源码分析14 - 存储体系07:存储层(4)磁盘存储与序列化

简介:在Spark的存储体系中,磁盘管理由DiskBlockManager磁盘管理器实现,它负责为逻辑的数据块与数据在磁盘的写入位置建立映射关系。

1. 磁盘存储

1.1. 磁盘数据块管理器

在Spark的存储体系中,磁盘管理由DiskBlockManager磁盘管理器实现,它负责为逻辑的数据块与数据在磁盘的写入位置建立映射关系。DiskBlockManager通过二维的目录结构对数据块的存储进行组织。有了这些了解我们先来看一下DiskBlockManager的定义和重要字段:

org.apache.spark.storage.DiskBlockManager
  • /**
  • * Creates and maintains the logical mapping between logical blocks and physical on-disk
  • * locations. One block is mapped to one file with a name given by its BlockId.
  • *
  • * Block files are hashed among the directories listed in spark.local.dir (or in
  • * SPARK_LOCAL_DIRS, if it's set).
  • *
  • * 磁盘块管理器。对磁盘上的文件及目录的读写操作进行管理。
  • * 负责为逻辑的Block与数据写入磁盘的位置之间建立逻辑的映射关系。
  • *
  • * @param conf SparkConf对象
  • * @param deleteFilesOnStop 停止DiskBlockManager的时候是否删除本地目录的布尔类型标记。
  • * 当不指定外部的ShuffleClient(即spark.shuffle.service.enabled属性为false)
  • * 或者当前实例是Driver时,此属性为true。
  • */
  • private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
  • // 磁盘存储DiskStore的本地子目录的数量。可以通过spark.diskStore.subDirectories属性配置,默认为64。
  • private[spark] val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64)
  • /* Create one local directory for each path mentioned in spark.local.dir; then, inside this
  • * directory, create multiple subdirectories that we will hash files into, in order to avoid
  • * having really large inodes at the top level.
  • *
  • * 本地目录的数组。
  • * 默认获取spark.local.dir属性或者系统属性java.io.tmpdir指定的目录,目录可能有多个。
  • * 并在每个路径下创建以blockmgr-为前缀,UUID为后缀的随机字符串的子目录,
  • * 例如,blockmgr-f4cf9ae6-9213-4178-98a7-11b4a1fe12c7。
  • **/
  • private[spark] val localDirs: Array[File] = createLocalDirs(conf)
  • if (localDirs.isEmpty) {
  • logError("Failed to create any local dir.")
  • System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
  • }
  • // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content
  • // of subDirs(i) is protected by the lock of subDirs(i)
  • /**
  • * DiskStore的本地子目录的二维数组:
  • * - 一维大小为spark.local.dir属性或者系统属性java.io.tmpdir指定的目录的个数。
  • * - 二维大小为subDirsPerLocalDir,即spark.diskStore.subDirectories指定的大小,默认为64。
  • * - 元素为File对象。
  • */
  • private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
  • ...
  • }

其中localDirs用于记录一级目录的存放位置,是由DiskBlockManager的createLocalDirs(...)方法创建,源码如下:

org.apache.spark.storage.DiskBlockManager#createLocalDirs
  • /**
  • * Create local directories for storing block data. These directories are
  • * located inside configured local directories and won't
  • * be deleted on JVM exit when using the external shuffle service.
  • */
  • private def createLocalDirs(conf: SparkConf): Array[File] = {
  • // 获取一级目录的路径,并进行flatMap
  • Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
  • try {
  • // 在每个一级目录下都创建名为"blockmgr-UUID字符串"的子目录
  • val localDir = Utils.createDirectory(rootDir, "blockmgr")
  • logInfo(s"Created local directory at $localDir")
  • // 返回创建的目录
  • Some(localDir)
  • } catch {
  • case e: IOException =>
  • logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
  • None
  • }
  • }
  • }

createLocalDirs(...)中使用的Utils.getConfiguredLocalDirs(...)方法通过SparkConf的多项配置决定最终读取的一级目录存放位置,顺序如下:

  1. 如果在YARN容器中运行,就以YARN容器LOCAL_DIRS配置的目录作为一级目录存放路径。
  2. 如果不在YARN容器中运行,但配置了SPARK_EXECUTOR_DIRS,则以SPARK_EXECUTOR_DIRS配置的目录作为一级目录存放路径。
  3. 如果不在YARN容器中运行,没有配置SPARK_EXECUTOR_DIRS,但配置了SPARK_LOCAL_DIRS,则以SPARK_EXECUTOR_DIRS配置的目录作为一级目录存放路径。
  4. 如果不在YARN容器中运行,没有配置SPARK_EXECUTOR_DIRS及SPARK_LOCAL_DIRS,但配置了MESOS_DIRECTORY且未开启外部Shuffle服务,则以MESOS_DIRECTORY配置的目录作为一级目录存放路径。
  5. 如果不在YARN容器中运行,没有配置SPARK_EXECUTOR_DIRS及SPARK_LOCAL_DIRS,但配置了MESOS_DIRECTORY且开启了外部Shuffle服务,则以spark.local.dir或系统属性java.io.tmpdir配置的目录作为一级目录存放路径。

一级目录的存放位置配置项中可以配置多个路径,createLocalDirs(...)方法会在每个路径下建立名为“blockmgr-UUID字符串”的目录作为一级目录。

DiskBlockManager的subDirsPerLocalDir字段规定了二级目录的最大数量,由spark.diskStore.subDirectories配置决定,默认为64个。

subDirs字段是一个二维数组,一维大小为一级目录个数,二维大小为subDirsPerLocalDir字段的值,即二级目录的最大数量,存储的元素是File对象。

DiskBlockManager针对本地文件提供了一些外界可以使用的方法,下面分别进行介绍。

1.1.1. 获取文件

DiskBlockManager提供了获取文件的数个方法,获取操作是面向文件名或BlockId的,获取的结果都是File对象。文件都是存放在二级目录中的,如果DiskBlockManager中没有对应的二级目录则会进行创建。

  1. 通过指定文件名获取文件

getFile(filename: String): File方法通过指定字符串形式的文件名获取指定的文件,源码如下:

org.apache.spark.storage.DiskBlockManager#getFile
  • /** Looks up a file by hashing it into one of our local subdirectories. */
  • // This method should be kept in sync with
  • // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
  • // 根据指定的文件名获取文件。
  • def getFile(filename: String): File = {
  • // Figure out which local directory it hashes to, and which subdirectory in that
  • // 获取文件名的非负哈希值
  • val hash = Utils.nonNegativeHash(filename)
  • // 按照Hash取余获取一级目录
  • val dirId = hash % localDirs.length
  • // 按照Hash取余获取二级目录
  • val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
  • // Create the subdirectory if it doesn't already exist
  • // 尝试获取对应的二级目录
  • val subDir = subDirs(dirId).synchronized {
  • val old = subDirs(dirId)(subDirId)
  • if (old != null) { // 目录不为空
  • old
  • } else { // 目录为空,需要创建新的目录
  • // 创建目录
  • val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
  • if (!newDir.exists() && !newDir.mkdir()) {
  • throw new IOException(s"Failed to create local dir in $newDir.")
  • }
  • // 记录到subDirs数组中
  • subDirs(dirId)(subDirId) = newDir
  • newDir
  • }
  • }
  • // 构造File对象返回
  • new File(subDir, filename)
  • }

从该方法的源码来看,定位一级目录和二级目录的方式是通过文件名的非负哈希值进行一系列取余操作,最终会从二维数组subDirs中根据两级目录的索引来获取对应的存放目录,二级目录可能不存在,此时会创建该二级目录,最终会返回拼接了指定文件名的全路径File对象。

  1. 通过指定的BlockId获取数据块文件

getFile(blockId: BlockId): File方法用于通过指定的BlockId获取数据块文件,它内部直接调用了getFile(filename: String): File方法,传入的文件名是blockId.name

org.apache.spark.storage.DiskBlockManager#getFile
  • // 此方法根据BlockId获取文件
  • def getFile(blockId: BlockId): File = getFile(blockId.name)
  1. 获取所有文件

getAllFiles(...)方法用于获取所有文件,源码如下:

org.apache.spark.storage.DiskBlockManager#getAllFiles
  • /** List all the files currently stored on disk by the disk manager.
  • * 用于获取本地localDirs目录中的所有文件
  • **/
  • def getAllFiles(): Seq[File] = {
  • // Get all the files inside the array of array of directories
  • // 遍历subDirs中所有的目录
  • subDirs.flatMap { dir => // 一层目录遍历
  • dir.synchronized { // 加锁
  • // Copy the content of dir because it may be modified in other threads
  • // 加锁后克隆一份,避免线程安全问题
  • dir.clone()
  • }
  • }.filter(_ != null).flatMap { dir => // 二层目录遍历
  • val files = dir.listFiles()
  • if (files != null) files else Seq.empty
  • }
  • }

该方法会遍历subDirs数组,分别进行每一层的获取,最终将返回File实例序列。

  1. 获取所有数据块的文件

getAllBlocks()方法用于获取所有数据块的文件,它会返回一个BlockId的序列:

org.apache.spark.storage.DiskBlockManager#getAllBlocks
  • /** List all the blocks currently stored on disk by the disk manager.
  • * 获取本地localDirs目录中所有Block的BlockId
  • **/
  • def getAllBlocks(): Seq[BlockId] = {
  • // 使用getAllFiles()获取所有文件,构造为BlockId对象数组返回
  • getAllFiles().map(f => BlockId(f.getName))
  • }

可见,它内部直接调用getAllFiles()方法将获取的File对象的文件名包装为BlockId后组成序列返回。

1.1.2. 判断是否包含指定数据块的文件

containsBlock(...)方法用于判断是否包含指定的数据块对应的文件,它内部先尝试获取文件,如果获取不到则说明不包含,源码如下:

  • /** Check if disk block manager has a block.
  • * 用于检查本地localDirs目录中是否包含BlockId对应的文件
  • **/
  • def containsBlock(blockId: BlockId): Boolean = {
  • // 调用geiFile()方法并判断是否存在
  • getFile(blockId.name).exists()
  • }

1.1.3. 创建临时数据块

DiskBlockManager提供了创建临时数据块文件的方法,分别创建TempLocalBlockId和TempShuffleBlockId对应的临时数据块文件。

  1. 为指定数据块创建临时文件

createTempLocalBlock(...)方法用于创建临时数据块文件,最终返回TempLocalBlockId和File实例组成的二元元组:

org.apache.spark.storage.DiskBlockManager#createTempLocalBlock
  • /** Produces a unique block id and File suitable for storing local intermediate results.
  • * 用于为中间结果创建唯一的BlockId和文件,此文件将用于保存本地Block的数据。
  • **/
  • def createTempLocalBlock(): (TempLocalBlockId, File) = {
  • // "temp_local_"前缀加上UUID字符串
  • var blockId = new TempLocalBlockId(UUID.randomUUID())
  • while (getFile(blockId).exists()) { // 判断是否存在,如果存在就重试
  • blockId = new TempLocalBlockId(UUID.randomUUID())
  • }
  • // 返回二元Tuple
  • (blockId, getFile(blockId))
  • }

createTempLocalBlock()方法内部是通过getFile(blockId: BlockId): File方法来尝试获取的,该方法在存放文件的二级目录不存在时会直接创建。

  1. 为指定的Shuffle中间结果输出的数据块创建临时文件

createTempShuffleBlock()方法与createTempLocalBlock()实现基本一致,但它返回的是TempShuffleBlockId和File实例组成的二元元组:

  • /** Produces a unique block id and File suitable for storing shuffled intermediate results.
  • * 创建唯一的BlockId和文件,用来存储Shuffle中间结果(即Map任务的输出)。
  • **/
  • def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
  • // "temp_shuffle_"前缀加上UUID字符串
  • var blockId = new TempShuffleBlockId(UUID.randomUUID())
  • while (getFile(blockId).exists()) { // 判断是否存在,如果存在就重试
  • blockId = new TempShuffleBlockId(UUID.randomUUID())
  • }
  • // 返回二元Tuple
  • (blockId, getFile(blockId))
  • }

1.1.4. 关闭DiskBlockManager

DiskBlockManager在初始化时为自己添加了JVM关闭钩子:

org.apache.spark.storage.DiskBlockManager
  • // JVM关闭钩子
  • private val shutdownHook = addShutdownHook()
  • private def addShutdownHook(): AnyRef = {
  • logDebug("Adding shutdown hook") // force eager creation of logger
  • // 虚拟机关闭钩子
  • ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
  • logInfo("Shutdown hook called")
  • // 在虚拟机关闭时也关闭DiskBlockManager
  • DiskBlockManager.this.doStop()
  • }
  • }

可见,doStop()方法用于关闭操作的主要方法,它会递归清理所有的目录和文件:

org.apache.spark.storage.DiskBlockManager#doStop
  • private def doStop(): Unit = {
  • if (deleteFilesOnStop) { // 如果标记了在停止时需要删除文件
  • // 遍历一级目录
  • localDirs.foreach { localDir =>
  • if (localDir.isDirectory() && localDir.exists()) {
  • try {
  • if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
  • // 递归删除一级目录及其中的内容
  • Utils.deleteRecursively(localDir)
  • }
  • } catch {
  • case e: Exception =>
  • logError(s"Exception while deleting local spark dir: $localDir", e)
  • }
  • }
  • }
  • }
  • }

DiskBlockManager定义了stop()方法用于主动关闭,它会先移除虚拟机钩子之后再调用doStop()方法:

org.apache.spark.storage.DiskBlockManager#stop
  • /** Cleanup local dirs and stop shuffle sender.
  • * 正常停止DiskBlockManager
  • **/
  • private[spark] def stop() {
  • // Remove the shutdown hook. It causes memory leaks if we leave it around.
  • try {
  • ShutdownHookManager.removeShutdownHook(shutdownHook)
  • } catch {
  • case e: Exception =>
  • logError(s"Exception while removing shutdown hook.", e)
  • }
  • // 调用doStop()
  • doStop()
  • }

1.2. 磁盘存储具体实现

DiskStore是真正用于存储文件操作的类,它依赖于DiskBlockManager的目录及文件管理功能,在此基础上添加读写功能的实现。DiskStore的定义如下:

org.apache.spark.storage.DiskStore
  • /**
  • * Stores BlockManager blocks on disk.
  • *
  • * 磁盘存储。依赖于DiskBlockManager,负责对Block的磁盘存储。
  • *
  • * @param conf SparkConf对象
  • * @param diskManager 磁盘Block管理器DiskBlockManager对象
  • */
  • private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
  • ...
  • }

可见,DiskStore持有了一个DiskBlockManager的引用。

DiskStore根据spark.storage.memoryMapThreshold参数来初始化minMemoryMapBytes阈值,它用于决定读取文件时是直接读取还是使用FileChannel进行读取,默认为2MB:

org.apache.spark.storage.DiskStore#minMemoryMapBytes
  • /**
  • * 读取磁盘中的Block时,是直接读取还是使用FileChannel的内存镜像映射方法读取的阈值。
  • * 由spark.storage.memoryMapThreshold配置,默认为2M
  • */
  • private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")

接下来我们看一下DiskStore提供的读写方法。

1.2.1. 判断是否包含指定数据块的文件

DiskStore的contains(blockId: BlockId)方法根据指定的BlockId判断对应的文件是否存在,它会使用DiskBlockManager获取文件的File对象引用,然后使用其exists()方法进行判断:

org.apache.spark.storage.DiskStore#contains
  • // 用于判断本地磁盘存储路径下是否包含给定BlockId所对应的Block文件
  • def contains(blockId: BlockId): Boolean = {
  • // 使用DiskBlockManager的getFile()获取对应的文件
  • val file = diskManager.getFile(blockId.name)
  • // 判断文件是否存在
  • file.exists()
  • }

1.2.2. 获取指定数据块文件大小

getSize(...)方法用于获取指定BlockId对应的文件的大小:

org.apache.spark.storage.DiskStore#getSize
  • // 用于获取给定BlockId所对应Block的大小
  • def getSize(blockId: BlockId): Long = {
  • // 使用DiskBlockManager的getFile()方法获取到对应的文件
  • diskManager.getFile(blockId.name).length
  • }

1.2.3. 读取指定数据块数据

DiskStore提供了getBytes(...)方法用于获取指定数据块的数据,它返回一个ChunkedByteBuffer对象:

org.apache.spark.storage.DiskStore#getBytes
  • // 用于读取给定BlockId所对应的Block,并封装为ChunkedByteBuffer返回
  • def getBytes(blockId: BlockId): ChunkedByteBuffer = {
  • // 获取文件
  • val file = diskManager.getFile(blockId.name)
  • // 获取文件的FileChannel
  • val channel = new RandomAccessFile(file, "r").getChannel
  • Utils.tryWithSafeFinally {
  • // For small files, directly read rather than memory map
  • // 小于阈值,写入堆缓冲
  • if (file.length < minMemoryMapBytes) {
  • val buf = ByteBuffer.allocate(file.length.toInt)
  • // 定位到文件开头
  • channel.position(0)
  • // 循环将读取数据写入到buf中
  • while (buf.remaining() != 0) {
  • if (channel.read(buf) == -1) {
  • throw new IOException("Reached EOF before filling buffer\n" +
  • s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
  • }
  • }
  • // 切换读模式
  • buf.flip()
  • // 返回包装了ByteBuffer的ChunkedByteBuffer对象
  • new ChunkedByteBuffer(buf)
  • } else {
  • // 否则返回MappedByteBuffer内存映射的ChunkedByteBuffer包装对象
  • new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
  • }
  • } {
  • channel.close()
  • }
  • }

实现比较简单,当读取的文件大小没有达到minMemoryMapBytes阈值时将直接将数据读取到ByteBuffer中,最终将该ByteBuffer包装为ChunkedByteBuffer对象返回;否则将直接返回MappedByteBuffer内存映射的ChunkedByteBuffer包装对象。

1.2.4. 向指定数据块写入数据

putBytes(...)方法用于将指定的ChunkedByteBuffer中数据写入到指定BlockId对应的文件中:

org.apache.spark.storage.DiskStore#putBytes
  • // 用于将BlockId所对应的Block写入磁盘
  • def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
  • // 调用put()方法,传入描述了写出操作的回调函数
  • put(blockId) { fileOutputStream =>
  • // 获取文件流的FileChannel
  • val channel = fileOutputStream.getChannel
  • Utils.tryWithSafeFinally {
  • // 使用FileChannel写出到磁盘
  • bytes.writeFully(channel)
  • } {
  • channel.close()
  • }
  • }
  • }

该方法内部会调用put(...)(...)方法进行写入,源码如下:

org.apache.spark.storage.DiskStore#put
  • /**
  • * Invokes the provided callback function to write the specific block.
  • *
  • * 用于将BlockId所对应的Block写入磁盘
  • *
  • * @throws IllegalStateException if the block already exists in the disk store.
  • */
  • def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
  • // 判断是否包含了该BlockId对应的文件
  • if (contains(blockId)) {
  • // 如果包含就抛出异常
  • throw new IllegalStateException(s"Block $blockId is already present in the disk store")
  • }
  • logDebug(s"Attempting to put block $blockId")
  • // 开始时间
  • val startTime = System.currentTimeMillis
  • // 获取文件File对象
  • val file = diskManager.getFile(blockId)
  • // 构建输出流
  • val fileOutputStream = new FileOutputStream(file)
  • var threwException: Boolean = true
  • try {
  • // 进行写出
  • writeFunc(fileOutputStream)
  • threwException = false
  • } finally {
  • try {
  • Closeables.close(fileOutputStream, threwException)
  • } finally {
  • if (threwException) {
  • remove(blockId)
  • }
  • }
  • }
  • // 结束时间
  • val finishTime = System.currentTimeMillis
  • // 记录耗时
  • logDebug("Block %s stored as %s file on disk in %d ms".format(
  • file.getName,
  • Utils.bytesToString(file.length()),
  • finishTime - startTime))
  • }

写入操作依赖于DiskBlockManager定位文件得到File引用,然后使用FileOutputStream输出流进行写入。

1.2.5. 移除指定数据块

remove(...)方法用于移除指定BlockId对应的数据块文件,实现比较简单:

org.apache.spark.storage.DiskStore#remove
  • // 用于删除给定BlockId所对应的Block文件
  • def remove(blockId: BlockId): Boolean = {
  • // 使用DiskBlockManager的getFile()获取对应的文件
  • val file = diskManager.getFile(blockId.name)
  • // 判断文件是否存在
  • if (file.exists()) {
  • // 存在,则删除文件
  • val ret = file.delete()
  • if (!ret) {
  • logWarning(s"Error deleting ${file.getPath()}")
  • }
  • ret
  • } else {
  • // 不存在,直接返回false
  • false
  • }
  • }

2. 序列化

Spark提供了序列化模块为对象进行网络传输、存储数据读写等过程提供序列化支持。Spark的序列化模块是可插拔的,它内置提供了JDK API默认的序列化器JavaSerializerInstance、Kryo序列化器KryoSerializerInstance及UnsafeRow序列化器UnsafeRowSerializerInstance,其中UnsafeRowSerializerInstance位于Spark SQL模块,这里不过多做介绍。我们先来看看Spark序列化模块的结构类图:

1.Spark序列化继承体系.png

对于上述结构类图,这里先做一些简单的介绍:

  1. SerializerManager是序列化管理器,它负责根据数据块的类型选择合适的序列化器Serializer,并提供加密、压缩等可插拔的功能。
  2. Serializer是序列化器的高层抽象类,它有JavaSerializer、KryoSerializer及UnsafeRowSerializer等子类,序列化器主要用于构建特定类型的序列化器实例并保证并发情况下的线程安全。
  3. SerializerInstance是序列化器实例的高层抽象类,它有JavaSerializerInstance、KryoSerializerInstance及UnsafeRowSerializerInstance等子类,通过序列化器的newInstance()方法获得,序列化器实例才是进行序列化和反序列化工作的主要组件,它提供了对应的序列化和反序列化方法。
  4. SerializationStream和DeserializationStream分别是序列化和反序列化后数据的流式体现,分别提供了对序列化流数据的写出和对象数据的读取等功能。SerializerInstance依赖于它们进行序列化流数据的写出及对象数据的读取。

下面我们将对其中几个重要的类做详细讲解。

2.1. SerializerManager

上图中的SerializerManager意味序列化管理器,在前面已经出现过很多次了,一直没有单独讲解。BlockManager在构造时,需要传入的参数之一就是SerializerManager,它为Spark使用网络进行对象传输、向存储体系进行数据读写等过程提供序列化操作。SparkEnv的create(...)方法在BlockManager构造之前就已经对其进行了创建:

org.apache.spark.SparkEnv#create
  • // 创建序列化管理器
  • val serializer = instantiateClassFromConf[Serializer](
  • "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
  • logDebug(s"Using serializer: ${serializer.getClass}")
  • // 默认为org.apache.spark.serializer.JavaSerializer,通过spark.serializer配置
  • val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

SerializerManager的构造方法如下:

org.apache.spark.serializer.SerializerManager
  • /**
  • * Component which configures serialization, compression and encryption for various Spark
  • * components, including automatic selection of which [[Serializer]] to use for shuffles.
  • *
  • * @param defaultSerializer 默认的序列化器
  • * @param encryptionKey 加密使用的密钥
  • */
  • private[spark] class SerializerManager(
  • defaultSerializer: Serializer,
  • conf: SparkConf,
  • encryptionKey: Option[Array[Byte]]) {
  • def this(defaultSerializer: Serializer, conf: SparkConf) = this(defaultSerializer, conf, None)
  • ...
  • }

SerializerManager的构造方法接收一个默认的序列化器,同时还会接收可选的encryptionKey参数用于加密操作,在重载的构造方法中,该参数默认传None。

2.1.1. 序列化器

从前面的分析可知,SerializerManager默认持有了spark.serializer参数提供的序列化器,该参数默认指定的是JavaSerializer序列化器;不仅如此,在SerializerManager内部还持有了KryoSerializer序列化器实例:

org.apache.spark.serializer.SerializerManager
  • // 采用Google提供的Kryo序列化库实现
  • private[this] val kryoSerializer = new KryoSerializer(conf)

SerializerManager的两个getSerializer(...)重载方法可以通过指定参数获取对应的序列化器:

org.apache.spark.serializer.SerializerManager#getSerializer
  • // SPARK-18617: As feature in SPARK-13990 can not be applied to Spark Streaming now. The worst
  • // result is streaming job based on `Receiver` mode can not run on Spark 2.x properly. It may be
  • // a rational choice to close `kryo auto pick` feature for streaming in the first step.
  • /**
  • * 获取序列化器。
  • * 如果autoPick为true(即BlockId不为StreamBlockId时),
  • * 并且调用canUseKryo的结果为true时选择kryoSerializer,
  • * 否则选择defaultSerializer。
  • */
  • def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
  • if (autoPick && canUseKryo(ct)) {
  • kryoSerializer
  • } else {
  • defaultSerializer
  • }
  • }
  • /**
  • * Pick the best serializer for shuffling an RDD of key-value pairs.
  • * 获取序列化器。
  • * 如果对于keyClassTag和valueClassTag,
  • * 调用canUseKryo的结果都为true时选择kryoSerializer,
  • * 否则选择defaultSerializer。
  • */
  • def getSerializer(keyClassTag: ClassTag[_], valueClassTag: ClassTag[_]): Serializer = {
  • if (canUseKryo(keyClassTag) && canUseKryo(valueClassTag)) {
  • kryoSerializer
  • } else {
  • defaultSerializer
  • }
  • }

其中使用的canUseKryo(...)方法源码如下:

org.apache.spark.serializer.SerializerManager#canUseKryo
  • // 判断对于指定的类型标记ct,是否能使用kryoSerializer进行序列化。
  • def canUseKryo(ct: ClassTag[_]): Boolean = {
  • // 当类型标记ct属于primitiveAndPrimitiveArrayClassTags或者stringClassTag时,canUseKryo方法才返回真。
  • primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
  • }

primitiveAndPrimitiveArrayClassTags集合中记录可被Kryo序列化的数据类型:

org.apache.spark.serializer.SerializerManager#primitiveAndPrimitiveArrayClassTags
  • // 原生类型及原生类型数组的类型标记的集合
  • private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
  • val primitiveClassTags = Set[ClassTag[_]](
  • ClassTag.Boolean,
  • ClassTag.Byte,
  • ClassTag.Char,
  • ClassTag.Double,
  • ClassTag.Float,
  • ClassTag.Int,
  • ClassTag.Long,
  • ClassTag.Null,
  • ClassTag.Short
  • )
  • val arrayClassTags = primitiveClassTags.map(_.wrap)
  • primitiveClassTags ++ arrayClassTags
  • }

关于JavaSerializer和KryoSerializer会在后面详细介绍。

2.1.2. 加密

当构造SerializerManager实例时提供了加密使用的密钥参数encryptionKey时,SerializerManager会对数据块的输入输出流进行加密,通过判断encryptionKey是否被定义,可以知道当前SerializerManager是否支持加密:

org.apache.spark.serializer.SerializerManager#encryptionEnabled
  • /**
  • * 当前SerializerManager是否支持加密。
  • * 要支持加密,必须在构造SerializerManager的时候就传入encryptionKey。
  • * 可以通过
  • * spark.io.encryption.enabled(允许加密)、
  • * spark.io.encryption.keySizeBits(密钥长度,有128、192、256三种长度)、
  • * spark.io.encryption.keygen.algorithm(加密算法,默认为HmacSHA1)
  • * 等属性进行具体的配置。
  • */
  • def encryptionEnabled: Boolean = encryptionKey.isDefined

SerializerManager的wrapStream(...)相关方法会对数据块的输入输出流进行包装,其中分别进行了压缩包装和加密包装:

org.apache.spark.serializer.SerializerManager#wrapStream
  • /**
  • * Wrap an input stream for encryption and compression
  • * 对Block的输入流进行压缩与加密。
  • */
  • def wrapStream(blockId: BlockId, s: InputStream): InputStream = {
  • wrapForCompression(blockId, wrapForEncryption(s))
  • }
  • /**
  • * Wrap an output stream for encryption and compression
  • * 对Block的输出流进行压缩与加密。
  • */
  • def wrapStream(blockId: BlockId, s: OutputStream): OutputStream = {
  • wrapForCompression(blockId, wrapForEncryption(s))
  • }

可见在wrapStream(...)内部,先使用wrapForEncryption(...)对输入或输出流进行了包装,然后又使用wrapForCompression(...)对包装后的流再进行一次压缩包装,关于压缩功能我们后面再讨论,我们先来看一下wrapForEncryption(...)的实现,对应于输入输出流,它也有两个重载版本:

org.apache.spark.serializer.SerializerManager#wrapForEncryption
  • /**
  • * Wrap an input stream for encryption if shuffle encryption is enabled
  • * 对输入流进行加密。
  • */
  • def wrapForEncryption(s: InputStream): InputStream = {
  • encryptionKey
  • .map { key => CryptoStreamUtils.createCryptoInputStream(s, conf, key) }
  • .getOrElse(s)
  • }
  • /**
  • * Wrap an output stream for encryption if shuffle encryption is enabled
  • * 对输出流进行加密。
  • */
  • def wrapForEncryption(s: OutputStream): OutputStream = {
  • encryptionKey
  • .map { key => CryptoStreamUtils.createCryptoOutputStream(s, conf, key) }
  • .getOrElse(s)
  • }

可见,它会使用CryptoStreamUtils工具类的createCryptoInputStream(...)createCryptoOutputStream(...)方法以及传入的encryptionKey密钥对流进行加密包装,并返回对应的输入输出流。关于加密的细节读者朋友可以关注CryptoStreamUtils工具类的实现,这里就不赘述了。

2.1.3. 压缩

从上面讲解的内容可知,SerializerManager还使用压缩器编解码器提供了压缩功能。SerializerManager通过CompressionCodec的createCodec(...)方法从SparkConf的spark.io.compression.codec配置项创建,如果没有指定则默认为LZ4CompressionCodec:

org.apache.spark.serializer.SerializerManager#compressionCodec
  • /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
  • * the initialization of the compression codec until it is first used. The reason is that a Spark
  • * program could be using a user-defined codec in a third party jar, which is loaded in
  • * Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
  • * loaded yet.
  • * SerializerManager使用的压缩编解码器。延迟初始化。
  • **/
  • private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)

上面调用的CompressionCodec的createCodec(...)方法源码如下:

org.apache.spark.io.CompressionCodec
  • def createCodec(conf: SparkConf): CompressionCodec = {
  • // 使用getCodecName()方法获取压缩编解码器的名字,然后创建
  • createCodec(conf, getCodecName(conf))
  • }
  • def getCodecName(conf: SparkConf): String = {
  • // 使用spark.io.compression.codec参数获取,没有设置则默认为lz4
  • conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
  • }
  • // 默认的压缩编解码器
  • val DEFAULT_COMPRESSION_CODEC = "lz4"

从CompressionCodec类的shortCompressionCodecNames参数可以得知,Spark内置支持LZ4CompressionCodec、LZFCompressionCodec和SnappyCompressionCodec三类压缩编解码器:

org.apache.spark.io.CompressionCodec#shortCompressionCodecNames
  • // 内置支持的压缩编解码器
  • private val shortCompressionCodecNames = Map(
  • "lz4" -> classOf[LZ4CompressionCodec].getName,
  • "lzf" -> classOf[LZFCompressionCodec].getName,
  • "snappy" -> classOf[SnappyCompressionCodec].getName)

对于不同类型的数据块,如BroadcastBlock、ShuffleBlock、RDDBlock等,SerializerManager可以通过配置参数来决定他们是否可以被压缩:

  • // Whether to compress broadcast variables that are stored
  • // 是否对广播对象进行压缩,可以通过spark.broadcast.compress属性配置,默认为true。
  • private[this] val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
  • // Whether to compress shuffle output that are stored
  • // 是否对Shuffle输出数据压缩,可以通过spark.shuffle.compress属性配置,默认为true。
  • private[this] val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
  • // Whether to compress RDD partitions that are stored serialized
  • // 是否对RDD压缩,可以通过spark.rdd.compress属性配置,默认为false。
  • private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false)
  • // Whether to compress shuffle output temporarily spilled to disk
  • // 是否对溢出到磁盘的Shuffle数据压缩,可以通过spark.shuffle.spill.compress属性配置,默认为true。
  • private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)

SerializerManager的shouldCompress(...)方法中会根据这些配置来进行判断:

org.apache.spark.serializer.SerializerManager#shouldCompress
  • // 不同类型的数据块是否能够被压缩
  • private def shouldCompress(blockId: BlockId): Boolean = {
  • blockId match {
  • case _: ShuffleBlockId => compressShuffle
  • case _: BroadcastBlockId => compressBroadcast
  • case _: RDDBlockId => compressRdds
  • case _: TempLocalBlockId => compressShuffleSpill
  • case _: TempShuffleBlockId => compressShuffle
  • case _ => false
  • }
  • }

压缩编解码器的使用也是通过包装流的形式实现的,即wrapForCompression(...)相关方法,它会对加密包装后的流再进行压缩包装:

org.apache.spark.serializer.SerializerManager#wrapForCompression
  • /**
  • * Wrap an output stream for compression if block compression is enabled for its block type
  • * 对输出流进行压缩。
  • */
  • private[this] def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = {
  • // 对数据块进行判断是否需要被压缩,如果需要则使用compressionCodec包装为压缩输出流
  • if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
  • }
  • /**
  • * Wrap an input stream for compression if block compression is enabled for its block type
  • * 对输入流进行压缩
  • */
  • private[this] def wrapForCompression(blockId: BlockId, s: InputStream): InputStream = {
  • // 对数据块进行判断是否需要被压缩,如果需要则使用compressionCodec包装为压缩输出流
  • if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
  • }

wrapForCompression(...)方法的源码可知,压缩流的包装是通过压缩编解码compressionCodec的对应方法实现的,具体的压缩操作会交给指定的压缩编解码器。

2.1.4. 序列化

通过以上的讲解,我们已经了解了SerializerManager提供的压缩和加密等功能,接下来让我们分析一下具体的序列化和反序列化实现。

首先来分析序列化实现。SerializerManager中dataSerializeStream(...)用于对指定数据块进行序列化,源码如下:

org.apache.spark.serializer.SerializerManager#dataSerializeStream
  • /** Serializes into a stream.
  • * 对Block的输出流序列化。
  • **/
  • def dataSerializeStream[T: ClassTag](
  • blockId: BlockId,
  • outputStream: OutputStream,
  • values: Iterator[T]): Unit = {
  • // 包装为缓冲输出流
  • val byteStream = new BufferedOutputStream(outputStream)
  • // 非SteamBlock数据块是使用Kryo序列化器的前提
  • val autoPick = !blockId.isInstanceOf[StreamBlockId]
  • // 获取序列化器实例
  • val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
  • // 使用序列化器实例进行序列化,并将values的数据写出到outputStream
  • ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
  • }

该方法接收三个参数,其中blockId就是数据块的BlockId标识了,outputStream是序列化后进行数据输出的输出流,values则是真正要进行序列化的数据。dataSerializeStream(...)的实现分为以下几步:

  1. 将传入的输出流包装为BufferedOutputStream缓冲输出流。
  2. 根据数据块的类型通过前面讲解的getSerializer(...)获取序列化器,并使用序列化器的newInstance()方法创建对应的序列化器实例。
  3. 对第1步中得到的缓冲输出流进行加密、压缩包装,并使用序列化器实例的serializeStream(...)方法进行序列化包装,得到序列化输出流。
  4. 使用包装后的序列化输出流的writeAll(...)方法将values中的数据进行序列化写出,最后关闭流。

通过层层包装,最终写出到outputStream中的流将会是经过了加密、压缩和序列化的数据流。

SerializerManager还提供了dataSerialize(...)方法用于将指定数据块的数据序列化为ChunkedByteBuffer缓冲区,它的实现与上面讲解的序列化流程类似,只不过最终的数据会存放到ChunkedByteBuffer中:

org.apache.spark.serializer.SerializerManager
  • /** Serializes into a chunked byte buffer.
  • * 序列化为ChunkedByteBuffer
  • **/
  • def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = {
  • // 调用dataSerializeWithExplicitClassTag()方法
  • dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]])
  • }
  • /** Serializes into a chunked byte buffer.
  • * 使用明确的类型标记,序列化为ChunkedByteBuffer
  • **/
  • def dataSerializeWithExplicitClassTag(
  • blockId: BlockId,
  • values: Iterator[_],
  • classTag: ClassTag[_]): ChunkedByteBuffer = {
  • // 包装为分块的字节缓冲输出流
  • val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
  • // 包装为缓冲输出流
  • val byteStream = new BufferedOutputStream(bbos)
  • // 非SteamBlock数据块是使用Kryo序列化器的前提
  • val autoPick = !blockId.isInstanceOf[StreamBlockId]
  • // 获取序列化器实例
  • val ser = getSerializer(classTag, autoPick).newInstance()
  • // 使用序列化器实例进行序列化,并将values的数据写出到bbos输出流
  • ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
  • // 将bbos输出流转换为ChunkedByteBuffer对象
  • bbos.toChunkedByteBuffer
  • }

2.1.5. 反序列化

SerializerManager的dataDeserializeStream(...)方法用于将指定的数据块输入流反序列化为数据迭代器,它的实现与序列化相反,不过流程是一样的:

org.apache.spark.serializer.SerializerManager#dataDeserializeStream
  • /**
  • * Deserializes an InputStream into an iterator of values and disposes of it when the end of
  • * the iterator is reached.
  • * 将输入流反序列化为值的迭代器Iterator[T]。
  • */
  • def dataDeserializeStream[T](
  • blockId: BlockId,
  • inputStream: InputStream)
  • (classTag: ClassTag[T]): Iterator[T] = {
  • // 包装为缓冲输入流
  • val stream = new BufferedInputStream(inputStream)
  • // 非SteamBlock数据块是使用Kryo序列化器的前提
  • val autoPick = !blockId.isInstanceOf[StreamBlockId]
  • // 获取对应的序列化器实例,并使用该序列化器实例进行反序列化
  • getSerializer(classTag, autoPick)
  • .newInstance()
  • .deserializeStream(wrapStream(blockId, stream))
  • .asIterator.asInstanceOf[Iterator[T]]
  • }

该方法接收三个参数,其中blockId就是数据块的BlockId标识了,inputStream是反序列化时进行数据输入的输入流,Iterator[T]类型的返回值是最终反序列化得到的数据迭代器。dataDeserializeStream(...)的实现分为以下几步:

  1. 将传入的输入流包装为BufferedInputStream缓冲输入流。
  2. 根据数据块的类型通过前面讲解的getSerializer(...)获取序列化器,并使用序列化器的newInstance()方法创建对应的序列化器实例。
  3. 对第1步中得到的缓冲输入流进行加密、压缩包装,并使用序列化器实例的deserializeStream(...)方法进行反序列化包装,得到反序列化输入流。
  4. 使用包装后的反序列化输入流将输入流中的数据进行反序列化,最终得到Iterator[T]类型的返回值。

2.2. Serializer

前面提到过,Serializer是序列化器的高层抽象类,JavaSerializer和KryoSerializer都继承了该抽象类,它的定义如下:

org.apache.spark.serializer.Serializer
  • /**
  • * :: DeveloperApi ::
  • * A serializer. Because some serialization libraries are not thread safe, this class is used to
  • * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual
  • * serialization and are guaranteed to only be called from one thread at a time.
  • *
  • * Implementations of this trait should implement:
  • *
  • * 1. a zero-arg constructor or a constructor that accepts a [[org.apache.spark.SparkConf]]
  • * as parameter. If both constructors are defined, the latter takes precedence.
  • *
  • * 2. Java serialization interface.
  • *
  • * @note Serializers are not required to be wire-compatible across different versions of Spark.
  • * They are intended to be used to serialize/de-serialize data within a single Spark application.
  • */
  • @DeveloperApi
  • abstract class Serializer {
  • /**
  • * Default ClassLoader to use in deserialization. Implementations of [[Serializer]] should
  • * make sure it is using this when set.
  • */
  • @volatile protected var defaultClassLoader: Option[ClassLoader] = None
  • /**
  • * Sets a class loader for the serializer to use in deserialization.
  • *
  • * 反序列化时使用的类加载器,要保证子类优先使用该类加载器
  • *
  • * @return this Serializer object
  • */
  • def setDefaultClassLoader(classLoader: ClassLoader): Serializer = {
  • defaultClassLoader = Some(classLoader)
  • this
  • }
  • /** Creates a new [[SerializerInstance]].
  • *
  • * 创建一个新的序列化器实例
  • **/
  • def newInstance(): SerializerInstance
  • /**
  • * :: Private ::
  • * Returns true if this serializer supports relocation of its serialized objects and false
  • * otherwise. This should return true if and only if reordering the bytes of serialized objects
  • * in serialization stream output is equivalent to having re-ordered those elements prior to
  • * serializing them. More specifically, the following should hold if a serializer supports
  • * relocation:
  • *
  • * {{{
  • * serOut.open()
  • * position = 0
  • * serOut.write(obj1)
  • * serOut.flush()
  • * position = # of bytes writen to stream so far
  • * obj1Bytes = output[0:position-1]
  • * serOut.write(obj2)
  • * serOut.flush()
  • * position2 = # of bytes written to stream so far
  • * obj2Bytes = output[position:position2-1]
  • * serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
  • * }}}
  • *
  • * In general, this property should hold for serializers that are stateless and that do not
  • * write special metadata at the beginning or end of the serialization stream.
  • *
  • * This API is private to Spark; this method should not be overridden in third-party subclasses
  • * or called in user code and is subject to removal in future Spark releases.
  • *
  • * See SPARK-7311 for more details.
  • * 参数如果是true,则表示该序列化器支持重新定位他的序列化对象,否则则不行。
  • * 如果支持,这表示在流中输出的被序列化的对象的字节可以进行排序。这相当于对象排序后再进行序列化。
  • * 该属性现在被用于判断Shuffle使用哪个ShuffleWriter。
  • *
  • */
  • @Private
  • private[spark] def supportsRelocationOfSerializedObjects: Boolean = false
  • }

Serializer的注释中要求它的实现类应该提供一个无参构造器或接受SparkConf类型参数的构造器,同时需要Java的Serialization接口以提供对自身的序列化功能。

Serializer已知的可用实现类有JavaSerializer、KryoSerializer和UnsafeRowSerializer,我们会详细分析JavaSerializer的实现。

2.2.1. JavaSerializer

JavaSerializer是Serializer的一种实现,使用它可以得到提供JDK的默认序列化机制的序列化器实例,它的源码比较简单:

org.apache.spark.serializer.JavaSerializer
  • /**
  • * :: DeveloperApi ::
  • * A Spark serializer that uses Java's built-in serialization.
  • *
  • * @note This serializer is not guaranteed to be wire-compatible across different versions of
  • * Spark. It is intended to be used to serialize/de-serialize data within a single
  • * Spark application.
  • *
  • * 使用Java内置的序列化机制构建的Spark序列化器
  • *
  • * 该序列化器不保证在不同版本Spark之间保持线性兼容,
  • * 它主要是在Spark作业中用于对数据序列化或反序列化。
  • */
  • @DeveloperApi
  • class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
  • // 序列化时会缓存对象防止写多余的数据,但这些对象就不会被GC,默认缓存100个对象。
  • private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)
  • private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true)
  • protected def this() = this(new SparkConf()) // For deserialization only
  • // 获得一个JavaSerializerInstance实例
  • override def newInstance(): SerializerInstance = {
  • // 构造序列化器,类加载器使用本线程的ContextClassLoader
  • val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
  • // 直接返回了JavaSerializerInstance
  • new JavaSerializerInstance(counterReset, extraDebugInfo, classLoader)
  • }
  • // 控制JavaSerializer自身的序列化
  • override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
  • out.writeInt(counterReset)
  • out.writeBoolean(extraDebugInfo)
  • }
  • // 控制JavaSerializer自身的反序列化
  • override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
  • counterReset = in.readInt()
  • extraDebugInfo = in.readBoolean()
  • }
  • }

注意,JavaSerializer实现了Externalizable接口,而Externalizable则继承了JDK的Serialization接口,writeExternal(...)readExternal(...)二者就是用于JavaSerializer自身进行序列化和反序列化的方法。

JavaSerializer的newInstance()是我们需要关注的重点,它内部通过指定的类加载器(可以通过继承自Serializer类的setDefaultClassLoader(...)方法指定,如果不指定就使用当前线程的类加载器)创建JavaSerializerInstance对象。该对象即是序列化器实例,它是序列化和反序列化操作的具体实现类,我们后面会详细介绍。

2.3. SerializerInstance

SerializerInstance类是序列化器实例的父类,可以用它来创建多个序列化或反序列化流,由于Serializer在同一线程中只会创建唯一的SerializerInstance对象,因此它所创建的序列化或反序列化流都是线程安全的;它内部定义了数个规范的序列化和反序列化方法供子类实现:

  • /**
  • * :: DeveloperApi ::
  • * An instance of a serializer, for use by one thread at a time.
  • *
  • * It is legal to create multiple serialization / deserialization streams from the same
  • * SerializerInstance as long as those streams are all used within the same thread.
  • */
  • @DeveloperApi
  • @NotThreadSafe
  • abstract class SerializerInstance {
  • // 序列化特定类型的对象,并将数据存放在ByteBuffer中返回
  • def serialize[T: ClassTag](t: T): ByteBuffer
  • // 将ByteBuffer中的数据反序列化为特定类型的对象,会使用当前线程的类加载器
  • def deserialize[T: ClassTag](bytes: ByteBuffer): T
  • // 使用指定的类加载器将ByteBuffer中的数据反序列化为特定类型的对象
  • def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
  • // 将输出流包装为序列化流
  • def serializeStream(s: OutputStream): SerializationStream
  • // 将输入流包装为反序列化流
  • def deserializeStream(s: InputStream): DeserializationStream
  • }

2.3.1. JavaSerializerInstance

JavaSerializerInstance实现了SerializerInstance抽象类中指定的几个方法:

org.apache.spark.serializer.JavaSerializerInstance
  • private[spark] class JavaSerializerInstance(
  • counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: ClassLoader)
  • extends SerializerInstance {
  • // 序列化特定类型的对象,并将数据存放在ByteBuffer中返回
  • override def serialize[T: ClassTag](t: T): ByteBuffer = {
  • // 创建字节缓冲输出流
  • val bos = new ByteBufferOutputStream()
  • // 使用serializeStream()方法包装输出流
  • val out = serializeStream(bos)
  • // 写出对象到输出流中
  • out.writeObject(t)
  • // 关闭流
  • out.close()
  • // 将输出流转换为ByteBuffer
  • bos.toByteBuffer
  • }
  • // 将ByteBuffer中的数据反序列化为特定类型的对象,会使用当前线程的类加载器
  • override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
  • // 创建字节缓冲输入流
  • val bis = new ByteBufferInputStream(bytes)
  • // 使用deserializeStream()方法以当前线程的类加载器包装输入流
  • val in = deserializeStream(bis)
  • // 从输入流中读取数据并返回
  • in.readObject()
  • }
  • // 使用指定的类加载器将ByteBuffer中的数据反序列化为特定类型的对象
  • override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
  • // 创建字节缓冲输入流
  • val bis = new ByteBufferInputStream(bytes)
  • // 使用deserializeStream()方法以特定的类加载器包装输入流
  • val in = deserializeStream(bis, loader)
  • // 从输入流中读取数据并返回
  • in.readObject()
  • }
  • // 包装输出流为SerializationStream序列化流
  • override def serializeStream(s: OutputStream): SerializationStream = {
  • new JavaSerializationStream(s, counterReset, extraDebugInfo)
  • }
  • // 包装输入流为DeserializationStream反序列化流,会使用当前线程的类加载器
  • override def deserializeStream(s: InputStream): DeserializationStream = {
  • new JavaDeserializationStream(s, defaultClassLoader)
  • }
  • // 使用指定的类加载器包装输入流为DeserializationStream反序列化流
  • def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
  • new JavaDeserializationStream(s, loader)
  • }
  • }

JavaSerializerInstance的实现中,序列化和反序列,都依赖于SerializationStream和DeserializationStream两种流,其中SerializationStream使用的是其子类JavaSerializationStream,DeserializationStream使用的是其子类JavaSerializationStream。

2.4. SerializationStream

SerializationStream是序列化流的抽象类规范,它的定义如下:

org.apache.spark.serializer.SerializationStream
  • /**
  • * :: DeveloperApi ::
  • * A stream for writing serialized objects.
  • */
  • @DeveloperApi
  • abstract class SerializationStream {
  • /** The most general-purpose method to write an object.
  • * 将对象写出到SerializationStream
  • **/
  • def writeObject[T: ClassTag](t: T): SerializationStream
  • /** Writes the object representing the key of a key-value pair.
  • * 将key写出到SerializationStream
  • **/
  • def writeKey[T: ClassTag](key: T): SerializationStream = writeObject(key)
  • /** Writes the object representing the value of a key-value pair. 、
  • * 将value写出到SerializationStream
  • **/
  • def writeValue[T: ClassTag](value: T): SerializationStream = writeObject(value)
  • // 刷新
  • def flush(): Unit
  • // 关闭
  • def close(): Unit
  • // 将迭代器中的对象写出到SerializationStream
  • def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream = {
  • // 遍历并调用writeObject()方法写出
  • while (iter.hasNext) {
  • writeObject(iter.next())
  • }
  • this
  • }
  • }

2.4.1. JavaSerializationStream

JavaSerializationStream是SerializationStream的子类,它的源码如下:

org.apache.spark.serializer.JavaSerializationStream
  • private[spark] class JavaSerializationStream(
  • out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
  • extends SerializationStream {
  • // 使用ObjectOutputStream包装传入的输出流
  • private val objOut = new ObjectOutputStream(out)
  • // 序列化计数器,用于判断是否需要重置objOut对象
  • private var counter = 0
  • /**
  • * Calling reset to avoid memory leak:
  • * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
  • * But only call it every 100th time to avoid bloated serialization streams (when
  • * the stream 'resets' object class descriptions have to be re-written)
  • *
  • * 将对象序列化并写出到ObjectOutputStream流
  • */
  • def writeObject[T: ClassTag](t: T): SerializationStream = {
  • try {
  • // 将数据写出到objOut流
  • objOut.writeObject(t)
  • } catch {
  • case e: NotSerializableException if extraDebugInfo =>
  • throw SerializationDebugger.improveException(t, e)
  • }
  • // 没写一个计数器自增1
  • counter += 1
  • // 当计数器值大于counterReset时,说明需要重置ObjectOutputStream流
  • if (counterReset > 0 && counter >= counterReset) {
  • // 进行重置
  • objOut.reset()
  • // 计数器重置
  • counter = 0
  • }
  • this
  • }
  • // 刷写
  • def flush() { objOut.flush() }
  • // 关闭流
  • def close() { objOut.close() }
  • }

JavaSerializationStream的实现比较简单,它会将使用ObjectOutputStream包装构造时传入的输出流OutputStream,在writeObject(...)方法里通过ObjectOutputStream的writeObject(...)方法完成序列化操作,并将序列化后的流写出到OutputStream中。

2.5. DeserializationStream

DeserializationStream是反序列化流的抽象类规范,它的定义如下:

org.apache.spark.serializer.DeserializationStream
  • /**
  • * :: DeveloperApi ::
  • * A stream for reading serialized objects.
  • */
  • @DeveloperApi
  • abstract class DeserializationStream {
  • /** The most general-purpose method to read an object.
  • * 从反序列化流中读取对象
  • **/
  • def readObject[T: ClassTag](): T
  • /** Reads the object representing the key of a key-value pair.
  • * 从反序列化流中读取对象为Key
  • **/
  • def readKey[T: ClassTag](): T = readObject[T]()
  • /** Reads the object representing the value of a key-value pair.
  • * 从反序列化流中读取对象为Value
  • **/
  • def readValue[T: ClassTag](): T = readObject[T]()
  • // 关闭流
  • def close(): Unit
  • /**
  • * Read the elements of this stream through an iterator. This can only be called once, as
  • * reading each element will consume data from the input source.
  • *
  • * 从反序列化流中读取对象并构建为迭代器,迭代器元素为单个对象
  • */
  • def asIterator: Iterator[Any] = new NextIterator[Any] {
  • // 获取下一个元素的方法或通过调用readObject[Any]()方法进行读取
  • override protected def getNext() = {
  • try {
  • readObject[Any]()
  • } catch {
  • case eof: EOFException => // 抛出EOFException说明读取完了
  • // 标记finished为true并返回null
  • finished = true
  • null
  • }
  • }
  • // 关闭操作
  • override protected def close() {
  • DeserializationStream.this.close()
  • }
  • }
  • /**
  • * Read the elements of this stream through an iterator over key-value pairs. This can only be
  • * called once, as reading each element will consume data from the input source.
  • *
  • * 从反序列化流中读取对象并构建为迭代器,迭代器元素是键值对
  • */
  • def asKeyValueIterator: Iterator[(Any, Any)] = new NextIterator[(Any, Any)] {
  • // 获取下一个元素的方法或通过两次调用readObject[Any]()方法进行读取
  • override protected def getNext() = {
  • try {
  • // 同时读取两次,一个为键,一个为值
  • (readKey[Any](), readValue[Any]())
  • } catch {
  • case eof: EOFException => // 抛出EOFException说明读取完了
  • // 标记finished为true并返回null
  • finished = true
  • null
  • }
  • }
  • override protected def close() {
  • DeserializationStream.this.close()
  • }
  • }
  • }

从DeserializationStream的源码可知,它不仅提供了读取对象的操作,还提供了读取迭代器的实现。

2.5.1. JavaDeserializationStream

JavaDeserializationStream是DeserializationStream的子类,它的源码如下:

org.apache.spark.serializer.JavaDeserializationStream
  • private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
  • extends DeserializationStream {
  • // 构造ObjectInputStream包装传入的输入流
  • private val objIn = new ObjectInputStream(in) {
  • // 用于加载反序列化后的对象所属的类
  • override def resolveClass(desc: ObjectStreamClass): Class[_] =
  • try {
  • // scalastyle:off classforname
  • // 可能需要使用自定义的类加载器
  • Class.forName(desc.getName, false, loader)
  • // scalastyle:on classforname
  • } catch {
  • case e: ClassNotFoundException =>
  • JavaDeserializationStream.primitiveMappings.getOrElse(desc.getName, throw e)
  • }
  • }
  • // 读取为一个对象,使用ObjectInputStream读取并转换
  • def readObject[T: ClassTag](): T = objIn.readObject().asInstanceOf[T]
  • // 关闭流
  • def close() { objIn.close() }
  • }
  • private object JavaDeserializationStream {
  • val primitiveMappings = Map[String, Class[_]](
  • "boolean" -> classOf[Boolean],
  • "byte" -> classOf[Byte],
  • "char" -> classOf[Char],
  • "short" -> classOf[Short],
  • "int" -> classOf[Int],
  • "long" -> classOf[Long],
  • "float" -> classOf[Float],
  • "double" -> classOf[Double],
  • "void" -> classOf[Void]
  • )
  • }

JavaDeserializationStream会将传入的输入流包装为ObjectInputStream对象,并且复写了ObjectInputStream的resolveClass(...)方法来判断反序列化后得到对象所属的类是否可找到,如果找不到则会先捕捉抛出的ClassNotFoundException异常,判断所属的类是否是基本类型的包装类型,如果依旧不是,则会向外抛出ClassNotFoundException。

JavaDeserializationStream具体是使用ObjectInputStream对象的readObject()方法来进行反序列化的。