博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark入门(六)--Spark的combineByKey、sortBykey
阅读量:7092 次
发布时间:2019-06-28

本文共 9019 字,大约阅读时间需要 30 分钟。

spark的combineByKey

combineByKey的特点

combineByKey的强大之处,在于提供了三个函数操作来操作一个函数。第一个函数,是对元数据处理,从而获得一个键值对。第二个函数,是对键值键值对进行一对一的操作,即一个键值对对应一个输出,且这里是根据key进行整合。第三个函数是对key相同的键值对进行操作,有点像reduceByKey,但真正实现又有着很大的不同。

在中,我们用reduce进行求平均值。用combineByKey我们则可以求比平均值更为丰富的事情。现在有一个数据集,每一行数据包括一个a-z字母和一个整数,其中字母和整数之间以空格分隔。现在要求得每个字母的平均数。这个场景有点像多个学生,每个学生多门成绩,求得学生的平均分。但这里将问题简化,其中数据集放在grades中。数据集以及下面的代码都可以在上下载。

combineByKey求多个平均值

scala实现

import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.{SparkConf, SparkContext}object SparkCombineByKey {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")    val sc = new SparkContext(conf)    sc.textFile("./grades").map(line=>{      val splits = line.split(" ")      (splits(0),splits(1).toInt)    }).combineByKey(      value => (value,1),      (x:(Int,Int),y)=>(x._1+y,x._2+1),      (x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)    ).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)  }}复制代码

scala运行结果

(d,338451)(e,335306)(a,336184)(i,346279)(b,333069)(h,334343)(f,341380)(j,320145)(g,334042)(c,325022)复制代码

java实现:

import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.sources.In;import scala.Tuple2;public class SparkCombineByKeyJava {    public static void main(String[] args){        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKeyJava");        JavaSparkContext sc = new JavaSparkContext(conf);        combineByKeyJava(sc);        combineByKeyJava8(sc);    }    public static void combineByKeyJava(JavaSparkContext sc){        JavaPairRDD
splitData = sc.textFile("./grades").mapToPair(new PairFunction
() { @Override public Tuple2
call(String s) throws Exception { String[] splits = s.split(" "); return new Tuple2<>(splits[0],Integer.parseInt(splits[1])); } }); splitData.combineByKey(new Function
>() { @Override public Tuple2
call(Integer integer) throws Exception { return new Tuple2<>(integer, 1); } }, new Function2
, Integer, Tuple2
>() { @Override public Tuple2
call(Tuple2
integerIntegerTuple2, Integer integer) throws Exception { return new Tuple2<>(integerIntegerTuple2._1 + integer, integerIntegerTuple2._2 + 1); } }, new Function2
, Tuple2
, Tuple2
>() { @Override public Tuple2
call(Tuple2
integerIntegerTuple2, Tuple2
integerIntegerTuple22) throws Exception { return new Tuple2<>(integerIntegerTuple2._1+integerIntegerTuple22._1,integerIntegerTuple2._2+integerIntegerTuple22._2); } }).map(new Function
>, Tuple2
>() { @Override public Tuple2
call(Tuple2
> stringTuple2Tuple2) throws Exception { return new Tuple2<>(stringTuple2Tuple2._1,stringTuple2Tuple2._2._1*1.0/stringTuple2Tuple2._2._2); } }).foreach(new VoidFunction
>() { @Override public void call(Tuple2
stringDoubleTuple2) throws Exception { System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2); } }); } public static void combineByKeyJava8(JavaSparkContext sc){ JavaPairRDD
splitData = sc.textFile("./grades").mapToPair(line -> { String[] splits = line.split(" "); return new Tuple2<>(splits[0],Integer.parseInt(splits[1])); }); splitData.combineByKey( x->new Tuple2<>(x,1), (x,y)->new Tuple2<>(x._1+y,x._2+1), (x,y)->new Tuple2<>(x._1+y._1,x._2+y._2) ).map(x->new Tuple2(x._1,x._2._1*1.0/x._2._2)).foreach(x->System.out.println(x._1+" "+x._2)); }}复制代码

