kafka+storm 单机运行
2024-09-08 10:57:46
环境:
1、kafka+zookeeper
2、window平台
3、eclipse
设置:
1、kafka和zookeeper安装,另一篇有介绍(https://www.cnblogs.com/51python/p/10870258.html)
2、eclipse代码(建立maven工程)
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>hadoop</groupId>
<artifactId>eclipseandmaven</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging> <name>eclipseandmaven</name>
<url>http://maven.apache.org</url> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies> <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<!-- 本地测试注释集群运行打开 -->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
主函数
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder; public class MainTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder<String, String> kafkaBuilder = KafkaSpoutConfig.builder("127.0.0.1:9092", "test0811");
// .builder("127.0.0.1:9092,node-2:9092,node-3:9092", "test0811");
// 设置kafka属于哪个组
kafkaBuilder.setGroupId("testgroup");
// 创建kafkaspoutConfig
KafkaSpoutConfig<String, String> build = kafkaBuilder.build();
// 通过kafkaspoutConfig获得kafkaspout
KafkaSpout<String, String> kafkaSpout = new KafkaSpout<String, String>(build);
// 设置5个线程接收数据
builder.setSpout("kafkaSpout", kafkaSpout, 5);
// 设置2个线程处理数据
builder.setBolt("printBolt", new PrintBolt(), 2).localOrShuffleGrouping("kafkaSpout");
Config config = new Config();
if (args.length > 0) {
// 集群提交模式
config.setDebug(false);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
// 本地测试模式
config.setDebug(true);
// 设置2个进程
config.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafkaSpout", config, builder.createTopology());
}
}
}
storm输出
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple; public class PrintBolt extends BaseBasicBolt {
/**
* execute会被storm一直调用
*
* @param tuple
* @param basicOutputCollector
*/
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
// 为了便于查看消息用err标红
System.err.println(tuple.getValue(4));
System.err.println(tuple.getValues());
} public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
}
3、运行
1)启动zookeeper
zkserver
2)启动kafka服务(在D:\bigdata\kafka_2.11-0.9.0.1安装目录打开cmd)
.\bin\windows\kafka-server-start.bat .\config\server.properties
3)创建主题(在D:\bigdata\kafka_2.11-0.9.0.1\bin\windows安装目录打开cmd)
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test0811
4)创建生产者(在D:\bigdata\kafka_2.11-0.9.0.1\bin\windows安装目录打开cmd)
kafka-console-producer.bat --broker-list localhost:9092 --topic test0811
5)启动主函数
运行eclipse主函数
结果:
在4中的cmd窗口输入字符串,会在eclipse中收到。
这是单机版,后面会做多机通信,敬请期待!
参考:https://blog.csdn.net/qq_41455420/article/details/79385566
最新文章
- NSURLCache
- HTML5 aria- and role
- EntityFramework Core 学习笔记 —— 创建模型
- 知识总结提炼-AP模块
- 部门树形结构,使用Treeview控件显示部门
- dom 拖拽回放
- iOS 获取设备的ip地址
- java匿名对象
- [解决]Windows Server 2012 不能安装IE版的Flash
- 为JQuery EasyUI 表单组件增加“焦点切换”功能
- 移动端 H5图片裁剪插件,内置简单手势操作
- Linuxc - 多c文件程序编译执行
- 箱型图boxplot函数的使用
- SSM-网站前台博客系统制作(2)---完善版Google的Kaptcha
- Python开发——9.面向对象编程
- 自然语言处理之jieba分词
- android studio中编译单个文件
- MVC输出缓存(OutputCache参数详解)
- linux环境下安装qt过程
- 【转】js生成接口请求参数签名加密
热门文章
- js-学习方法
- SQL优化--使用 EXISTS 代替 IN 和 inner join来选择正确的执行计划
- 备份-泛函编程(23)-泛函数据类型-Monad
- Node.js常用express方法
- 平衡二叉树(Self-balancing Binary Search Tree)
- 构造函数和初始化表、this指针与常函数、析构函数、拷贝构造与拷贝赋值(day05)
- ExtJs之Ext.Model的MemoryProxy
- 【ACM】nyoj_7_街区最短路径问题_201308051737
- 洛谷—— P1962 斐波那契数列
- HDU 1238