1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.cyf</groupId>
<artifactId>TestStorm</artifactId>
<version>1.0-SNAPSHOT</version> <repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories> <dependencies> <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<!--<scope>provided</scope>-->
<version>0.9.5</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.5</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>0.8.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.cyf.StormTopologyDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

2  MyLocalFileSpout.java

package kfk;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import org.apache.commons.lang.StringUtils; import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; /**
* Created by Administrator on 2019/2/19.
*/
public class MyLocalFileSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private BufferedReader bufferedReader; //初始化方法
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
try {
this.bufferedReader = new BufferedReader(new FileReader("/root/1.log"));
// this.bufferedReader = new BufferedReader(new FileReader("D:\\1.log"));
} catch (FileNotFoundException e) {
e.printStackTrace();
} } //循环调用的方法
//Storm实时计算的特性就是对数据一条一条的处理 public void nextTuple() {
//每调用一次就会发送一条数据出去
try {
String line = bufferedReader.readLine(); if (StringUtils.isNotBlank(line)) {
List<Object> arrayList = new ArrayList<Object>();
arrayList.add(line);
collector.emit(arrayList);
}
} catch (IOException e) {
e.printStackTrace();
} } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("juzi"));
}
}

3 MySplitBolt.java

package kfk;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values; /**
* Created by Administrator on 2019/2/19.
*/
public class MySplitBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { //1.数据如何获取
byte[] juzi = (byte[]) tuple.getValueByField("bytes");
//2.进行切割
String[] strings = new String(juzi).split(" ");
//3.发送数据
for (String word : strings) {
basicOutputCollector.emit(new Values(word, 1));
}
} public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "num"));
}
}

4  MyWordCountAndPrintBolt.java

package kfk;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis; import java.util.HashMap;
import java.util.Map; /**
* Created by Administrator on 2019/2/19.
*/
public class MyWordCountAndPrintBolt extends BaseBasicBolt { private Map<String, String> wordCountMap = new HashMap<String, String>();
private Jedis jedis; @Override
public void prepare(Map stormConf, TopologyContext context) {
//连接redis——代表可以连接任何事物
jedis=new Jedis("127.0.0.1",6379);
super.prepare(stormConf, context);
} public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String word = (String) tuple.getValueByField("word");
Integer num = (Integer) tuple.getValueByField("num"); //1查看单词对应的value是否存在
Integer integer = wordCountMap.get(word)==null?0:Integer.parseInt(wordCountMap.get(word)) ;
if (integer == null || integer.intValue() == 0) {
wordCountMap.put(word, num+"");
} else {
wordCountMap.put(word, (integer.intValue() + num)+"");
}
//2.打印数据
// System.out.println(wordCountMap);
//保存数据到redis
//redis key wordcount:Map
jedis.hmset("wordcount",wordCountMap);
} public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
}

5 StormTopologyDriver.java

package kfk;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts; /**
* Created by Administrator on 2019/2/21.
*/
public class StormTopologyDriver {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
//1准备任务信息
TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("KafkaSpout", new KafkaSpout(new SpoutConfig(new ZkHosts("mini1:2181"),"wordCount","/wc","wc")));
topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("KafkaSpout");
topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt()).shuffleGrouping("bolt1"); //2任务提交
//提交给谁,提交什么内容
Config config=new Config();
StormTopology stormTopology=topologyBuilder.createTopology(); //本地模式
LocalCluster localCluster=new LocalCluster();
localCluster.submitTopology("wordcount",config,stormTopology); //集群模式
// StormSubmitter.submitTopology("wordcount",config,stormTopology);
}
}

6 TestRedis.java

package kfk;

import redis.clients.jedis.Jedis;

import java.util.Map;

/**
* Created by Administrator on 2019/2/25.
*/
public class TestRedis {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379); Map<String, String> wordcount = jedis.hgetAll("wordcount");
System.out.println(wordcount);
}
}

在mini1的/root/apps/kafka目录下

创建topic

bin/kafka-topics.sh --create --zookeeper mini1: --replication-factor  --partitions  --topic wordCount

生产数据

bin/kafka-console-producer.sh --broker-list mini1: --topic wordCount

启动 StormTopologyDriver.java

运行 redis-cli.exe

启动TestRedis.java

最新文章

  1. 完全移除TFS2013的版本控制
  2. HTTP2试用小记
  3. Asp.net Mvc 身份验证、异常处理、权限验证(拦截器)实现代码
  4. C/C++宏中#与##的讲解
  5. ZedBoard 引脚约束参考
  6. 【58测试】【贪心】【离散】【搜索】【LIS】【dp】
  7. Insert select 带选择复制一张表到另一张表
  8. oracle第一章
  9. light oj 1148 - Mad Counting
  10. VM的Linux CentOS系统的VMTools的手动安装
  11. hdu 5493 Queue(线段树)
  12. MyEclipse使用经验归纳
  13. Java 枚举详解
  14. java程序设计-算术表达式的运算
  15. MySQL集群PXC的搭建
  16. jQuery的 ready() 和原生 Js onload() 的主要区别:
  17. 【20190223】HTTP-知识点整理:HTTPS
  18. BZOJ2212 [Poi2011]Tree Rotations 线段树合并 逆序对
  19. [U3D Demo] 手机FPS射击游戏
  20. MySQL中Checkpoint技术

热门文章

  1. C# 连接 Oracle,读取Blob字段数据,存到文件中去,包括pdf等等
  2. Minikube-Kubernetes本地环境进行开发
  3. Web 前端开发代码规范(基础)
  4. 正则表达说明—Pattern API
  5. MFC技术积累——基于MFC对话框类的那些事儿
  6. Linux OpenGL 实践篇-13-geometryshader
  7. 使用python批量建立文件
  8. [Vue warn]: Failed to mount component: template or render function not defined.解决方案
  9. 如何移除 Navicat Premium for Mac 的所有文件
  10. c++ 将输入存储到数组,然后反转数组,最后输出