先模拟产生一些数据

我把这些数据摘一部分下来

 2017-06-10 18:25:56,092 [main] [org.apache.kafka.common.utils.AppInfoParser] [INFO] - Kafka version : 0.9.0.1
2017-06-10 18:25:56,092 [main] [org.apache.kafka.common.utils.AppInfoParser] [INFO] - Kafka commitId : 23c69d62a0cabf06
{"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":3936,"platform":"ios","timestamp":1497090356094}
{"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":6824,"platform":"android","timestamp":1497090356194}
{"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":9389,"platform":"ios","timestamp":1497090356294}
{"id":"865456863256326","vid":"1495267869123452","uid":"964226522333222","gold":3054,"platform":"ios","timestamp":1497090356394}
{"id":"865456863256329","vid":"1495267869123454","uid":"964226522333224","gold":1518,"platform":"android","timestamp":1497090356494}
{"id":"865456863256324","vid":"1495267869123452","uid":"964226522333222","gold":7668,"platform":"ios","timestamp":1497090356594}
{"id":"865456863256321","vid":"1495267869123454","uid":"964226522333224","gold":1665,"platform":"android","timestamp":1497090356694}
{"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":1727,"platform":"ios","timestamp":1497090356794}
{"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":6371,"platform":"ios","timestamp":1497090356894}
{"id":"865456863256328","vid":"1495267869123452","uid":"964226522333222","gold":495,"platform":"android","timestamp":1497090356994}
{"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":7543,"platform":"ios","timestamp":1497090417094}
{"id":"865456863256322","vid":"1495267869123454","uid":"964226522333224","gold":1901,"platform":"android","timestamp":1497090417194}
{"id":"865456863256329","vid":"1495267869123452","uid":"964226522333222","gold":8043,"platform":"ios","timestamp":1497090417294}
{"id":"865456863256321","vid":"1495267869123452","uid":"964226522333222","gold":9325,"platform":"ios","timestamp":1497090417394}
{"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":4408,"platform":"android","timestamp":1497090417494}
{"id":"865456863256320","vid":"1495267869123450","uid":"964226522333220","gold":8715,"platform":"android","timestamp":1497090417594}
{"id":"865456863256321","vid":"1495267869123450","uid":"964226522333220","gold":592,"platform":"ios","timestamp":1497090417694}
{"id":"865456863256321","vid":"1495267869123450","uid":"964226522333220","gold":4319,"platform":"android","timestamp":1497090417794}
{"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":416,"platform":"ios","timestamp":1497090417894}
{"id":"865456863256329","vid":"1495267869123454","uid":"964226522333224","gold":4410,"platform":"android","timestamp":1497090417994}
{"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":7197,"platform":"ios","timestamp":1497090478095}
{"id":"865456863256327","vid":"1495267869123451","uid":"964226522333221","gold":1737,"platform":"ios","timestamp":1497090478195}
{"id":"865456863256324","vid":"1495267869123453","uid":"964226522333223","gold":2425,"platform":"android","timestamp":1497090478295}
{"id":"865456863256326","vid":"1495267869123454","uid":"964226522333224","gold":6847,"platform":"ios","timestamp":1497090478395}
{"id":"865456863256322","vid":"1495267869123454","uid":"964226522333224","gold":1932,"platform":"android","timestamp":1497090478495}
{"id":"865456863256324","vid":"1495267869123454","uid":"964226522333224","gold":4428,"platform":"ios","timestamp":1497090478595}
{"id":"865456863256320","vid":"1495267869123453","uid":"964226522333223","gold":3708,"platform":"android","timestamp":1497090478695}
{"id":"865456863256321","vid":"1495267869123452","uid":"964226522333222","gold":5290,"platform":"ios","timestamp":1497090478795}
{"id":"865456863256328","vid":"1495267869123452","uid":"964226522333222","gold":5080,"platform":"android","timestamp":1497090478895}
{"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":9643,"platform":"android","timestamp":1497090478995}
{"id":"865456863256324","vid":"1495267869123452","uid":"964226522333222","gold":3766,"platform":"ios","timestamp":1497090539095}
{"id":"865456863256326","vid":"1495267869123451","uid":"964226522333221","gold":3758,"platform":"android","timestamp":1497090539195}
{"id":"865456863256328","vid":"1495267869123451","uid":"964226522333221","gold":2522,"platform":"android","timestamp":1497090539295}
{"id":"865456863256322","vid":"1495267869123450","uid":"964226522333220","gold":8746,"platform":"android","timestamp":1497090539395}
{"id":"865456863256328","vid":"1495267869123451","uid":"964226522333221","gold":7616,"platform":"ios","timestamp":1497090539495}
{"id":"865456863256325","vid":"1495267869123454","uid":"964226522333224","gold":527,"platform":"android","timestamp":1497090539595}
{"id":"865456863256327","vid":"1495267869123451","uid":"964226522333221","gold":3887,"platform":"ios","timestamp":1497090539695}
{"id":"865456863256325","vid":"1495267869123450","uid":"964226522333220","gold":2137,"platform":"ios","timestamp":1497090539795}
{"id":"865456863256329","vid":"1495267869123453","uid":"964226522333223","gold":6965,"platform":"android","timestamp":1497090539895}
{"id":"865456863256325","vid":"1495267869123451","uid":"964226522333221","gold":350,"platform":"android","timestamp":1497090539995}
{"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":863,"platform":"android","timestamp":1497090600096}
{"id":"865456863256320","vid":"1495267869123454","uid":"964226522333224","gold":9597,"platform":"ios","timestamp":1497090600196}
{"id":"865456863256324","vid":"1495267869123454","uid":"964226522333224","gold":9504,"platform":"ios","timestamp":1497090600296}
{"id":"865456863256322","vid":"1495267869123451","uid":"964226522333221","gold":1598,"platform":"ios","timestamp":1497090600396}
{"id":"865456863256325","vid":"1495267869123451","uid":"964226522333221","gold":1126,"platform":"android","timestamp":1497090600496}
{"id":"865456863256324","vid":"1495267869123453","uid":"964226522333223","gold":3606,"platform":"android","timestamp":1497090600596}
{"id":"865456863256326","vid":"1495267869123450","uid":"964226522333220","gold":1866,"platform":"ios","timestamp":1497090600696}
{"id":"865456863256323","vid":"1495267869123453","uid":"964226522333223","gold":1282,"platform":"android","timestamp":1497090600796}
{"id":"865456863256325","vid":"1495267869123450","uid":"964226522333220","gold":542,"platform":"ios","timestamp":1497090600896}
{"id":"865456863256326","vid":"1495267869123450","uid":"964226522333220","gold":4168,"platform":"android","timestamp":1497090600996}
{"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":4766,"platform":"android","timestamp":1497090661096}
{"id":"865456863256323","vid":"1495267869123451","uid":"964226522333221","gold":3867,"platform":"ios","timestamp":1497090661196}
{"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":7825,"platform":"ios","timestamp":1497090661296}
{"id":"865456863256320","vid":"1495267869123454","uid":"964226522333224","gold":4518,"platform":"ios","timestamp":1497090661396}
{"id":"865456863256326","vid":"1495267869123453","uid":"964226522333223","gold":4280,"platform":"ios","timestamp":1497090661496}
{"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":4909,"platform":"android","timestamp":1497090661596}
{"id":"865456863256325","vid":"1495267869123452","uid":"964226522333222","gold":7227,"platform":"ios","timestamp":1497090661696}
{"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":9937,"platform":"android","timestamp":1497090661796}
{"id":"865456863256321","vid":"1495267869123451","uid":"964226522333221","gold":7840,"platform":"ios","timestamp":1497090661896}
{"id":"865456863256326","vid":"1495267869123453","uid":"964226522333223","gold":2762,"platform":"ios","timestamp":1497090661996}
{"id":"865456863256322","vid":"1495267869123454","uid":"964226522333224","gold":7941,"platform":"ios","timestamp":1497090722097}
{"id":"865456863256320","vid":"1495267869123452","uid":"964226522333222","gold":6188,"platform":"android","timestamp":1497090722197}
{"id":"865456863256325","vid":"1495267869123454","uid":"964226522333224","gold":2387,"platform":"android","timestamp":1497090722297}
{"id":"865456863256322","vid":"1495267869123450","uid":"964226522333220","gold":2980,"platform":"ios","timestamp":1497090722397}
{"id":"865456863256321","vid":"1495267869123452","uid":"964226522333222","gold":9403,"platform":"android","timestamp":1497090722497}
{"id":"865456863256323","vid":"1495267869123453","uid":"964226522333223","gold":3482,"platform":"android","timestamp":1497090722597}
{"id":"865456863256324","vid":"1495267869123454","uid":"964226522333224","gold":3290,"platform":"android","timestamp":1497090722697}
{"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":1439,"platform":"android","timestamp":1497090722797}
{"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":6758,"platform":"ios","timestamp":1497090722897}
{"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":3501,"platform":"ios","timestamp":1497090722997}
{"id":"865456863256325","vid":"1495267869123454","uid":"964226522333224","gold":7904,"platform":"ios","timestamp":1497090783097}
{"id":"865456863256326","vid":"1495267869123453","uid":"964226522333223","gold":9900,"platform":"android","timestamp":1497090783197}
{"id":"865456863256320","vid":"1495267869123452","uid":"964226522333222","gold":1841,"platform":"ios","timestamp":1497090783297}
{"id":"865456863256322","vid":"1495267869123453","uid":"964226522333223","gold":8857,"platform":"ios","timestamp":1497090783397}
{"id":"865456863256328","vid":"1495267869123450","uid":"964226522333220","gold":7855,"platform":"android","timestamp":1497090783497}
{"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":7165,"platform":"android","timestamp":1497090783597}
{"id":"865456863256326","vid":"1495267869123450","uid":"964226522333220","gold":2247,"platform":"ios","timestamp":1497090783697}
{"id":"865456863256329","vid":"1495267869123454","uid":"964226522333224","gold":1742,"platform":"android","timestamp":1497090783797}
{"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":9122,"platform":"ios","timestamp":1497090783897}
{"id":"865456863256325","vid":"1495267869123453","uid":"964226522333223","gold":1623,"platform":"android","timestamp":1497090783997}
{"id":"865456863256324","vid":"1495267869123450","uid":"964226522333220","gold":8354,"platform":"ios","timestamp":1497090844098}
{"id":"865456863256321","vid":"1495267869123454","uid":"964226522333224","gold":3808,"platform":"ios","timestamp":1497090844198}
{"id":"865456863256326","vid":"1495267869123451","uid":"964226522333221","gold":9875,"platform":"android","timestamp":1497090844298}
{"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":2714,"platform":"ios","timestamp":1497090844398}
{"id":"865456863256326","vid":"1495267869123454","uid":"964226522333224","gold":3660,"platform":"ios","timestamp":1497090844498}
{"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":8545,"platform":"ios","timestamp":1497090844598}
{"id":"865456863256325","vid":"1495267869123453","uid":"964226522333223","gold":5757,"platform":"android","timestamp":1497090844698}
{"id":"865456863256320","vid":"1495267869123450","uid":"964226522333220","gold":7898,"platform":"android","timestamp":1497090844798}
{"id":"865456863256329","vid":"1495267869123453","uid":"964226522333223","gold":3633,"platform":"ios","timestamp":1497090844898}
{"id":"865456863256329","vid":"1495267869123452","uid":"964226522333222","gold":6500,"platform":"android","timestamp":1497090844998}
{"id":"865456863256323","vid":"1495267869123450","uid":"964226522333220","gold":8859,"platform":"ios","timestamp":1497090905098}
{"id":"865456863256322","vid":"1495267869123452","uid":"964226522333222","gold":3897,"platform":"android","timestamp":1497090905198}
{"id":"865456863256326","vid":"1495267869123451","uid":"964226522333221","gold":5786,"platform":"ios","timestamp":1497090905298}
{"id":"865456863256321","vid":"1495267869123451","uid":"964226522333221","gold":2667,"platform":"android","timestamp":1497090905398}
{"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":4038,"platform":"android","timestamp":1497090905499}
{"id":"865456863256328","vid":"1495267869123451","uid":"964226522333221","gold":361,"platform":"android","timestamp":1497090905599}
{"id":"865456863256326","vid":"1495267869123454","uid":"964226522333224","gold":7074,"platform":"android","timestamp":1497090905699}
{"id":"865456863256323","vid":"1495267869123451","uid":"964226522333221","gold":89,"platform":"android","timestamp":1497090905799}
{"id":"865456863256325","vid":"1495267869123450","uid":"964226522333220","gold":1354,"platform":"android","timestamp":1497090905899}
{"id":"865456863256326","vid":"1495267869123452","uid":"964226522333222","gold":221,"platform":"ios","timestamp":1497090905999}
{"id":"865456863256325","vid":"1495267869123451","uid":"964226522333221","gold":436,"platform":"android","timestamp":1497090966099}
{"id":"865456863256327","vid":"1495267869123451","uid":"964226522333221","gold":8000,"platform":"android","timestamp":1497090966199}
{"id":"865456863256324","vid":"1495267869123453","uid":"964226522333223","gold":9952,"platform":"android","timestamp":1497090966299}
{"id":"865456863256321","vid":"1495267869123451","uid":"964226522333221","gold":2216,"platform":"android","timestamp":1497090966400}
{"id":"865456863256320","vid":"1495267869123452","uid":"964226522333222","gold":2042,"platform":"android","timestamp":1497090966500}
{"id":"865456863256329","vid":"1495267869123451","uid":"964226522333221","gold":8739,"platform":"ios","timestamp":1497090966600}
{"id":"865456863256322","vid":"1495267869123452","uid":"964226522333222","gold":2500,"platform":"ios","timestamp":1497090966701}
{"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":9803,"platform":"ios","timestamp":1497090966801}
{"id":"865456863256328","vid":"1495267869123450","uid":"964226522333220","gold":7246,"platform":"android","timestamp":1497090966901}
{"id":"865456863256320","vid":"1495267869123454","uid":"964226522333224","gold":5220,"platform":"android","timestamp":1497090967001}

