import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{SaveMode, DataFrame}
import scala.collection.mutable.ArrayBuffer
import main.asiainfo.coc.tools.Configure
import org.apache.spark.sql.hive.HiveContext
import java.sql.DriverManager
import java.sql.Connection

1 连接前台数据源 查询前台MYSQL中的数据

val DIM_COC_INDEX_INFO_DDL = s"""
CREATE TEMPORARY TABLE DIM_COC_INDEX_INFO
USING org.apache.spark.sql.jdbc
OPTIONS (
url '${mySQLUrl}',
dbtable 'DIM_COC_INDEX_INFO'
)""".stripMargin sqlContext.sql(DIM_COC_INDEX_INFO_DDL)
val DIM_COC_INDEX_INFO = sql("SELECT * FROM DIM_COC_INDEX_INFO").cache()

  

2   在A表中筛选出 B表中获取的TARGET_TABLE_CODE 然后再按照DATA_SRC_CODE排序,查询出源表的集合

val sources = DIM_COC_INDEX_INFO.filter("TARGET_TABLE_CODE ='"+TARGET_TABLE_CODE+"'")
.select("DATA_SRC_CODE").groupBy("DATA_SRC_CODE").agg(DIM_COC_INDEX_INFO("DATA_SRC_CODE")).collect

3 将表进行关联

resultIndexTableDF = resultIndexTableDF.join(SOURCE_TABLE,ALL_USERS.col(ALL_USER_JOIN_COLUMN_NAME) === SOURCE_TABLE.col(SOURCE_TABLE_JOIN_COLUMN_NAME),"left_outer")
resultIndexTableDF.dtypes.foreach(println)

4 根据条件筛选

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,CI_MDA_SYS_TABLE("TABLE_ID") === CI_MDA_SYS_TABLE_COLUMN("TABLE_ID"),"inner")
.join(CI_LABEL_EXT_INFO,CI_MDA_SYS_TABLE_COLUMN("COLUMN_ID") === CI_LABEL_EXT_INFO("COLUMN_ID"),"inner")
.join(CI_LABEL_INFO,CI_LABEL_EXT_INFO("LABEL_ID") === CI_LABEL_INFO("LABEL_ID"),"inner")
.join(CI_APPROVE_STATUS,CI_LABEL_INFO("LABEL_ID") === CI_APPROVE_STATUS("RESOURCE_ID"),"inner")
.filter(CI_APPROVE_STATUS("CURR_APPROVE_STATUS_ID") === CI_APPROVE_STATUS_SUCCESS_CODE
and (CI_LABEL_INFO("DATA_STATUS_ID") === 1 || CI_LABEL_INFO("DATA_STATUS_ID") === 2)
and (CI_LABEL_EXT_INFO("COUNT_RULES_CODE") isNotNull //TODO trim.length>0
)
and CI_MDA_SYS_TABLE("UPDATE_CYCLE") === TABLE_DATA_CYCLE
).cache()

5 根据某字段对表进行排序

    val labelTargetTables = labels.groupBy("CI_MDA_SYS_TABLE.TABLE_ID","CI_MDA_SYS_TABLE.TABLE_NAME").agg(labels("CI_MDA_SYS_TABLE.TABLE_ID"),labels("CI_MDA_SYS_TABLE.TABLE_NAME")).collect

6 创建parquet格式的表 可使用schema.生成到指定的schema.

        sqlContext.sql("create table "+labelTargetTableName+" stored as parquet as select * from default."+labelTargetTableNameJson)

7 保存数据格式,可以指定生成的格式

 resultLabelTable.saveAsTable(tableName = labelTargetTableName, source="parquet", mode=SaveMode.Overwrite)

8 根据筛选查询出相应数据,由于cache方法并不属于action操作,接下来的操作需要这一步所执行的数据信息,所以这里使用collect方法,再执行遍历方法

      val r0000Labels = labelInThisTargetTable.filter("COUNT_RULES_CODE = 'R_00000'").select("CI_LABEL_INFO.LABEL_ID","COLUMN_NAME").collect
for(r0000Label <- r0000Labels){
........
}

  

  

  

最新文章

  1. HDU 1712 ACboy needs your help(分组背包)
  2. sql条件为空查询全部,不为空按条件查询以及多条件筛选查询。
  3. C++编写DLL的方法
  4. ARCGIS Server 发布服务时出现的问题解决
  5. HDU 1231 最大连续子序列(水题)
  6. SpringMVC java.lang.IllegalStateException: Neither BindingResult nor plain target object for bean name
  7. socket、webService、RMI ?
  8. 35.3wCF编程
  9. POJ 2456 Aggressive cows
  10. cocos2d-x3.0rc 版 设置模拟器窗体大小
  11. JAVA中的super和this关键字的使用
  12. C#高级编程学习一-----------------第五章泛型
  13. js中判定this的规则
  14. python入门(3)python的解释器
  15. 判定程序员等级,HashMap就够了
  16. Python爬虫实战四之抓取淘宝MM照片
  17. BZOJ3578:GTY的人类基因组计划2(集合hash,STL)
  18. node基础:文件系统-文件读取
  19. HDU 1700 Points on Cycle (几何 向量旋转)
  20. JVM内存限制和调整

热门文章

  1. scala言语基础学习十二
  2. Attention and Augmented Recurrent Neural Networks
  3. 论文笔记之:Attention For Fine-Grained Categorization
  4. java_linear list
  5. Quarzt.NET 任务调度框架
  6. Knockout.js, Asp.Net MVC and Bootstrap 前端设计
  7. mvn 命令
  8. ORACLE 常用数值函数
  9. SPOJ #692. Fruit Farm
  10. 【Spring学习笔记-MVC-18.1】Spring MVC实现RESTful风格-同一资源,多种展现:xml-json-html