环境:

  1、kafka+zookeeper

  2、window平台

  3、eclipse

设置:

1、kafka和zookeeper安装,另一篇有介绍(https://www.cnblogs.com/51python/p/10870258.html

2、eclipse代码(建立maven工程)

  pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>hadoop</groupId>
<artifactId>eclipseandmaven</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging> <name>eclipseandmaven</name>
<url>http://maven.apache.org</url> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies> <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<!-- 本地测试注释集群运行打开 -->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

  主函数

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder; public class MainTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder<String, String> kafkaBuilder = KafkaSpoutConfig.builder("127.0.0.1:9092", "test0811");
// .builder("127.0.0.1:9092,node-2:9092,node-3:9092", "test0811");
// 设置kafka属于哪个组
kafkaBuilder.setGroupId("testgroup");
// 创建kafkaspoutConfig
KafkaSpoutConfig<String, String> build = kafkaBuilder.build();
// 通过kafkaspoutConfig获得kafkaspout
KafkaSpout<String, String> kafkaSpout = new KafkaSpout<String, String>(build);
// 设置5个线程接收数据
builder.setSpout("kafkaSpout", kafkaSpout, 5);
// 设置2个线程处理数据
builder.setBolt("printBolt", new PrintBolt(), 2).localOrShuffleGrouping("kafkaSpout");
Config config = new Config();
if (args.length > 0) {
// 集群提交模式
config.setDebug(false);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
// 本地测试模式
config.setDebug(true);
// 设置2个进程
config.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafkaSpout", config, builder.createTopology());
}
}
}

 storm输出

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple; public class PrintBolt extends BaseBasicBolt {
/**
* execute会被storm一直调用
*
* @param tuple
* @param basicOutputCollector
*/
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
// 为了便于查看消息用err标红
System.err.println(tuple.getValue(4));
System.err.println(tuple.getValues());
} public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
}

3、运行

  1)启动zookeeper

zkserver

  2)启动kafka服务(在D:\bigdata\kafka_2.11-0.9.0.1安装目录打开cmd)

.\bin\windows\kafka-server-start.bat .\config\server.properties  

  3)创建主题(在D:\bigdata\kafka_2.11-0.9.0.1\bin\windows安装目录打开cmd)

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test0811

  4)创建生产者(在D:\bigdata\kafka_2.11-0.9.0.1\bin\windows安装目录打开cmd)

kafka-console-producer.bat --broker-list localhost:9092 --topic test0811 

  5)启动主函数

  运行eclipse主函数

结果:

  在4中的cmd窗口输入字符串,会在eclipse中收到。

这是单机版,后面会做多机通信,敬请期待!

参考:https://blog.csdn.net/qq_41455420/article/details/79385566

最新文章

  1. NSURLCache
  2. HTML5 aria- and role
  3. EntityFramework Core 学习笔记 —— 创建模型
  4. 知识总结提炼-AP模块
  5. 部门树形结构,使用Treeview控件显示部门
  6. dom 拖拽回放
  7. iOS 获取设备的ip地址
  8. java匿名对象
  9. [解决]Windows Server 2012 不能安装IE版的Flash
  10. 为JQuery EasyUI 表单组件增加“焦点切换”功能
  11. 移动端 H5图片裁剪插件,内置简单手势操作
  12. Linuxc - 多c文件程序编译执行
  13. 箱型图boxplot函数的使用
  14. SSM-网站前台博客系统制作(2)---完善版Google的Kaptcha
  15. Python开发——9.面向对象编程
  16. 自然语言处理之jieba分词
  17. android studio中编译单个文件
  18. MVC输出缓存(OutputCache参数详解)
  19. linux环境下安装qt过程
  20. 【转】js生成接口请求参数签名加密

热门文章

  1. js-学习方法
  2. SQL优化--使用 EXISTS 代替 IN 和 inner join来选择正确的执行计划
  3. 备份-泛函编程(23)-泛函数据类型-Monad
  4. Node.js常用express方法
  5. 平衡二叉树(Self-balancing Binary Search Tree)
  6. 构造函数和初始化表、this指针与常函数、析构函数、拷贝构造与拷贝赋值(day05)
  7. ExtJs之Ext.Model的MemoryProxy
  8. 【ACM】nyoj_7_街区最短路径问题_201308051737
  9. 洛谷—— P1962 斐波那契数列
  10. HDU 1238