spark scala splitplit单机 多少个split

新手园地& & & 硬件问题Linux系统管理Linux网络问题Linux环境编程Linux桌面系统国产LinuxBSD& & & BSD文档中心AIX& & & 新手入门& & & AIX文档中心& & & 资源下载& & & Power高级应用& & & IBM存储AS400Solaris& & & Solaris文档中心HP-UX& & & HP文档中心SCO UNIX& & & SCO文档中心互操作专区IRIXTru64 UNIXMac OS X门户网站运维集群和高可用服务器应用监控和防护虚拟化技术架构设计行业应用和管理服务器及硬件技术& & & 服务器资源下载云计算& & & 云计算文档中心& & & 云计算业界& & & 云计算资源下载存储备份& & & 存储文档中心& & & 存储业界& & & 存储资源下载& & & Symantec技术交流区安全技术网络技术& & & 网络技术文档中心C/C++& & & GUI编程& & & Functional编程内核源码& & & 内核问题移动开发& & & 移动开发技术资料ShellPerlJava& & & Java文档中心PHP& & & php文档中心Python& & & Python文档中心RubyCPU与编译器嵌入式开发驱动开发Web开发VoIP开发技术MySQL& & & MySQL文档中心SybaseOraclePostgreSQLDB2Informix数据仓库与数据挖掘NoSQL技术IT业界新闻与评论IT职业生涯& & & 猎头招聘IT图书与评论& & & CU技术图书大系& & & Linux书友会二手交易下载共享Linux文档专区IT培训与认证& & & 培训交流& & & 认证培训清茶斋投资理财运动地带快乐数码摄影& & & 摄影器材& & & 摄影比赛专区IT爱车族旅游天下站务交流版主会议室博客SNS站务交流区CU活动专区& & & Power活动专区& & & 拍卖交流区频道交流区
家境小康, 积分 1267, 距离下一级还需 733 积分
论坛徽章:29
获奖详情:
话题背景:
生活离不开水,同样离不开数据,我们被数据包围,在数据中生活。当数据越来越多时,就成了大数据。想要理解大数据,就需要理解大数据相关的查询、处理、机器学习、图计算和统计分析等,Spark 作为新一代轻量级大数据快速处理平台,集成了大数据相关的各种能力,是理解大数据的首选。现在,让我们以向大师致敬的方式开始学习之旅,向Doug Cutting 和Matei Zaharia 两位大师致敬!什么是Spark?Spark作为Apache 顶级的开源项目,是一个快速、通用的大规模数据处理引擎,和Hadoop 的MapReduce 计算框架类似,但是相对于MapReduce,Spark 凭借其可伸缩、基于内存计算等特点,以及可以直接读写Hadoop 上任何格式数据的优势,进行批处理时更加高效,并有更低的延迟。相对于“ one stack to rule them all ”的目标,实际上,Spark 已经成为轻量级大数据快速处理的统一平台,各种不同的应用,如实时流处理、机器学习、交互式查询等,都可以通过Spark 建立在不同的存储和运行系统上,下面我们来具体认识一下Spark。
讨论话题:
1、Spark在大数据领域的影响力越来越强,采用Spark解决方案的公司也越来越多,那么问题来了,为什么我们国家搞不出Spark这样的技术?为什么高大上的技术大多在美国?
2、请对比分析MLlib库与Apache Mahout库
3、在Spark开发中使用Scala好?还是使用Java好?你的理由是?
说明:可任选1题或多题回答,回答越有深度越容易中奖。
活动结束后将选取4名讨论精彩的童鞋,每人赠送一本《Spark核心技术与高级应用》图书作为奖励。
zcover.jpg (5.12 KB, 下载次数: 129)
22:11 上传
作者: 于俊& & 向海& & 代其锋& & 马海平& &
丛书名: 大数据技术丛书
出版社:机械工业出版社
出版日期:2016 年1月
开本:16开
于俊、向海、代其锋、马海平编写的《Spark核 心技术与高级应用》是spark领域少有的专注于核心 原理与深度应用的*作,由科大讯飞和百分点科技的 4位大数据专家撰写。不仅细致介绍了spark的程序开 发、编程模型、作业执行解析等基础知识,还深度讲 解了spark sQL、sparkML、spark stteaming等大量 内部模块和周边模块的原理与使用。除此之外,还从 管理和性能优化的角度对spark进行了深入探索。
&&nbsp|&&nbsp&&nbsp|&&nbsp&&nbsp|&&nbsp&&nbsp|&&nbsp
巨富豪门, 积分 27623, 距离下一级还需 12377 积分
论坛徽章:210
携程英特尔都在用
论坛徽章:69
感觉Scala相比java好一点
稍有积蓄, 积分 296, 距离下一级还需 204 积分
论坛徽章:3
本帖最后由 nail78 于
10:49 编辑
spark内核就是用scala语言开发的,自然而然在spark开发中用scala更好。
使用scala性能开销小,避免了不同语言环境和数据的转换。
Spark 的机器学习、流处理和图分析库全都是用Scala 写的,而新功能对其他语言的绑定支持要慢一些。如果想用Spark 的全部功能(而不用花时间等待它移植到其他语言绑定),恐怕你必须学点儿Scala 基础知识,如果想扩展这些Spark 已有功能来解决你手头上的新问题,就更要深入了解Scala 了。
稍有积蓄, 积分 319, 距离下一级还需 181 积分
论坛徽章:2
哇塞,正好最近在看流式计算唉,最近关注storm比较多,这俩算是一类的产品。
论坛徽章:202
BAT这麽牛,为什麽还要用人家的技术
为什么我们国家搞不出Spark这样的技术?为什么高大上的技术大多在美国?
丰衣足食, 积分 946, 距离下一级还需 54 积分
论坛徽章:7
Spark是准实时性的,Storm是实时性的
大富大贵, 积分 12329, 距离下一级还需 7671 积分
论坛徽章:16
请对比分析MLlib库与Apache Mahout库
1.mahout 支持mapreduce,mlib 支持spark。如果ML 算法是单一的一个MRjob.跟spark相比主要的区别是启动时候的耗时。
2.mahout 是在2014之后支持spark。现在可以支持H2O.
3.如果你想跑一些通用的引擎如R,那么可以使用mahout。如果你想使用特定的算法,可以根据情况来选。
4.Mahout包含一些老的hadoop算法,
富足长乐, 积分 5225, 距离下一级还需 2775 积分
论坛徽章:34
正在计划学习SPARK.关注spark sql.
1.为什么我们国家搞不出Spark这样的技术?为什么高大上的技术大多在美国
阿里最近开放了数加平台。也是高大上的技术。不过,开源这方面,中国起步晚,落后一些也正常。
2.还没用到。
3.从感情上说,scala好。因为是spark原生的开发语言。
&&不过,从项目实际来说,要和其他功能整合,java也是绕不开的。当然,还可以选python.
4.好书要顶
家境小康, 积分 1547, 距离下一级还需 453 积分
论坛徽章:10
1、Spark在大数据领域的影响力越来越强,采用Spark解决方案的公司也越来越多,那么问题来了,为什么我们国家搞不出Spark这样的技术?为什么高大上的技术大多在美国?
技术积累还没到。
2、请对比分析MLlib库与Apache Mahout库
它们主要的区别将来自底层的框架。若Mahout主要是Hadoop MapReduce框架,而MLib则是Spark。更具体的说就是它们工作负载的差异。如果将你的ML算法映射到单一的MR时,它们主要的区别是启动所耗费的时间,Hadoop MR需要耗费几十秒,而Spark仅仅只需要1秒钟。倘若将你的算法映射到大量的任务的时候则会有很大的不同,在这种情况下,倘若对于每次迭代具有相同的差异,请看下面的例子
假设需要100次迭代,每次需要CPU运行5秒:
Spark:总共需要花费100*5+100*1=600秒
Hadoop:总共需要花费100*5+100*30=3500秒
在同一时间的Hadoop MR是更加成熟的框架,其次是Spark。如果你有大量的数据需要处理,那么稳定则是最重要的,这里可以考虑Mahout作为替代的选择。
即:mahout 支持mapreduce,mlib 支持spark。如果ML 算法是单一的一个MRjob.跟spark相比主要的区别是启动时候的耗时。
2).mahout 是在2014之后支持spark。现在可以支持H2O.
3).如果你想跑一些通用的引擎如R,那么可以使用mahout。如果你想使用特定的算法,可以根据情况来选。
4).Mahout包含一些老的hadoop算法,
MLlib机器学习算法的流程
用户可以容易地使用MLbase这个工具来处理自己的数据。大部分的机器学习算法都包含训练以及预测两个部分,训练出模型,然后对未知样本进行预测。Spark中的机器学习包也是如此。
Spark将机器学习算法都分成了两个模块:
& & 训练模块:通过训练样本输出模型参数
& & 预测模块:利用模型参数初始化,预测测试样本,输出与测值。
MLbase提供了函数式编程语言Scala,利用MLlib可以很方便的实现机器学习的常用算法。
比如说,我们要做分类,只需要写如下scala代码:
& & 1 var X = load(&some_data&, 2 to 10)
& & 2 var y = load(&some_data&, 1)
& & 3 var (fn-model, summary) = doClassify(X, y)
代码解释:X是需要分类的数据集,y是从这个数据集里取的一个分类标签,doClassify()分类。
这样的处理有两个主要好处:
& & 每一步数据处理很清楚,可以很容易地可视化出来;
& & 对用户来说,用ML算法处理是透明的,不用关心和考虑用什么分类方法,是SVM还是AdaBoost,SVM用的kernel是线性的还是RBF的,original和scaled的参数调成多少等等。
MLbase的三大组成部分之一:ML Optimizer,会选择它认为最适合的已经在内部实现好了的机器学习算法和相关参数,来处理用户输入的数据,并返回模型或别的帮助分析的结果。总体上的处理流程如下图:
& & 用户输入的Task比如doClassify(X, y)或者做协同过滤doCollabFilter(X, y)、图计算findTopKDegreeNodes(G, k = 1000)之类的事情,先会传给Parser处理,然后交给LLP。LLP是logical learning plan,即是逻辑上的一个学习选择过程,在这个过程里选择该用什么算法,特征提取应该用什么做,参数应该选什么,数据集怎么拆子数据集的策略等事情。
& & LLP决定之后交给Optimizer。Optimizer是MLbase的核心,它会把数据拆分成若干份,对每一份使用不同的算法和参数来运算出结果,看哪一种搭配方式得到的结果最优(注意这次最优结果是初步的),优化器做完这些事之后就交给PLP。
& & PLP是physical learning plan,即物理(实际)执行的计划,让MLbase的master把任务分配给具体slave去最后执行之前选好的算法方案,把结果计算出来返回,同时返回这次计算的学习模型。
& & 这个流程是Task -& Parser -& LLP -& Optimizer -& PLP -& Execute -& Result/Model,先从逻辑上,在已有的算法里选几个适合这个场景的,让优化器都去做一遍,把认为当时最优的方案给实际执行的部分去执行,返回结果。
MLbase不仅仅把结果返回给用户。在LLP、Optimizer,MLbase会存储一些中间结果和特征,然后会继续搜寻和测试结果更好的算法和相关参数,并且会通知用户。LLP内部实现的算法是可以扩充的。
总之,MLbase会自动寻找合适的算法,自动选择和优化,还可以进行扩充。
& &&&MLlib实现KMeans
以MLlib实现KMeans算法,利用MLlib KMeans训练出来的模型,可以对新的数据作出分类预测,具体见代码和输出结果。
Scala代码:
& & 1 package com.hq
& & 3 import org.apache.spark.mllib.clustering.KMeans
& & 4 import org.apache.spark.mllib.linalg.Vectors
& & 5 import org.apache.spark.{SparkContext, SparkConf}
& & 7 object KMeansTest {
& & 8& &def main(args: Array[String]) {
& & 9& &&&if (args.length & 1) {
& & 10& && & System.err.println(&Usage: &file&&)
& & 11& && & System.exit(1)
& & 12& &&&}
& & 14& &&&val conf = new SparkConf()
& & 15& &&&val sc = new SparkContext(conf)
& & 16& &&&val data = sc.textFile(args(0))
& & 17& &&&val parsedData = data.map(s =& Vectors.dense(s.split(' ').map(_.toDouble)))
& & 18& &&&val numClusters = 2
& & 19& &&&val numIterations = 20
& & 20& &&&val clusters = KMeans.train(parsedData,numClusters,numIterations)
& & 22& &&&println(&------Predict the existing line in the analyzed data file: &+args(0))
& & 23& &&&println(&Vector 1.0 2.1 3.8 belongs to clustering &+ clusters.predict(Vectors.dense(&1.0 2.1 3.8&.split(' ').map(_.toDouble))))
& & 24& &&&println(&Vector 5.6 7.6 8.9 belongs to clustering &+ clusters.predict(Vectors.dense(&5.6 7.6 8.9&.split(' ').map(_.toDouble))))
& & 25& &&&println(&Vector 3.2 3.3 6.6 belongs to clustering &+ clusters.predict(Vectors.dense(&3.2 3.3 6.6&.split(' ').map(_.toDouble))))
& & 26& &&&println(&Vector 8.1 9.2 9.3 belongs to clustering &+ clusters.predict(Vectors.dense(&8.1 9.2 9.3&.split(' ').map(_.toDouble))))
& & 27& &&&println(&Vector 6.2 6.5 7.3 belongs to clustering &+ clusters.predict(Vectors.dense(&6.2 6.5 7.3&.split(' ').map(_.toDouble))))
& & 29& &&&println(&-------Predict the non-existent line in the analyzed data file: ----------------&)
& & 30& &&&println(&Vector 1.1 2.2 3.9&&belongs to clustering &+ clusters.predict(Vectors.dense(&1.1 2.2 3.9&.split(' ').map(_.toDouble))))
& & 31& &&&println(&Vector 5.5 7.5 8.8&&belongs to clustering &+ clusters.predict(Vectors.dense(&5.5 7.5 8.8&.split(' ').map(_.toDouble))))
& & 33& &&&println(&-------Evaluate clustering by computing Within Set Sum of Squared Errors:-----&)
& & 34& &&&val wssse = puteCost(parsedData)
& & 35& &&&println(&Within Set Sum of Squared Errors = &+ wssse)
& & 36& &&&sc.stop()
& & 37& &}
3.4 以Spark集群standalone方式运行
①在IDEA打成jar包(如果忘记了,参见Spark:用Scala和Java实现WordCount),上传到用户目录下/home/ebupt/test/kmeans.jar
②准备训练样本数据:hdfs://eb170:8020/user/ebupt/kmeansData,内容如下
[ebupt@eb170 ~]$ hadoop fs -cat ./kmeansData
& & 1.0 2.1 3.8
& & 5.6 7.6 8.9
& & 3.2 3.3 6.6
& & 8.1 9.2 9.3
& & 6.2 6.5 7.3
③spark-submit提交运行
[ebupt@eb174 test]$ spark-submit --master spark://eb174:7077 --name KmeansWithMLib --class com.hq.KMeansTest --executor-memory 2G --total-executor-cores 4 ~/test/kmeans.jar hdfs://eb170:8020/user/ebupt/kmeansData
输出结果摘要:
& & 1 ------Predict the existing line in the analyzed data file: hdfs://eb170:8020/user/ebupt/kmeansData
& & 2 Vector 1.0 2.1 3.8 belongs to clustering 0
& & 3 Vector 5.6 7.6 8.9 belongs to clustering 1
& & 4 Vector 3.2 3.3 6.6 belongs to clustering 0
& & 5 Vector 8.1 9.2 9.3 belongs to clustering 1
& & 6 Vector 6.2 6.5 7.3 belongs to clustering 1
& & 7 -------Predict the non-existent line in the analyzed data file: ----------------
& & 8 Vector 1.1 2.2 3.9&&belongs to clustering 0
& & 9 Vector 5.5 7.5 8.8&&belongs to clustering 1
& & 10 -------Evaluate clustering by computing Within Set Sum of Squared Errors:-----
& & 11 Within Set Sum of Squared Errors = 16.388
4.MLbase总结
介绍了MLbase如何实现机器学习算法,简单介绍了MLBase的设计思想。总的来说,Mlbase的核心是ML Optimizer,把声明式的任务转化成复杂的学习计划,输出最优的模型和计算结果。
与其它机器学习系统Weka、mahout不同:
& & MLbase是分布式的,Weka是单机的。
& & Mlbase是自动化的,Weka和mahout都需要使用者具备机器学习技能,来选择自己想要的算法和参数来做处理。
& & MLbase提供了不同抽象程度的接口,可以扩充ML算法。
3、在Spark开发中使用Scala好?还是使用Java好?你的理由是?
我最熟悉的语言是java,但是我们公司会python的比较多,openstack就是用python
scala也是要用的,也是擅长的。因为我读spark源码来作研发。
结论用大多数用scala,
和openstack接口用python接口
和hadoop接口用java
信誉积分 +5Spark算子:RDD基本转换操作(3)–randomSplit、glom
randomSplit
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
该函数根据weights权重,将一个RDD切分成多个RDD。
该权重参数为一个Double数组
第二个参数为random的种子,基本可忽略。
scala& var rdd = sc.makeRDD(1 to 10,10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21
scala& rdd.collect
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala& var splitRDD = rdd.randomSplit(Array(1.0,2.0,3.0,4.0))
splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23,
MapPartitionsRDD[18] at randomSplit at :23,
MapPartitionsRDD[19] at randomSplit at :23,
MapPartitionsRDD[20] at randomSplit at :23)
//这里注意:randomSplit的结果是一个RDD数组
scala& splitRDD.size
res8: Int = 4
//由于randomSplit的第一个参数weights中传入的值有4个,因此,就会切分成4个RDD,
//把原来的rdd按照权重1.0,2.0,3.0,4.0,随机划分到这4个RDD中,权重高的RDD,划分到//的几率就大一些。
//注意,权重的总和加起来为1,否则会不正常
scala& splitRDD(0).collect
res10: Array[Int] = Array(1, 4)
scala& splitRDD(1).collect
res11: Array[Int] = Array(3)
scala& splitRDD(2).collect
res12: Array[Int] = Array(5, 9)
scala& splitRDD(3).collect
res13: Array[Int] = Array(2, 6, 7, 8, 10)
def glom(): RDD[Array[T]]
该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。
scala& var rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at makeRDD at :21
scala& rdd.partitions.size
res33: Int = 3 //该RDD有3个分区
scala& rdd.glom().collect
res35: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
//glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组问题对人有帮助,内容完整,我也想知道答案
问题没有实际价值,缺少关键内容,没有改进余地
关于scala的,我看的书里spark-shell中
map split 有这样非函数的用法,但实际写到IDEA里貌似就报错了
有没有大神能给解释一下
答案对人有帮助,有参考价值
答案没帮助,是错误的答案,答非所问
split后面加()试试
如果在shell里面可以,在编辑器里面应该也可以
同步到新浪微博
分享到微博?
你好!看起来你挺喜欢这个内容,但是你还没有注册帐号。 当你创建了帐号,我们能准确地追踪你关注的问题,在有新答案或内容的时候收到网页和邮件通知。还能直接向作者咨询更多细节。如果上面的内容有帮助,记得点赞 (????)? 表示感谢。
明天提醒我
关闭理由:
删除理由:
忽略理由:
推广(招聘、广告、SEO 等)方面的内容
与已有问题重复(请编辑该提问指向已有相同问题)
答非所问,不符合答题要求
宜作评论而非答案
带有人身攻击、辱骂、仇恨等违反条款的内容
无法获得确切结果的问题
非开发直接相关的问题
非技术提问的讨论型问题
其他原因(请补充说明)
我要该,理由是:Spark(38)
关键字:Spark算子、Spark RDD基本转换、randomSplit、glom
randomSplit
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
该函数根据weights权重,将一个RDD切分成多个RDD。
该权重参数为一个Double数组
第二个参数为random的种子,基本可忽略。
scala& var rdd = sc.makeRDD(1 to 10,10)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21&scala& rdd.collectres6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
&scala& var splitRDD = rdd.randomSplit(Array(1.0,2.0,3.0,4.0))splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23, MapPartitionsRDD[18] at randomSplit at :23, MapPartitionsRDD[19] at randomSplit at :23, MapPartitionsRDD[20] at randomSplit at :23)&//这里注意:randomSplit的结果是一个RDD数组scala& splitRDD.sizeres8: Int = 4//由于randomSplit的第一个参数weights中传入的值有4个,因此,就会切分成4个RDD,//把原来的rdd按照权重1.0,2.0,3.0,4.0,随机划分到这4个RDD中,权重高的RDD,划分到//的几率就大一些。//注意,权重的总和加起来为1,否则会不正常&scala& splitRDD(0).collectres10: Array[Int] = Array(1, 4)&scala& splitRDD(1).collectres11: Array[Int] = Array(3)
&scala& splitRDD(2).collectres12: Array[Int] = Array(5, 9)&scala& splitRDD(3).collectres13: Array[Int] = Array(2, 6, 7, 8, 10)&
def glom(): RDD[Array[T]]
该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。
scala& var rdd = sc.makeRDD(1 to 10,3)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at makeRDD at :21scala& rdd.partitions.sizeres33: Int = 3
//该RDD有3个分区scala& rdd.glom().collectres35: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))//glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组
更多关于Spark算子的介绍,可参考spark算子系列文章:
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:211984次
积分:3319
积分:3319
排名:第10651名
原创:123篇
转载:97篇
评论:21条
(1)(3)(7)(5)(4)(1)(5)(27)(2)(1)(19)(2)(3)(1)(6)(11)(10)(24)(1)(5)(14)(5)(2)(9)(2)(8)(15)(7)(15)(1)(6)
(window.slotbydup = window.slotbydup || []).push({
id: '4740881',
container: s,
size: '200,200',
display: 'inlay-fix'}

我要回帖

更多关于 spark sql split 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信