所用环境:
kafka_2.-2.0..gz
centos 6.9 nat动态ip
准备工作:
().将防火墙关闭
service iptables stop 临时关闭
chkconfig iptables off 永久关闭 ().修改C:\Windows\System32\drivers\etc 下的hosts文件
增加映射

启动zookeeper服务(采用kafka内置的zk)

/root/kafka_2.-2.0./bin
在这个目录下启动 zookeeper-server-start.sh 命令 :bin/zookeeper-server-start.sh config/zookeeper.properties 当最后一行显示 INFO binding to port 0.0.0.0/0.0.0.0:2181 证明成功

启动kafka服务

进入到kafka目录下
bin/kafka-server-start.sh config/server.properties

创建一个topic

 bin/kafka-topics.sh --create --zookeeper localhost: --replication-factor  --partitions  --topic testTopic

我这里是   192.168.15.140 test
localhost改为test也可以运行成功
(当出现副本什么的larger than n,就要关闭防火墙)

kafka生产者生产消息

bin/kafka-console-producer.sh --broker-list test: --topic testTopic

消费者消费消息

bin/kafka-console-consumer.sh --bootstrap-server test: --topic testTopic --from-beginning

代码测试:

    这里用了idea

produce

 package com.xuliugen.kafka.demo;

 import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { // Topic
private static final String topic = "testTopic"; public static void main(String[] args) throws Exception { Properties props = new Properties();
props.put("bootstrap.servers", "192.168.15.140:9092");
props.put("acks", "0");
props.put("group.id", "1111");
props.put("retries", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //生产者实例
KafkaProducer producer = new KafkaProducer(props); int i = 1; // 发送业务消息
// 读取文件 读取内存数据库 读socket端口
while (true) {
Thread.sleep(1000);
producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i));
System.out.println("key:" + i + " " + "value:" + i);
i++;
}
}
}

comsumer

package com.xuliugen.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.util.Arrays;
import java.util.Properties; public class ConsumerDemo {
private static final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
private static final String topic = "testTopic"; public static void main(String[] args) { Properties props = new Properties();
props.put("bootstrap.servers", "192.168.15.140:9092");
props.put("group.id", "1111");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.xuliugen.kafka</groupId>
<artifactId>kafka.demo</artifactId>
<version>1.0-SNAPSHOT</version> <dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency> <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency> </dependencies> </project>
 抄袭自
https://blog.csdn.net/xlgen157387/article/details/77312569

代码地址            链接: https://pan.baidu.com/s/1hjJ7IRMTQEFdV-8SCf7VlA 提取码: 286w 复制这段内容后打开百度网盘手机App,操作更方便哦

最新文章

  1. python dict clear只能删除一层,不能够递归删除。
  2. 你写的return null正确吗?
  3. JavaScript中的parseInt的进制问题
  4. QT 常用控件二
  5. windows 中 使用MongoDB
  6. SSM框架学习之高并发秒杀业务--笔记4-- web层
  7. 《大象-Think In UML》读书笔记1
  8. 实现multbandblend
  9. IIS提示Server Application Unavailable
  10. PHP5中常用的魔术函数有哪些,举例说明各自的用法。
  11. Nginx 之五: Nginx服务器的负载均衡、缓存与动静分离功能
  12. Win32环境下的程序崩溃异常定位
  13. Java开发环境的基本设置
  14. C# 哈希表(Hashtable)用法笔记
  15. JFFS2 文件系统及新特性介绍
  16. Cookie登录保存
  17. (四)JavaScript 注释
  18. 在linux上安装spark详细步骤
  19. python笔记15-集合
  20. jsp无法访问

热门文章

  1. Java设计模式--装饰器模式到Java IO 流
  2. (双指针) leetcode 485. Max Consecutive Ones
  3. marquee标签的使用
  4. python通过套接字来发送接收消息
  5. epoll的本质
  6. Airflow 重跑dag中部分失败的任务
  7. python之路(6)迭代器和生成器
  8. echo 输入背景和字体常用方法
  9. jar包中File 文件找不到的异常分析与解决
  10. 阿里云服务器+ftp文件操作+基于Centos7的vsftpd配置