/**
* scala模型的main(args:Array[String])是业务执行入口
* org.apache.spark.{SparkConf, SparkContext}
* val sparkConf =new SparkConf().setAppName(appName)
* val ssc = new StreamingContext(sparkConf, Seconds(batchNum))
* val sc = ssc.sparkContext //如果代码中不用StreamingContextval 只需要SparkContext则new一个如val sc = new SparkContext(sparkConf)
*
* val sqlContext = new HiveContext(sc)//HiveContext是对SQLContext的扩展 val sqlContext = new SQLContext(sc)
* val result:DataFrame = sqlContext.sql(sql)
* //2.0之后HiveContext和SQLContext也可以用SparkSession替换val result =SparkSession.builder().appName("test").config("key","value").getOrCreate().sql(sql)
*
* 项目中一般用json处理,如发送kafka或者格式转换和过滤
* val resultRdd = result.toJSON.rdd.map(x => {
val json = new JSONObject(x)
val computerIp = json.optString("ip", "")
val rowKey = json.optString("name", "")
......
val dataMap = new util.HashMap[String, String]()
dataMap.put("computerip", computerIp)
(rowKey, dataMap)
})
val bhaseRdd = resultRdd.filter(r => {
r._1 != "" && r._1 != null && r._1.length > 0
}).map(line => {
val put = new Put(Bytes.toBytes(line._1)) //rowKey 为参数,拿到put
val key = line._2.keySet().iterator(); //拿到对应的dataMap
while (key.hasNext) {
val k = key.next().toString
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(k), Bytes.toBytes(line._2.get(k)))
}
(new ImmutableBytesWritable(), put)
}) val hadoopconf = sc.hadoopConfiguration
val jobconf = new JobConf(hadoopconf)
jobconf.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
jobconf.setOutputValueClass(classOf[Result])
jobconf.setClass("mapreduce.job.outputformat.class", classOf[TableOutputFormat[ImmutableBytesWritable]],classOf[OutputFormat[ImmutableBytesWritable, Mutation]])
jobconf.set(TableOutputFormat.OUTPUT_TABLE, table) bhaseRdd.saveAsNewAPIHadoopDataset(jobconf) //存入Hasee
*-----------------------------------------------------------------------------------------------------------
* class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, value: String): Unit ={ producer.send(new ProducerRecord(topic, value))
}
} object KafkaSink {
def apply(config: java.util.Map[String, Object]): KafkaSink = {
val f = () => {
val producer = new KafkaProducer[String, String](config)
producer
}
new KafkaSink(f)
}
}
*val kafka = sc.broadcast(KafkaSink(Configs.kafka_props))
*selectDatas.toJSON.rdd.foreach(x => {
val json = new JSONObject(x)
kafka.value.send(topic, json.toString)
})
*//发送topic
*-------------------------------------------------------------------
* val kafkaStream= KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
* kafkaStream.foreachRDD(rdd =>{
* rdd.foreach(data=> {
* //消费kafka
*/

  

												

最新文章

  1. 窥探Vue.js 2.0
  2. SpringMVC
  3. Apache与Nginx区别
  4. WinForm------GridControl的部分属性介绍
  5. NSIS脚本入门和进阶方法
  6. loj 1429(可相交的最小路径覆盖)
  7. IntelliJ IDEA 16 本地LicenseServer激活(破解)
  8. python通过win32api轻松获取控件的属性值
  9. C#部分---类、异常保护;
  10. web前端常用小函数汇总
  11. iOS多线程常用类说明--备用参考
  12. Bzoj 1976: [BeiJing2010组队]能量魔方 Cube 最小割,最大流
  13. mybati的存储过程
  14. JAVA中this用法小结[转]
  15. Candy----HDU4465----数学题
  16. C#使用IHttpModule接口修改http输出的方法浅谈
  17. 使用jquery ajaxForm提交表单
  18. std::shared_ptr<void>的工作原理
  19. Android开发学习之路--网络编程之初体验
  20. JS生成随机数进行循环匹配数组

热门文章

  1. 面试刷题11:java系统中io的分类有哪些?
  2. IntegerCache缓存占用堆、栈、常量池的问题,自动拆装箱的基本概念,Integer==int时的问题说明
  3. QQ小程序支付
  4. Mysql数据库设置权限
  5. Django ajax的简单使用、自定义分页器
  6. 动态规划(Dynamic Programming)算法与LC实例的理解
  7. 基于zookeeper实现分布式锁和基于redis实现分布所的区别
  8. iOS 真机查看沙盒目录
  9. Matlab——m_map指南(2)
  10. EXPLAIN 查看 SQL 执行计划