map,filter,flatMap算子

视频教程:

1、优酷

2、YouTube

1、map

map是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。

java:

 package com.bean.spark.trans;

 import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
/**
*
* @author RedBean
*map
*/
public class TraMap {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("map");
System.setProperty("hadoop.home.dir", "D:/tools/spark-2.0.0-bin-hadoop2.6");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> number = Arrays.asList(0,1,2,3,4,5,6,7,8,9);
JavaRDD<Integer> numberRDD = sc.parallelize(number);
JavaRDD<Integer> results = numberRDD.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer s) throws Exception {
// TODO Auto-generated method stub
return s * 5;
}
});
System.out.println(results.collect());
}
}

python:

 # -*- coding:utf-8 -*-

 from __future__ import print_function
from pyspark import SparkConf
from pyspark import SparkContext
import os if __name__ == '__main__':
os.environ['SPARK_HOME'] = 'D:/tools/spark-2.0.0-bin-hadoop2.6'
conf = SparkConf().setAppName('mapTest').setMaster('local')
sc = SparkContext(conf=conf)
data = sc.parallelize([1,2,3,4,5,6])
def myMap(l):
return l * 5
print(data.map(myMap).collect())

2、filter

返回一个新的数据集,由经过func函数后返回值为true的原元素组成

java:

 package com.bean.spark.trans;

 import java.util.Arrays;
import java.util.List; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function; public class TraFilter {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("filter");
System.setProperty("hadoop.home.dir", "D:/tools/spark-2.0.0-bin-hadoop2.6");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> number = Arrays.asList(0,1,2,3,4,5,6,7,8,9);
JavaRDD<Integer> numberRDD = sc.parallelize(number);
JavaRDD<Integer> results = numberRDD.filter(new Function<Integer, Boolean>() { @Override
public Boolean call(Integer s) throws Exception {
// TODO Auto-generated method stub
return s % 2 == 0;
}
});
System.out.println(results.collect());
}
}

python:

 # -*- coding:utf-8 -*-

 from __future__ import print_function
from pyspark import SparkConf
from pyspark import SparkContext
import os if __name__ == '__main__':
os.environ['SPARK_HOME'] = 'D:/tools/spark-2.0.0-bin-hadoop2.6'
conf = SparkConf().setAppName('filterTest').setMaster('local')
sc = SparkContext(conf=conf)
data = sc.parallelize([1,2,3,4,5,6])
def filterFun(l):
return l > 2
print(data.filter(filterFun).collect())

3、flatMap

将一条 rdd数据使用你定义的函数给分解成多条 rdd数据。

java:

 package com.bean.spark.trans;

 import java.util.Arrays;
import java.util.Iterator; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction; public class TraFlatMap {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("FlatMap");
System.setProperty("hadoop.home.dir", "D:/tools/spark-2.0.0-bin-hadoop2.6");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> context = sc.textFile("D:/tools/data/flatMap/flatMap.txt");
JavaRDD<String> results = context.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(s).iterator();
}
});
System.out.println(results.collect()); }
}

python:

 # -*- coding:utf-8 -*-

 from __future__ import print_function
from pyspark import SparkConf
from pyspark import SparkContext
import os if __name__ == '__main__':
os.environ['SPARK_HOME'] = 'D:/tools/spark-2.0.0-bin-hadoop2.6'
conf = SparkConf().setAppName('filterTest').setMaster('local')
sc = SparkContext(conf=conf)
data = sc.parallelize(["Hello World","Spark Hadoop Storm","java python c"])
def flatFun(l):
return l.split(" ")
print(data.flatMap(flatFun).collect())

最新文章

  1. Android开发学习之路-RecyclerView滑动删除和拖动排序
  2. Duilib源码分析(六)整体流程
  3. IOS从视频中获取截图
  4. [Android] emualtor-5554 offline的解决方法
  5. asp.net下ajax.ajaxMethod使用方法
  6. MySQL 插入数据
  7. DELL服务器安装Windows server 2003---解决找不到安装在计算机上的硬盘驱动器 安装无法
  8. itextsharp生成pdf后的直接打印问题
  9. 用HMM(隐马)图解三国杀的于吉“质疑”
  10. 修改Maven仓库路径
  11. vue与avuex
  12. MFC中使用ATL报错:error C4430: missing type specifier - int assumed. Note: C++ does not support default-int
  13. [codechef]SnackDown 2017 Online Elimination Round Prefix XOR
  14. git小乌龟工具TortoiseGit记住你的账号密码
  15. ASP.NET Web API基于OData的增删改查,以及处理实体间关系
  16. ORACLE workflow审批界面显示附件信息和附件的下载链接(转)
  17. FastAdmin 使用 Git 更新的新用法 (2019-02-28)
  18. Linux Guard Service - 杀死守护进程
  19. vm虚拟机安装,配置与使用
  20. RTSP协议

热门文章

  1. 在linux系统中安装VSCode(Visual Studio Code)
  2. 使用topshelf包装redis为windows服务
  3. 【Win 10 应用开发】共享目标(UWP)
  4. 【Win10 应用开发】自定义应用标题栏
  5. Android线程管理之ThreadPoolExecutor自定义线程池
  6. clipChildren和clipToPadding
  7. 多线程映射工具——ThreadLocal
  8. C# 给PDF文件添加水印
  9. JQuery中ajax的相关方法总结
  10. logstash日志分析的配置和使用