import org.apache.spark.{SparkContext, SparkConf}
import scala.math.random
/**
* 利用spark进行圆周率的计算
*/
object test {
def main(args: Array[String]) {
// 这个是远程连接集群主机测试代码
val conf = new SparkConf().setAppName("SparkPai").setMaster("spark://192.168.1.116:7077").setJars(List("D:\\IntelliJ IDEA 15.0.2\\workplace\\test\\out\\artifacts\\test_jar\\test.jar"))
val sc = new SparkContext(conf) //分片数
val slices = if (args.length > ) args().toInt else
//为避免溢出,n不超过int的最大值
val n = math.min(10000L*slices, Int.MaxValue).toInt
//计数
val count = sc.parallelize( until n, slices).map{
lines =>
//小于1的随机数
val x = random* -
//小于1的随机数
val y = random* -
//点到圆心的的值,小于1计数一次,超出1就不计算
if (x*x + y*y < ) else
}.reduce(_+_) //汇总累加落入的圆中的次数 //count / n是概率,count落入圆中次的数,n是总次数;
println("Pai is roughly " + 4.0 * count / n)
sc.stop()
}
}

2.本地测试。(一般基本没有多大用)

import java.io.File

import org.apache.spark.{SparkContext, SparkConf}
import scala.math.random
/**
* 利用spark进行圆周率的计算
* Created by 汪本成 on 2016/6/10.
*/
object T1 {
def main(args: Array[String]) {
//---
val path = new File(".").getCanonicalPath()
//File workaround = new File(".");
System.getProperties().put("hadoop.home.dir", path);
new File("./bin").mkdirs();
new File("./bin/winutils.exe").createNewFile();
//--- val conf = new SparkConf().setAppName("SparkPai").setMaster("local[4]")
val sc = new SparkContext(conf) //分片数
val slices = if (args.length > ) args().toInt else
//为避免溢出,n不超过int的最大值
val n = math.min(10000L*slices, Int.MaxValue).toInt
//计数
val count = sc.parallelize( until n, slices).map{
lines =>
//小于1的随机数
val x = random* -
//小于1的随机数
val y = random* -
//点到圆心的的值,小于1计数一次,超出1就不计算
if (x*x + y*y < ) else
}.reduce(_+_) //汇总累加落入的圆中的次数 //count / n是概率,count落入圆中次的数,n是总次数;
println("Pai is roughly " + 4.0 * count / n)
sc.stop()
}
}

-------------------------------------------------------------------------------------以下是kafka的生产者和消费者----------------------------------------------------------------------------------

生产者

import java.util.Map;
import java.util.Properties; import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; public class ProducerKafKa {
private KafkaProducer<String, String> producer;
private Properties properties; public ProducerKafKa() {
properties = new Properties();
properties.put("bootstrap.servers", "192.168.1.116:9092,192.168.1.118:9092,168.1.119:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(properties);
} public void sendRecorder(String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("world", key, value);
producer.send(record); } public void assignPartitionSend(String key,String value){
ProducerRecord<String, String> record = new ProducerRecord<String, String>("world", key, value);
producer.send(record);
} public void sendRecordWithCallback(String key, String value) {
final Logger logger = LoggerFactory.getLogger(ProducerKafKa.class);
ProducerRecord<String, String> record = new ProducerRecord<String, String>("world", key, value);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
logger.info("存储位置:partition:" + metadata.partition() + ",offset:" + metadata.offset() + ",ts:");
;
} else {
exception.printStackTrace();
}
}
});
} public void close() {
producer.flush();
producer.close();
} public void getMetrics(){
Logger logger = LoggerFactory.getLogger(ProducerKafKa.class); Map<MetricName, Metric> metrics = (Map<MetricName, Metric>) producer.metrics();
for (MetricName name : metrics.keySet()) {
logger.info(name.name()+":"+metrics.get(name).value());
}
} public static void main(String[] args) {
ProducerKafKa client = new ProducerKafKa();
for (int i = ; i < ; i++) {
client.sendRecorder("key" + i, "value" + i);
}
client.close();
} }

消费者(类型Ⅰ)

package test;

