废话不说,先来个示例,有个感性认识再介绍。

这个示例来自spark自带的example,基本步骤如下:

(1)使用以下命令输入流消息:

$ nc -lk 9999

(2)在一个新的终端中运行NetworkWordCount,统计上面的词语数量并输出:

$ bin/run-example streaming.NetworkWordCount localhost 9999

(3)在第一步创建的输入流程中敲入一些内容,在第二步创建的终端中会看到统计结果,如:

第一个终端输入的内容:

hello world again

第二个端口的输出

-------------------------------------------
Time: 1436758706000 ms
-------------------------------------------
(again,1)
(hello,1)
(world,1)

简单解释一下,上面的示例通过手工敲入内容,并传给spark streaming统计单词数量,然后将结果打印出来。

附上代码:

package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel /**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: NetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
*/
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
} StreamingExamples.setStreamingLogLevels() // Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
 
 
(一)构建自己的项目
本示例使用java+maven来构建一个wordcount
1、创建项目,在pom.xml添加如下的依赖关系

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-api</artifactId>

<version>1.7.0</version>

</dependency>

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-log4j12</artifactId>

<version>1.7.0</version>

</dependency>

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.10</artifactId>

<version>1.4.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.10</artifactId>

<version>1.4.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka_2.10</artifactId>

<version>1.4.0</version>

</dependency>

 

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.8.2.1</version>

</dependency>

 
2、写代码,此部分代码使用了官方的代码:
package com.netease.gdc.kafkaStreaming;

import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern; import scala.Tuple2;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils; /**
* Consumes messages from one or more topics in Kafka and does wordcount.
*
* Usage: JavaKafkaWordCount
* is a list of one or more zookeeper servers that make quorum
* is the name of kafka consumer group
* is a list of one or more kafka topics to consume from
*is the number of threads the kafka consumer should use
*
* To run this example:
* `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
* zoo03 my-consumer-group topic1,topic2 1`
*/ public final class JavaKafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" "); private JavaKafkaWordCount() {
} public static void main(String[] args) {
if (args.length < 4) {
System.err.println("Usage: JavaKafkaWordCount
");
System.exit(1);
} SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
// Create the context with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); int numThreads = Integer.parseInt(args[3]);
Map topicMap = new HashMap();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
} JavaPairReceiverInputDStream messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaDStream lines = messages.map(new Function<tuple2, String>() {
@Override
public String call(Tuple2 tuple2) {
return tuple2._2();
}
}); JavaDStream words = lines.flatMap(new FlatMapFunction() {
@Override
public Iterable call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
}); JavaPairDStream wordCounts = words.mapToPair(
new PairFunction() {
@Override
public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
}).reduceByKey(new Function2() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}); wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
 
3、上传到服务器中然后编译
mvn clean package
4、提交job到spark中
/home/hadoop/spark/bin/spark-submit --jars ../mylib/metrics-core-2.2.0.jar,../mylib/zkclient-0.3.jar,../mylib/spark-streaming-kafka_2.10-1.4.0.jar,../mylib/kafka-clients-0.8.2.1.jar,../mylib/kafka_2.10-0.8.2.1.jar  --class com.netease.gdc.kafkaStreaming.JavaKafkaWordCount --master spark://192.168.165.102:7077  target/kafkaStreaming-0.0.1-SNAPSHOT.jar 192.168.172.111:2181/kafka my-consumer-group test 3
当然,前提是kafka集群已经正常运行,且存在test这个topic
 
5、验证
打开一个console producer,输入内容,然后观察wordcount的结果。
结果形式如下:
(hi,1)

  

最新文章

  1. jq中数组应用的错误
  2. c++模板使用出错情况error LNK2019: unresolved external symbol &quot;public: float __thiscall Compare&lt;float&gt;::min(void)&quot; (?min@?$Compare@M@@QAEMXZ) referenced in function _main
  3. [转]反向ajax项目
  4. OC中的深拷贝与浅拷贝
  5. VC++ 将资源位图画到窗口上去的方法
  6. 不可或缺 Windows Native (9) - C 语言: 动态分配内存,链表,位域
  7. 解决调用context.Session[&quot;NAME&quot;]时总出现Object reference not set to an instance of an object.异常的方法
  8. demo15 AlertDialog
  9. CentOS 6.5 安装 Nginx 1.7.8 教程
  10. [JavaScript]&#39;this&#39;详解
  11. C语言中,如何通过socket得到对端IP地址
  12. hdu 4915 Parenthese sequence(模拟)2014多培训学校5现场
  13. 企业应用打包的时候 修改ipa包的bundle identifier
  14. 《PHP, MySQL, Javascript和CSS》读书随手记----php篇
  15. symfony 事务提交
  16. Xamarin版的C# SVG路径解析器
  17. HDU 2883 kebab(最大流)
  18. 利用 localStorage 储存css js
  19. 洛谷P4458 /loj#2512.[BJOI2018]链上二次求和(线段树)
  20. Uva 11419 我是SAM

热门文章

  1. Domino系统从UNIX平台到windows平台的迁移及备份
  2. 平板电脑上完美体验Windows 8 (视频)
  3. go语言的一个gui 开源 项目 https://github.com/andlabs/ui
  4. 二分图简单概念&amp;&amp;HDU 2063
  5. js--基于面向对象的组件开发及案例
  6. angular路由(自带路由篇)
  7. 洛谷 P1801 黑匣子_NOI导刊2010提高(06)(未完)
  8. 利用JAVA反射机制实现调用私有方法
  9. 【翻译自mos文章】OGG的集成捕捉模式支持Oracle database标准版么?
  10. 可变参数的实现my_sprintf