1.DataFrame的创建方式

  1.1 通过加载外部文件创建

//通过sqlContext读取json文件创建DataFrame
DataFrame dataFrame=sqlContext.read().json("src/main/resources/datafromcreate.txt");
//通过两种方式加载json文件
//sqlContext.read().json("src/main/resources/datafromcreate.txt");
sqlContext.read().format("json").load("src/main/resources/datafromcreate.txt");

  1.2 通过RDD和元数据进行转换

    1.2.1 通过使用动态构建的元数据的方式创建DataFrame

//创建sqlContext
SQLContext sqlContext=new SQLContext(context);
//使用程序构建DataFrame的元数据
StructType structType=new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
}); //创建studentsRdd
JavaRDD<Row> studentsRdd=context.textFile("src/main/resources/students.txt").map(new Function<String, Row>() { private static final long serialVersionUID = 1L; public Row call(String line) throws Exception {
String[] words=line.split(" ");
return RowFactory.create(Integer.parseInt(words[]),words[],Integer.parseInt(words[]));
}
}); //使用动态构建的元数据创建DataFrame
DataFrame studentDataFrame= sqlContext.createDataFrame(studentsRdd, structType);

    1.2.2 通过反射的方式,使用javabean的属性作为DataFrame的元数据进行创建DataFrame

//封装为Student JavaRDD
JavaRDD<Student> students=context.textFile("src/main/resources/students.txt").map(new Function<String, Student>() { private static final long serialVersionUID = 1L; public Student call(String line) throws Exception {
String[] words=line.split(" ");
return new Student(Integer.parseInt(words[]), words[], Integer.parseInt(words[]));
}
}); //使用反射技术,将javaRdd转换为DataFrame,使用javabean的属性定义DataFrame的元数据
DataFrame studentDataFrame= sqlContext.createDataFrame(students, Student.class);

    1.2.3 使用hiveContext.table方法将hive表中的数据装换为DataFrame

DataFrame goodStudentDF=hiveContext.table("sqark.good_student_info");

  1.3 加载分区表的parquet文件,自动推断分区字段

//加载parquet文件为DataFrame
DataFrame usersDF=sqlContext.read().parquet("src/main/resources/parquet/users.parquet"); /**
* root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
*/
usersDF.printSchema(); /**
* 加载区别表中的数据是会自动推断分区列,
  users.parquet只有两个字段name,age;
  female和coutry为分区字段
*/
usersDF=sqlContext.read().parquet("src/main/resources/parquet/female=male/coutry=US/users.parquet"); /**
* root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- female: string (nullable = true)
|-- coutry: string (nullable = true)
*/
usersDF.printSchema();

  1.4 合并分区

    开启合并元数据的两种方式:
      1) sqlContext.read().option("mergeSchema", "true")
       2) SparkConf().set("spark.sql.parquet.mergeSchema", "true")

/**
* megerschema/idandage.txt 中的内容只有id和age两个属性
* megerschema/idandname.txt 中的内容只有id和name两个属性
* 合并以后的元素为id,name,age三个属性
*/
DataFrame personDF=sqlContext.read().option("mergeSchema", "true").format("json").load("src/main/resources/megerschema");
personDF.printSchema();

2.将DataFrame进行保存到外部文件系统

//将DataFrame,默认以parquet类型进行保存,可以使用format修改保存的文件格式
personDF.write().save("src/main/resources/output/persons");
//将DataFrame使用json格式保存
personDF.write().format("json").save("src/main/resources/output/persons");

最新文章

  1. float 对整形的取余运算
  2. (转)CSS中的绝对定位与相对定位定位
  3. Time series database
  4. java线程之——synchronized的注意细节
  5. 【转】ubuntu自动挂载硬盘方法
  6. 通过IP控制登录系统
  7. C#中用PadLeft、PadRight 补足位数
  8. jquery优化引发的思考
  9. ceph 参数说明&lt;转&gt;
  10. layer弹窗插件实战用法小结1—— layer.alert()
  11. MySQL 内连接与外连接
  12. 【转载】npm查看全局安装过的包
  13. Java进程和线程关系及区别
  14. 常见机试题分析Java版
  15. nodejs web API 相关杂项
  16. 【Mysql】事务的四种特性和隔离级别
  17. CentOS7 安装PHP7的redis扩展:
  18. HTTP Security Header Not Detected未检测到HTTP安全标头
  19. JAVA项目常用异常处理情况
  20. tensorflow 指定使用gpu处理,tensorflow占用多个GPU但只有一个在跑

热门文章

  1. for语句基础求和练习
  2. zoj 3777 Problem Arrangement(壮压+背包)
  3. mybatis默认的数据源连接池(PooledDataSource和UnPooledDataSource)
  4. 后台得到jsp提交name属性相同的内容
  5. sql查询静态数据
  6. 将循环结果通过管道 传递给其他命令处理(done |)
  7. dotNET面试(三)
  8. numpy 中的broadcast 机制
  9. memcache、mongodb、redis的对比区别
  10. case when使用的总结