Storm最常用的消息源就是Kafka,在对接的时候大多需要使用KafkaSpout;

在网上大概有两种KafkaSpout,一种是只有几十行,一种却有一大啪啦类文件。


在kafka中,同一个partition中的消息只能被同一个组的一个consumer消费,不能并发,所以kafka的并发说的是多partition的并发;

kafka的consumer API分为high level consumer和low level consumer,官方建议使用前者,以为不用关心partition、offset那些,但是后者也有其存在的意义:1.多次读取的时候;2.选择性读取部分消息;3.控制消费过程。


写法比较简单的KafkaSpout:

 public class KafkaSpouttest implements IRichSpout {

     private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private ConsumerConnector consumer;
private String topic; public KafkaSpouttest() {} public KafkaSpouttest(String topic) {
this.topic = topic;
} public void ack(Object arg0) { } private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
// 设置zookeeper的链接地址
props.put("zookeeper.connect", "localhost:2181");
// 设置group id
props.put("group.id", "1");
// kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
props.put("auto.commit.interval.ms", "1000");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
} public void activate() {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map < String,
Integer > topickMap = new HashMap < String,
Integer > ();
topickMap.put(topic, 1); System.out.println("*********Results********topic:" + topic); Map < String,
List < KafkaStream < byte[],
byte[] >>> streamMap = consumer.createMessageStreams(topickMap);
KafkaStream < byte[],
byte[] > stream = streamMap.get(topic).get(0);
ConsumerIterator < byte[],
byte[] > it = stream.iterator();
while (it.hasNext()) {
String value = new String(it.next().message());
SimpleDateFormat formatter = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss SSS");
Date curDate = new Date(System.currentTimeMillis()); //获取当前时间
String str = formatter.format(curDate); System.out.println("storm接收到来自kafka的消息------->" + value); collector.emit(new Values(value, 1, str), value);
}
} public void close() {
// TODO Auto-generated method stub
} public void deactivate() {
// TODO Auto-generated method stub
} public void fail(Object arg0) {
// TODO Auto-generated method stub
} public void nextTuple() {
// TODO Auto-generated method stub
} public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
this.collector = collector;
} public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "id", "time"));
} public Map < String,
Object > getComponentConfiguration() {
System.out.println("getComponentConfiguration被调用");
topic = "admln";
return null;
} }

方法相关的不解释,和本主题相关的一句话是:

byte[] >>> streamMap = consumer.createMessageStreams(topickMap);

想说的是它用的是High Level API


复杂的代码就多了,在github上有好几个,最官方的还是apache storm自带的:

里面和本主题相关的一句话是DynamicPartitionConnections.java中的60行:

_connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));

它用的是low level API


apache KafkaSpout 在 topology 中的配置

String zkConnString = "node1:2181,node2:2181,node3:2181";
String topicName = "testtopic";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.forceFromStart = false;
spoutConfig.zkPort = ;
spoutConfig.zkServers = Arrays.asList(new String[]{"node1","node2","node3"}); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = new TopologyBuilder();
// 构造NC数据流向图
builder.setSpout("mrspout", kafkaSpout, );
builder.setBolt("mrverifybolt", new MRVerifyBolt(), )
.shuffleGrouping("mrspout");
builder.setBolt("mr2storagebolt", new MR2StorageBolt(), )
.shuffleGrouping("mrverifybolt");
// 以类名作为STORM任务名
String name = MRTopology.class.getSimpleName();
// 传主机名则为集群运行模式,不传则为本地运行模式
if (args != null && args.length > ) {
Config conf = new Config();
// 通过指定nimbus主机
conf.put(Config.NIMBUS_HOST, args[]);
conf.setNumWorkers();
conf.setNumAckers();
conf.setMaxSpoutPending();
StormSubmitter.submitTopologyWithProgressBar(name, conf,
builder.createTopology());
} else {
Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, );
conf.put(Config.TOPOLOGY_DEBUG, true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
}
}

关于 spoutConfig.servers 和 spoutConfig.port 在实际应用中其实不设置也可以,因为在集群中如果不设置 storm 默认会把 storm 配置中的 zookeeper 地址和端口,设置的用处是在 eclipse 中测试运行的时候因为是模拟 storm cluster, 所以主动设置。


两者各有优劣,相同点性能,简单测试过,low level的要好点,但是相差不大(都在合适的配置下,小集群);

不同点是high level 的代码简单,而low level的代码很多,配置也多,用着麻烦(也不是很麻烦);

low level的优点是支持重读,就是配置中的 spoutConfig.forceFromStart = false; ,支持重读的另一个好处是和storm的acker结合,可以重发,防止丢数据,这一点比low level的要安全一点,另一个好处是配置多,使用就很难灵活,比如设置KafkaSpout的fetchSizeBytes,和kafka的bufferSizeBytes对应,是优化的一个手段。

至于选择哪种,支持后者,反正storm中已经自带了,不需要自己写,配置就好,而且0.9.4中优化了很多KafkaSpout的问题。


最新文章

  1. 使用CocoaPods过程中 Unable to find a specification for
  2. 高性能PHP框架thinkphp5.0.0 Beta发布-为API开发而设计
  3. Post model至Web Api
  4. SICP— 第一章 构造过程抽象
  5. 图片上传代码(C#)
  6. 20135220谈愈敏Blog6_进程的描述和创建
  7. java初探native
  8. Android 注解的一些应用以及原理
  9. Js 赋值传值和引用传址
  10. 类型“GridView”的控件必须放在具有 runat=server 的窗体标记内?
  11. 浅谈CDN、SEO、XSS、CSRF
  12. 再也不用c++的string了
  13. 剑指offer 05:用两个栈实现队列
  14. swagger配置和简单使用
  15. 【基础配置】Dubbo的配置及使用
  16. Python内存管理机制及优化简析(转载)
  17. SDOI 2017 Round2 滚粗了
  18. header field token is not allowed by Access-Control-Allow-Headers in preflight response问题解决
  19. Python爬虫教程-24-数据提取-BeautifulSoup4(二)
  20. 一道算法题-八皇后问题(C++实现)

热门文章

  1. wp 取消button按下效果
  2. C# socket请求的名称有效 但是找不到请求的类型的数据
  3. 七月小说网 Python + GraphQL (三)
  4. .NET平台的资源文件管理
  5. 十七、创建一个 WEB 服务器(一)
  6. constexpr函数------c++ primer
  7. CPU 和 GPU 的区别
  8. 【洛谷2416】泡芙(Tarjan+LCA)
  9. 条目十五《注意strng实现的多样性》
  10. Java 实现手机发送短信验证码