一、pom.xml

 <dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency> <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency> <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/LICENSE*</exclude>
<exclude>META-INF/NOTICE*</exclude>
<exclude>license/*</exclude>
<exclude>LICENSE*</exclude>
<exclude>NOTICE*</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.bigData.DataProducer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

二、相关配置文件

producer.properties

#acks=1
bootstrap.servers=alary001:9092,alary002:9092,alary003:9092
retries=2
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

log4j.properties

# Output pattern : date [thread] priority category - message   FATAL 0  ERROR 3  WARN 4  INFO 6  DEBUG 7
log4j.rootLogger=INFO, Console #Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d %-5p [%c{5}] - %m%n

base.properties

topic=Data_Server

三、Producer客户端

在集群上启动zookeeper

zkServer.sh start

查看zookeeper的状态

zkServer.sh status

启动kafka集群:

kafka-server-start.sh config/server.properties &

创建新的topic

kafka-topics.sh --create --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --replication-factor 3 --partitions 3 --topic Data_Server

查看topic副本信息

kafka-topics.sh --describe alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --replication-factor 3 --partitions 3 --topic Data_Server

查看已经创建的topic信息

kafka-topics.sh --list --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0

测试生产者发送消息

bin/kafka-console-producer.sh --broker-list alary001:9092,alary002:9092,alary003:9092 --topic Data_Server

测试消费者消费消息

kafka-console-consumer.sh --bootstrap-server alary001:9092,alary002:9092,alary003:9092 --from-beginning --topic Data_Server

删除topic

bin/kafka-topics.sh --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --delete --topic Data_Server

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

停止Kafka服务

kafka-server-stop.sh stop

停止zookeeper集群

zkServer.sh stop

package com.zlkj.producer;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties; public class DataTransmission { private static final Logger logger = LoggerFactory.getLogger(com.zlkj.producer.DataTransmission.class); public static void main(String[] args) {
Properties baseConfiguration = new Properties();
Properties producerConfiguration = new Properties();
try {
baseConfiguration.load(com.zlkj.producer.DataTransmission.class.getResourceAsStream("/base.properties"));
if (args != null && args.length > 0 && StringUtils.isNoneBlank(args[0])) {
producerConfiguration.load(new FileInputStream(args[0]));
} else {
producerConfiguration.load(com.zlkj.producer.DataTransmission.class.getResourceAsStream("/producer.properties"));
}
} catch (IOException e) {
logger.error("=================加载配置异常=================");
} //发送消息
Producer producer = new KafkaProducer<String, String>(producerConfiguration);
for (int i = 1; i <= 10; i++) {
String value = "value_" + i;
logger.info("发送的消息: {}", value);
ProducerRecord<String, String> msg = new ProducerRecord<String, String>(baseConfiguration.getProperty("topic"), value);
producer.send(msg);
}
producer.close();
}
}

最新文章

  1. POCO库——Foundation组件之核心Core
  2. Sublime更换默认字体的方法
  3. POJ 1269 Intersecting Lines --计算几何
  4. iOS如何上传代码到Github
  5. cocos2d-x知识巩固-基础篇(2)
  6. Ext入门学习系列(五)表格控件(2)
  7. InkPicture 控件使用_01
  8. 分页过滤SQL求总条数SQL正则
  9. 自制单片机之十五……可串行驱动LCD12864的应用
  10. spring bean初始化和销毁
  11. docker入门实战笔记
  12. 用ECMAScript4 ( ActionScript3) 实现Unity的热更新 -- 使用第三方组件
  13. Throughtput收集器
  14. DB 查询分析器 方便地创建DB2自定义函数
  15. angular.isNumber()
  16. 如何量化考核技术人的KPI?
  17. [UE4]判断2个向量是否相等
  18. JavaEE正常开发怎么做
  19. scp出现Permission denied,please try again的解决办法
  20. M1事后分析报告

热门文章

  1. Pytest测试框架(三):pytest fixture 用法
  2. 死磕以太坊源码分析之MPT树-上
  3. Hive 中的四种排序详解,再也不会混淆用法了
  4. 辅助调用函数【call,apply,bind】
  5. 索引失效 -- 使用Between范围查询时
  6. 任意文件下载漏洞的接口URL构造分析与讨论
  7. python模块详解 | unittest(单元测试框架)(持续更新中)
  8. Linux学习笔记 | 常见错误之无法获得锁
  9. Pulsar vs Kafka,CTO 如何抉择?
  10. Spring Bean详解