参考代码KafkaProducer.java

 package yehua.kafkaDemo;

 import java.util.Properties;
import java.util.Random; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducer { public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//String topic = "gold_log_r2p5";
String topic = "test"; Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
int count = 0;
//{"id":"865456863256326","vid":"1495267869123456","uid":"965406863256326","gold":150,"platform":"ios","timestamp":1495267869}
//模拟送礼人id
String[] idArr = {"865456863256320","865456863256321","865456863256322","865456863256323","865456863256324","865456863256325","865456863256326","865456863256327","865456863256328","865456863256329"};
//模拟直播间视频id
String[] vidArr = {"1495267869123450","1495267869123451","1495267869123452","1495267869123453","1495267869123454"};
//模拟直播用户id
String[] uidArr = {"964226522333220","964226522333221","964226522333222","964226522333223","964226522333224"};
//模拟用户手机平台
String[] platformArr = {"android","ios"};
Random random = new Random();
while(true){
int rint1 = random.nextInt(10);
int rint2 = random.nextInt(5);
int rint3 = random.nextInt(2);
String log = "{\"id\":\""+idArr[rint1]+"\",\"vid\":\""+vidArr[rint2]+"\",\"uid\":\""+uidArr[rint2]+"\",\"gold\":"+random.nextInt(10000)+",\"platform\":\""+platformArr[rint3]+"\",\"timestamp\":"+System.currentTimeMillis()+"}";
//producer.send(new ProducerRecord<String, String>(topic, log));
System.out.println(log);
count++;
Thread.sleep(100);
if(count%10 == 0){
//break;
Thread.sleep(1000*60);
}
}
} }
 