java运行结果

d 338451.6e 335306.7480769231a 336184.95321637427i 346279.497029703b 333069.8589473684h 334343.75f 341380.94444444444j 320145.7618069815g 334042.37605042016c 325022.4183673469复制代码

分析

在开始python之前,我们先观察java和scala两个程序。我们发现java7的代码非常冗余,而java8和scala则相比起来非常干净利落。当然,我们难说好坏,但是这也表现出当代语言开始从繁就简的一个转变。到了python这一特点就体现的更加淋漓尽致。

但我们不光说语言,我们分析这个求平均的实现方式,由于java中对数值做了一个处理,因此有保留小数,而scala则没有,但至少可以判断两者的结果是一致的。当然,这不是重点,重点是,这个combinByKey非常复杂,有三个函数。我们很难观察到每个过程做了什么。因此我们在这里,对scala程序进行进一步的输出,从而观察combineByKey到底做了什么。

scala修改

import org.apache.spark.{SparkConf, SparkContext}object SparkCombineByKey {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")    val sc = new SparkContext(conf)    sc.textFile("./grades").map(line=>{      val splits = line.split(" ")      (splits(0),splits(1).toInt)    }).combineByKey(      value => {        println("这是第一个函数")        println("将所有的值遍历,并放在元组中,标记1")        println(value)        (value,1)      },      (x:(Int,Int),y)=>{        println("这是第二个函数")        println("将x中的第一个值进行累加求和,第二个值加一,求得元素总个数")        println("x:"+x.toString())        println("y:"+y)        (x._1+y,x._2+1)      },      (x:(Int,Int),y:(Int,Int))=>{        (x._1+y._1,x._2+y._2)      }    ).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)  }}复制代码

得到结果

这是第一个函数将所有的值遍历,并放在元组中,标记1222783这是第一个函数将所有的值遍历,并放在元组中,标记148364这是第一个函数将所有的值遍历,并放在元组中,标记1204950这是第一个函数将所有的值遍历,并放在元组中,标记1261777.........这是第二个函数将x中的第一个值进行累加求和,第二个值加一,求得元素总个数x:(554875,2)y:357748这是第二个函数将x中的第一个值进行累加求和,第二个值加一,求得元素总个数x:(912623,3)y:202407这是第一个函数将所有的值遍历,并放在元组中,标记148608这是第二个函数将x中的第一个值进行累加求和,第二个值加一,求得元素总个数x:(1115030,4)y:69003这是第一个函数将所有的值遍历,并放在元组中,标记1476893.........(d,338451)(e,335306)(a,336184)(i,346279)(b,333069)(h,334343)(f,341380)(j,320145)(g,334042)(c,325022)复制代码

这里我们发现了,函数的顺序并不先全部执行完第一个函数,再执行第二个函数。而是分区并行,即第一个分区执行完第一个函数,并不等待其他分区执行完第一个函数,而是紧接着执行第二个函数,最后在第三个函数进行处理。在本地单机下,该并行特点并不能充分发挥,但在集群环境中,各个分区在不同节点计算,然后处理完结果汇总处理。这样,当数据量十分庞大时,集群节点数越多,该优势就表现地越明显。

此外还有一个非常值得关注的特点,当我们把foreach(println)这句话去掉时

foreach(println)复制代码

我们运行程序,发现程序没有任何输出。这是由于spark的懒加载特点,spark只用在对数据执行具体操作时,如输出、保存等才会执行计算。这看起来有点不合理,但实际上这样做在很多场景下能大幅度提升效率,但如果没有处理好,可能会导致spark每次执行操作都会从头开始计算该过程。因此当一个操作结果需要被频繁或者多次调用的时候,我们应该将结果存下来。

python实现

from pyspark import SparkConf,SparkContextconf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")sc = SparkContext(conf=conf)sc.textFile("./grades")\    .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\    .combineByKey(    lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])).map(lambda x:(x[0],x[1][0]/x[1][1])).foreach(print)复制代码

得到结果

('b', 333069.8589473684)('f', 341380.94444444444)('j', 320145.7618069815)('h', 334343.75)('a', 336184.95321637427)('g', 334042.37605042016)('d', 338451.6)('e', 335306.7480769231)('c', 325022.4183673469)复制代码

spark的sortByKey

sortByKey进行排序

sortByKey非常简单,也非常常用。这里依然采用上述文本,将处理后的结果,进行排序,得到平均值最大的字母。在实际运用中我们这里可以看成求得按照成绩排序,或者按照姓名排序。

scala实现

import org.apache.spark.{SparkConf, SparkContext}object SparkSortByKey {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")    val sc = new SparkContext(conf)    val result = sc.textFile("./grades").map(line=>{      val splits = line.split(" ")      (splits(0),splits(1).toInt)    }).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)    ).map(x=>(x._1,x._2._1/x._2._2))    //按照名字排序,顺序    result.sortByKey(true).foreach(println)    //按照名字排序,倒序    result.sortByKey(false).foreach(println)    val result1 = sc.textFile("./grades").map(line=>{      val splits = line.split(" ")      (splits(0),splits(1).toInt)    }).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)    ).map(x=>(x._2._1/x._2._2,x._1))    //按照成绩排序,顺序    result1.sortByKey(true).foreach(println)    //按照成绩排序,倒序    result1.sortByKey(false).foreach(println)  }}复制代码

python实现

from pyspark import SparkConf,SparkContextconf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")sc = SparkContext(conf=conf)result = sc.textFile("./grades")\    .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\    .combineByKey(    lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])).map(lambda x:(x[0],x[1][0]/x[1][1]))result.sortByKey(True).foreach(print)result.sortByKey(False).foreach(print)result1 = sc.textFile("./grades")\    .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\    .combineByKey(    lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])).map(lambda x:(x[1][0]/x[1][1],x[0]))result1.sortByKey(True).foreach(print)result1.sortByKey(False).foreach(print)复制代码

得到结果

(a,336184)(b,333069)(c,325022)(d,338451)(e,335306)(f,341380)(g,334042)(h,334343)(i,346279)(j,320145)(j,320145)(i,346279)(h,334343)(g,334042)(f,341380)(e,335306)(d,338451)(c,325022)(b,333069)(a,336184)(320145,j)(325022,c)(333069,b)(334042,g)(334343,h)(335306,e)(336184,a)(338451,d)(341380,f)(346279,i)(346279,i)(341380,f)(338451,d)(336184,a)(335306,e)(334343,h)(334042,g)(333069,b)(325022,c)(320145,j)复制代码

数据集以及代码都可以在上下载。

转载地址:http://ppsql.baihongyu.com/

你可能感兴趣的文章
菜鸡互啄队 --- 第三周-需求改进&系统设计
查看>>
面向对象的多态
查看>>
配置Apache虚拟目录
查看>>
02.驱动调试
查看>>
C# 动态调用WebService
查看>>
在DataList控件访问子控件的方法
查看>>
EJB 连接DB util -- EntityManagerUtil
查看>>
oracle 创建用户指定表空间 删除用户删除表空间
查看>>
第十周作业
查看>>
Generate Parentheses(组合,回溯)
查看>>
ubuntu桌面进不去,我跪了
查看>>
jquery ui是什么
查看>>
北风设计模式课程---25、模板方法模式
查看>>
js进阶ajax基本用法(创建对象,连接服务器,发送请求,获取服务器传过来的数据)...
查看>>
Android 自动发送邮件
查看>>
mysql 导出,导入数据
查看>>
使用composer下拉组件失败,出现killed解决办法
查看>>
jquery点击按钮显示和隐藏DIv
查看>>
14.菜单和控制栏
查看>>
贪心算法之背包问题
查看>>