我在使用 Structured Streaming 的 ForeachWriter,写 HDFS 文件时,出现了这个异常

这个异常出现的原因是HDFS作为一个分布式文件系统,支持多线程读,但是不支持多线程写入。所以HDFS引入了一个时间类型的锁机制,也就是HDFS的租约机制(** lease holder**)。

这个知识点来源于这篇文章 http://blog.csdn.net/weixin_44252761/article/details/89517393

大数据计算时,多线程与分布式的并行可以很好的加速数据的处理速度。可在大数据存储时,分布式的文件存储系统对并发的写请求支持存在天然的缺陷。这是一对天然的矛盾,暂时无法解决,只能缓和。

怎么缓和呢?不得不崇拜Spark开发者的智商,非常的简单和实用。不能同时写一个文件,但是可以同时写多个文件啊,只要我(spark或者程序)认为这多个文件是一个文件,那写一个和多个就没有区别了。

按照这个想法,修改我的代码,真正代码篇幅太长,主要就是一个地方:

val hdfsWritePath = new Path(path) 改为 val hdfsWritePath = new Path(path + "/" + partitionId) 即可。

有兴趣的朋友可以看看更全面的代码,原来的源代码如下:

       inputStream match {
case Some(is) =>
is.writeStream
.foreach(new ForeachWriter[Row]() {
var successBufferedWriter: Option[BufferedWriter] = None def openHdfs(path: String, partitionId: Long, version: Long): Option[BufferedWriter] = {
val configuration: Configuration = new Configuration()
configuration.set("fs.defaultFS", hdfsAddr) val fileSystem: FileSystem = FileSystem.get(configuration)
val hdfsWritePath = new Path(path) val fsDataOutputStream: FSDataOutputStream =
if (fileSystem.exists(hdfsWritePath))
fileSystem.append(hdfsWritePath)
else
fileSystem.create(hdfsWritePath) Some(new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)))
} override def open(partitionId: Long, version: Long): Boolean = {
successBufferedWriter =
if (successBufferedWriter.isEmpty) openHdfs(successPath, partitionId, version)
else successBufferedWriter
true
} override def process(value: Row): Unit = {
successBufferedWriter.get.write(value.mkString(","))
successBufferedWriter.get.newLine()
} override def close(errorOrNull: Throwable): Unit = {
successBufferedWriter.get.flush()
successBufferedWriter.get.close()
}
})
.start()
.awaitTermination()

上述代码初看没问题,却会导致标题错误,修改如下:

       inputStream match {
case Some(is) =>
is.writeStream
.foreach(new ForeachWriter[Row]() {
var successBufferedWriter: Option[BufferedWriter] = None def openHdfs(path: String, partitionId: Long, version: Long): Option[BufferedWriter] = {
val configuration: Configuration = new Configuration()
configuration.set("fs.defaultFS", hdfsAddr) val fileSystem: FileSystem = FileSystem.get(configuration)
val hdfsWritePath = new Path(path + "/" + partitionId) val fsDataOutputStream: FSDataOutputStream =
if (fileSystem.exists(hdfsWritePath))
fileSystem.append(hdfsWritePath)
else
fileSystem.create(hdfsWritePath) Some(new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)))
} override def open(partitionId: Long, version: Long): Boolean = {
successBufferedWriter =
if (successBufferedWriter.isEmpty) openHdfs(successPath, partitionId, version)
else successBufferedWriter
true
} override def process(value: Row): Unit = {
successBufferedWriter.get.write(value.mkString(","))
successBufferedWriter.get.newLine()
} override def close(errorOrNull: Throwable): Unit = {
successBufferedWriter.get.flush()
successBufferedWriter.get.close()
}
})
.start()
.awaitTermination()

如此轻松(其实困扰了我一天)就解决了这个可能大家都会遇到的问题,读取时路径到 successPath 即可,分享出来。

如果有什么问题或不足,希望大家可以与我联系,共同进步。

完~~~~

最新文章

  1. SQL Server 2012基本知识
  2. 大公司的PHP面试题
  3. Hadoop核心组件
  4. Maven3简介
  5. ORACLE 回收站导致的故障
  6. GridView总结二:GridView自带编辑删除更新
  7. 浅试 JNI编程
  8. {转自MC}NVIDIA DirectX 11演示DEMO详解
  9. iOS开发之Runtime函数
  10. catalan 数——卡特兰数(转)
  11. window.history.back()的改进方法window.history.go()
  12. 基于Android的ELF PLT/GOT符号和重定向过程ELF Hook实现(by 低端农业代码 2014.10.27)
  13. SSH登录与增删改查demo详解+源代码
  14. Nodejs学习笔记(十五)--- Node.js + Koa2 构建网站简单示例
  15. 测试驱动开发实践2————从testList开始
  16. Vue-admin工作整理(十四):Vuex和双向绑定
  17. Python的函数基础
  18. letsencrypt续期 最简单的续期方法更新证书
  19. Hyperledger Fabric 1.0.1至Hyperledger Fabric 1.0.5所升级的内容及修复的问题
  20. 利用Array Prototype的方法来实现对dom集合的筛选、indexOf、map等功能

热门文章

  1. php 正则判断是否是手机号码 最新
  2. Java 网络爬虫,就是这么的简单
  3. Kafka 学习笔记之 删除Topic
  4. 2019 中国.NET 开发者峰会正式启动
  5. JAVA MyBatis配置文件用properties引入外部配置文件
  6. Kylin构建Cube过程详解
  7. 基于KVM的SRIOV直通配置及性能测试
  8. Ubuntu安装exfat工具
  9. python犯傻之题目解答思路比较与反思
  10. Flash安全总结