先在kafka创建topic

两个副本5个分区

可以看到topic创建成功

总结一下前面的流程

下面再新建一个maven项目stormpProject0521

依赖文件:

依赖文件pom.xml参考代码

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>yehua</groupId>
<artifactId>stormpProject0521</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging> <name>stormpProject0521</name>
<url>http://maven.apache.org</url> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> <dependencies>
<!-- 这个依赖只在编译时有用,运行时就不需要了,因为storm集群中有 -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<!-- 表示这个依赖只在编译代码的时候使用,打包的时候不用 -->
<scope>provided</scope>
</dependency>
<!-- 主要为了使用kafkaspout -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
<!-- 过滤掉 slf4j-log4j12-->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 注意,kafka0.9开始需要在这里配置kafka client依赖,否则会报错 java.lang.NoSuchMethodError:
org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.1</version>
</dependency>
<!-- dbunits -->
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.6</version>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.29</version>
</dependency>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.10</version>
</dependency>
<!-- redis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler插件, 设定JDK版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

  

参考代码LogProcessTopology.java

 package yehua.stormpProject0521;

 import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder; import yehua.stormpProject0521.bolt.LogProcessBolt1;
import yehua.stormpProject0521.bolt.LogProcessBolt2;
import yehua.stormpProject0521.bolt.ParseLogBolt; public class LogProcessTopology { public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
String topology_name = LogProcessTopology.class.getSimpleName();
String SPOUT_ID = KafkaSpout.class.getSimpleName();
String BOLT_ID_1 = ParseLogBolt.class.getSimpleName();
String BOLT_ID_2 = LogProcessBolt1.class.getSimpleName();
String BOLT_ID_3 = LogProcessBolt2.class.getSimpleName(); BrokerHosts hosts = new ZkHosts("hadoop100:2181");//设置zk地址,为了找到kafka
String topic = "gold_log_r2p5";//topic
String zkRoot = "/kafkaSpout";//storm会通过kafkaspout消费kafka中的数据,具体消费的offset信息会保存到这个节点下面
String id = "consumer_gold_log";//可以理解为groupid
SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
//表示吧spout输出的数据使用字符串进行解析,这样在bolt中取数据的时候,就可以之间获取字符串了
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
//注意:第一次消费数据的话,默认会从topic的最早的数据进行消费
//storm通过kafkaspout消费topic里面数据的时候,如果zkRoot中没有保存消费的offset,那么久会根据startOffsetTime的值来消费topic中的数据
//spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();/最早的数据
spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();//最新的数据 topologyBuilder.setSpout(SPOUT_ID,new KafkaSpout(spoutConf),5);
//可以实现多个kafkaspout
//topologyBuilder.setSpout("newSpout",new KafkaSpout(spoutConfNew),5); topologyBuilder.setBolt(BOLT_ID_1, new ParseLogBolt(),2).setNumTasks(6).shuffleGrouping(SPOUT_ID);
//LogProcessBolt1 这个bolt只能使用一个线程执行 globalGrouping可以保证数据只让一个线程去处理
topologyBuilder.setBolt(BOLT_ID_2, new LogProcessBolt1()).globalGrouping(BOLT_ID_1);
topologyBuilder.setBolt(BOLT_ID_3, new LogProcessBolt2(),2).shuffleGrouping(BOLT_ID_1); Config config = new Config();
//config.setNumWorkers(2);//使用两个worker
config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx1024m");//给worker指定内存
config.setMaxSpoutPending(1000);//限制内存中未处理的tuple个数最多为1000
StormTopology createTopology = topologyBuilder.createTopology(); if(args.length==0){
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topology_name, config, createTopology);
}else{
try {
StormSubmitter.submitTopology(topology_name, config, createTopology);
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} } }

