本文共 16651 字,大约阅读时间需要 55 分钟。
http://spark.apache.org
官方网站都有。目前支持Java,Scala, Python,R。推荐使用Java和Scala,spark2中对python的支持不够好。
验证spark集群是否可用
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --num-executors 1 --driver-memory 500m --executor-memory 500m --executor-cores 1 lib/spark-examples-1.5.2-hadoop2.5*.jar 10验证结果如下:Pi is roughly 3.1411
用scala编写完简单的运算后,可以在localhost:4040上查看
运行spark有两种:
bin/spark-submit --master local --class org.apache.spark.examples.SparkPi lib/spark-examples-1.5.2-hadoop2.6.0-cdh5.4.4.jar 5
本地运行会直接输出结果
bin/spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi lib/spark-examples-1.5.2-hadoop2.6.0-cdh5.4.4.jar 5
而分布式运行的结果在
cd $HADOOP_HOME/logs/userlogs/
scala 运行例子
val lines = sc.textFile("hdfs://master:9000/input0917/qqFriend.txt")val lineLengths = lines.map(s => s.length)val totalLength = lineLengths.reduce((a, b) => a + b)输出:totalLength: Int = 188
第一种方式是通过读取本地或者hdfs上的文件创建RDD
sc.textFile("hdfs://")
第二种方式是通过并行化的方式创建RDD. 其实就是通过我们自己取模拟数据
val str=Array("you jump","I jump")val list = Array(1,2,3,4,5,6)val listadd = sc.parallelize(list) 可以看到返回值就是RDD,当然可以调用RDD中的函数,比如reduce算子等listadd.reduce(_+_)
大多数方式是使用第一种
Spark支持两种RDD操作: transformation and action.
transformation 会针对已有的RDD创建一个新的RDD,而action则主要对RDD进行最后的操作。transformation只是记录了对RDD的操作,并不会触发spark程序的执行,只有当transform之后接着一个action操作,那么所有的transformation才会执行。比如val file=sc.textFile("hdfs://hadoop1:9000/hello.txt").flatMap(line => line.split("\t"))回车之后并没有触发spark的执行,因为flatMap等属于transformation操作等到file.collect()后会看到spark的执行,collect是action操作
具体可以参考编程指南:
那为什么要有transformation和action呢?Spark可以通过这种lazy属性,来进行底层的spark应用程序的优化,避免过多的中间结果。
练习如下,注意eclipse的使用。
- 将鼠标放在方法上,ctrl+1 来选择返回值类型 - 鼠标选中区域, ctrl+shift+/ 来进行多行注释 - 当实现某个方法,查看错误信息,提示添加内部方法,添加即可
map和flatmap的区别:
map与mappartitions的区别:
map迭代的是RDD中的每一个元素,而mappartiontions迭代的一个分区。如果在映射过程中频繁创建额外的对象,那么mappartitions比map高效。但也需要注意内存,因为内存可能不够用,比分区小。
比如在迭代元素的时候要将10000条结果存到mysql数据库,如果用map需要创建10000个额外的connection,用map partitions可能只需要创建10个, 对每个分区建立连接。
下面是不同算子做实验的代码示例:
repartition coalesce 重新进行分区 宽以来 窄依赖 shuffle
那什么时候用repartition呢?比如: filter之后,partition的量会减少,比如之前是100 partitions,对应的需要起100 个tasks,现在filter后变成了50个,此时要是100个tasks就浪费了,可以repartition到50。repartition分区会进行shuffle操作package com.ecaoyng.spark;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.List;import org.apache.cassandra.cli.CliParser.newColumnFamily_return;import org.apache.cassandra.thrift.Cassandra.system_add_column_family_args;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.VoidFunction;import org.stringtemplate.v4.compiler.STParser.mapExpr_return;import scala.Tuple2;public class Trans { /** * repartition will do shuffle */ public static void repartition(){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("repartition"); JavaSparkContext sc = new JavaSparkContext(conf); Listlist1 = Arrays.asList(1,2,3,4,5,6); JavaRDD list1RDD = sc.parallelize(list1); JavaRDD repartition = list1RDD.repartition(2); repartition.foreach(new VoidFunction () { public void call(Integer arg0) throws Exception { System.out.println(arg0); } }); } /** * output: * Hello1Hello2Hello3Hello4Hello5Hello6 */ public static void mapPartitions(){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("mapParititons"); JavaSparkContext sc = new JavaSparkContext(conf); List list1 = Arrays.asList(1,2,3,4,5,6); JavaRDD list1RDD = sc.parallelize(list1, 2); /** * return value is string */ JavaRDD mapPartitions = list1RDD.mapPartitions(new FlatMapFunction , String>() { /* * process each partition each time * partition1: 123 * partition2: 456 * */ public Iterable call(Iterator t) throws Exception { ArrayList arrayList = new ArrayList (); while (t.hasNext()) { Integer i = (Integer) t.next(); arrayList.add("Hello" + i); } return arrayList; } }); mapPartitions.foreach(new VoidFunction () { public void call(String arg0) throws Exception { System.out.println(arg0); } }); } /* * di ka er ji * output:1 a1 b1 c2 a2 b2 c3 a3 b3 c */ public static void cartesian(){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("cartesian"); JavaSparkContext sc = new JavaSparkContext(conf); List list1 = Arrays.asList(1,2,3); List list2 = Arrays.asList("a","b","c"); JavaRDD list1RDD = sc.parallelize(list1); JavaRDD list2RDD = sc.parallelize(list2); JavaPairRDD cartesian = list1RDD.cartesian(list2RDD); cartesian.foreach(new VoidFunction >() { public void call(Tuple2 t) throws Exception { System.out.println(t._1 + " " + t._2); } }); }/* qu chong * output:416352 */ public static void distinct(){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("distinct"); JavaSparkContext sc = new JavaSparkContext(conf); List list1 = Arrays.asList(1,2,3,4,4,5,5,6,6); JavaRDD listRDD = sc.parallelize(list1); JavaRDD distinct = listRDD.distinct(); distinct.foreach(new VoidFunction () { public void call(Integer t) throws Exception { System.out.println(t); } }); } /* * jiao ji * output: 43 */ public static void interSection() { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("union"); JavaSparkContext sc = new JavaSparkContext(conf); List list1 = Arrays.asList(1,2,3,4); List list2 = Arrays.asList(3,4,5,6); JavaRDD list1RDD = sc.parallelize(list1); JavaRDD list2RDD = sc.parallelize(list2); JavaRDD intersection = list1RDD.intersection(list2RDD); intersection.foreach(new VoidFunction () { public void call(Integer t) throws Exception { System.out.println(t); } }); } /* * bingji, bu qu chong */ public static void union() { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("union"); JavaSparkContext sc = new JavaSparkContext(conf); List list1 = Arrays.asList(1,2,3,4); List list2 = Arrays.asList(3,4,5,6); JavaRDD list1RDD = sc.parallelize(list1); JavaRDD list2RDD = sc.parallelize(list2); JavaRDD union = list1RDD.union(list2RDD); union.foreach(new VoidFunction () { public void call(Integer t) throws Exception { System.out.println(t); } }); } /* * ID: 1Name: [Tom, Tom]Score: [30, 70]ID: 3Name: [Chirs]Score: [60, 90]ID: 2Name: [Jerry]Score: [40, 60] * */ public static void cogroup() { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("cogroup"); JavaSparkContext sc = new JavaSparkContext(conf); List > stu = Arrays.asList( new Tuple2 (1, "Tom"), new Tuple2 (2, "Jerry"), new Tuple2 (3, "Chirs"), new Tuple2 (1, "Tom") ); List > score = Arrays.asList( new Tuple2 (1, 30), new Tuple2 (2, 40), new Tuple2 (3, 60), new Tuple2 (1, 70), new Tuple2 (2, 60), new Tuple2 (3, 90) ); JavaPairRDD stuRDD = sc.parallelizePairs(stu); JavaPairRDD scoreRDD = sc.parallelizePairs(score);// <2,tuple > JavaPairRDD , Iterable >> cogroupRDD = stuRDD.cogroup(scoreRDD); cogroupRDD.foreach(new VoidFunction ,Iterable >>>() { public void call( Tuple2 , Iterable >> t) throws Exception { System.out.println("ID: " + t._1); System.out.println("Name: " + t._2._1); System.out.println("Score: " + t._2._2); } }); } /* * output:ID: 1Name: TomScore: 30==================>ID: 3Name: ChirsScore: 60==================>ID: 2Name: JerryScore: 40==================> */ public static void join(){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("join"); JavaSparkContext sc = new JavaSparkContext(conf); List > stu = Arrays.asList( new Tuple2 (1, "Tom"), new Tuple2 (2, "Jerry"), new Tuple2 (3, "Chirs") ); List > score = Arrays.asList( new Tuple2 (1, 30), new Tuple2 (2, 40), new Tuple2 (3, 60) ); JavaPairRDD stuRDD = sc.parallelizePairs(stu); JavaPairRDD scoreRDD = sc.parallelizePairs(score);// Integer:id, Tuple2 : name,score JavaPairRDD > joined = stuRDD.join(scoreRDD); joined.foreach(new VoidFunction >>() { public void call(Tuple2 > t) throws Exception { System.out.println("ID: " + t._1); System.out.println("Name: " + t._2._1); System.out.println("Score: " + t._2._2); System.out.println("==================>"); } }); } public static void sortByKey(){ } /* calculate sum by each key * output:PBCS:95TDCS:120 */ public static void reduceByKey(){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("reduceByKey"); JavaSparkContext sc = new JavaSparkContext(conf); List > asList = Arrays.asList( new Tuple2 ("TDCS", 30), new Tuple2 ("TDCS", 40), new Tuple2 ("TDCS", 50), new Tuple2 ("PBCS", 40), new Tuple2 ("PBCS", 55) ); JavaPairRDD parallelizePairsRDD = sc.parallelizePairs(asList); /* * input: Integer, Integer * output: sum:Integer * scala.reduceByKey(_+_) */ parallelizePairsRDD.reduceByKey(new Function2 () { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }).foreach(new VoidFunction >() { public void call(Tuple2 sum) throws Exception { System.out.println(sum._1 + ":" + sum._2); } }); } /* * src: key-value * Tuple2 in scala is similar with map in Java * output:PBCSTerryShawn======================================>TDCSTomJerryChris * */ public static void groupByKey(){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("groupByKey"); JavaSparkContext sc = new JavaSparkContext(conf); List > list = Arrays.asList( new Tuple2 ("TDCS", "Tom"), new Tuple2 ("TDCS", "Jerry"), new Tuple2 ("TDCS", "Chris"), new Tuple2 ("PBCS", "Terry"), new Tuple2 ("PBCS", "Shawn") );// due to key-value, it should be use parallelizePairs JavaPairRDD parallelizePairsRDD = sc.parallelizePairs(list); JavaPairRDD > groupByKey = parallelizePairsRDD.groupByKey(); groupByKey.foreach(new VoidFunction >>() { public void call(Tuple2 > t) throws Exception { System.out.println(t._1); Iterator iterator = t._2.iterator(); while (iterator.hasNext()) { System.out.println(iterator.next()); } System.out.println("======================================>"); } }); } /* * flatmap: {You jump I jump} * split each word into a sigle line */ public static void flatMap(){ SparkConf conf = new SparkConf();// if no set on conf.setMaster, the default setting will use distributed mode. conf.setMaster("local"); conf.setAppName("flatmap"); JavaSparkContext sc = new JavaSparkContext(conf); List list = Arrays.asList("you jump","I jump"); JavaRDD listRDD = sc.parallelize(list);// U refers to the return value of method: FlatMapFunction JavaRDD flatMap = listRDD.flatMap(new FlatMapFunction () { public Iterable call(String t) throws Exception { return Arrays.asList(t.split(" ")); } }); flatMap.foreach(new VoidFunction () { public void call(String line) throws Exception { System.out.println(line); } }); } /* * Map() test: say hello to each items within a array */ public static void map() { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("map"); JavaSparkContext sc = new JavaSparkContext(conf); List list = Arrays.asList("Tom","Jerry","USA","China"); JavaRDD listRDD = sc.parallelize(list);// R refers to the retuen value of new Function JavaRDD map = listRDD.map(new Function () { public String call(String str) throws Exception { return "hello " + str; } });// action opts map.foreach(new VoidFunction () { public void call(String str) throws Exception { System.out.println(str); } }); } /* * fliter(): get odds from dataset */ public static void fliter(){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("map"); JavaSparkContext sc_fliter = new JavaSparkContext(conf); List fliter_list = Arrays.asList(1,2,3,4,5,6,7,8,9); JavaRDD fliterRDD = sc_fliter.parallelize(fliter_list); JavaRDD filter = fliterRDD.filter(new Function () { public Boolean call(Integer num) throws Exception { return num %2 ==0; } }); filter.foreach(new VoidFunction () { public void call(Integer num) throws Exception { System.out.println(num); } }); } public static void main(String[] args) {// map();// fliter();// flatMap();// groupByKey();// reduceByKey();// join();// cogroup();// union();// interSection();// distinct();// cartesian();// mapPartitions(); repartition(); }}
当你缓存了一个RDD,每个节点都缓存了RDD的所有分区,这样就可以在内存中进行计算,速度更快(提高10倍)。
可以对希望缓存的RDD进行cache或者persist方法进行标记。她通过动作操作第一次在Rdd上进行计算后,它会被缓存在节点的内存中。Spark的缓存具有容错性,如果RDD的某个分区丢失,它会自动使用最初创建RDD的转换操作进行重新计算。建议:
之外,OFF_HEAP (experimental) 是将RDD存到tachyon上。
一个开源的基于jvm的内存分布式文件系统,介于计算层和存储层之间,简单的理解为存储层在内存内的缓存系统。
为何会出现tachyon?以内存替换磁盘,就能明显的减少延时,所以涌现出很多基于内存的计算工具,比如spark计算框架。
spark运行在jvm中,spark的任务会将数据存入jvm的堆中,随着计算的迭代,jvm堆中存放的数据量迅速增大,对于spark而言,spark的计算引擎和存储引擎处在同一个jvm中,所以会有重复的gc方面的开销,增大了系统的延时
因基于内存的分布式计算框架有以上的问题,那么就促使了内存分布式文件系统的诞生, 比如tachyon。
广播变量
举例: 之前 val a的时候需要将a拷贝好多份到每个task上,如果a大道1个G,就浪费好多空间。现在有了broadcast var 就将变量拷到
累加变量
按照之前的习惯写sum会出错,一种新的声明方式
只需要在spark下的conf目录下的spark-env.sh 中配置Hadoop_conf_dir(hadoop的位置)即可。让spark通过hadoop的配置文件找到yarn.
之前用standalone模式需要启动master和worker,现在用yarn模式不需要启动master和worker,只需要启动hdfs和yarn即可
start-dfs.sh, jps -> 3个服务start-yarn.sh
之后spark的开发和之前一样,只需要在提交代码的时候指定运行的模式
具体请参考官方文档:# Run on a YARN clusterexport HADOOP_CONF_DIR=XXX./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ # can be client for client mode --executor-memory 20G \ --num-executors 50 \ /path/to/examples.jar \ 1000
yarn提交也分为client和cluster模式
cluster
注意executor会向driver(application master)
client:
建议:
举例,先在本地运行如下代码,成功在本地生成union的并集:
package com.ecaoyng.spark;import java.util.Arrays;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;public class SaveAsText { public static void Save(){ SparkConf conf = new SparkConf(); conf.setMaster("local[3]"); // 因为是本地运行 conf.setAppName("SaveAsTextFile"); JavaSparkContext sc = new JavaSparkContext(conf); Listlist1 = Arrays.asList(1,2,3,4,5); List list2 = Arrays.asList(4,5,6,7,8); JavaRDD list1RDD = sc.parallelize(list1); JavaRDD list2RDD = sc.parallelize(list2); JavaRDD unionRDD = list1RDD.union(list2RDD); unionRDD.repartition(1).saveAsTextFile("/home/zkpk/union"); # 注意是在本地磁盘的目录下生成union文件夹 } public static void main(String[] args) { Save(); }}
测试通过后,因为要放到cluster上运行,所以将conf.setMaster注释掉。之后用eclipse将jar包导出
下面是提交的代码:/home/zkpk/spark-1.5.2-bin-2.5.2/bin/spark-submit \ --class com.ecaoyng.spark.SaveAsText \ --master yarn \ --deploy-mode cluster \ --executor-memory 100M \ --num-executors 1 \ /home/zkpk/output/SaveUnion.jar \
注意里面不要出现注释,否则可能会出错。如果发现参数错误可以spark-submit –help 查看哪些是可以运行在cluster上的参数。 输出的结果放在hdfs上。当然事先需要设置export SPARK_CONF_DIR=/home/zkpk/hadoop-2.5.2 或者在spark的配置中的spark-env.sh中配置
也可以在web中查看 提交的进度窄依赖,父RDD的每个分区只被一个子RDD的分区依赖。
宽依赖,父RDD的分区被多个子RDD的分区所依赖。本例中前面三个RDD都不涉及到其他的节点,但是真实环境是需要其他节点的参与的。shuffle阶段需要大量的磁盘IO,序列化与反序列化,网络数据的传输,所以spark很大的性能损耗都在shuffle上,所以有必要进行调优。
shuffle的发展经历了三个阶段:1.2,1.3, 1.5之后。分别取名:
[未经优化的HashshuffleManager]
每一个map task生成的buffer和file和 reduce的数量有关,可以联想MR中的partition,可以看到生成的碎文件太多,产生的中间文件数量如图
[经过优化的HashshuffleManager]
[现在在使用的是sort-based shuffle manage ]
shuffle 配置参数地址:
正常情况使用的是普通运行机制,但是当shuffle read task小于200(默认值)时就启用bypass机制
因为合并成了一个磁盘文件,所以中间文件将被删除
bypass模式
未完待续