1.环境

jdk : 1.8

scala : 2.11.7

hadoop:2.7

spark : 2.2.0

2. 开发工具

idea 2017.2

3.maven的pom文件

<dependencies>
<!-- https://mvnrepository.com/artifact/com.sun/tools -->
<!-- https://mvnrepository.com/artifact/org.apache.maven/maven-core -->
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-core</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>

4.sparkStreaming通过本地的socket端口解析日志

package test02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext object demo5 {
def main (args : Array[String]) {
val conf = new SparkConf().setMaster("local[4]").setAppName("SaprkApp")
val ssc = new StreamingContext(conf, Seconds(10))
val lines = ssc.socketTextStream("localhost", 9999) //测试数据 : 2017-07-18 11:02:52.032 INFO 172.31.20.232:56965 18583551@170718110244697@172.31.33.81 PUBLIC_USER_LOGIN(0202)-0 create session for 18583551/18583551@170718110244697@172.31.33.81
val loginRDD = lines.map(line =>{
val pattern = """^(\S+\s\d+\:\d+\:\d+\.\d{3})\s+\S+\s+(\d+\.\d+.\d+\.\d+):\d+\S+\s+(\d+)@(\S+)@\S+\s+PUBLIC_USER_LOGIN\(0202\)-0\s+create session for.*""".r
var login_time = ""
var ip_address = ""
var passport_id = ""
var session_id = ""
var scene_name = "login"
pattern.findAllIn(line).matchData foreach{m =>{
login_time = m.group(1)
ip_address = m.group(2)
passport_id = m.group(3)
session_id = m.group(4) println(login_time)
println(ip_address)
println(passport_id)
println(session_id) }}
(login_time,ip_address,passport_id,session_id,scene_name)
})
val loginRes = loginRDD.filter(_._1 != "").filter(_._2 != "")
loginRes.print() //测试数据 : 2017-07-18 11:03:44.312 INFO 0.0.0.0:18402 26639185@170718110334147@172.31.32.135 ADMIN_SYSTEM_SUCCESS(00FE)-84581 -> USER_DISCONNECTED Time cost 9.09ms
val logoutRDD = lines.map(line =>{
val pattern = """^(\S+\s\d+\:\d+\:\d+\.\d{3})\s+\S+\s+(\d+\.\d+\.\d+\.\d+):\d+\S+\s+(\d+)@(\S+)@.*(disconnected|DISCONNECTED).*""".r
var login_id = ""
var ip = ""
var passport_id = ""
var str = ""
var scene_name = "logout"
pattern.findAllIn(line).matchData foreach{m =>{
login_id = m.group(1)
ip = m.group(2)
passport_id = m.group(3)
str = m.group(4) println(login_id)
println(ip)
println(passport_id)
println(str) }}
(login_id,ip,passport_id,str)
})
val logoutRes = logoutRDD.filter(_._1 != "").filter(_._2 != "")
logoutRes.print()
logoutRes.saveAsTextFiles("/Users/huiliyang/streaming/aa") //测试数据 : 2017-08-27 06:04:38.420 [info] <0.3471.83> 172.31.2.201:59154 70281275 PUBLIC_SERVER_CLIENT_LOG(258)-0 LovelyStreet:1228
val eventRDD = lines.map(line =>{
val pattern = """^(\S+\s\d+\:\d+\:\d+\.\d{3})\s+\[info\]\s<[\d\.]*>\s?(\d+\.\d+\.\d+.\d+):\d+\S+\s+\S?([1-9]\d{7})(@\d+@\d+\.\d+\.\d+\.\d+)?\S?\s+PUBLIC_(SERVER|SYSTEM)_CLIENT\S+\s(\S+):(\d+)""".r
var login_time = ""
var ip_address = ""
var passport_id = ""
var session_id = ""
var str1 = ""
var str2 = ""
var str3 = ""
pattern.findAllIn(line).matchData foreach{m =>{
login_time = m.group(1)
ip_address = m.group(2)
passport_id = m.group(3)
session_id = m.group(4)
str1 = m.group(5)
str2 = m.group(6)
str3 = m.group(7) println(login_time)
println(ip_address)
println(passport_id)
println(session_id)
println(str1)
println(str2)
println(str3) }}
(login_time,ip_address,passport_id,session_id,str1,str2,str3)
})
val eventRes = eventRDD.filter(_._1 != "").filter(_._2 != "")
eventRes.print() ssc.start()
ssc.awaitTermination() }
}

最新文章

  1. C#开发微信门户及应用(40)--使用微信JSAPI实现微信支付功能
  2. Java监听器
  3. iOS开发之使用Runtime给Model类赋值
  4. Nodejs进阶:如何玩转子进程(child_process)
  5. 25款顶级的jQuery表格插件
  6. AVR/Arduino定时/计数器、中断入门
  7. Python 全栈开发 -- 开发环境篇
  8. 关于Servlet中重定向
  9. COJN 0583 800602分苹果
  10. struts2处理请求流程详解
  11. CentOS配置smaba与Windows共享文件
  12. xls添加 序号列技巧
  13. sql测验,like 和 = 的区别
  14. redis持久化方案(十)
  15. Linux—CentOS7下python开发环境配置
  16. 【Dubbo 源码解析】02_Dubbo SPI
  17. 结对项目 https://github.com/quchengyu/jiedui/tree/quchengyu-patch-1
  18. 【转】Error:JAVA_HOME is not set and could not be found
  19. Python基础6 面向对象编程
  20. shell 命令 mkdir -p

热门文章

  1. MySQL主键跟外键
  2. Mac OS 快速查询技巧
  3. GoF著作中未提到的设计模式(2):Interceptor
  4. HDU6395-Sequence 矩阵快速幂+除法分块 矩阵快速幂模板
  5. Java-Class-C:org.springframework.web.client.RestTemplate
  6. nginx中reuqest_uri与uri的区别说明
  7. 在Debian中安装VNC Server
  8. jquery实现点击按钮弹出层和点击空白处隐藏层
  9. 打包的@font-face包
  10. 码云的使用以及pycharm