参考代码ParseLogBolt.java

 package yehua.stormpProject0521.bolt;

 import java.util.HashMap;
import java.util.Map; import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; /**
* 主要对数据进行解析,把关键字段解析出来,发射出去
* @author yehua
*
*/
public class ParseLogBolt extends BaseRichBolt {
private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
private Map<String, String> idCountryMap;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
//在初始化的时候从redis中把送礼人id和省份信息加载过来,后期在storm的定时任务中每半个小时同步一次,把新注册用户的信息拉取过来
/*RedisUtils redisUtils = new RedisUtils();
List<String> list = redisUtils.lrange("all_id_province", 0, -1);
for (String id_country : list) {
String[] splits = id_country.split("\t");
idCountryMap.put(splits[0], splits[1]);
}
redisUtils.close();*/
idCountryMap = new HashMap<String, String>();
idCountryMap.put("865456863256320", "京");
idCountryMap.put("865456863256321", "津");
idCountryMap.put("865456863256322", "冀");
idCountryMap.put("865456863256323", "晋");
idCountryMap.put("865456863256324", "辽");
idCountryMap.put("865456863256325", "黑");
idCountryMap.put("865456863256326", "沪");
idCountryMap.put("865456863256327", "苏");
idCountryMap.put("865456863256328", "浙");
idCountryMap.put("865456863256329", "皖");
} @Override
public void execute(Tuple input) {
if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
//执行定时同步用户静态信息的代码
//定时向idCountryMap中更新数据,每次更新只需要把新增的数据读取过来即可,属于增量读取
/*RedisUtils redisUtils = new RedisUtils();
String poll = redisUtils.poll("new_id_country");
while(poll!=null){
String[] splits = poll.split("\t");
idCountryMap.put(splits[0], splits[1]);
poll = redisUtils.poll("new_id_country");
}*/
}else{
try {
//String log = new String(input.getBinaryByField("bytes"));
String log = input.getStringByField("str");
JSONObject logObj = JSON.parseObject(log);
String id = logObj.getString("id");
String province = idCountryMap.getOrDefault(id, "其它");//用户省份信息
Integer gold = logObj.getInteger("gold");//金币
this.collector.emit(new Values(province,gold));
this.collector.ack(input);
} catch (Exception e) {
this.collector.fail(input);
}
}
} @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("province","gold"));
} @Override
public Map<String, Object> getComponentConfiguration() {
HashMap<String, Object> hashMap = new HashMap<String, Object>();
hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60*30);
return hashMap;
} }

