spark操作kudu之DML操作
2024-10-18 10:11:21
Kudu支持许多DML类型的操作,其中一些操作包含在Spark on Kudu集成
包括:
INSERT - 将DataFrame的行插入Kudu表。请注意,虽然API完全支持INSERT,但不鼓励在Spark中使用它。使用INSERT是有风险的,因为Spark任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为如果行已经存在,INSERT将不允许插入行(导致失败)。相反,我们鼓励使用下面描述的INSERT_IGNORE。
INSERT-IGNORE - 将DataFrame的行插入Kudu表。如果表存在,则忽略插入动作。
DELETE - 从Kudu表中删除DataFrame中的行
UPSERT - 如果存在,则在Kudu表中更新DataFrame中的行,否则执行插入操作。
UPDATE - 更新dataframe中的行
Insert操作
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.kudu.spark.kudu._
/**
* Created by angel;
*/
object Insert {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("AcctfileProcess")
//设置Master_IP并设置spark参数
.setMaster("local")
.set("spark.worker.timeout", "500")
.set("spark.cores.max", "10")
.set("spark.rpc.askTimeout", "600s")
.set("spark.network.timeout", "600s")
.set("spark.task.maxFailures", "1")
.set("spark.speculationfalse", "false")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkContext = SparkContext.getOrCreate(sparkConf)
val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
//使用spark创建kudu表
val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
//TODO 1:定义kudu表
val kuduTableName = "spark_kudu_tbl" //TODO 2:配置kudu参数
val kuduOptions: Map[String, String] = Map(
"kudu.table" -> kuduTableName,
"kudu.master" -> kuduMasters)
import sqlContext.implicits._
//TODO 3:定义数据
val customers = Array(
Customer("jane", 30, "new york"),
Customer("jordan", 18, "toronto")) //TODO 4:创建RDD
val customersRDD = sparkContext.parallelize(customers)
//TODO 5:将RDD转成dataFrame
val customersDF = customersRDD.toDF() //TODO 6:将数据插入kudu表
kuduContext.insertRows(customersDF, kuduTableName) //TODO 7:将插入的数据读取出来
sqlContext.read.options(kuduOptions).kudu.show
}
}
Delete操作
import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession /**
* Created by angel;
*/
object Delete {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("AcctfileProcess")
//设置Master_IP并设置spark参数
.setMaster("local")
.set("spark.worker.timeout", "500")
.set("spark.cores.max", "10")
.set("spark.rpc.askTimeout", "600s")
.set("spark.network.timeout", "600s")
.set("spark.task.maxFailures", "1")
.set("spark.speculationfalse", "false")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkContext = SparkContext.getOrCreate(sparkConf)
val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
//使用spark创建kudu表
val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
//TODO 1:定义kudu表
val kuduTableName = "spark_kudu_tbl" //TODO 2:配置kudu参数
val kuduOptions: Map[String, String] = Map(
"kudu.table" -> kuduTableName,
"kudu.master" -> kuduMasters)
import sqlContext.implicits._
//TODO 3:定义数据
val customers = Array(
Customer("jane", 30, "new york"),
Customer("jordan", 18, "toronto")) //TODO 4:创建RDD
val customersRDD = sparkContext.parallelize(customers)
//TODO 5:将RDD转成dataFrame
val customersDF = customersRDD.toDF()
//TODO 6:注册表
customersDF.registerTempTable("customers") //TODO 7:编写SQL语句,过滤出想要的数据
val deleteKeysDF = sqlContext.sql("select name from customers where age > 20") //TODO 8:使用kuduContext执行删除操作
kuduContext.deleteRows(deleteKeysDF, kuduTableName) //TODO 9:查看kudu表中的数据
sqlContext.read.options(kuduOptions).kudu.show
}
}
Upsert操作
如果存在,则在Kudu表中更新DataFrame中的行,否则执行插入操作。
import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession /**
* Created by angel;
*/
object Upsert {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("AcctfileProcess")
//设置Master_IP并设置spark参数
.setMaster("local")
.set("spark.worker.timeout", "500")
.set("spark.cores.max", "10")
.set("spark.rpc.askTimeout", "600s")
.set("spark.network.timeout", "600s")
.set("spark.task.maxFailures", "1")
.set("spark.speculationfalse", "false")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkContext = SparkContext.getOrCreate(sparkConf)
val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
//使用spark创建kudu表
val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
//TODO 1:定义kudu表
val kuduTableName = "spark_kudu_tbl" //TODO 2:配置kudu参数
val kuduOptions: Map[String, String] = Map(
"kudu.table" -> kuduTableName,
"kudu.master" -> kuduMasters)
import sqlContext.implicits._ //TODO 3:定义数据集
val newAndChangedCustomers = Array(
Customer("michael", 25, "chicago"),
Customer("denise" , 43, "winnipeg"),
Customer("jordan" , 19, "toronto")) //TODO 4:将数据集转换成dataframe
val newAndChangedRDD = sparkContext.parallelize(newAndChangedCustomers)
val newAndChangedDF = newAndChangedRDD.toDF() //TODO 5:使用upsert来更新数据集
kuduContext.upsertRows(newAndChangedDF, kuduTableName) //TODO 6:读取kudu中的数据
sqlContext.read.options(kuduOptions).kudu.show
}
}
Update操作
更新kudu行数据
import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession /**
* Created by angel;
*/
object Update {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("AcctfileProcess")
//设置Master_IP并设置spark参数
.setMaster("local")
.set("spark.worker.timeout", "500")
.set("spark.cores.max", "10")
.set("spark.rpc.askTimeout", "600s")
.set("spark.network.timeout", "600s")
.set("spark.task.maxFailures", "1")
.set("spark.speculationfalse", "false")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkContext = SparkContext.getOrCreate(sparkConf)
val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
//使用spark创建kudu表
val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
//TODO 1:定义kudu表
val kuduTableName = "spark_kudu_tbl" //TODO 2:配置kudu参数
val kuduOptions: Map[String, String] = Map(
"kudu.table" -> kuduTableName,
"kudu.master" -> kuduMasters) //TODO 3:准备数据集
val modifiedCustomers = Array(Customer("michael", 25, "toronto"))
val modifiedCustomersRDD = sparkContext.parallelize(modifiedCustomers)
//TODO 4:将数据集转化成dataframe
import sqlContext.implicits._
val modifiedCustomersDF = modifiedCustomersRDD.toDF() //TODO 5:执行更新操作
kuduContext.updateRows(modifiedCustomersDF, kuduTableName) //TODO 6:查看kudu数据
sqlContext.read.options(kuduOptions).kudu.show
}
}
最新文章
- linux 下 TeXmacs 作 Mathematica 10 的前端
- [整理]Code::Blocks使用遇到的问题
- 控制执行流程 Thinking in Java 第四章
- 第二篇:Power BI数据可视化之基于Web数据的报表制作(经典级示例)
- win32系统信息获取
- Hadoop 5、HDFS HA 和 YARN
- IOC容器在框架中的应用
- mybatis处理特殊符号
- centos 安装FTP server详情(转)
- Vcenter 账户密码过期设置修改
- ACM-ICPC 2018 沈阳赛区网络预赛 G Spare Tire(容斥)
- tp5 查询单个字段的值
- Perl语法的基本规则
- pyqt5-控件的显示与隐藏
- set_include_path和get_include_path用法详解
- 用Python实现数据结构之栈
- python 计时累积超过24小时时继续往上累加
- jQuery.when().done()
- linux升级内核
- PowerDesigner V16.5 安装及汉化