1. map(func) 

 >>> a = sc.parallelize(('a', 'b', 'c'))
>>> a.map(lambda x: x+'').collect()
['a1', 'b1', 'c1']

2. filter(func) 

 >>> a = sc.parallelize(range(10))
>>> a.filter(lambda x: x%2==0).collect() # 选出0-9的偶数
[0, 2, 4, 6, 8]

3. flatMap(func) 

 >>> l = ['I am Tom', 'She is Jenny', 'He is Ben']
>>> a = sc.parallelize(l,3)
>>> a.flatMap(lambda line: line.split()).collect() # 将每个字符串中的单词划分出来
['I', 'am', 'Tom', 'She', 'is', 'Jenny', 'He', 'is', 'Ben']

4. mapPartitions(func) 

 >>> def squareFunc(a):
. . . for i in a:
. . . yield i*i
. . .
>>> a = sc.parallelize(range(10), 3)
PythonRDD[1] at RDD at PythonRDD.scala:48
>>> a.mapPartitions(squareFunc).collect()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

5. mapPartitionsWithIndex(func) 

 >>> def func(index, iterator):  # 返回每个分区的编号和数值
. . . yield (‘index ‘ + str(index) + ’ is: ‘ + str(list(iterator)))
. . .
>>> a = sc.parallelize(range(10),3)
>>> a.mapPartitionsWithIndex(func).collect()
['index 0 is: [0, 1, 2]', 'index 1 is: [3, 4, 5]', 'index 2 is: [6, 7, 8, 9]']
>>> def squareIndex(index, iterator): # 返回每个数值所属分区的编号和数值的平方
... for i in iterator:
... yield ("The index is: " + str(index) + ", and the square is: " + str(i*i))
>>> a.mapPartitionsWithIndex(squareIndex).collect()
['The index is: 0, and the square is: 0',
'The index is: 0, and the square is: 1',
'The index is: 1, and the square is: 4',
'The index is: 1, and the square is: 9',
'The index is: 1, and the square is: 16',
'The index is: 2, and the square is: 25',
'The index is: 2, and the square is: 36',
'The index is: 3, and the square is: 49',
'The index is: 3, and the square is: 64',
'The index is: 3, and the square is: 81']

6. sample(withReplacementfractionseed) 

 >>> data = sc.parallelize(range(1,101),2)
>>> sample = data.sample(True, 0.2)
>>> sampleData.count()
>>> sampleData.collect()
[16, 19, 24, 29, 32, 33, 44, 45, 55, 56, 56, 57, 65, 65, 73, 83, 84, 92, 96]


7. union(otherDataset) 

 >>> data1 = sc.parallelize(range(10))
>>> data2 = sc.parallelize(range(6,15))
>>> data1.union(data2).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]

8. intersection(otherDataset) 

 >>> data1 = sc.parallelize(range(10))
>>> data2 = sc.parallelize(range(6,15))
>>> data1.intersection(data2).collect()
[8, 9, 6, 7]

9. distinct([numTasks]) 

 >>> data1 = sc.parallelize(range(10))
>>> data2 = sc.parallelize(range(6,15))
>>> data1.union(data2).distinct().collect()
[0, 8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]

(Ward_CODE, Ward_NAME, TotalWardPupils, Ward2Sec_Flow_No., Secondary_School_URN, Secondary_School_Name, Pupil_count) 

 >>> school = sc.textFile("file:///home/yang/下载/school.csv")
Data = sc.textFile("file:///home/yang/下载/school.csv")
>>> school.count() # 共有16796行数据
>>> import re # 引入python的正则表达式包
>>> rows = school.map(lambda line: re.subn(',[\s]+',': ', line))

注意:1. 从本地读取数据时,代码中要通过 “file://” 前缀指定读取本地文件。Spark shell 默认是读取 HDFS 中的文件,需要先上传文件到 HDFS 中,否则会有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/school.csv”的错误。 
2. 对数据集进行了一下预处理,利用正则匹配替换字符串,由于一些学校的名字的字符串中本身含有逗号,比如“The City Academy, Hackney”, 此时如果利用csv的分隔符’,’进行分割,并不能将名字分割为“The City Academy”和“Hackney”。我们注意到csv的分隔符逗号后边是没有空格的,而名字里边的逗号后边都会有空格(英语书写习惯),因此,先利用re.subn语句对逗号后边含有至少一个空格(正则表达式为’,[\s]+’)的子字符串进行替换,替换为’: ’,然后再进行后续操作。以上即为对这一数据集的预处理过程。

10. groupByKey([numTasks]) 
作用于由键值对(K, V)组成的数据集上,将Key相同的数据放在一起,返回一个由键值对(K, Iterable)组成的数据集。 
注意:1. 如果这一操作是为了后续在每个键上进行聚集(aggregation),比如sum或者average,此时使用reduceByKey或者aggregateByKey的效率更高。2. 默认情况下,输出的并行程度取决于RDD分区的数量,但也可以通过给可选参数numTasks赋值来调整并发任务的数量。

 >>> newRows = rows.map(lambda r: r[0].split(','))