参考代码LogProcessBolt1.java

 package yehua.stormpProject0521.bolt;

 import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.HashMap;
import java.util.Map; import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.trident.operation.builtin.Sum;
import org.apache.storm.tuple.Tuple; import yehua.stormpProject0521.utils.MyDateUtils;
import yehua.stormpProject0521.utils.MyDbUtils; /**
* 统计一下全网金币消耗数据(2分钟)(折线图)
* 每隔两分钟统计一下全网金币消耗数据(2分钟)(折线图)
* 1 1526 2017-01-01 00:00:00
* 2 2560 2017-01-01 00:02:00
* 3 1560 2017-01-01 00:04:00
* 4 1960 2017-01-01 00:06:00
* @author yehua
*
*/
public class LogProcessBolt1 extends BaseRichBolt { @Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) { }
int sum = 0;
private Connection connection = null;
@Override
public void execute(Tuple input) {
if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
//定时任务
try {
String curr_time = MyDateUtils.formatDate2(new Date());
connection = MyDbUtils.getConnection();
Statement state = connection.createStatement();
state.executeUpdate("insert into result1 (gold,time) values("+sum+",'"+curr_time+"')");
System.out.println("入库成功!");
sum = 0;//注意,需要把sum重置为0
} catch (SQLException e) {
System.out.println("执行错误!");
}finally{
if(connection!=null){
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}else{
Integer gold = input.getIntegerByField("gold");
sum+=gold;
} } @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override
public Map<String, Object> getComponentConfiguration() {
HashMap<String, Object> hashMap = new HashMap<String, Object>();
hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60*2);
return hashMap;
} }

