一、sparkContext与sparkSession区别

任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数,sparkContext只能在driver机器上面启动;
SparkSession: SparkSession实质上是SQLContext和HiveContext的组合,SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成

val conf: SparkConf = new SparkConf().setAppName("test")
val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

二、repartition与coalesce区别

repartition一般是用来增加分区数(当然也可以减少),coalesce只能用来减少分区数。所以如果不介意保存的文件块大小不一样,可以使用coalesce来减少分区数,保存的时候一个分区就会生成一个文件块

三、Scala常用方法

1. StringBuilder

主要用于字符串的拼接,可作用于生成倒排序列,如:
val userItemScore = sc.parallelize(List((, , 0.8), (, , 0.7), (, , 0.5), (, , 0.9)))
userItemScore.map(x => (x._1, (x._2.toString, x._3.toString))).groupByKey()
.map{x =>
val userid = x._1
val item_score_list = x._2
val tmp_arr = item_score_list.toArray.sortWith(_._2 > _._2)
val watch_len = tmp_arr.length
val strbuf = new StringBuilder() for (i <- until watch_len - ) {
strbuf ++= tmp_arr(i)._1
strbuf.append(":")
strbuf ++= tmp_arr(i)._2
strbuf.append(" ")
}
strbuf ++= tmp_arr(watch_len - )._1
strbuf.append(":")
strbuf ++= tmp_arr(watch_len - )._2 userid + "\t" + strbuf
}.collect()

2. scala.collection.mutable.ArrayBuffer

相当于是一个大小可变数组,把需要的值添加进来,例如:
val tmpArray = new ArrayBuffer[String]()
val tmpArray = new ArrayBuffer[Int]()
val tmpArray = new ArrayBuffer[(String, Int)]()
scala> tmpArray.append(("wangzai", ))
scala> tmpArray
res11: scala.collection.mutable.ArrayBuffer[(String, Int)] = ArrayBuffer((wangzai,), (test,)) tmpArray.indexOf(("test",))为获取当前值的索引,返回类型为整型
tmpArray.slice(tmpArray.indexOf(("test", )), tmpArray.length)为切片,返回类型为ArrayBuffer

四、通过spark-shell来操作数据库中的表

1 启动(通过--jars指定包,后面reids包不需要,只是演示添加多个包的用法)

/xxx/spark/bin/spark-shell \
--master spark://xxx:7077 \
--executor-cores \
--total-executor-cores \
--driver-memory 2g \
--jars /xxx/jars/mysql-connector-java-5.1..jar,/xxx/jars/jedis-2.9..jar

2 在命令行中输入::paste, 然后粘贴以下代码,最后ctrl+D退出之后,即可执行


import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkConf
val conf: SparkConf = new SparkConf()
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val mysqlUrl: String = "jdbc:mysql://ip:port/database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
val productTable: String = "product_info"
val orderTable: String = "order_info"
val properties: Properties = new Properties()
properties.put("user", user)
properties.put("password", password)


// 获取同事购配置表数据
val productDF: DataFrame = spark.read.jdbc(mysqlUrl, productTable, properties).select("id", "name")
val orderDF: DataFrame = spark.read.jdbc(mysqlUrl, orderTable, properties).select("product_id", "createTime")


val totalDataDF = productDF.join(orderDF, orderDF("product_id") === productDF("id")).drop("id")
//如果product_info对应的id为product_id,即关联id字段名不相同
//val totalDataDF = productDF.join(orderDF, Seq("product_id"))

3 把该DateFrame注册为临时表才能通过spark-sql操作

totalDataDF.createOrReplaceTempView("totalDataDF")

五、spark-sql的基本操作

//默认显示20条数据
scala> df.show()
//打印模式信息
scala> df.printSchema()
//选择多列
scala> df.select(df("name"),df("age")+).show()
// 条件过滤
scala> df.filter(df("age") > ).show()
// 分组聚合
scala> df.groupBy("age").count().show()
// 排序
scala> df.sort(df("age").desc).show()
//多列排序
scala> df.sort(df("age").desc, df("name").asc).show()
//对列进行重命名
scala> df.select(df("name").as("username"),df("age")).show()
//对多个列重命名
scala> df.withColumnRenamed("id", "userId").withColumnRenamed("name", "userName")

最新文章

  1. C和指针 第五章 逻辑位移与算术位移
  2. OpenMP之数值积分(求圆周率Pi)(sections)
  3. SQL中 char、varchar、text 和 nchar、nvarchar、ntext的区别
  4. orientationchange不管用啊
  5. 迁移ADT/ANT构建的Android项目至Gradle,a walk through。
  6. SQLServer分页
  7. .NET开源项目 TOP 25
  8. 开涛spring3(9.1) - Spring的事务 之 9.1 数据库事务概述
  9. linux下权限问题思考
  10. 逆向-攻防世界-maze
  11. 柳叶刀重磅出击!全外显子测序在胎儿结构异常的评估Whole-exome sequencing in the evaluation of fetal structural anomalies: a prospective cohort study
  12. css 选择器/table属性/type 属性
  13. 分金币 [CQOI 2011] [BZOJ 3293]
  14. SLICK基础
  15. [CF920G]List Of Integers
  16. 阿里云oss如何上传一个文件夹
  17. 【LINK】手机Web开发框架
  18. C++宏定义不受命名空间的约束
  19. [转]StarWind模拟iSCSI设备
  20. Java中如何读写cookie (二)

热门文章

  1. java插入代码块
  2. 开发中常用linux命令
  3. HTML5 - websocket的应用 之 简易聊天室
  4. Java编程思想之十二 通过异常处理错误
  5. shell(三)if流程控制
  6. HP Client Security Manager
  7. winform 通用自动更新程序
  8. jvm jdk jre 关系
  9. QFIL软件烧写镜像
  10. Java里 equals 和 == 以及 hashcode