>>> ward_schoolname = newRows .map(lambda r: (r[1], r[5])).groupByKey() # r[1]为ward的名字,r[5]为学校的名字
>>> ward_schoolname.map(lambda x: {x[0]: list(x[1])}).collect() # 列出每个ward区域内所有的学校的名字
[{'Stifford Clays': ['William Edwards School', 'Brentwood County High School', "The Coopers' Company and Coborn School", 'Becket Keys Church of England Free School', ...]
# 输出结果为在Stifford Clays这个ward里的学校有William Edwards School,Brentwood County High School,The Coopers' Company and Coborn School等等...

11. reduceByKey(func, [numTasks]) 
作用于键值对(K, V)上,按Key分组,然后将Key相同的键值对的Value都执行func操作,得到一个值,注意func的类型必须满足

 >>> pupils = newRows.map(lambda r: (r[1], int(r[6])))  # r[1]为ward的名字,r[6]为每个学校的学生数
>>> ward_pupils = pupils.reduceByKey(lambda x, y: x+y) # 计算各个ward中的学生数
>>> ward_pupils.collect() # 输出各个ward中的学生数
[('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526),
('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835),
('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585),
('Camberwell Green', 1374), ('Glyndon', 4633),...]

12. aggregateByKey(zeroValueseqOpcomOp, [numTasks]) 
在于键值对(K, V)的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seqOp函数的参数,进行计算,返回的结果作为一个新的键值对(K, V),然后再将结果按照key进行合并,最后将每个分组的value传递给comOp函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给comOp函数,以此类推),将key与计算结果作为一个新的键值对(K, V)输出。 
例子: 上述统计ward内学生人数的操作也可以通过aggregateByKey实现,此时,seqOpcomOp都是进行加法操作,代码如下:

 >>> ward_pupils = pupils.aggregateByKey(0, lambda x, y: x+y, lambda x, y: x+y)
>>> ward_pupils.collect()
[('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526),
('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835),
('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585),
('Camberwell Green', 1374), ('Glyndon', 4633),...]

13. sortByKey([ascending=True], [numTasks]) 

 >>> ward_pupils.sortByKey(False, 4).take(10)
[('Yiewsley', 2560), ('Wormholt and White City', 1455), ('Woodside', 1204),
('Woodhouse', 2930), ('Woodcote', 1214), ('Winchmore Hill', 1116), ('Wilmington', 2243),
('Willesden Green', 1896), ('Whitefoot', 676), ('Whalebone', 2294)]

14. join(otherDataset, [numTasks]) 
类似于SQL中的连接操作,即作用于键值对(K, V)和(K, W)上,返回元组 (K, (V, W)),spark也支持外连接,包括leftOuterJoin,rightOuterJoin和fullOuterJoin。例子:

 >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
>>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
>>> class1.join(class2).collect()
[('Tom', ('attended', 'attended'))]
>>> class1.leftOuterJoin(class2).collect()
[('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None))]
>>> class1.rightOuterJoin(class2).collect()
[('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]
>>> class1.fullOuterJoin(class2).collect()
[('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None)), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]

15. cogroup(otherDataset, [numTasks]) 
作用于键值对(K, V)和(K, W)上,返回元组 (K, (Iterable, Iterable))。这一操作可叫做groupWith。

 >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
>>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
>>> group = class1.cogroup(class2)
>>> group.collect()
[('John', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808afd0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a1d0>)),
('Tom', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a7f0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a048>)),
('Jenny', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a9b0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a208>)),
('Bob', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808ae80>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b448d0>)),
('Amy', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44c88>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44588>)),
('Alice', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44748>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44f98>))]
>>> group.map(lambda x: {x[0]: [list(x[1][0]), list(x[1][1])]}).collect()
[{'John': [[], ['attended']]}, {'Tom': [['attended'], ['attended']]}, {'Jenny': [['attended'], []]}, {'Bob': [['attended'], []]}, {'Amy': [[], ['attended']]}, {'Alice': [[], ['attended']]}]

16. cartesian(otherDataset) 
笛卡尔乘积,作用于数据集T和U上,返回(T, U),即数据集中每个元素的两两组合

 >>> a = sc.parallelize(('a', 'b', 'c'))
>>> b = sc.parallelize(('d', 'e', 'f'))
>>> a.cartesian(b).collect()
[('a', 'd'), ('a', 'e'), ('a', 'f'), ('b', 'd'), ('b', 'e'), ('b', 'f'), ('c', 'd'), ('c', 'e'), ('c', 'f')]

17. pipe(command, [envVars]) 

18. coalesce(numPartitions) 

19. repartition(numPartitions) 

20. repartitionAndSortWithinPartitions(partitioner) 