参考代码LogProcessBolt2.java

 package yehua.stormpProject0521.bolt;

 import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry; import org.apache.commons.collections.MapUtils;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple; import yehua.stormpProject0521.utils.DistributedLock;
import yehua.stormpProject0521.utils.MyDateUtils;
import yehua.stormpProject0521.utils.MyDbUtils; /**
* 统计不同省份的金币消耗数据(1分钟)(柱状图)
* 1 京 9200 2017-01-01
* 2 津 5508 2017-01-01
* 3 京 8562 2017-01-02
* 4 津 4586 2017-01-02
* 5 京 8954 2017-01-03
* 6 津 2563 2017-01-03
*
* @author yehua
*
*/
public class LogProcessBolt2 extends BaseRichBolt {
private DistributedLock lock;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.lock = new DistributedLock("hadoop100:2181","test");
}
private Map<String, Integer> province_gold_map = new HashMap<String, Integer>();
private Connection connection = null;
@Override
public void execute(Tuple input) {
if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
//定时任务
String curr_time = MyDateUtils.formatDate4(new Date());
try {
connection = MyDbUtils.getConnection();
Statement state = connection.createStatement();
lock.lock();//上锁
for (Entry<String, Integer> entry : province_gold_map.entrySet()) {
String province = entry.getKey();
Integer gold = entry.getValue();
// 入库之前,需要先查询一下,如果有数据,则执行更新操作,如果没有,则插入
state.execute("select id,province,gold from result2 where province = '"+province+"' and time = '"+curr_time+"'");
ResultSet resultSet = state.getResultSet();
if(resultSet.next()){//有数据
int id = resultSet.getInt(1);
int count = resultSet.getInt(3);
count+=gold;
state.executeUpdate("update result2 set gold = "+count+" where id = "+id);
}else{
state.executeUpdate("insert into result2(province,gold,time) values('"+province+"',"+gold+",'"+curr_time+"')");
}
}
System.out.println("执行入库成功: "+province_gold_map.size());
province_gold_map.clear();//注意,一定要把临时结果清空
} catch (SQLException e) {
e.printStackTrace();
}finally{
if(connection!=null){
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
lock.unlock();//释放锁
} }else{
String province = input.getStringByField("province");//省份信息
Integer gold = input.getIntegerByField("gold");
province_gold_map.put(province, MapUtils.getInteger(province_gold_map, province, 0)+gold);
} } @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override
public Map<String, Object> getComponentConfiguration() {
HashMap<String, Object> hashMap = new HashMap<String, Object>();
hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
return hashMap;
} @Override
public void cleanup() {
this.lock.closeZk();//关闭分布式共享锁使用的zk链接
} }