/**
* Created by guest2 on 2018/3/10.
* 创建消费者代码
*/
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties; /**
* @author Joker
* 自己控制偏移量提交
* 很多时候,我们是希望在获得消息并经过一些逻辑处理后,才认为该消息已被消费,这可以通过自己控制偏移量提交来实现。
*/
public class ManualOffsetConsumer {
private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class); public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
//设置brokerServer(kafka)ip地址
props.put("bootstrap.servers", "192.168.1.116:9092,192.168.1.117:9092,192.168.1.119:9092");
//设置consumer group name
props.put("group.id","mygroup11");
props.put("enable.auto.commit", "false");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//设置心跳时间
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.subscribe(Arrays.asList("test"));//主题?
final int minBatchSize = 5; //批量提交数量
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(30000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("consumer message values is----: "+record.value()+" and the offset is "+ record.offset());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
System.out.println("---now commit offset"+buffer.size());
consumer.commitSync();
buffer.clear();
}
}
}
}

linux和idea消费的信息图下

消费者(类型Ⅱ)

package com.you.bd17;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition; public class MenulConsumer {
private Properties properties = new Properties();
private KafkaConsumer<String, String> consumer; public MenulConsumer(){
properties.setProperty("bootstrap.servers", "master:9092,slave1:9092");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "java_group");
// 设置取消自动提交offset
properties.setProperty("enable.auto.commit", "false");
properties.setProperty("auto.offset.reset", "none");
consumer = new KafkaConsumer<String, String>(properties);
} // 获取topic的offset值
public void getOffsets(){
OffsetAndMetadata offsets = consumer.committed(new TopicPartition("from-java", 1));
System.out.println(offsets + ":" + offsets.offset());
} public void subscribeTopic(){
List<String> topics = new ArrayList<String>();
topics.add("from-java");
consumer.subscribe(topics);
while(true){
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition:" + record.partition() + ",offset:" + record.offset() +",key:" + record.key() + ",value:" + record.value());
}
consumer.commitSync();
}
} // 指定分区消费,指定offset的值处开始消费
// 对topic 的消费有两种方式,第一是:consumer.subscribe(topics);
// 第二是:consumer.assign(topicPartitions);
// 两种方式互斥,只能选择一种
public void consumerAssignerd(){
/*List<String> topics = new ArrayList<String>();
topics.add("from-java");
consumer.subscribe(topics);*/
// 指定分区
List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
topicPartitions.add(new TopicPartition("from-java", 0));
consumer.assign(topicPartitions);
// 指定的分区的offset消费
consumer.seek(new TopicPartition("from-java", 0), 20);
while(true){
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition:" + record.partition() + ",offset:" + record.offset() +",key:" + record.key() + ",value:" + record.value());
}
}
} // 设置提交的offset
public void setCommitOffset() {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
offsets.put(new TopicPartition("from-java", 0), new OffsetAndMetadata(20));
// 指定位置提交某个分区的offset的值,这个会在下一次拉取数据之前生效
consumer.commitSync(offsets);
List<String> topics = new ArrayList<String>();
topics.add("from-java");
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
if (record.partition() == 0) {
System.out.println("partition:" + record.partition() + ",offset:" + record.offset() + ",key:"
+ record.key() + ",value:" + record.value());
}
}
}
} //
public void exactlyOnceConsumer(){
// 1 配置上参数
properties.setProperty("enable.auto.commit", "false"); // 2 订阅主题或者分区
// consumer.subscribe(topics);
// 重设offset (offset的值需要从mysql中获取) // 3 从mysql 中获取perxon topic每个分区的值 , 使用:
// 4.1 consumer.commitSync(offset);提交到kafka服务器上
// 或者使用
// 4.2 consumer.seek(new TopicPartition("from-java", 0), 20);
// 来指定要从kafka中高消费数据的初始位置 // 5 poll 数据
// records = consumer.poll(1000); // 6 遍历数据进行分析计算 // 7 计算结束之后用consumer.committed(new TopicPartition("from-java", 1));方法
// 获取当前已经消费的offset计算 // 8 把计算结果和offset的值以原子的操作(事务)的形式保存到mysql数据库 // 9 重新调到第5步循环执行,进行下一次的poll和下一次计算 } public static void main(String[] args) {
MenulConsumer menulConsumer = new MenulConsumer();
//menulConsumer.subscribeTopic();
//menulConsumer.getOffsets();
//menulConsumer.consumerAssignerd();
menulConsumer.setCommitOffset();
}
}

maven依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.</modelVersion>
<groupId>com.zhiyou.bd17</groupId>
<artifactId>KafkaTest</artifactId>
<version>0.0.-SNAPSHOT</version> <dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-sinks/flume-ng-kafka-sink --> <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-kafka-sink</artifactId> <version>1.8.</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.</version> </dependency> </dependencies> </project>

消费者(类型Ⅲ)

