spark的combineByKey
combineByKey的特点
combineByKey的强大之处,在于提供了三个函数操作来操作一个函数。第一个函数,是对元数据处理,从而获得一个键值对。第二个函数,是对键值键值对进行一对一的操作,即一个键值对对应一个输出,且这里是根据key进行整合。第三个函数是对key相同的键值对进行操作,有点像reduceByKey,但真正实现又有着很大的不同。
在中,我们用reduce进行求平均值。用combineByKey我们则可以求比平均值更为丰富的事情。现在有一个数据集,每一行数据包括一个a-z字母和一个整数,其中字母和整数之间以空格分隔。现在要求得每个字母的平均数。这个场景有点像多个学生,每个学生多门成绩,求得学生的平均分。但这里将问题简化,其中数据集放在grades中。数据集以及下面的代码都可以在上下载。
combineByKey求多个平均值
scala实现
import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.{SparkConf, SparkContext}object SparkCombineByKey { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey") val sc = new SparkContext(conf) sc.textFile("./grades").map(line=>{ val splits = line.split(" ") (splits(0),splits(1).toInt) }).combineByKey( value => (value,1), (x:(Int,Int),y)=>(x._1+y,x._2+1), (x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2) ).map(x=>(x._1,x._2._1/x._2._2)).foreach(println) }}复制代码
scala运行结果
(d,338451)(e,335306)(a,336184)(i,346279)(b,333069)(h,334343)(f,341380)(j,320145)(g,334042)(c,325022)复制代码
java实现:
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.sources.In;import scala.Tuple2;public class SparkCombineByKeyJava { public static void main(String[] args){ SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKeyJava"); JavaSparkContext sc = new JavaSparkContext(conf); combineByKeyJava(sc); combineByKeyJava8(sc); } public static void combineByKeyJava(JavaSparkContext sc){ JavaPairRDDsplitData = sc.textFile("./grades").mapToPair(new PairFunction () { @Override public Tuple2 call(String s) throws Exception { String[] splits = s.split(" "); return new Tuple2<>(splits[0],Integer.parseInt(splits[1])); } }); splitData.combineByKey(new Function >() { @Override public Tuple2 call(Integer integer) throws Exception { return new Tuple2<>(integer, 1); } }, new Function2 , Integer, Tuple2 >() { @Override public Tuple2 call(Tuple2 integerIntegerTuple2, Integer integer) throws Exception { return new Tuple2<>(integerIntegerTuple2._1 + integer, integerIntegerTuple2._2 + 1); } }, new Function2 , Tuple2 , Tuple2 >() { @Override public Tuple2 call(Tuple2 integerIntegerTuple2, Tuple2 integerIntegerTuple22) throws Exception { return new Tuple2<>(integerIntegerTuple2._1+integerIntegerTuple22._1,integerIntegerTuple2._2+integerIntegerTuple22._2); } }).map(new Function >, Tuple2 >() { @Override public Tuple2 call(Tuple2 > stringTuple2Tuple2) throws Exception { return new Tuple2<>(stringTuple2Tuple2._1,stringTuple2Tuple2._2._1*1.0/stringTuple2Tuple2._2._2); } }).foreach(new VoidFunction >() { @Override public void call(Tuple2 stringDoubleTuple2) throws Exception { System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2); } }); } public static void combineByKeyJava8(JavaSparkContext sc){ JavaPairRDD splitData = sc.textFile("./grades").mapToPair(line -> { String[] splits = line.split(" "); return new Tuple2<>(splits[0],Integer.parseInt(splits[1])); }); splitData.combineByKey( x->new Tuple2<>(x,1), (x,y)->new Tuple2<>(x._1+y,x._2+1), (x,y)->new Tuple2<>(x._1+y._1,x._2+y._2) ).map(x->new Tuple2(x._1,x._2._1*1.0/x._2._2)).foreach(x->System.out.println(x._1+" "+x._2)); }}复制代码
java运行结果
d 338451.6e 335306.7480769231a 336184.95321637427i 346279.497029703b 333069.8589473684h 334343.75f 341380.94444444444j 320145.7618069815g 334042.37605042016c 325022.4183673469复制代码
分析
在开始python之前,我们先观察java和scala两个程序。我们发现java7的代码非常冗余,而java8和scala则相比起来非常干净利落。当然,我们难说好坏,但是这也表现出当代语言开始从繁就简的一个转变。到了python这一特点就体现的更加淋漓尽致。
但我们不光说语言,我们分析这个求平均的实现方式,由于java中对数值做了一个处理,因此有保留小数,而scala则没有,但至少可以判断两者的结果是一致的。当然,这不是重点,重点是,这个combinByKey非常复杂,有三个函数。我们很难观察到每个过程做了什么。因此我们在这里,对scala程序进行进一步的输出,从而观察combineByKey到底做了什么。
scala修改
import org.apache.spark.{SparkConf, SparkContext}object SparkCombineByKey { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey") val sc = new SparkContext(conf) sc.textFile("./grades").map(line=>{ val splits = line.split(" ") (splits(0),splits(1).toInt) }).combineByKey( value => { println("这是第一个函数") println("将所有的值遍历,并放在元组中,标记1") println(value) (value,1) }, (x:(Int,Int),y)=>{ println("这是第二个函数") println("将x中的第一个值进行累加求和,第二个值加一,求得元素总个数") println("x:"+x.toString()) println("y:"+y) (x._1+y,x._2+1) }, (x:(Int,Int),y:(Int,Int))=>{ (x._1+y._1,x._2+y._2) } ).map(x=>(x._1,x._2._1/x._2._2)).foreach(println) }}复制代码
得到结果
这是第一个函数将所有的值遍历,并放在元组中,标记1222783这是第一个函数将所有的值遍历,并放在元组中,标记148364这是第一个函数将所有的值遍历,并放在元组中,标记1204950这是第一个函数将所有的值遍历,并放在元组中,标记1261777.........这是第二个函数将x中的第一个值进行累加求和,第二个值加一,求得元素总个数x:(554875,2)y:357748这是第二个函数将x中的第一个值进行累加求和,第二个值加一,求得元素总个数x:(912623,3)y:202407这是第一个函数将所有的值遍历,并放在元组中,标记148608这是第二个函数将x中的第一个值进行累加求和,第二个值加一,求得元素总个数x:(1115030,4)y:69003这是第一个函数将所有的值遍历,并放在元组中,标记1476893.........(d,338451)(e,335306)(a,336184)(i,346279)(b,333069)(h,334343)(f,341380)(j,320145)(g,334042)(c,325022)复制代码
这里我们发现了,函数的顺序并不先全部执行完第一个函数,再执行第二个函数。而是分区并行,即第一个分区执行完第一个函数,并不等待其他分区执行完第一个函数,而是紧接着执行第二个函数,最后在第三个函数进行处理。在本地单机下,该并行特点并不能充分发挥,但在集群环境中,各个分区在不同节点计算,然后处理完结果汇总处理。这样,当数据量十分庞大时,集群节点数越多,该优势就表现地越明显。
此外还有一个非常值得关注的特点,当我们把foreach(println)这句话去掉时
foreach(println)复制代码
我们运行程序,发现程序没有任何输出。这是由于spark的懒加载特点,spark只用在对数据执行具体操作时,如输出、保存等才会执行计算。这看起来有点不合理,但实际上这样做在很多场景下能大幅度提升效率,但如果没有处理好,可能会导致spark每次执行操作都会从头开始计算该过程。因此当一个操作结果需要被频繁或者多次调用的时候,我们应该将结果存下来。
python实现
from pyspark import SparkConf,SparkContextconf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")sc = SparkContext(conf=conf)sc.textFile("./grades")\ .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\ .combineByKey( lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])).map(lambda x:(x[0],x[1][0]/x[1][1])).foreach(print)复制代码
得到结果
('b', 333069.8589473684)('f', 341380.94444444444)('j', 320145.7618069815)('h', 334343.75)('a', 336184.95321637427)('g', 334042.37605042016)('d', 338451.6)('e', 335306.7480769231)('c', 325022.4183673469)复制代码
spark的sortByKey
sortByKey进行排序
sortByKey非常简单,也非常常用。这里依然采用上述文本,将处理后的结果,进行排序,得到平均值最大的字母。在实际运用中我们这里可以看成求得按照成绩排序,或者按照姓名排序。
scala实现
import org.apache.spark.{SparkConf, SparkContext}object SparkSortByKey { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey") val sc = new SparkContext(conf) val result = sc.textFile("./grades").map(line=>{ val splits = line.split(" ") (splits(0),splits(1).toInt) }).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2) ).map(x=>(x._1,x._2._1/x._2._2)) //按照名字排序,顺序 result.sortByKey(true).foreach(println) //按照名字排序,倒序 result.sortByKey(false).foreach(println) val result1 = sc.textFile("./grades").map(line=>{ val splits = line.split(" ") (splits(0),splits(1).toInt) }).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2) ).map(x=>(x._2._1/x._2._2,x._1)) //按照成绩排序,顺序 result1.sortByKey(true).foreach(println) //按照成绩排序,倒序 result1.sortByKey(false).foreach(println) }}复制代码
python实现
from pyspark import SparkConf,SparkContextconf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")sc = SparkContext(conf=conf)result = sc.textFile("./grades")\ .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\ .combineByKey( lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])).map(lambda x:(x[0],x[1][0]/x[1][1]))result.sortByKey(True).foreach(print)result.sortByKey(False).foreach(print)result1 = sc.textFile("./grades")\ .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\ .combineByKey( lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])).map(lambda x:(x[1][0]/x[1][1],x[0]))result1.sortByKey(True).foreach(print)result1.sortByKey(False).foreach(print)复制代码
得到结果
(a,336184)(b,333069)(c,325022)(d,338451)(e,335306)(f,341380)(g,334042)(h,334343)(i,346279)(j,320145)(j,320145)(i,346279)(h,334343)(g,334042)(f,341380)(e,335306)(d,338451)(c,325022)(b,333069)(a,336184)(320145,j)(325022,c)(333069,b)(334042,g)(334343,h)(335306,e)(336184,a)(338451,d)(341380,f)(346279,i)(346279,i)(341380,f)(338451,d)(336184,a)(335306,e)(334343,h)(334042,g)(333069,b)(325022,c)(320145,j)复制代码
数据集以及代码都可以在上下载。