<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.ht</groupId>
<artifactId>kafkatest</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build> <dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
</project>

java 代码

 import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections;
import java.util.Properties; import static org.apache.kafka.clients.consumer.ConsumerConfig.*; /**
* @author sunzq
* @since 2017/8/29
*/
public class Application {
public static void main(String[] args) { Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, "node1:6667,node2:6667,node3:6667,node4:6667");
props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(GROUP_ID_CONFIG, "test08291103");
// props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test0829");
props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// topic name: test9
consumer.subscribe(Collections.singleton("test9"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}

启动参数

-Djava.security.krb5.conf=c:\\app\\conf\\krb5.conf -Djava.security.auth.login.config=c:\\app\\conf\\kafka_jaas.conf

windows 下记得用 \\

最新文章

  1. 【OpenWRT之旅】LuCI探究
  2. C语言数组删除增加一个元素
  3. Android之把手机的3g流量共享出来让其他人连接这个wifi
  4. Java Bean validation specification...
  5. Windwos平台上ffmpeg解码音频并且保存到wav文件中
  6. Ural-1018 Binary Apple Tree(树形dp+分组背包)
  7. luoguP1886 滑动窗口 [单调队列]
  8. Amazon email system中使用的字体
  9. DBoW2应用
  10. flask wtforms组件详解
  11. PYthon3:函数实现“自动售卖机”功能
  12. Hdoj 1785.You Are All Excellent 题解
  13. RabbitMQ入门-发布订阅模式
  14. 安装阿里云github提供的修改版minikube
  15. Hat’s Words HDU1247
  16. currentStyle&amp;getComputedStyle获取属性
  17. 编译JDK源代码【转】
  18. PHPDragon设计结构
  19. top命令之你不一定懂的cpu显示信息
  20. lnmp环境搭建错误集合

热门文章

  1. ActiveMQ使用详解---相关概念
  2. [OS] 进程相关知识点
  3. 微服务日志监控与查询logstash + kafka + elasticsearch
  4. DataBase -- FUNCTION
  5. 计蒜客16492 building(二分线段树/分块)
  6. php: Can&#39;t use function return value in write context
  7. [Leetcode] The minimum depth of binary tree二叉树的最小深度
  8. 部分经典IT书籍
  9. 做一个所见即所得的CSS效果
  10. CSS样式权重的级联cascade的概念