我们来安装Kafka的集群模式,三台机器:

192.168.131.128
192.168.131.130
192.168.131.131

Kafka集群需要依赖zookeeper,所以需要先安装好zk。

下载Kafka安装包:

Kafka_2.11-1.1.0.tgz

解压到 /usr/local/下。

进入到Kafka的config目录下:

我们看到有zk的配置文件,这是Kafka自带的zk,如果你没有安装zk,可以使用Kafka集成的zk,配置方式和单独安装是一样的。

我们默认已经安装zk,所以修改server.properties文件,大致的配置项有这些:

broker.id=0        #每个实例不一样
listeners=PLAINTEXT://192.168.131.128:9092 #改为所在主机的ip
advertised.host.name=192.168.131.128     #改为改为所在主机的ip
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/Kafka/log #需手动创建,Kafka并不会根据配置文件自动创建
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.131.128:2181,192.168.131.130:2181,192.168.131.131:2181 #修改为zookeeper所在主机ip:port
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
auto.create.topics.enable=false

需要修改的地方已经标注出来了。

然后我们需要将Kafka同步到另外两台机器上:

scp -r Kafka hadoop@hadoopslaver1:/usr/local
scp -r Kafka hadoop@hadoopslaver2:/usr/local

下面我们准备启动,首先确保zk是启动的,如果没有安装可以使用Kafka的zk:

bin/zookeeper-server-start.sh config/zookeeper.properties &

然后我们启动Kafka:

bin/Kafka-server-start.sh -daemon config/server.properties &

三台机器上都要执行启动操作,如果偶没有报错就是启动成功了。

接下来我们可以做一些测试。

消费端:

import java.util.Arrays;
import java.util.Properties; import org.apache.Kafka.clients.consumer.Consumer;
import org.apache.Kafka.clients.consumer.ConsumerConfig;
import org.apache.Kafka.clients.consumer.ConsumerRecord;
import org.apache.Kafka.clients.consumer.ConsumerRecords;
import org.apache.Kafka.clients.consumer.KafkaConsumer;
import org.apache.Kafka.common.serialization.StringDeserializer; public class Consumer { public static void main(String[] args) {
Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.131.128:9092,192.168.131.130:9092,192.168.131.131:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("page_visits")); while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}

生产者:

import java.util.Date;
import java.util.Properties;
import java.util.Random; import org.apache.Kafka.clients.producer.Callback;
import org.apache.Kafka.clients.producer.KafkaProducer;
import org.apache.Kafka.clients.producer.Producer;
import org.apache.Kafka.clients.producer.ProducerRecord;
import org.apache.Kafka.clients.producer.RecordMetadata; public class Producer {
public static void main(String[] args) {
long events = 1;
Random rnd = new Random(); Properties props = new Properties();
props.put("bootstrap.servers", "192.168.131.128:9092,192.168.131.130:9092,192.168.131.131:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.Kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.Kafka.common.serialization.StringSerializer");
//配置partitionner选择策略,可选配置
props.put("partitioner.class", "com.rickiyang.service.Partitioner"); Producer<String, String> producer = new KafkaProducer<>(props); for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
ProducerRecord<String, String> data = new ProducerRecord<String, String>("page_visits", ip, msg);
producer.send(data,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
}
producer.close();
}
}

自定义分区策略:

import java.util.List;
import java.util.Map; import org.apache.Kafka.clients.producer.Partitioner;
import org.apache.Kafka.common.Cluster;
import org.apache.Kafka.common.PartitionInfo; public class Partitioner implements Partitioner { @Override
public void configure(Map<String, ?> configs) {
// TODO Auto-generated method stub } @Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int partition = 0;
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
} return partition;
} @Override
public void close() {
// TODO Auto-generated method stub } }

我们运行一下:

Producer:

Consumer

客户端可以接受到服务端的消息的。

最新文章

  1. hoj 2634 How to earn more
  2. app打包(同步发生冲突)
  3. Apache Rewrite 拟静态配置
  4. Apache Spark源码走读之16 -- spark repl实现详解
  5. python笔记 - day7
  6. js内置对象-Date对象
  7. MySQL定期分析检查与优化表
  8. jQuery 基础
  9. PHP时间戳和日期相互转换
  10. Qt中如果通过QStyle自定义能够跨平台的界面控件
  11. QDockWidget嵌套布局详解-实现Visual Studio布局
  12. svn 几个好用的命令
  13. 老李分享:Android性能优化之内存泄漏3
  14. Form提交表单后页面刷新不跳转的实现
  15. PDF的水印怎么去掉
  16. 聚类--K均值算法
  17. npm 镜像的问题
  18. Python全栈之路----数据类型—列表
  19. ios成长之每日一遍(day 4)
  20. MOOC视频学习

热门文章

  1. HDU 3949:XOR(高斯消元+线性基)
  2. HDU 6053:TrickGCD(莫比乌斯反演)
  3. Ng-Matero:基于 Angular Material 搭建的中后台管理框架
  4. C++中 / 和 % 在分离各位时的妙用
  5. 2050编程赛 冰水挑战 HDU 6495
  6. 嵊州D2T1 “我只是来打个电话”
  7. Object.toString()打印“地址”的原理
  8. 1. 在Mac OS中配置CMake的详细图文教程
  9. 在CentOS6.5部署Redis为开机自启
  10. 跨站脚本攻击(反射型xss)笔记(一)