一、Spark 初始化
Spark初始化主要是要创建一个SprakContext实例,该实例表示与spark集群的连接。可以通过多种方式创建。
SparkContext
直接使用SparkContext类创建一个spark上下文,主要参数是指定master和appName。
from pyspark import SparkContext
sc = SprakContext(master = 'local[*]',appName='test')
SparkConf
还可以通过调用SparkConf配置类来生成spark上下文。
from pyspark import SparkConf, SprakContext
conf = SparkConf().setMaster('local').setAppName('test')
sc = SparkContext(conf=conf)
二、创建RDD
RDD是spark中的主要数据格式,名称为弹性分布式数据集,可以序列化python对象来得到RDD,或者读取文件。
1、序列化
# parallelize方法序列化python对象为RDD
rdd = sc.parallelize([('a', 7), ('a', 2), ('b', 2)])
rdd1 = sc.parallelize([2,5,1,8])
rdd2 = sc.parallelize([('a', 2), ('d', 1), ('b', 1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([('a', ['x', 'y', 'z']), ('b', ['p', 'r'])])
2、获取RDD信息
基本信息
1. # 获取rdd的分区
Rdd.getNumpartitions()
2. # 获取rdd的key
Rdd.keys().collects()
编写代码结果如下:
['a', 'a', 'b']
3. # 获取rdd的value
Rdd.values().collects()
编写代码结果如下:
[7, 2, 2]
3、统计信息
统计信息包含了基本的统计计算值,如最大值、最小值、平均数、描述统计等。
4. # rdd3求和
Rdd3.sum()
编写代码结果如下:
4950
5. # rdd3最大值
Rdd3.max()
编写代码结果如下:
99
处理RDD
切片/collect
6. # 获取rdd里的所有元素,返回list
Rdd.collect()
编写代码结果如下:
[('a', 7), ('a', 2), ('b', 2)]
7. # 获取降序排序的前3个元素
Rdd3.top(3)
编写代码结果如下:
[99, 98, 97]
计数/count
8. # 统计rdd里的元素个数
Rdd.count()
编写代码结果如下:
3
9. # 按key统计rdd里的元素个数
Rdd.countByKey()
编写代码结果如下:
defaultdict(<class 'int'>, {'a': 2, 'b': 1})
重采样/sample
10. # 对rdd3进行重采样 ,不放会,比例0.1,seed=81
Rdd3.sample(false,0.1,81).collect()
编写代码:
过滤/filter
11. # 根据key过滤,rdd中包含‘a’,筛选(lambda x:'a' in x).。
rdd.filter(lambda x:’a’ in x).collect()
编写代码结果如下:
[('a', 7), ('a', 2)]
去重/distinct
12. # 对rdd5元素去重
Rdd.distinct().collect()
编写代码结果如下:
['a', 7, 2, 'b']
排序/sortBy
13. # rdd1升序排序(默认)
Rdd1.sortBy(lambda x:x).collect()
编写代码结果如下:
[1, 2, 5, 8]
14. # 对键值对rdd2按照key排序
Rdd2.sortBykey().collect()
编写代码结果如下:
[('a', 2), ('b', 1), ('d', 1)]
映射/map
15. # map方法对rdd每个元素应用函数lambda x: x+(x[0],x[1])
Rdd.map(lambda x:x+(x[0],x[1])).collect()
编写代码结果如下:
[('a', 7, 'a', 7), ('a', 2, 'a', 2), ('b', 2, 'b', 2)]
16. # flatMap方法,对rdd每个元素应用函数lambda x: x+(x[0],x[1]),返回的结果会扁平化
rdd.flatMap(lambda x:x+(x[0],x[1])).collect()
编写代码结果如下:
rdd5.collect()
['a', 7, 'a', 7, 'a', 2, 'a', 2, 'b', 2, 'b', 2]
迭代/foreach
17. def g(x):print(x)
rdd.foreach(x)
# foreach方法对所有元素应用函数,对rdd使用foreach应用g(x)函数
编写代码结果如下:
('a', 7)
('a', 2)
('b', 2)
简化/reduce
18. # reduce方法对rdd进行合并,应用函数(lambda x,y:x+y)
Rdd.reduce(lambda x,y:x+y)
编写代码结果如下:
('a', 7, 'a', 2, 'b', 2)
分组/groupBy
19. # groupBy方法对rdd1的元素分组,依据(lambda x:x%2)进行分组。
Rdd1.groupBy(lambda x:x%2).mapValues(list).collect()
编写代码结果如下:
[(0, [2, 8]), (1, [5, 1])]
集合/intersection,union
20. # 两个rdd的交集,求rdd和rdd2的交集
Rdd.intersection(rdd2).collect()
编写代码结果如下:
[('a', 2)]
21. # 两个rdd,rdd和rdd2的并集(包含重复元素)
Rdd.union(rdd2.collect())
编写代码结果如下:
[('a', 7), ('a', 2), ('b', 2), ('a', 2), ('d', 1), ('b', 1)]
保存RDD
22. # 保存rdd到本地保存格式为txt
Rdd.saveAsTextFile(‘rdd.txt’)
关闭spark
23. # 使用stop方法关闭spark context实例
Sc.stop()
运行
进入spark安装目录下,通过sprak-submit命令运行py文件。
./bin/spark-submit example/src/main/python/pi.py