Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...
Reduce端在处理聚合操作时为了避免数据量过大产生OOM,没有使用Map端聚合时用到AppendOnlyMap,而采用了ExternalAppendOnlyMap这种带有溢写操作的自定义Map来处理。
ShuffleReader只有一个实现类:BlockStoreShuffleReader,该类也是Spark中唯一实现的用于拉取Map任务输出数据读取器。
ShuffleWriter是Map任务用于输出结果数据的写出器,主要有三个实现:UnsafeShuffleWriter、BypassMergeSortShuffleWriter和SortShuffleWriter。Shuffle操作在Map任务端,会将数据写出到磁盘进行持久化,然后由Reduce任务来拉取;而Map任务在写出数据时,可能会对数据进行排序或Map端聚合。
ExternalSorter除了会将Map任务的输出数据存储到JVM的堆中,如果指定了聚合函数,则还会对数据进行聚合。ExternalSorter会使用分区器将数组分组到对应的分区中,然后使用自定义比较器对每个分区中的数据以键进行可选的排序,并将每个分区的数据输出到单个文件的不同字节范围中,减少生成的文件数量,便于Reduce端的Shuffle获取。
ShuffleExternalSorter作为外部排序器,UnsafeShuffleWriter依赖于它实现堆外内存中序列化数据的排序操作,它继承自MemoryConsumer抽象类,也即是说它其实是一个内存消费者。ShuffleExternalSorter依赖于TaskMemoryManager提供的MemoryBlock存储记录数据,并且使用ShuffleInMemorySorter对存储的记录的根据分区进行排序,排序后的得到的即是索引数据了。
本文主要讲解Shuffle过程涉及的ShuffleManager和IndexShuffleBlockResolver组件。
ShuffleMapTask和ResultTask都需要调用所在Stage中最末RDD的iterator()方法进行迭代计算,不同点无非是二者对迭代计算结果的处理方式不同,ShuffleMapTask将通过ShuffleWriter将计算结果写出到磁盘,为可能存在的Shuffle阶段做准备,而ResultTask会使用具体的Action算子函数处理计算结果。
本文主要介绍Task、TaskContxt和TaskMemoryManager。
在讲解计算引擎之前,以Standalone部署模式为运行环境,让我们来理一理Application运行流程。
TaskScheduler意为任务调度器,它是属于Spark调度系统中对Task进行调度的资源调度器。Spark自己提供了Standalone模式的集群管理器,也允许开发者使用YARN、Mesos等外部集群管理器。集群管理器负责将资源分配给Spark应用,而TaskScheduler的职责则是将集群管理器分配给Spark应用的资源以更小的单位分配给每个Task。集群管理器与TaskScheduler构成了Spark中的两层资源调度体系。
StandaloneAppClient将作为Application的RPC端点,与Spark自有的Standalone集群管理器进行对话;它会通过向Standalone集群管理器注册Application信息以请求启动Executor资源,以便向Application提供运行环境。
SchedulerBackend是TaskScheduler的调度后端接口。TaskScheduler给Task分配资源实际是通过SchedulerBackend来完成的,SchedulerBackend给Task分配完资源后将与分配给Task的Executor通信,并要求后者运行Task。
TaskScheduler是以树的方式来管理任务队列,树中的叶子节点为TaskSetManager,非叶子节点为Pool; 它们都是Schedulable的实现类。
在DAGScheduler中涉及了一个组件,即OutputCommitCoordinator,意为输出提交协调器。它是用于控制Stage的每个TaskAttempt提交到HDFS的权限。
DAGScheduler实现了面向DAG的高层次调度,即将DAG中的各个RDD划分到不同的Stage。DAGScheduler可以通过计算将DAG中的一系列RDD划分到不同的Stage,然后构建这些Stage之间的父子关系,最后将每个Stage按照Partition切分为多个Task,并以Task集合(即TaskSet)的形式提交给底层的TaskScheduler。
RDD(Resilient Distributed Dataset)名为分布式数据集,是Spark中最基本的数据抽象,它为用户屏蔽了底层对数据的复杂抽象和处理,提供了一组方便的数据转换与求值方法。
有了前面的对通信层、存储层的强行铺垫,以及对各种重要组件的详细分析,大家一定对Spark的存储体系的各项功能已经都有了一定的了解,但是对存储体系的运转方式还是没有一个整体上的认识,在本文中将回到最初的BlockManager,以BlockManager的各项实现来串起前面讲解的各类组件,相信大家阅读完本文,都会对前面几篇文章中存在的疑惑有一个清晰的领悟。
在Spark的存储体系中,磁盘管理由DiskBlockManager磁盘管理器实现,它负责为逻辑的数据块与数据在磁盘的写入位置建立映射关系。
MemoryManager是用于对节点上内存的分配和回收的内存管理器,每个实现存储体系的节点上都会存在MemoryManager;在Spark中,MemoryManager的实现有三种,除去用于测试的TestMemoryManager,其余的两种分别是静态内存管理器StaticMemoryManager和统一内存管理器UnifiedMemoryManager。
Spark中数据的存储按照位置来分,可以分为磁盘和内存,存储体系也分别根据这两种存储位置做出了不同的实现;同时,存储层还有一项非常重要的工作,就是对这两种存储进行管理。
对于Spark分布式计算的特性而言,Map任务和Reduce任务很有可能会发生Shuffle过程,此时Map任务会将产生的数据保存到存储体系中,然后由Reduce任务进行拉取,当遇到数据非本地化的情况则需要进行跨节点数据传输。BlockManager通过BlockTransferService向外提供数据拉取服务,同时在没有配置外部Shuffle客户端的情况下,BlockTransferService还会充当Shuffle客户端用于拉取数据。
Spark存储体系由各个Driver和Executor实例中的BlockManager所构成,实现了分布式管理,而从Driver和Executor单个节点来看,Spark存储体系属于节点的SparkEnv的内部组成部分;存储体系主要分为两层:通信层和存储层
一般来说,笔者通常将事件总线归为Spark的通信架构层。事件总线是以监听器模式实现的,主要组成结构分为监听器、事件源和事件分发器(也即是事件总线)。Spark中定义了大量的事件总线,方便监听者监听自己所感兴趣的事件。
Spark针对各类场景,实现了不同的RpcHandler和StreamManager,在Spark Core模块的org.apache.spark.rpc包下,包含了RPC通信框架的高层实现,在本文中我们将以具体流程来对它们进行解析。
主要讲解通信架构中传输层相关的组件,包括TransportServer、TransportClient、Bootstrap等
主要讲解与GroupCoordinator相关的组件,包括GroupMetadata、MemberMetadata等
主要简介KafkaController的辅助组件,包括ControllerContext、ControllerChannelManager和ControllerBrokerRequestBatch
主要讲解OffsetIndex、Message、ByteBufferMessageSet、FileMessageSet及日志读写操作
讲解OffsetFetchRequest、ListOffsetRequest、HeartbeatRequest、OffsetCommitRequest和FetchRequest的处理
讲解GroupCoordinatorRequest、JoinGroupRequest和SyncGroupRequest请求的发送
本文主要讲解KafkaConsumer的创建、心跳操作Heartbeat和HeartbeatTask、Offset自动提交任务AutoCommitTask及主体订阅相关。
Kafka提供了两种消息发送的形式:同步发送和异步发送,其实它们本质上都是异步发送。消息发送涉及多个过程,底层依赖于NIO的同步非阻塞处理。
KafkaProducer对元数据的管理和操作,是先行于数据发送环节的。元数据的完整、正确与否决定了是数据发送先决条件。KafkaProducer对元数据的更新操作与消息数据的发送操作虽然都需要经过网络I/O,但二者的实现略有差异
Kafka是分布式的、分区的且具有副本机制的消息服务器。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。Kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(server)成为broker。无论是Kafka集群,还是Producer和Consumer都依赖于Zookeeper来保证系统可用性集群保存一些meta信息。
Executors是java.util.concurrent包提供一个用于创建各类线程池的工厂类,其中有大量的静态方法可以方便我们创建ThreadPoolExecutor和ScheduledThreadPoolExecutor线程池,本文将一一进行介绍。
JDK提供了CompletionService接口用于解决该问题。在它的实现类ExecutorCompletionService中,通过维护一个队列保存结束任务的Future,如果有任务结束,任务的Future会保存到队列中,从该队列中一定能拿到任务的返回结果;如果没有已经完成的任务,队列为空,取结果的线程才会进入阻塞等待。
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExecutor的基础上实现的任务调度线程池,内部使用延时工作队列DelayedWorkQueue实现对任务的延时调度。DelayedWorkQueue内部使用一个初始容量为16的数组来保存任务,容量不够时会按照现有容量的1.5倍进行扩容,最大容量可达Integer.MAX_VALUE。
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExecutor的基础上实现的任务调度线程池,内部使用延时工作队列DelayedWorkQueue实现对任务的延时调度。DelayedWorkQueue内部使用一个初始容量为16的数组来保存任务,容量不够时会按照现有容量的1.5倍进行扩容,最大容量可达Integer.MAX_VALUE。
ThreadPoolExecutor是线程池类,可以通俗的将它理解为存放一定数量线程的一个线程集合。线程池允许若个线程同时运行,同时运行的线程数量就是线程池的容量;当添加的到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。
ThreadPoolExecutor是线程池类,可以通俗的将它理解为存放一定数量线程的一个线程集合。线程池允许若个线程同时运行,同时运行的线程数量就是线程池的容量;当添加的到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。
对于Runnable接口我们其实是很熟悉的,在前面对线程的使用中经常接触它。Runnable接口是针对单纯的无返回值任务,但在Java的多线程机制中,还提供了有返回值任务相应的接口,接下将详细讨论这些线程执行的任务。
ConcurrentLinkedQueue是线程安全的队列,它适用于高并发的场景。它是一个基于链接节点的无界线程安全队列,按照FIFO(先进先出)原则对元素进行排序。队列元素中不可以放置null元素(内部实现的特殊节点除外)。
LinkedBlockingDeque是双向链表实现的双向并发阻塞队列,该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(添加或删除);并且该阻塞队列是支持线程安全。此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。
LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
使用良好的Commit Message结构可以使我们提交时的备注信息更清晰,我们可以将Commit Message定义为三个部分:Header、Body和Footer,其中Header必须有,Body和Footer可以按情况省略。
CopyOnWriteArraySet是线程安全的无序的集合,可以将它理解成线程安全的HashSet。CopyOnWriteArraySet和HashSet虽然都继承于共同的父类AbstractSet,但HashSet是通过散列表(HashMap)实现的,而CopyOnWriteArraySet则是通过动态数组(CopyOnWriteArrayList)实现的,并不是散列表。
当我们在执行git commit操作时,默认是使用git commit -m "备注信息"的方式来填写备注信息,但是这种方式只能填写单行备注信息,对于日常工作中我们可能需要详细对每个提交版本的进行多行的备注信息,此时我们可以配置Git默认的备注信息编辑器,比如我们可以使用Visual Studio Code这编辑器,当然也可以使用Sublime这种轻量级编辑器。
在Java 集合系列专栏中,介绍了Java集合的架构,主体内容包括Collection集合和Map类,Collection集合又可以划分为List(队列)和Set(集合)。
Semaphore是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。线程可以通过release()来释放它所持有的信号量许可。
CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (Common Barrier Point)。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。
CountDownLatch是一个同步辅助类,被称作”栅栏“,它能够实现在某些线程中执行的操作完成之前,让一个或多个线程一直等待。
AQS是一个抽象类,继承自AbstractOwnableSynchronizer类,并实现了Serializable接口。虽然AQS是一个抽象类,但其内部并没有抽象方法,这是典型的模板设计模式的应用。AQS作为一个基础组件为继承它的实现类提供基础设施,如构建等待队列、控制同步状态等;其内部除了提供并发操作的核心方法以及等待队列操作外,还提供了一些模板方法让子类自己实现,AQS只关注内部公共方法实现,并不关心外部不同模式的实现。
AQS是一个抽象类,继承自AbstractOwnableSynchronizer类,并实现了Serializable接口。虽然AQS是一个抽象类,但其内部并没有抽象方法,这是典型的模板设计模式的应用。AQS作为一个基础组件为继承它的实现类提供基础设施,如构建等待队列、控制同步状态等;其内部除了提供并发操作的核心方法以及等待队列操作外,还提供了一些模板方法让子类自己实现,AQS只关注内部公共方法实现,并不关心外部不同模式的实现。
AQS是一个抽象类,继承自AbstractOwnableSynchronizer类,并实现了Serializable接口。虽然AQS是一个抽象类,但其内部并没有抽象方法,这是典型的模板设计模式的应用。AQS作为一个基础组件为继承它的实现类提供基础设施,如构建等待队列、控制同步状态等;其内部除了提供并发操作的核心方法以及等待队列操作外,还提供了一些模板方法让子类自己实现,AQS只关注内部公共方法实现,并不关心外部不同模式的实现。
与RDD类似,SparkStreaming也可以让开发人员手动控制,将数据流中的数据持久化到内存中。对DStream调用persist()方法,就可以让SparkStreaming自动将该数据流中的所有产生的RDD,都持久化到内存中。如果要对一个DStream多次执行操作,那么,对DStream持久化是非常有用的。因为多次操作,可以共享使用内存中的一份缓存数据。
Spark Streaming,其实就是一种Spark提供的,对于大数据,进行实时计算的一种框架。它的底层,其实,也是基于我们之前讲解的SparkCore的。基本的计算模型,还是基于内存的大数据实时计算模型。而且,它的底层的组件或者叫做概念,其实还是最核心的RDD。
Spark 1.0版本开始,推出了Spark SQL。其实最早使用的,都是Hadoop自己的Hive查询引擎;但是后来Spark提供了Shark;再后来Shark被淘汰,推出了Spark SQL。Shark的性能比Hive就要高出一个数量级,而Spark SQL的性能又比Shark高出一个数量级。
由于Spark的计算本质是基于内存的,所以Spark性能程序的性能可能因为集群中的任何因素出现瓶颈:CPU、网络带宽、或者是内存。如果内存能够容纳得下所有的数据,那么网络传输和通信就会导致性能出现瓶颈。
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...
Java多线程 47 - CompletionService详解
JDK提供了CompletionService接口用于解决该问题。在它的实现类ExecutorCompletionSer...