Kafka.net使用编程入门(四)
2024-10-16 00:45:03
新建一个cmd窗口,zkServer命令启动zookeeper
打开另一个cmd窗口,输入:
cd D:\Worksoftware\Apachekafka2.11\bin\windows
kafka-server-start D:\Worksoftware\Apachekafka2.11\config\server.properties
删除主题:E:\WorkSoftWare\kafka2.11\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --delete --topic TestSiso --zookeeper localhost:2181
kafka 删除topic 提示marked for deletion
并没有真正删除,如果要真正删除
在每一台机器中的kafka_2.10/config/server.properties 文件加入 delete.topic.enable=true
最后所有机器重新启动kafka
启动kafka成功后,就可以运行项目了
引用了kafka-net.dll
Program.cs
internal class Program
{
private static void Main(string[] args)
{
string header = "kafka测试";
Console.Title = header;
Console.WriteLine(header);
ConsoleColor color = Console.ForegroundColor;
var pub = new KafkaHelper("Test", true);
var sub = new KafkaHelper("Test", false);
Task.Run(() =>
{
while (true)
{
string msg = string.Format("{0}这是一条测试消息", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
pub.Pub(new List<string> {msg});
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("发送消息:" + msg);
//Console.ForegroundColor = color;
Thread.Sleep(2000);
}
});
Task.Run(() => sub.Sub(msg =>
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("收到消息:{0}", msg);
//Console.ForegroundColor = color;
}));
Console.ReadLine();
}
}
KafkaHelper.cs代码:
/// <summary>
/// kafka辅助类
/// </summary>
public sealed class KafkaHelper
{
private readonly KafkaConfig _config;
private readonly ConsumerHelper _consumerHelper;
private readonly bool _isProducer = true;
private readonly ProduceHelper _produceHelper;
private BrokerHelper _brokerHelper;
/// <summary>
/// kafka辅助类构造方法
/// </summary>
/// <param name="sectionName">config中配置节点名称</param>
/// <param name="isProducer"></param>
public KafkaHelper(string sectionName, bool isProducer = true)
{
_isProducer = isProducer;
_config = KafkaConfig.GetConfig(sectionName);
_brokerHelper = new BrokerHelper(_config.Broker);
if (isProducer)
_produceHelper = new ProduceHelper(_brokerHelper);
else
_consumerHelper = new ConsumerHelper(_brokerHelper);
}
/// <summary>
/// 是否是生产者模式
/// </summary>
public bool IsProducer
{
get { return _isProducer; }
}
/// <summary>
/// 发送消息到队列
/// </summary>
/// <param name="topic"></param>
/// <param name="datas"></param>
/// <param name="acks"></param>
/// <param name="timeout"></param>
public void Pub(List<string> datas, short acks = 1, TimeSpan? timeout = default(TimeSpan?))
{
_produceHelper.Pub(_config.Topic, datas, acks, timeout, MessageCodec.CodecNone);
}
/// <summary>
/// 订阅消息
/// </summary>
/// <param name="onMsg"></param>
public void Sub(Action<string> onMsg)
{
_consumerHelper.Sub(_config.Topic, onMsg);
}
/// <summary>
/// 取消订阅
/// </summary>
public void UnSub()
{
_consumerHelper.UnSub();
}
}
KafkaConfig.cs代码:
/// <summary>
/// kafka配置类
/// </summary>
public class KafkaConfig : ConfigurationSection
{
/// <summary>
/// 当前配置名称
/// 此属性为必须
/// </summary>
public string SectionName { get; set; }
/// <summary>
/// 代理
/// </summary>
[ConfigurationProperty("broker", IsRequired = true)]
public string Broker
{
get { return (string) base["broker"]; }
set { base["broker"] = value; }
}
/// <summary>
/// 主题
/// </summary>
[ConfigurationProperty("topic", IsRequired = true)]
public string Topic
{
get { return (string) base["topic"]; }
set { base["topic"] = value; }
}
#region 从配置文件中创建kafka配置类
/// <summary>
/// 获取默认kafka配置类
/// </summary>
/// <returns></returns>
public static KafkaConfig GetConfig()
{
return (KafkaConfig) ConfigurationManager.GetSection("kafkaConfig");
}
/// <summary>
/// 获取指定的kafka配置类
/// </summary>
/// <param name="sectionName"></param>
/// <returns></returns>
public static KafkaConfig GetConfig(string sectionName)
{
var section = (KafkaConfig) ConfigurationManager.GetSection(sectionName);
// 跟默认配置相同的,可以省略
if (section == null)
section = GetConfig();
if (section == null)
throw new ConfigurationErrorsException("kafkacofng节点 " + sectionName + " 未配置.");
section.SectionName = sectionName;
return section;
}
/// <summary>
/// 从指定位置读取配置
/// </summary>
/// <param name="fileName"></param>
/// <param name="sectionName"></param>
/// <returns></returns>
public static KafkaConfig GetConfig(string fileName, string sectionName)
{
return GetConfig(ConfigurationManager.OpenMappedMachineConfiguration(new ConfigurationFileMap(fileName)),
sectionName);
}
/// <summary>
/// 从指定Configuration中读取配置
/// </summary>
/// <param name="config"></param>
/// <param name="sectionName"></param>
/// <returns></returns>
public static KafkaConfig GetConfig(Configuration config, string sectionName)
{
if (config == null)
throw new ConfigurationErrorsException("传入的配置不能为空");
var section = (KafkaConfig) config.GetSection(sectionName);
if (section == null)
throw new ConfigurationErrorsException("kafkacofng节点 " + sectionName + " 未配置.");
section.SectionName = sectionName;
return section;
}
#endregion
}
BrokerHelper.cs代码:
/// <summary>
/// 代理人辅助类
/// </summary>
internal class BrokerHelper
{
private readonly string _broker;
public BrokerHelper(string broker)
{
_broker = broker;
}
/// <summary>
/// 获取代理的路由对象
/// </summary>
/// <returns></returns>
public BrokerRouter GetBroker()
{
var options = new KafkaOptions(new Uri(string.Format("http://{0}", _broker)));
return new BrokerRouter(options);
}
}
ConsumerHelper.cs代码:
/// <summary>
/// 消费者辅助类
/// </summary>
internal class ConsumerHelper
{
private readonly BrokerHelper _brokerHelper;
private Consumer _consumer;
private bool _unSub;
public ConsumerHelper(BrokerHelper brokerHelper)
{
_brokerHelper = brokerHelper;
}
public void Sub(string topic, Action<string> onMsg)
{
_unSub = false;
var opiton = new ConsumerOptions(topic, _brokerHelper.GetBroker());
_consumer = new Consumer(opiton);
Task.Run(() =>
{
while (!_unSub)
{
IEnumerable<Message> msgs = _consumer.Consume();
Parallel.ForEach(msgs, msg => onMsg(Encoding.UTF8.GetString(msg.Value)));
}
});
}
public void UnSub()
{
_unSub = true;
}
}
ProduceHelper.cs代码:
/// <summary>
/// 生产者辅助类
/// </summary>
internal class ProduceHelper : IDisposable
{
private readonly Producer _producer;
private BrokerHelper _brokerHelper;
public ProduceHelper(BrokerHelper brokerHelper)
{
_brokerHelper = brokerHelper;
_producer = new Producer(_brokerHelper.GetBroker());
}
public void Dispose()
{
if (_producer != null)
_producer.Dispose();
}
/// <summary>
/// 发送消息到队列
/// </summary>
/// <param name="topic"></param>
/// <param name="datas"></param>
/// <param name="acks"></param>
/// <param name="timeout"></param>
/// <param name="codec"></param>
public void Pub(string topic, List<string> datas, short acks = 1, TimeSpan? timeout = default(TimeSpan?), MessageCodec codec = MessageCodec.CodecNone)
{
//var msgs = new List<Message>();
//foreach (string item in datas)
//{
// msgs.Add(new Message(item));
//}
var msgs = datas.Select(item => new Message(item)).ToList();
_producer.SendMessageAsync(topic, msgs, acks, timeout, codec);
}
}
App.config
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="Test" type="xxxxx.sssss.KafkaConfig, xxxxx.sssss" />
</configSections>
<Test broker="127.0.0.1:9092" topic="TestSiso" />
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
</startup>
</configuration>
运行结果如图:
最新文章
- 【转】使用Spring MVC统一异常处理实战
- 探究JavaScript中的五种事件处理程序
- 如何实现ZBrush中部分模型的选择和隐藏
- netty 粘包问题处理
- PHP学习之中数组--创建数组【1】
- The Suspects(POJ 1611 并查集)
- Maven 工程下 Spring MVC 站点配置 (二) Mybatis数据操作
- 苹果新手MacBook 目录认识
- wepy 初探
- 百度map 3.0初探
- asp.net mvc 三层加EF两表联查
- 01-oracle学习环境配置
- P2V后,VMWare ESX 上RedHat AS5网络不通问题的解决办法
- mysql update where
- 2018-2019-2 20175310 实验二《Java面向对象程序设计》实验报告
- sklearn:Python语言开发的通用机器学习库
- ADO.Net创建数据模型和数据访问类及泛型集合
- Python的两种运行方式
- smali-2.2.4.jar &; baksmali-2.2.4.jar
- Android-自定义TabHost