Kafaka .NET连接

Kafka目前主流在用的.NET客户端有两个:一个是kafka-net,另外一个是Confluent.Kafka,这里给出使用示例:

kafka-net示例:

    public class NetKafka
    {
        public static void Push()
        {
            var options = new KafkaOptions(new Uri("http://192.168.253.133:9092"));
            var router = new BrokerRouter(options);
            var producer = new Producer(router);
            List<Message> msgArr = new List<Message>();
            msgArr.Add(new Message("你好"));
            producer.SendMessageAsync("MyTopic", msgArr.ToArray()).Wait(3000);

        }

        public static void Pull()
        {
            var options = new KafkaOptions(new Uri("http://192.168.253.133:9092"));
            var router = new BrokerRouter(options);
            var consumer = new Consumer(new ConsumerOptions("MyTopic", router));

            var msgs = consumer.Consume();
            foreach (var msg in msgs)
            {
                Console.WriteLine(Encoding.UTF8.GetString(msg.Value));
            }
        }
    }

confulen-kafkat示例:

    public class ConfulentKafka
    {
        public static void Push()
        {
            var config = new Dictionary<string, object>
            {
                { "group.id", "test-group" },
                {"bootstrap.servers","192.168.253.133:9092"}
            };
            using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
            {
                var dr = producer.ProduceAsync("MyTopic", null, "hello").Result;
                Console.WriteLine(dr.TopicPartitionOffset);
            }
        }

        public static void Pull()
        {
            var config = new Dictionary<string, object>
            {
                //同一个Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息
                { "group.id", "test-group" },
                 //kafka的集群消费地址
                { "bootstrap.servers", "192.168.253.133:9092" },
                 //consumer向consumer提交offset的频率,单位ms
                {"auto.commit.interval.ms",5000},
                 //earliest和latest:当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,earliest是从头开始消费、latest从末尾开始消费;
                 //none:当各分区下存在已提交的offset时,从offset后开始消费,但只要有一个分区不存在已提交的offset时,则抛出异常
                {"auto.offset.reset","earliest"},

            };

            using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
            {
                consumer.OnMessage += (_, msg)
                  => Console.WriteLine(msg.Value);

                consumer.OnError += (_, error)
                  => Console.WriteLine(error);

                consumer.OnConsumeError += (_, msg)
                  => Console.WriteLine(msg);

                consumer.Subscribe("MyTopic");

                while (true)
                {
                    consumer.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
        }
    }

最新文章

  1. Apache shiro之权限校验流程
  2. 浅谈我眼中的ASP.NET MVC
  3. 对单片机的modbus RTU的详细解释(转载)
  4. 【转】Windows环境下.NET 操作Oracle问题
  5. 异常处理与调试4 - 零基础入门学习Delphi53
  6. 源代码版本控制工具TortoiseSVN,AnkhSVN最新版本下载地址
  7. Java Ant Could not find the main class: org.eclipse.ant.internal.launching.remote.InternalAntRunner. Program
  8. python学习随笔(三)
  9. 3.Java 加解密技术系列之 SHA
  10. jquery快速入门(二)
  11. 6. svg学习笔记-路径
  12. JS_高程3.基本概念(5)语句
  13. python3学习笔记五(列表2)
  14. nyoj-0469-擅长排列的小明 II(找规律)
  15. centos7 部署 open-falcon 0.2.0
  16. NodeJS类型定义方式
  17. 2018.08.30 Tyvj1952 Easy(期望dp)
  18. Spark中文文本分析建模
  19. 1106-冒泡算法C程序(语法树)
  20. DOS中的CD命令详解

热门文章

  1. slf4j日志只输出到控制台,没输出到日志文件
  2. 用批处理实现垃圾文件清除/自动关机/清除copy病毒
  3. Office 2013 提示找不到 Office.zh-cn\XXXXX
  4. eas中删除原来的监听事件添加新的监听事件
  5. POJ2431-Expedition【优先队列+贪心】
  6. [bzoj1050 HAOI2006] 旅行comf (kruskal)
  7. 最小化安装CentOS-7-x86_64-Minimal-1511图文教程
  8. 24.基于groovy脚本进行partial update
  9. springcloud(一):初识springcloud
  10. 0622centos下coreseek安装及使用方法