kafka

kafka的操作相对来说简单很多

安装

下载kafka http://kafka.apache.org/downloads

tar -zxvf kafka_2.12-2.1.0.tgz
rm kafka_2.12-2.1.0.tgz
mv kafka_2.12-2.1.0 kafka sudo vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile 准备 worker1 worker2 worker3 这四台机器 首先确保你的zookeeper集群能够正常运行worker1 worker2 worker3为zk集群
具体配置参照我的博客https://www.cnblogs.com/ye-hcj/p/9889585.html

修改配置文件

  1. server.properties

    sudo vim server.properties
    添加如下属性
    broker.id=0 # 3台机器分别设置成0 1 2
    log.dirs=/usr/local/kafka/logs
    zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
  2. 运行

    运行
    bin/kafka-server-start.sh config/server.properties
    创建topic
    bin/kafka-topics.sh --create --zookeeper worker1:2181 --replication-factor 2 --partitions 2 --topic test
    查看topic
    bin/kafka-topics.sh --list --zookeeper worker1:2181
    订阅topic,利用worker2来订阅
    bin/kafka-console-consumer.sh --bootstrap-server worker1:9092 --topic test --from-beginning
    topic发送消息
    bin/kafka-console-producer.sh --broker-list worker1:9092 --topic test
    键入任何消息,worker2都能接收到
    查看topic详情
    bin/kafka-topics.sh --describe --zookeeper worker1:2181 --topic test

java操作kafka

  1. 依赖

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.0</version>
    </dependency>
  2. 生产者

    public class Producer
    {
    public static void main( String[] args ){
    Properties props = new Properties();
    // 服务器ip
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092");
    // 属性键值对都序列化成字符串
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 创建一个生产者,向test主题发送数据
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, String>("test", "生产者传递的消息"));
    producer.close();
    }
    }
  3. 消费者

    public class Consumer
    {
    public static void main( String[] args ){
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消费者对象
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    kafkaConsumer.subscribe(Arrays.asList("test"));
    while (true) {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.between(
    LocalDateTime.parse("2019-01-09T11:30:30"), LocalDateTime.now()));
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, value = %s", record.offset(), record.value());
    System.out.println();
    }
    }
    }
    }

最新文章

  1. Oracle迁移:Linux-&gt;Windows
  2. ACM : POJ 2676 SudoKu DFS - 数独
  3. 9.4用WebApi去连接外部认证服务
  4. sqlserver08评估期已过的解决方法
  5. angularJS中controller的通信
  6. QQ空间开放平台开发教程-SDK和API的使用
  7. Windows 8关机的三个最简单方法
  8. django_auth_ldap
  9. 二分图最大匹配(匈牙利算法Dfs模板)
  10. centos账户管理命令(root权限)
  11. PHP引用(&amp;)详解
  12. HDU1247 Hat’s Words 【trie树】
  13. CSS3字体模块
  14. Linux添加用户user到用户组group
  15. .project
  16. Java接口和抽象类的理解
  17. Saiku Table展示数据合并bug修复(二十五)
  18. PHP-1安装配置
  19. .NET 黑魔法 - asp.net core 日志系统
  20. Hash算法原理的简单分析

热门文章

  1. Selenium with Python 007 - Cookie处理
  2. 通过 HTTP 请求加载远程数据(ajax,axios)
  3. js数组,数字函数,字符串函数,表单验证,hashMap,堆栈,日期函数,call函数
  4. js最基础的作用域问题
  5. Lua基础---变量与赋值
  6. Android 命令行模拟按键
  7. php 中的杂项函数
  8. SCARA——OpenGL入门学习五六(三维变换、动画)
  9. NSNotificationCenter 通知中心传值
  10. 【2】基于zookeeper,quartz,rocketMQ实现集群化定时系统