摘要:腾讯分布式数据仓库基于開源软件Hadoop和Hive进行构建,TDW计算引擎包括两部分:MapReduce和Spark两者内部都包含了一个重要的过程—什么是shufflee。本文对什么是shufflee过程进行解析并对两个计算引擎的什么是shufflee过程进行比较。
腾讯分布式数据仓库(Tencent distributed Data Warehouse, 简称TDW)基于开源软件Hadoop和Hive进行构建并且根据公司数据量大、计算复杂等特定情况进行叻大量优化和改造,目前单集群最大规模达到 5600台每日作业数达到100多万,已经成为公司最大的离线数据处理平台为了满足用户更加多样嘚计算需求,TDW也在向实时化方向发展为用户提供
更加高效、稳定、丰富的服务。
TDW计算引擎包括两部分:一个是偏离线的MapReduce一个是偏实时嘚Spark,两者内部都包含了一个重要的过程——什么是shufflee本 文对什么是shufflee过程进行解析,并对两个计算引擎的什么是shufflee过程进行比较对后续的优囮方向进行思考和探索,期待经过我们不断的努力TDW计算 引擎运行地更好。
什么是shufflee的本义是洗牌、混洗把一组有一定规则的数据尽量转換成一组无规则的数据,越随机越好MapReduce中的什么是shufflee更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据
为 什麼MapReduce计算模型需要什么是shufflee过程?我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射负责数据的过滤分 发;Reduce是规约,负责数据的计算归並Reduce的数据来源于Map,Map的输出即是Reduce的输入Reduce需要通过 什么是shufflee来获取数据。
Spill过程包括输出、排序、溢写、合并等步骤如图所示:
每个Map任务不斷地以对的形式把数据输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间在内存中放置尽可能多的数据。
这个数据结构其实就是个字节数组叫Kvbuffer, 名如其义但是这里面不光放置了数据,还放置了一些索引数据给放置索引数据嘚区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个 IntBuffer(字节序采用的是平台自身的字节序)的马甲数据区域和索引数据区域在Kvbuffer中是相邻不偅叠的两个区域,用一个分界点来划分两者分界点不是亘古不变的,而是每次
Spill之后都会更新一次初始的分界点是0,数据的存储方向是姠上增长索引数据的存储方向是向下增长,如图所示:
索引是对在kvbuffer中的索引是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长喥 占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”然后再向上一个格子一个格子地填充四元组的数据。比如
Kvbuffer的大小虽然可以通过参数设置但是总共就那么大,和索引不断地增加加着加着,Kvbuffer总有不够用的那天那怎么办?把数据从内存刷到磁盘上再接着往内存写数据把 Kvbuffer中的数据刷到磁盘上的过程就叫Spill,多么明了的叫法内存中的数据满了就自动地spill到具有更大空间的磁盘。
关于Spill 触发的条件吔就是Kvbuffer用到什么程度开始Spill,还是要讲究一下的如果把Kvbuffer用得死死得,一点缝都不剩的时候再开始 Spill那Map任务就需要等Spill完成腾出空间之后才能繼续写数据;如果Kvbuffer只是满到一定程度,比如80%的时候就开始
Spill那在Spill的同时,Map任务还能继续写数据如果Spill够快,Map可能都不需要为空闲空间而发愁两利相衡取其大,一般选择后 者
Spill这个重要的过程是由Spill线程承担,Spill线程从Map任务接到“命令”之后就开始正式干活干的活叫SortAndSpill,原来不僅仅是Spill在Spill之前还有个颇具争议性的Sort。
先把Kvbuffer中的数据按照partition值和key两个关键字升序排序移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位聚集在一起同一partition内的按照key有序。
Spill线程为这次Spill过程创建一个磁盘文件:从所有的本地目录中轮训查找能存储这么大空间的目录找到之后茬其中创建一个类似于 “spill12.out”的文件。Spill线程根据排过序的Kvmeta挨个partition的把数据吐到这个文件中一个partition对应的数据吐完之后顺序地吐下个partition,直到把所囿的partition遍历
所有的partition 对应的数据都放在这个文件里虽然是顺序存放的,但是怎么直接知道某个partition在这个文件中存放的起始位置呢强大的索引叒出场了。有一个三元 组记录某个partition对应的数据在这个文件中的索引:起始位置、原始数据长度、压缩之后的数据长度一个partition对应一个三元組。然后
把这些索引信息存放在内存中如果内存中放不下了,后续的索引信息就需要写到磁盘文件中了:从所有的本地目录中轮训查找能存储这么大空间的目录找到之后 在其中创建一个类似于“spill12.out.index”的文件,文件中不光存储了索引数据还存储了crc32的校验数据。
(spill12.out.index不一定在磁盤上创建如果内存(默认1M空间)中能放得下就放在内存中,即使在磁盘上创建了和 spill12.out文件也不一定在同一个目录下。)
每一次Spill过程就会最尐生成一个out文件有时还会生成index文件,Spill的次数也烙印在文件名中索引文件和数据文件的对应关系如下图所示:
话 分两端,在Spill线程如火如荼的进行SortAndSpill工作的同时Map任务不会因此而停歇,而是一无既往地进行着数据输出Map还是把数 据写到kvbuffer中,那问题就来了:只顾着闷头按照bufindex指针姠上增长kvmeta只顾着按照Kvindex向下增长,是保持指针起始位置不变继续跑呢还是
另谋它路?如果保持指针起始位置不变很快bufindex和Kvindex就碰头了,碰頭之后再重新开始或者移动内存都比较麻烦不可取。Map取 kvbuffer中剩余空间的中间位置用这个位置设置为新的分界点,bufindex指针移动到这个分界点Kvindex移动到这个分界点的-16位
置,然后两者就可以和谐地按照自己既定的轨迹放置数据了当Spill完成,空间腾出之后不需要做任何改动继续前進。分界点的转换如下图所示:
Map任务总要把输出的数据写到磁盘上即使输出数据量很小在内存中全部能装得下,在最后也会把数据刷到磁盘上
Map任务如果输出数据量很大,可能会进行好几次Spillout文件和Index文件会产生很多,分布在不同的磁盘上最后把这些文件进行合并的merge过程閃亮登场。
Merge 过程怎么知道产生的Spill文件都在哪了呢从所有的本地目录上扫描得到产生的Spill文件,然后把路径存储在一个数组里Merge过程又怎么知道 Spill的索引信息呢?没错也是从所有的本地目录上扫描得到Index文件,然后把索引信息存储在一个列表里到这里,又遇到了一个值得纳闷嘚地方
在之前Spill过程中的时候为什么不直接把这些信息存储在内存中呢,何必又多了这步扫描的操作特别是Spill的索引数据,之前当内存超限之后就把数 据写到磁盘现在又要从磁盘把这些数据读出来,还是需要装到更多的内存中之所以多此一举,是因为这时kvbuffer这个内存大户巳经不再使用可以回
收有内存空间来装这些数据了。(对于内存空间较大的土豪来说用内存来省却这两个io步骤还是值得考虑的。)
然後为merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引
一 个partition一个partition的进行合并输出。对于某个partition来说从索引列表中查询这個partition对应的所有索引信 息,每个对应一个段插入到段列表中也就是这个partition对应一个段列表,记录所有的Spill文件中对应的这个partition那段数据的文 件名、起始位置、长度等等
然后对这个partition对应的所有的segment进行合并,目标是合并成一个segment当这个 partition对应很多个segment时,会分批地进行合并:先从segment列表中紦第一批取出来以key为关键字放置成最小堆,然后从最小
堆中每次取出最小的输出到一个临时文件中这样就把这一批段合并成一个临时嘚段,把它加回到segment列表中;再从segment列表中把第二批取出 来合并输出到一个临时segment把其加入到列表中;这样往复执行,直到剩下的段是一批輸出到最终的文件中。
最终的索引数据仍然输出到Index文件中
Reduce 任务通过HTTP向各个Map任务拖取它所需要的数据。每个节点都会启动一个常驻的HTTP server其Φ一项服务就是响应Reduce拖取Map数据。当有MapOutput的HTTP请求过来的时候HTTP server就读取相应的Map输出文件中对应这个Reduce部分的数据通过网络流输出给Reduce。
Reduce任务拖取某个Map 對应的数据如果在内存中能放得下这次数据的话就直接把数据写到内存中。Reduce要向每个Map去拖取数据在内存中每个Map对应一块数据,当内存 Φ存储的Map数据占用空间达到一定程度的时候开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中
如果在内存 中不能放得下这个Map嘚数据的话,直接把Map数据写到磁盘上在本地目录创建一个文件,从HTTP流中读取数据然后写到磁盘使用的缓存区大小是 64K。拖一个Map数据过来僦会创建一个文件当文件数量达到一定阈值时,开始启动磁盘文件merge把这些文件合并输出到一个文件。
有些Map的数据较小是可以放在内存Φ的有些Map的数据较大需要放在磁盘上,这样最后Reduce任务拖过来的数据有些放在内存中了有些放在磁盘上最后会对这些来一个全局合并。
這里使用的Merge和Map端使用的Merge过程一样Map的输出数据已经是有序的,Merge进行一次合并排序所谓Reduce端的 sort过程就是这个合并的过程。一般Reduce是一边copy一边sort即copy和sort两个阶段是重叠而不是完全分开的。
Spark丰富了任务类型有些任务之间数据流转不需要通过什么是shufflee,但是有些任务之间还是需要通过什麼是shufflee来传递数据比如wide dependency的group by key。
Map创建的bucket其实对应磁盘上的一个文 件Map的结果写到每个bucket中其实就是写到那个磁盘文件中,这个文件也被称为blockFile是Disk Block Manager管理器通过文件名的Hash值对应到本地目录的子目录中创建的。每个Map要在节点上创建R个磁盘文件用于结果输出Map的结果是直接输 出到磁盘文件仩的,100KB的内存缓冲是用来创建Fast
reduce创建bucket对应的输出文件把这些文件组织成什么是shuffleeFileGroup,当这次map执行完之后这个 什么是shuffleeFileGroup可以释放为下次循环利用;当又有map在这个节点上执行时,不需要创建新的bucket文件而是在上次的
比 如一个Job有3个Map和2个reduce:(1) 如果此时集群有3个节点有空槽,每个节点空闲了┅个core则3个Map会调度到这3个节点上执行,每个Map都会创建2个什么是shufflee文件总共创 建6个什么是shufflee文件;(2) 如果此时集群有2个节点有空槽,每个节点空閑了一个core则2个Map先调度到这2个节点上执行,每个Map都会创建2个什么是shufflee文件然后其
中一个节点执行完Map之后又调度执行另一个Map,则这个Map不会创建新的什么是shufflee文件而是把结果输出追加到之前Map创建的什么是shufflee 文件中;总共创建4个什么是shufflee文件;(3) 如果此时集群有2个节点有空槽,一个节点囿2个空core一个节点有1个空core则一个节点调度2个Map一个节点调度1个Map,调度2个Map的
节点上一个Map创建了什么是shufflee文件,后面的Map还是会创建新的什么是shufflee文件因为上一个Map还正在写,它创建的 什么是shuffleeFileGroup还没有释放;总共创建6个什么是shufflee文件
Reduce去拖Map的输出数据,Spark提供了两套不同的拉取数据框架:通過socket连接去取数据;使用netty框架去取数据
并不是所有的数据都是通过网络读取,对于在本节点的Map数据Reduce直接去磁盘上读取而不再通过网络框架。
Reduce 拖过来数据之后以什么方式存储呢Spark Map输出的数据没有经过排序,Spark 什么是shufflee过来的数据也不会进行排序Spark认为什么是shufflee过程中的排序不是必須的,并不是所有类型的Reduce需要的数据都需要排序强
制地进行排序只会增加什么是shufflee的负担。Reduce拖过来的数据会放在一个HashMap中HashMap中存储的也是对,key是Map输出的keyMap输出对应这个key的所有value组成HashMap的value。Spark将 什么是shufflee取过来的每一个对插入或者更新到HashMap中来一个处理一个。HashMap全部放在内存中
什么是shufflee 取過来的数据全部存放在内存中,对于数据量比较小或者已经在Map端做过合并处理的什么是shufflee数据占用内存空间不会太大,但是对于比如group by key这样嘚操作Reduce需要得到key对应的所有value,并将这些value组一个数组放在内存中这样当数据量较大时,就需要较多内存
当 内存不够时,要不就失败偠不就用老办法把内存中的数据移到磁盘上放着。Spark意识到在处理数据规模远远大于内存空间时所带来的不足引入了一个具有 外部排序的方案。什么是shufflee过来的数据先放在内存中当内存中存储的对超过1000并且内存使用超过70%时,判断节点上可用内存如果还足够则把内存缓冲区夶小翻倍,如果可用内存不再够了则把内存中
的对排序然后写到磁盘文件中。最后把内存缓冲区中的数据排序之后和那些磁盘文件组成┅个最小堆每次从最小堆中读取最小的数据,这个和 MapReduce中的merge过程类似
|
在内存中构造了一块数据结构用于map输出的缓冲
|
没有在内存中构造一塊数据结构用于map输出的缓冲,而是直接把输出写到磁盘文件
|
map输出的数据有排序
|
map输出的数据没有排序
|
对磁盘上的多个spill文件最后进行合并成一個输出文件
|
在map端没有merge过程在输出时直接是对应一个reduce的数据写到一个文件中,这些文件同时存在并发写最后不需要合并成一个
|
|
仍然是通過网络框架拖取数据
|
不通过网络框架,对于在本节点上的map输出文件采用本地读取的方式
|
copy过来的数据存放位置
|
先放在内存,内存放不下时寫到磁盘
|
一种方式全部放在内存;
|
最后会对磁盘文件和内存中的数据进行合并排序
|
对于采用另一种方式时也会有合并排序的过程
|
通过上面嘚介绍我们了解到,什么是shufflee 过程的主要存储介质是磁盘尽量的减少IO是什么是shufflee的主要优化方向。我们脑海中都有那个经典的存储金字塔體系什么是shufflee过程为什么把结果都 放在磁盘上,那是因为现在内存再大也大不过磁盘内存就那么大,还这么多张嘴吃当然是分配给最需要的了。如果具有“土豪”内存节点减少什么是shufflee
IO的最有效方式无疑是尽量把数据放在内存中。下面列举一些现在看可以优化的方面期待经过我们不断的努力,TDW计算引擎运行地更好
-
压缩:对数据进行压缩,减少写读数据量;
-
减少不必要的排序:并不是所有类型的Reduce需要嘚数据都是需要排序的排序这个nb的过程如果不需要最好还是不要的好;
-
内存化:什么是shufflee的数据不放在磁盘而是尽量放在内存中,除非逼鈈得已往磁盘上放;当然了如果有性能和内存相当的第三方存储系统那放在第三方存储系统上也是很好的;这个是个大招;
-
网络框架:netty嘚性能据说要占优了;
-
本节点上的数据不走网络框架:对于本节点上的Map输出,Reduce直接去读吧不需要绕道网络框架。
Spark作为MapReduce的进阶架构对于什么是shufflee过程已经是优化了的,特别是对于那些具有争议的步骤已经做了优化但是Spark的什么是shufflee对于我们来说在一些方面还是需要优化的。