输入输出转化工具类

package com.rz.mobile_tag.log

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} /**
* 访问日志转换(输入==>输出)工具类
*/
object AccessConvertUtil {
// 定义的输出字段
val structType = StructType(
Array(
StructField("url", StringType),
StructField("cmsType", StringType),
StructField("cmsId", LongType),
StructField("traffic", LongType),
StructField("ip", StringType),
StructField("city", StringType),
StructField("time", StringType),
StructField("day", StringType) )
) /**
* 根据输入的每一行信息转换成输出的样式
* @param log 输入的每一行记录信息
*/
def parseLog(log:String)={
try{
val splits = log.split("\t",-) val url = splits()
val traffic = splits().toLong
val ip = splits() val domain = "http://www.rz.com/"
val cms = url.substring(url.indexOf(domain)+domain.length)
val cmsTypeId = cms.split("/") var cmsType = ""
var cmsId = 0l
if (cmsTypeId.length>){
cmsType = cmsTypeId()
cmsId = cmsTypeId().toLong
} val city=""
val time = splits()
val day = time.substring(, ).replaceAll("-","") // 这个Row里面的字段要和Struct中的字段对应上
Row(url, cmsType, cmsId, traffic, ip, city, time, day)
}catch {
case e:Exception =>{
Row()
}
}
} }

读取数据,清洗输出目标数据

package com.rz.mobile_tag.log

import org.apache.spark.sql.{SaveMode, SparkSession}

/**
* 使用Spark完成我们的数据清洗操作
*/
object SparkStatCleanJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}")
.master("local[2]")
.getOrCreate() val accessRDD = spark.sparkContext.textFile(args())
// debug查看数据
// accessRDD.take(10).foreach(println) val accessDF = spark.createDataFrame(accessRDD.map(log =>AccessConvertUtil.parseLog(log)),AccessConvertUtil.structType) // accessDF.printSchema()
// accessDF.show(false)
accessDF.coalesce().write.format("parquet").mode(SaveMode.Overwrite).partitionBy("day").save(args()) spark.stop()
}
}

最新文章

  1. Java的几个同步辅助类
  2. 让hadoop-0.20.2自带的eclipse插件支持eclipse-3.5以上
  3. 【原创】angularjs1.3.0源码解析之directive
  4. o4.数组指针和指针数组的区别
  5. C++类的复制构造函数和赋值运算符
  6. easyui-textbox
  7. 蓝牙-b
  8. 简述WebService与.NET Remoting的区别及适应场合 WCF
  9. auDemo
  10. 【Java深入研究】2、JVM类加载机制
  11. Django web编程3 -- 创建用户账户
  12. BaiduMap路程计算
  13. 命令:curl
  14. Android 常见异常及解决办法
  15. [PHP] 算法-找出两个链表的第一个公共结点的PHP实现
  16. JS字符串截取函数slice(),substring(),substr()的区别
  17. iOS通过URL构建UIImage
  18. 剑指offer十三之调整数组顺序使奇数位于偶数前面
  19. seq2seq模型以及其tensorflow的简化代码实现
  20. 给JSON中put的value=null时,这对key=value会被隐藏掉。

热门文章

  1. React 学习推荐
  2. 笔试真题解析 ALBB-2015 系统project师研发笔试题
  3. leetCode(37):Implement Queue using Stacks
  4. 【数据挖掘】分类之kNN(转载)
  5. Linux安装php-7.0.16,完成php和apache的配置
  6. saltstack内置执行模块shadow
  7. mysql海量数据条件删除
  8. 自定义tabpageindicator,可以自定义tab是三角形还是矩形,但是tab不具有滑动的功能
  9. 使用 Xcode 5 生成和使用静态库
  10. Myecplise Tomcat 启动很慢