package com.example.demo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession; public class DemoApplication { public static void main(String[] args) { // /*-----------------------线上调用方式--------------------------*/
// 读入店铺id数据
SparkSession spark = SparkSession.builder().appName("demo_spark").enableHiveSupport().getOrCreate();
Dataset<Row> vender_set = spark.sql("select pop_vender_id from app.app_sjzt_payout_apply_with_order where dt = '2019-08-05' and pop_vender_id is not null");
System.out.println( "数据读取 OK" ); JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
// JavaSparkContext sc = new JavaSparkContext();
SQLContext sqlContext = new SQLContext(sc); // 将数据去重,转换成 List<Row> 格式
vender_set = vender_set.distinct();
vender_set = vender_set.na().fill(0L);
JavaRDD<Row> vender= vender_set.toJavaRDD();
List<Row> vender_list = vender.collect(); // 遍历商家id,调用jsf接口,创建list 保存返回数据
List<String> list_temp = new ArrayList<String>();
for(Row row:vender_list) {
String id = row.getString(0);
String result = service.venderDownAmountList(id); System.out.println( "接口调用返回值 OK" ); // 解析json串 ,按照JSONObject 和 JSONArray 一层一层解析 并过返回滤数据
JSONObject jsonOBJ = JSON.parseObject(result);
JSONArray data = jsonOBJ.getJSONArray("data");
if (data != null) {
JSONObject data_all = data.getJSONObject(0);
double amount = data_all.getDouble("jfDownAmount");
// 将商家id 和 倒挂金额存下来
list_temp.add("{\"vender_id\":"+id+",\"amount\":"+amount+"}");
}
else {
continue;
} System.out.println( "解析 OK" ); }
// list 转为 RDD
JavaRDD<String> venderRDD = sc.parallelize(list_temp); // 注册成表
Dataset<Row> vender_table = sqlContext.read().json(venderRDD);
vender_table.registerTempTable("vender");
System.out.println( "注册表 OK" ); // 写入数据库
spark.sql("insert overwrite table dev.dev_jypt_vender_dropaway_amount select vender.vender_id,vender.amount from vender");
System.out.println( "写入数据表 OK" ); sc.stop();
System.out.println( "Hello World!" ); }
}

  

最新文章

  1. EBS 11i 的工作流列表
  2. Java线程池使用说明
  3. [PCL]FPFH描述子
  4. linux 定时器编程实例(完善中).....
  5. EF中使用SQL语句或存储过程
  6. 银联接口测试——详细(JAVA)
  7. boost-内存管理
  8. Java [Leetcode 83]Remove Duplicates from Sorted List
  9. [转]chrome技术文档列表
  10. 获取鼠标点击相对于Canva位置的2种方法
  11. 初学 Python(十三)——匿名函数
  12. 深入浅出了解OCR识别票据原理
  13. Linxu基础知识:终端、终端模拟器、shell
  14. Python_copy_深浅拷贝
  15. python 减少可调用对象的参数个数
  16. PHP复制文件夹及文件夹内的文件
  17. Python -- xlrd,xlwt,xlutils 读写同一个Excel
  18. BASIC-5_蓝桥杯_查找整数
  19. Ubuntu双网卡不双待攻略
  20. 百度地图js lite api 支持点聚合

热门文章

  1. 【SpringMVC】统一异常处理
  2. layui表单
  3. RestFramework之权限组件
  4. 记录java+testng运行selenium(四)--- 运行代码
  5. Jenkins配置文件
  6. Mac上使用brew安装Nginx服务器
  7. TG2
  8. [HDU 5608]Function(莫比乌斯反演 + 杜教筛)
  9. bootstrap之collapse
  10. 54、servlet3.0-ServletContainerInitializer