spark spark 定时任务调度度,有哪些方式

spark源码分析--spark的任务调度(补充一张图) -
- ITeye技术网站
原创,转载请注明出处&& /blog/2007317 ,作者邮箱:vc_,新浪微博:爱看历史的码农--白硕
对于 /blog/1994026,画了一张图,辅助源码阅读,图片较大
baishuo491
浏览: 53036 次
来自: 北京
图片看不清楚,要是能下载就好了
很好,学习了
请问PPT上传了吗,发到我邮箱一下,@qq.c ...
想请问楼主怎么调试源码呢?用idea的本地运行功能吗?Spark 作业调度 - 技术翻译 - 开源中国社区
Spark 作业调度
【已翻译100%】
英文原文:
推荐于 3年前 (共 9 段, 翻译完成于 10-18)
参与翻译&(3人)&: ,
Spark有几个在计算中调度资源的工具。首先需要记得,正如中描述的那样,每个Spark应用中(SparkContext实例)都运行着一组独立的执行进程。Spark运行在的集群管理器提供了的工具。第二,在每个Spark应用中,由不同线程提交的多个“jobs”(Spark actions)可以同时运行。在处理网络请求的应用中这很常见,比如服务器就以这种方式运行。Spark有一个在每个SparkContext中调度资源。
&翻译得不错哦!
应用程序之间的调度
每个运行在集群上的Spark应用程序都能得到一个独立的JVM虚拟机,而JVM仅仅用于应用程序运行任务和存储数据。如果多用户需要共享你的集群,可以通过集群管理器配置不同的选项来分配资源。
在集群管理器中最简单有效的方式就是静态区分资源。使用此方法,每个应用程序在整个的生命周期中都可以得到一个最大数量的资源。这种方式被用于Spark的和模式中,同样也用于模式。根据集群的类型,可以通过下面的配置来分配资源。
Standalone mode:默认情况下,应用程序启用Spark独立部署模式,这种模式按照FIFO(先进先出)的顺序执行,每个应用程序都会尝试使用所有可用的节点。你可以通过spark设置来限制应用程序的节点数。例如:你可以启动一个有10核并长时间运行的服务器,并允许每个用户通过shells使用20核心。最后,除了控制核心外,每个应用程序的spark执行存储器可以控制自己的内存使用。
Mesos:在独立部署模式中,要在Mesos上使用静态分区,需要设置spark.mesos.coarse 系统属性为true,另外,可选项设置spark.cores.max可以限制每个应用程序的共享资源数。你也可以配置spark.executor.memory来控制执行器的内存。
YARN:num-workers选项用于在Spark YARN端分配集群上workers数量,尽管worker-memory和worker-cores可以控制每个worker的资源分配。
&翻译得不错哦!
Mesos上的第二个可用选项是动态共享CPU内核。在这种模式下,每个Spark应用程序仍然分配有一个固定和独立的内存(通过spark.executor.memory来设置),当这个应用程序没有在机器上执行任务的时候,其他的应用程序就可能在这些内核上运行任务。当你期望大量但不是过度活跃应用程序的时候,这种模式是非常有用的,例如独立用户中的shell会话。然而,它却伴随着一个不可预知的潜在危险,这是因为当它需要执行任务的时候,在节点上需要耗费一段时间重新获得CPU核心资源。使用这种模式,不需要设置spark.mesos.coarse为true,只需要简单的使用amesos://URL。
请注意,所有的模式目前提供跨应用程序内存共享。如果你喜欢通过这种方式共享数据,我们推荐运行单一服务器的应用程序能够提供多个请求,可以通过查询相同的RDDs得到。例如, JDBC服务器以这种方式进行SQL查询。在将来的版本中,内存中的存储系统,如&将会提供另外的一种方式来共享RDDs。
&翻译得不错哦!
应用中的调度
在给定的Spark应用(已实例化SparkContext)中,如果在不同线程中,多个并行的工作可以同时运行。我们所说的“工作”,其实是一个Spark动作(如保存,收集等)或是任何想需要评估动作的任务。Spark的任务调度员是多线程安全的,它也支持这个用例来使应用服务多个请求(多用户查询).
默认的,Spark调度员是按照FIFO(先进先出)的形式来运行工作的。每一个工作被分为多个“阶段”(如,map和reduce语句),对于所有可用的资源中第一个工作优先级最高,这个工作阶段中的任务会被启动,之后是第二个,依次类推。如果集群不需要队列头中的工作,后面的工作将被立刻启动,如果队列头的工作很大,后面的工作可能大大地推迟。
&翻译得不错哦!
启动Spark0.8,它可以在两个作业之间配置共享调度。Spark负责在作业之间轮换分配调度,所以,所有的作业都能得到一个大致公平的共享的集群资源。这就意味着即使有一个很长的作业在运行,花费时间较少的作业在提交之后,仍然能够立即获得资源,并且能够有很好的响应,而不是需要等待那个很长的作业运行完之后才能运行。这种模式最适合多用户设置。
要启用公平作业调度,在创建一个SparkContext之前,需要简单的配置spark.scheduler.mode为FAIR:
System.setProperty("spark.scheduler.mode", "FAIR")
&翻译得不错哦!
公平的调度池
公平调度可以支持在池中将工作分组,而且为不同的池可以设置不同的调度选项(如,权重)。这样可以很有用的为更多重要的工作创建一个“高优先级”池,举例,将每一个用户的工作一起分组,不管有多少并发工作也让每个用户平等的分享&,以这种方式代替了平分给定的工作。这种方式是模仿。
如果没有设置,新提交的工作将进入默认池中,但是工作池可以在线程中用spark.scheduler.pool来给SparkContent添加“本地属性”并提交。如下:
// 假设context是你SparkContext中的变量
context.setLocalProperty("spark.scheduler.pool", "pool1") 在设置了本地属性之后,所有的在这个线程(在这个线程中调用 RDD.save,count,collect等)的工作提交将会用这个池来命名。这样同一个用户可以让每个线程容易的执行多个工作。如果你想要清除线程相关的池,简单调用如下:
context.setLocalProperty("spark.scheduler.pool", null)
&翻译得不错哦!
调度池中的默认行为
默认的,每个池都会平等的分享集群(在默认的池中每一个工作也是平等分享的),但在每一个池中,工作是按照FIFO(先进先出)顺序。比如,如果你给每一个用户创建一个池,这就意味着每一个用户都平等的分享一个集群,这样每一个查询都是按顺序查询的。
&翻译得不错哦!
配置调度池
通过配置文件可以修改调度池的属性。每个调度池都支持3个属性。
schedulingMode:该属性的值可以是FIFO或者FAIR,用来控制作业在调度池中排队运行(默认情况下)或者公平分享调度池资源。
weight:控制调度池在集群之间的分配。默认情况下,所有调度池的weight值都是为1。例如:如果你指定了一个调度池的值为2,那么这个调度池就比其它调度池多获得2倍的资源。设置一个更高的weight值,例如1000,就可以实现线程池之间的优先权——实际上,weight值为1000的调度池无论什么时候作业被激活,它都总是能够最先运行。
minShare:除了一个整体的权重,如果管理员喜欢,可以给每个调度池指定一个最小的shares值(也就是CPU的核数目)。公平调度器通过权重重新分配资源之前总是试图满足所有活动调度池的最小share。在没有给定一个高优先级的其他集群中,minShare属性是另外的一种方式来确保调度池能够迅速的获得一定数量的资源(例如10核CPU),默认情况下,每个调度池的minShare值都为0。
&翻译得不错哦!
可以通过XML文件来设置pool属性,和配置公平调度的xml模板文件一样,只需要设置spark.scheduler.allocation.file的属性:
System.setProperty("spark.scheduler.allocation.file", "/path/to/file")
对于每个池,XML文件的格式是一个简单的&pool&元素,可以在这个元素中设置各种不同元素。例如:
&?xml version="1.0"?&
&allocations&
&pool name="production"&
&schedulingMode&FAIR&/schedulingMode&
&weight&1&/weight&
&minShare&2&/minShare&
&pool name="test"&
&schedulingMode&FIFO&/schedulingMode&
&weight&2&/weight&
&minShare&3&/minShare&
&/allocations& 这个完整的例子也可以适用到对公平调度的xml模板文件配置。请注意,任何没有在xml文件中配置的池,都会有一个默认配置值(scheduling mode
值是FIFO,weight值为1,minShare值为0)。
&翻译得不错哦!
我们的翻译工作遵照 ,如果我们的工作有侵犯到您的权益,请及时联系我们
暂无网友评论Spark资源调度流程浅析 - 为程序员服务
Spark资源调度流程浅析
spark的调度分为资源的调度和任务的调度。前者目前支持standalone、yarn、mesos等,后者对标hadoop的MapReduce。本文介绍资源调度的流程,能力所限仅cover到最简单的standalone,无法对比各种调度框架的优劣。
Spark中与资源调度相关的角色包括driver、master、worker和executor,其中executor更多是与任务调度有关。其交互关系如下图所示,主要交互流如下:
driver-&master:提交Application,并告知需要多少资源(cores)
master-&driver:告知Application提交成功(我们只考虑最理想的情况)
driver-&workers:发送appDescription,启动executor子进程
executor-&driver:注册executor
driver-&executor:提交Tasks,由后者执行任务
上图的虚线代表Actor交互,spark基于Actor模式的Akka包进行进程、线程、网络间的交互,以避免同步、死锁等问题,也使代码相对简单。可以注意到,worker主进程是不跟driver交互的,只有executor子进程才跟driver交互。
Driver & Master
在SparkContext初始化的时候,在SparkContext class的一个大try catch中,就会完成Application注册,在standalone mode下,主要做了以下事情:
启动心跳维持监听actor,因为executors需要同driver维持心跳
启动Job进度监听actor,在UI和LOG里需要能够看到进度
启动mapOutputTracker,因为作为reducer的executors会向driver询问map output的地址
初始化task pool,确定FIFO or FAIR的调度方式
根据masters uri的格式,确定schedulerBackend和taskScheduler的取值,其中schedulerBackend与executor端的executorBackend交互
由AppClient/ClientActor代理,同masters交互,确定可用master,由后者代理获取并启动workers上的可用executors资源
SparkContext.createTaskScheduler方法会确定backend和scheduler的取值,并调用_taskScheduler.initialize(),初始化pool并确定schedulableBuilder:FIFO、FAIR。
schedulerBackend
taskScheduler
LocalBackend
TaskSchedulerImpl
local\[([0-9]+|\*)\]
LocalBackend
TaskSchedulerImpl
local\[([0-9]+|\*)\s*,\s*([0-9]+)\]
LocalBackend
TaskSchedulerImpl
spark://(.*)
SparkDeploySchedulerBackend
TaskSchedulerImpl
local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]
SparkDeploySchedulerBackend
TaskSchedulerImpl
"yarn-standalone" | "yarn-cluster"
org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
org.apache.spark.scheduler.cluster.YarnClusterScheduler
"yarn-client"
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
org.apache.spark.scheduler.cluster.YarnScheduler
mesosUrl @ MESOS_REGEX(_)
CoarseMesosSchedulerBackend or MesosSchedulerBackend
TaskSchedulerImpl
simr://(.*)
SimrSchedulerBackend
TaskSchedulerImpl
确定两者取值之后,会立即触发与master的交互:
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
以standalone模式为例,间接调用 _schedulerBackend.start(),并阻塞等待初始化AppClient完成(即接收到master返回的RegisteredApplication消息)
AppClient就是一个wrapper,初始化了:
actor = actorSystem.actorOf(Props(new ClientActor))
ClientActor代理与master的交互,在preStart()里向每一个master uri发送RegisterApplication消息,并注册了接收处理master发送消息的多个处理方法。
Worker & Driver
deploy/worker/Worker.scala 是worker主进程,以actor方式接收master发送的消息,例如LaunchExecutor。在该消息处理方法中,通过一个子线程启动真正的executor子进程。
deploy/worker/ExecutorRunner.scala -& fetchAndRunExecutor()是启动子进程的地方。
executor/CoarseGrainedExecutorBackend.scala -& onStart() 里会向driver发送RegisterExecutor消息,由driver的schedulerBackend接收并处理。在该消息里提交了executor的相关信息,driver会将executor信息存储在executorDataMap对象里,并触发makeOffers方法,分配pending的tasks。
Driver端的消息处理方法如下:
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =&
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
context.reply(RegisteredExecutor)
addressToExecutorId(executorRef.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (numPendingExecutors & 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
listenerBus.post(
// 通过bus,向所有监听SparkListenerExecutorAdded的线程发送通知,这里好像没人关注。
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
// Make fake resource offers on all executors ,
其中makerOffers方法会分配tasks,是把资源调度和Task调度融合的地方,调用该方法的地方还有:
case StatusUpdate
case ReviveOffers
submitTasks以及任务重试等时机时,会调用
case RegisterExecutor
即一旦有executor注册,就看看有没有需要分配的任务
该方法调用scheduler.resourceOffers ,每次尽量调度更多的tasks,主要考虑的是:
本地化 recomputeLocality,PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
负载均衡 round robin
任务的重跑
任务 + executorId的黑名单
各个状态的任务列表
其中调用的scheduler.resourceOffers关键代码如下,针对每个taskSet,尽量多的调度task,在分配时,尽量分配locality的task:
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet &- sortedTaskS maxLocality &- taskSet.myLocalityLevels) {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
在scheduler.resourceOffers子方法中,也会告知driver,有task状态的改变,可用通过listenerBus告知监听者:
sched.dagScheduler.taskStarted(task, info)
eventProcessLoop.post(BeginEvent(task, taskInfo))
在makeOffers的最后,调用launchTasks,这时才是真正发起任务。workder端的CoarseGrainedExecutorBackend.scala接收到消息后,会继续执行。
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
Worker主进程启动executor子进程后(独立JVM),后者通过actor方式向driver注册自己
driver接收到RegisterExecutor消息,会检查有没有pending的tasks,并且task的localityLevel与executorId匹配
如果有合适的tasks(可能多个),则会再通过actor方式发送LaunchTask消息给executor,由后者真正执行任务
driver、master、worker里有哪些相关的子线程、子进程?
driver里至少有工作主进程、通过ClientActor与master通信的子线程们、通过SchedulerBackend与executors通信的子线程们、关注各种进度的listenerBus相关子线程、通过MapOutputTrackerMasterEndpoint管理mapOutput位置的子线程们、心跳维持的子线程,并且应该还有block、http等
standalone的master采用actor子线程维护可用workers列表,包括资源使用情况;并且接收driver发起的app注册请求等。如果是使用zk进行master选主,还得维持与zk之间的连接。另外,还有UI界面。
worker主进程需要维持与master之间的心跳,汇报资源使用情况。executors子进程需要同driver间维持心跳。主进程通过子线程启动executor子进程。另外还有block、shuffle output(不确定是否直接复用了block线程or进程)等。
模块、进程、线程间如何通信的?数据结构体如何?
scala程序,网络、进程、线程间主要通过Actor方式进行交互,资源调度主要走这条流
python通过py4j调用java代理的scala程序
shuffle output等通过netty交互数据
broadcast通过p2p交互数据
master怎么知道有哪些可用worker的?
worker启动时,向master发送RegisterWorker消息,由Master类通过Actor方式处理
任务调度的算法是什么?有没有优缺点?trait SchedulerBuilder , FairSchedulingAlgorithm/FIFOSchedulingAlgorithm
资源调度的依据,对可用executor 随机化,locality的原则,每个worker有cores限制,当前每个task只能占用1个core
jar、files、library是这时发送给executor的吗?driver还是master发送的呢?
driver提交给master的是appDescription,其中包含的command只是指定了executorBackend类,未涉及app具体的代码
driver在executor注册后,向其发送的launchTask消息才包含真正的task信息(files、jars名称),由TaskRunner中反序列化并拉取文件
standalone、yarn、mesos切换对driver、master、worker都有什么影响呢?使用到哪些类?
目前只能看出有不同的backend类,实现功能,没有比较优劣。
同一worker上的资源隔离做到哪一步?JVM可以控制memory,CPU、IO、DISK呢?
我们当前的配置,一台worker上只能有一个executor,后者是在driver-&master-&Worker时生成的
每个executor在调度时,可能会被分配多个tasks,只要availableCpus(i) &= CPUS_PER_TASK
executorBackend在接收到launchTask请求时,会生成TaskRunner对象,并由threadpool调度执行,即如果有多个tasks到达,就会多线程并发执行了
这些属于同一个executor的tasks共享一个JVM,所以共享executor.memory等限制
JVM、python进程复用 是如何做到的?
standalone cluster manager type时,如何与zk配合?
ZooKeeperPersistenceEngine和ZooKeeperLeaderElectionAgent 在Master类里被初始化。后者在new时直接start了,基于apache curator.apache.org实现,其isLeader、notLeader方法会被curator的方法调用。
最后,放一张简化后的时序图:
原文地址:, 感谢原作者分享。
您可能感兴趣的代码SparkRDD使用详解1--RDD原理
在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD)。RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作
在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD)。RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。
(1)RDD的特点
1)创建:只能通过转换 ( transformation ,如map/filter/groupBy/join 等,区别于动作 action) 从两种数据源中创建 RDD 1 )稳定存储中的数据; 2 )其他 RDD。
2)只读:状态不可变,不能修改。
3)分区:支持使 RDD 中的元素根据那个 key 来分区 ( partitioning ) ,保存到多个结点上。还原时只会重新计算丢失分区的数据,而不会影响整个。
4)路径:在 RDD 中叫世族或血统 ( lineage ) ,即 RDD 有充足的信息关于它是如何从其他 RDD 产生而来的。
5)持久化:支持将会被重用的 RDD 缓存 ( 如 in-memory 或溢出到磁盘 )。
6)延迟计算: Spark 也会延迟计算 RDD ,使其能够将转换管道化 (pipeline transformation)。
7)操作:丰富的转换(transformation)和动作 ( action ) , count/reduce/collect/save 等。
执行了多少次transformation操作,RDD都不会真正执行运算(记录lineage),只有当action操作被执行时,运算才会触发。
(2)RDD的好处
1)RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
2)RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。
3)RDD的数据分区特性,可以通过数据的本地性来提高性能,这不Hadoop MapReduce是一样的。
4)RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。
5)批量操作:任务能够根据数据本地性 (data locality) 被分配,从而提高性能。
(3)RDD的内部属性
通过RDD的内部属性,用户可以获取相应的元数据信息。通过这些信息可以支持更复杂的算法或优化。
1)分区列表:通过分区列表可以找到一个RDD中包含的所有分区及其所在地址。
2)计算每个分片的函数:通过函数可以对每个数据块进行RDD需要进行的用户自定义函数运算。
3)对父RDD的依赖列表:为了能够回溯到父RDD,为容错等提供支持。
4)对key-value pair数据类型RDD的分区器,控制分区策略和分区数。通过分区函数可以确定数据记录在各个分区和节点上的分配,减少分布不平衡。
5)每个数据分区的地址列表(如HDFS上的数据块的地址)。
如果数据有副本,则通过地址列表可以获知单个数据块的所有副本地址,为负载均衡和容错提供支持。
(4)RDD的存储与分区
1)用户可以选择不同的存储级别存储RDD以便重用。
2)当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
3)RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。
RDD根据useDisk、useMemory、useOffHeap、deserialized、replication参数的组合定义了以下存储级别:
(5)RDD的容错机制
RDD的容错机制实现分布式数据集容错方法有两种:数据检查点和记录更新RDD采用记录更新的方式:记录所有更新点的成本很高。所以,RDD只支持粗颗粒变换,即只记录单个块上执行的单个操作,然后创建某个RDD的变换序列(血统)存储下来;变换序列指,每个RDD都包含了他是如何由其他RDD变换过来的以及如何重建某一块数据的信息。因此RDD的容错机制又称“血统”容错。 要实现这种“血统”容错机制,最大的难题就是如何表达父RDD和子RDD之间的依赖关系。实际上依赖关系可以分两种,窄依赖和宽依赖:窄依赖:子RDD中的每个数据块只依赖于父RDD中对应的有限个固定的数据块;宽依赖:子RDD中的一个数据块可以依赖于父RDD中的所有数据块。例如:map变换,子RDD中的数据块只依赖于父RDD中对应的一个数据块;groupByKey变换,子RDD中的数据块会依赖于多有父RDD中的数据块,因为一个key可能错在于父RDD的任何一个数据块中 将依赖关系分类的两个特性:第一,窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据;宽依赖则要等到父RDD所有数据都计算完成之后,并且父RDD的计算结果进行hash并传到对应节点上之后才能计算子RDD。第二,数据丢失时,对于窄依赖只需要重新计算丢失的那一块数据来恢复;对于宽依赖则要将祖先RDD中的所有数据块全部重新计算来恢复。所以在长“血统”链特别是有宽依赖的时候,需要在适当的时机设置数据检查点。也是这两个特性要求对于不同依赖关系要采取不同的任务调度机制和容错恢复机制。
(6)Spark计算工作流
图1-5中描述了Spark的输入、运行转换、输出。在运行转换中通过算子对RDD进行转换。算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
·输入:在Spark程序运行中,数据从外部数据空间(例如,HDFS、Scala集合或数据)输入到Spark,数据就进入了Spark运行时数据空间,会转化为Spark中的数据块,通过BlockManager进行管理。
·运行:在Spark数据输入形成RDD后,便可以通过变换算子fliter等,对数据操作并将RDD转化为新的RDD,通过行动(Action)算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
·输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS)或Scala数据或集合中(collect输出到Scala集合,count返回Scala Int型数据)。
Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如MappedRDD、ShuffledRDD等子类。Spark将常用的大数据操作都转化成为RDD的子类。
来看一段代码:textFile算子从HDFS读取日志文件,返回“file”(RDD);filter算子筛出带“ERROR”的行,赋给 “errors”(新RDD);cache算子把它缓存下来以备未来使用;count算子返回“errors”的行数。RDD看起来与Scala集合类型 没有太大差别,但它们的数据和运行模型大相迥异。
上图给出了RDD数据模型,并将上例中用到的四个算子映射到四种算子类型。Spark程序工作在两个空间中:Spark RDD空间和Scala原生数据空间。在原生数据空间里,数据表现为标量(scalar,即Scala基本类型,用橘色小方块表示)、集合类型(蓝色虚线 框)和持久存储(红色圆柱)。
下图描述了运行过程中通过算子对RDD进行转换, 算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
图1 两个空间的切换,四类不同的RDD算子
输入算子(橘色箭头)将Scala集合类型或存储中的数据吸入RDD空间,转为RDD(蓝色实线框)。输入算子的输入大致有两类:一类针对 Scala集合类型,如parallelize;另一类针对存储数据,如上例中的textFile。输入算子的输出就是Spark空间的RDD。
因为函数语义,RDD经过变换(transformation)算子(蓝色箭头)生成新的RDD。变换算子的输入和输出都是RDD。RDD会被划分 成很多的分区 (partition)分布到集群的多个节点中,图1用蓝色小方块代表分区。注意,分区是个逻辑概念,变换前后的新旧分区在物理上可能是同一块内存或存 储。这是很重要的优化,以防止函数式不变性导致的内存需求无限扩张。有些RDD是计算的中间结果,其分区并不一定有相应的内存或存储与之对应,如果需要 (如以备未来使用),可以调用缓存算子(例子中的cache算子,灰色箭头表示)将分区物化(materialize)存下来(灰色方块)。
一部分变换算子视RDD的元素为简单元素,分为如下几类:
输入输出一对一(element-wise)的算子,且结果RDD的分区结构不变,主要是map、flatMap(map后展平为一维RDD);
输入输出一对一,但结果RDD的分区结构发生了变化,如union(两个RDD合为一个)、coalesce(分区减少);
从输入中选择部分元素的算子,如filter、distinct(去除冗余元素)、subtract(本RDD有、它RDD无的元素留下来)和sample(采样)。
另一部分变换算子针对Key-Value集合,又分为:
对单个RDD做element-wise运算,如mapValues(保持源RDD的分区方式,这与map不同);
对单个RDD重排,如sort、partitionBy(实现一致性的分区划分,这个对数据本地性优化很重要,后面会讲);
对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey;
对两个RDD基于key进行join和重组,如join、cogroup。
后三类操作都涉及重排,称为shuffle类操作。
从RDD到RDD的变换算子序列,一直在RDD空间发生。这里很重要的设计是lazy evaluation:计算并不实际发生,只是不断地记录到元数据。元数据的结构是DAG(有向无环图),其中每一个“顶点”是RDD(包括生产该RDD 的算子),从父RDD到子RDD有“边”,表示RDD间的依赖性。Spark给元数据DAG取了个很酷的名字,Lineage(世系)。这个 Lineage也是前面容错设计中所说的日志更新。
Lineage一直增长,直到遇上行动(action)算子(图1中的绿色箭头),这时 就要evaluate了,把刚才累积的所有算子一次性执行。行动算子的输入是RDD(以及该RDD在Lineage上依赖的所有RDD),输出是执行后生 成的原生数据,可能是Scala标量、集合类型的数据或存储。当一个算子的输出是上述类型时,该算子必然是行动算子,其效果则是从RDD空间返回原生数据空间。
RDD运行逻辑
如图所示,在Spark应用中,整个执行流程在逻辑上运算之间会形成有向无环图。Action算子触发之后会将所有累积的算子形成一个有向无环图,然后由调度器调度该图上的任务进行运算。Spark的调度方式与MapReduce有所不同。Spark根据RDD之间不同的依赖关系切分形成不同的阶段(Stage),一个阶段包含一系列函数进行流水线执行。图中的A、B、C、D、E、F、G,分别代表不同的RDD,RDD内的一个方框代表一个数据块。数据从HDFS输入Spark,形成RDD A和RDD C,RDD C上执行map操作,转换为RDD D,RDD B和RDD F进行join操作转换为G,而在B到G的过程中又会进行Shuffle。最后RDD G通过函数saveAsSequenceFile输出保存到HDFS中。
RDD依赖关系
RDD的依赖关系如下图所示:
窄依赖 (narrowdependencies) 和宽依赖 (widedependencies) 。窄依赖是指 父 RDD 的每个分区都只被子 RDD 的一个分区所使用,例如map、filter。相应的,那么宽依赖就是指父 RDD 的分区被多个子 RDD 的分区所依赖,例如groupByKey、reduceByKey等操作。如果父RDD的一个Partition被一个子RDD的Partition所使用就是窄依赖,否则的话就是宽依赖。
这种划分有两个用处。首先,窄依赖支持在一个结点上管道化执行。例如基于一对一的关系,可以在 filter 之后执行 map 。其次,窄依赖支持更高效的故障还原。因为对于窄依赖,只有丢失的父 RDD 的分区需要重新计算。而对于宽依赖,一个结点的故障可能导致来自所有父 RDD 的分区丢失,因此就需要完全重新执行。因此对于宽依赖,Spark 会在持有各个父分区的结点上,将中间数据持久化来简化故障还原,就像 MapReduce 会持久化 map 的输出一样。
特别说明:对于join操作有两种情况,如果join操作的使用每个partition仅仅和已知的Partition进行join,此时的join操作就是窄依赖;其他情况的join操作就是宽依赖;因为是确定的Partition数量的依赖关系,所以就是窄依赖,得出一个推论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖(也就是说对父RDD的依赖的Partition的数量不会随着RDD数据规模的改变而改变)
如何划分Stage如下图所示:
Stage划分的依据就是宽依赖,什么时候产生宽依赖呢?例如reduceByKey,groupByKey等Action。
1.从后往前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到Stage中;
2.每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition数量决定的;
3.最后一个Stage里面的任务的类型是ResultTask,前面所有其他Stage里面的任务类型都是ShuffleMapTask;
4.代表当前Stage的算子一定是该Stage的最后一个计算步骤;
补充:Hadoop中的MapReduce操作中的Mapper和Reducer在Spark中基本等量算子是:map、reduceByKey;在一个Stage内部,首先是算子合并,也就是所谓的函数式编程的执行的时候最终进行函数的展开从而把一个Stage内部的多个算子合并成为一个大算子(其内部包含了当前Stage中所有算子对数据的计算逻辑);其次是由于Transformation操作的Lazy特性!!在具体算子交给集群的Executor计算之前,首先会通过Spark Framework(DAGScheduler)进行算子的优化。
RDD如何操作
(1)RDD的创建方式
1)从Hadoop文件系统(或与Hadoop兼容的其他持久化存储系统,如Hive、Cassandra、HBase)输入(例如HDFS)创建。
2)从父RDD转换得到新RDD。
3)通过parallelize或makeRDD将单机数据创建为分布式RDD。
(2)RDD的两种操作算子
对于RDD可以有两种操作算子:转换(Transformation)与行动(Action)。
1)转换(Transformation):Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行,需要等到有Action操作的时候才会真正触发运算。
2)行动(Action):Action算子会触发Spark提交作业(Job),并将数据输出Spark系统。
1.Transformation具体内容:
2.Action具体内容:
相比MapReduce,Spark提供了更加优化和复杂的执行流。读者还可以深入了解Spark的运行机制与Spark算子,这样能更加直观地了解API的使用。Spark提供了更加丰富的函数式算子,这样就为Spark上层的开发奠定了坚实的基础。后续文章将详细介绍Spark算子源代码及示例。}

我要回帖

更多关于 spark的任务调度 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信