package com.zhiyou.bd17;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; public class ProducerConsumer { private Properties properties = new Properties();
private KafkaConsumer<String, String> consumer; // 初始化配置
public ProducerConsumer(){
properties.setProperty("bootstrap.servers", "192.168.1.116:9092,192.168.1.118:9092,168.1.119:9092");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "java_group");
consumer = new KafkaConsumer<String, String>(properties);
} // 订阅topic
public void subscribeTopic(){
List<String> topics = new ArrayList<String>();
topics.add("kafkademo");
consumer.subscribe(topics);
// 循环从kafka中拉取数据
while(true){
// 从kafka中拉取数据
ConsumerRecords<String, String> records = consumer.poll();
for (ConsumerRecord<String, String> record : records) {
System.out.println("接收到的消息:partition:" + record.partition() + ",offset:" + record.offset()
+ ",key" + record.key() + ",value:" + record.value());
}
}
} public static void main(String[] args) {
ProducerConsumer producerConsumer = new ProducerConsumer();
producerConsumer.subscribeTopic();
} }

-------------------------------上比较乱.下面创建一个生产者,一个消费者.它俩是配套------------------------------

消费者

/**
* Created by guest2 on 2018/3/10.
* 创建消费者代码
*/
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties; /**
* @author Joker
* 自己控制偏移量提交
* 很多时候,我们是希望在获得消息并经过一些逻辑处理后,才认为该消息已被消费,这可以通过自己控制偏移量提交来实现。
*/
public class ManualOffsetConsumer {
private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class); public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
//设置brokerServer(kafka)ip地址
props.put("bootstrap.servers", "192.168.1.116:9092,192.168.1.117:9092,192.168.1.119:9092");
//设置consumer group name
props.put("group.id","mygroup11");
props.put("enable.auto.commit", "false");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//设置心跳时间
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.subscribe(Arrays.asList("test"));//主题?
final int minBatchSize = 5; //批量提交数量
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(30000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("consumer message values is----: "+record.value()+" and the offset is "+ record.offset());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
System.out.println("---now commit offset"+buffer.size());
consumer.commitSync();
buffer.clear();
}
}
}
}

消费者

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import kafka.javaapi.producer.Producer; import java.util.Properties;
import java.util.concurrent.TimeUnit; /**
* Created by zhanghuayan on 2017/1/22.
*/
public class ProducerTest extends Thread {
private String topic; public ProducerTest(String topic) {
super();
this.topic = topic;
} public void run() {
Producer producer = createProducer();
int i=0;
while(true){
producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} private Producer<Integer, String> createProducer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.1.116:2181");//声明zk
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list", "192.168.1.116:9092");// 声明kafka broker
return new Producer<Integer, String>(new ProducerConfig(properties));
} public static void main(String[] args){
new ProducerTest("test").start();
}
}

最新文章

  1. 【代码笔记】iOS-验证手机号,邮箱,车牌号是否合法
  2. 02-C#入门(枚举、结构等)
  3. 在MVC5和webAPI下是用Autofac依赖注入
  4. CMD代码页
  5. Alpha版本——Postmortem会议
  6. SQL Server 几种锁的区别
  7. NGINX Plus 现在完全支持 HTTP/2
  8. java字节数组和16进制之间的转换
  9. js之dom_2
  10. Windows7 下安装ORACLE 11G(遇到的问题)
  11. Java访问kafka的时候java.nio.channels.ClosedChannelException解决办法
  12. UWP_小说在线阅读器:功能要求与技术要求
  13. Qt中的View Model模型
  14. vuex 收藏一个循序渐进,易懂易行的博客。
  15. .net core 连接mysql
  16. 《Spring Boot 入门及前后端分离项目实践》目录
  17. Setting property &#39;source&#39; to &#39;org.eclipse.jst.jee.server:hczm&#39; did not find a matching property
  18. 《Oracle DBA工作笔记:运维、数据迁移与性能调优》 PDF 下载
  19. php 路途一点启示
  20. [ SHELL编程 ] 文件内容大小写替换

热门文章

  1. 深入理解Kubernetes资源限制:CPU
  2. Disable trigger to avoid the ID is auto-updated
  3. RedisTemplate 中 opsForHash()使用 (没有测试过,copy的)
  4. 6、获取Class中的方法
  5. stl常数测试
  6. 001_FreeRTOS中断配置
  7. HTML的基础
  8. 详解Kafka: 大数据开发最火的核心技术
  9. word文档如何选择全部图片粘贴
  10. 牛客练习赛53 (C 富豪凯匹配串) bitset