最新文章

  1. Cesium原理篇:6 Render模块(6: Instance实例化)
  2. 与你相遇好幸运,Waterline的属性
  3. js函数调用模式
  4. jquery.query.js 插件的用法
  5. Logger日志打印普通方法
  6. 分享:在微信公众平台做HTML5游戏经验谈(转载与http://software.intel.com/zh-cn/blogs/2013/04/03/html5)
  7. (二)Android 基本控件
  8. spring maven pom
  9. Android---OpenGL ES之添加动作
  10. 【Linux】 用户管理
  11. iOS体会篇 大学编程到公司的过程
  12. typeScript 学习
  13. 【python-时间戳】时间与时间戳之间的转换
  14. Asp.net Security框架(2)
  15. Python中GIL
  16. AtCoder Beginner Contest 088 (ABC)
  17. Git Diff 格式分析
  18. windows命令行 批量对源代码添加版权头/头信息
  19. hdu 2048 神上帝以及老天爷
  20. 如何定制Gtk版Emacs的Widget外观

热门文章

  1. iOS开发之旅:实现一个APP界面框架
  2. PAT 5-9&#160;输出华氏-摄氏温度转换表&#160;&#160;&#160;(10分)
  3. 小程序和ThinkPHP5结合实现登录状态(含代码)
  4. magento的布局文件之谜
  5. STM32 PWM输出(映射)
  6. Java-JDK &amp; Android SDK下载安装及配置教程
  7. 哈尔滨理工大学第七届程序设计竞赛初赛(BFS多队列顺序)
  8. index.do
  9. TensorFlow笔记-08-过拟合,正则化,matplotlib 区分红蓝点
  10. smarty中调用php内置函数