一、环境准备

1、jdk 8+

2、zookeeper

3、kafka

说明:在kafka较新版本中已经集成了zookeeper,所以不用单独安装zookeeper,只需要在kafka文件目录中启动zookeeper即可

二、下载地址

https://kafka.apache.org/downloads

三、部署

1、启动zookeeper

-- 启动
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties -- 查看是否启动成功
ps -ef | grep zoo

2、进入解压的kafka目录,修改/config/kafka-server的配置文件

vi config/server.properties
-- 重点配置节点说明

listeners=PLAINTEXT://localhost:9092   -- 将localhost修改为主机ip

log.dirs=/bigdata/kafka/logs-1  -- 默认不修改也可

3、使用kafka-server-start.sh,启动kafka服务

./bin/kafka-server-start.sh config/server.properties

四、使用客户端kafka tools连接kafka

客户端下载地址:https://www.kafkatool.com/download.html

关于客户端如何使用可查看:https://www.cnblogs.com/frankdeng/p/9452982.html

五、kafka实战简单使用

NuGet:Confluent.Kafka

1、新建.Net Core控制台项目,代码如下:

static void Main(string[] args)
{
// 发送消息
var producerConfig = new ProducerConfig
{
BootstrapServers = "192.168.140.131:9092",
MessageTimeoutMs = 50000
};
var builder = new ProducerBuilder<string, string>(producerConfig);
using (var producer = builder.Build())
{
var data = new { key = "1", value = "001" };
var json = JsonConvert.SerializeObject(data);
var dr = producer.ProduceAsync("order", new Message<string, string> { Key = "order", Value = json }).GetAwaiter().GetResult();
Console.WriteLine($"发送事件{dr.Value}到{dr.TopicPartitionOffset}成功");
} // 消费消息
var consumerConfig = new ConsumerConfig {
BootstrapServers = "192.168.140.131:9092",
AutoOffsetReset=AutoOffsetReset.Earliest,
GroupId="1111", // 自定义
EnableAutoCommit=true
};
var consumerBuilder = new ConsumerBuilder<string, string>(consumerConfig); using (var consumer = consumerBuilder.Build())
{
// 1、订阅
consumer.Subscribe("order");
while (true)
{
try
{
// 2、消费(自动确认)
var result = consumer.Consume(); // 3、业务逻辑
string key = result.Key;
string value = result.Value; Console.WriteLine($"创建商品:Key:{key}");
Console.WriteLine($"创建商品:Order:{value}");
consumer.Commit(result); }
catch (Exception e)
{
Console.WriteLine($"异常:Order:{e}");
}
}
}
}

2、使用kafka客户端查看消息投递

最新文章

  1. (转)配置Log4j(很详细)
  2. Android入门(五):程序架构——MVC设计模式在Android中的应用
  3. Codeforces Round #372 (Div. 2) C. Plus and Square Root
  4. 如何在java中拟合正态分布
  5. Socket 多线程FTP软件开发
  6. PR 创建
  7. mysql关键字讲解(join 、order by、group by、having、distinct)
  8. HDU-1406 完数
  9. 案例:计算1!+2!+3!+......+n!
  10. Telegram学习解析系列(二):这我怎么给后台传输数据?
  11. Lambda表达式与函数式接口
  12. 「UVA10766」Organising the Organisation(生成树计数)
  13. centos 升级python2.6 到python3.3(实测可行)
  14. linux开启swap(磁盘缓存)操作
  15. 2010-2011 ACM-ICPC, NEERC, Moscow Subregional Contest Problem J. Joke 水题
  16. 获取公钥证书的DN(Distinguished Name)
  17. EOS token 代币兑换的资料
  18. poj1177 Picture 矩形周长并
  19. Css3中拖拽效果的实例(带有注释~欢迎指教)
  20. 3dsmax2020卸载/安装失败/如何彻底卸载清除干净3dsmax2020注册表和文件的方法

热门文章

  1. while和for循环的补充与数据类型的内置方法(int, float, str)
  2. String、StringBuilder、StringBuffer——JavaSE基础
  3. 命令行传参——JavaSE基础
  4. Java学习-第一阶段-第一节:Java概述
  5. Java开发学习(五)----bean的生命周期
  6. ABP Framework 5.3.0 版本新增功能和变更说明
  7. 探索链路追踪在.NET6工业物联网项目的应用
  8. Moriis神级遍历!
  9. Flex &amp; Bison 开始
  10. rhel安装vmtools