Streaming+Sparksql使用sql实时分析 rabbitmq+mongodb+hive
2024-08-31 08:03:57
SparkConf sparkConf = new SparkConf()
//此处使用一个链接切记使用一个链接否则汇报有多个sparkcontext错误
.setAppName("SparkConsumerRabbit")
.setMaster("local[2]")
.set("hive.metastore.uris", thrift)
.set("spark.sql.warehouse.dir", hdfs)
.set("spark.mongodb.input.uri", "mongodb://" + rule.getMUName(jsonStr) + ":" + rule.getMpwd(jsonStr) + "@" + rule.getMIp(jsonStr) + ":" + rule.getMport(jsonStr) + "/" + rule.getMDBName(jsonStr) + "." + rule.getMtable(jsonStr))
.set("spark.mongodb.output.uri", "mongodb://root:123456@192.168.4.51:27010/pachong.test");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//Duration参数秒
//Streaming 方式
JavaStreamingContext jsc = new JavaStreamingContext(sc, Durations.seconds(5));
//hivesql 方式
HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("show databases").show();
hiveContext.sql("use" + " " + db);
//mongodb 方式
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
Map<String, String> params = new HashMap<>();
//map中参数设置,加载map连接rabbit
params.put("hosts", "192.168.7.96");
params.put("port", "5672");
params.put("userName", "admin");
params.put("password", "admin");
params.put("queueName", "cj_ack");
params.put("durable", "false");
Function<QueueingConsumer.Delivery, String> handler = message -> new String(message.getBody());
JavaReceiverInputDStream<String> messages = RabbitMQUtils.createJavaStream(jsc,String.class,params,handler);
messages.print();
最新文章
- Bootstrap<;基础十六>; 导航元素
- 在Android中自定义捕获Application全局异常,可以替换掉系统的强制退出对话框(很有参考价值与实用价值)
- Springmvc mybatis
- find_first_of()和 find_last_of() 【获取路径、文件名】
- hdu 2665 Kth number
- ajax方法完整的事件流
- YAPF:Google开源的Python代码格式化工具
- 无需图片,使用CSS3实现圆角按钮[转]
- js实现跨域(jsonp, iframe+window.name, iframe+window.domain, iframe+window.postMessage)
- c++和c#之间的类型转换
- Java基础知识强化之IO流笔记62:三种方式实现键盘录入
- C#迭代语句
- asp.net验证码及怎么获取里面的数值(整合)
- Swift 2.0 字符串学习笔记(建议掌握OC字符串知识的翻阅)
- MERGE语法详解
- [Swift]LeetCode222. 完全二叉树的节点个数 | Count Complete Tree Nodes
- Spring Boot (十):邮件服务
- 文件拷贝, 使用 BIO,NIO的对比,四种写法性能分析。
- Js_图片轮换
- LINE 不被封锁的技巧