你遇到了嗎?Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException)
- 2019 年 11 月 7 日
- 筆記
我在使用 Structured Streaming 的 ForeachWriter,寫 HDFS 文件時,出現了這個異常
這個異常出現的原因是HDFS作為一個分散式文件系統,支援多執行緒讀,但是不支援多執行緒寫入。所以HDFS引入了一個時間類型的鎖機制,也就是HDFS的租約機制(** lease holder**)。
這個知識點來源於這篇文章 http://blog.csdn.net/weixin_44252761/article/details/89517393
大數據計算時,多執行緒與分散式的並行可以很好的加速數據的處理速度。可在大數據存儲時,分散式的文件存儲系統對並發的寫請求支援存在天然的缺陷。這是一對天然的矛盾,暫時無法解決,只能緩和。
怎麼緩和呢?不得不崇拜Spark開發者的智商,非常的簡單和實用。不能同時寫一個文件,但是可以同時寫多個文件啊,只要我(spark或者程式)認為這多個文件是一個文件,那寫一個和多個就沒有區別了。
按照這個想法,修改我的程式碼,真正程式碼篇幅太長,主要就是一個地方:
將val hdfsWritePath = new Path(path)
改為 val hdfsWritePath = new Path(path + "/" + partitionId)
即可。
有興趣的朋友可以看看更全面的程式碼,原來的源程式碼如下:
inputStream match { case Some(is) => is.writeStream .foreach(new ForeachWriter[Row]() { var successBufferedWriter: Option[BufferedWriter] = None def openHdfs(path: String, partitionId: Long, version: Long): Option[BufferedWriter] = { val configuration: Configuration = new Configuration() configuration.set("fs.defaultFS", hdfsAddr) val fileSystem: FileSystem = FileSystem.get(configuration) val hdfsWritePath = new Path(path) val fsDataOutputStream: FSDataOutputStream = if (fileSystem.exists(hdfsWritePath)) fileSystem.append(hdfsWritePath) else fileSystem.create(hdfsWritePath) Some(new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8))) } override def open(partitionId: Long, version: Long): Boolean = { successBufferedWriter = if (successBufferedWriter.isEmpty) openHdfs(successPath, partitionId, version) else successBufferedWriter true } override def process(value: Row): Unit = { successBufferedWriter.get.write(value.mkString(",")) successBufferedWriter.get.newLine() } override def close(errorOrNull: Throwable): Unit = { successBufferedWriter.get.flush() successBufferedWriter.get.close() } }) .start() .awaitTermination()
上述程式碼初看沒問題,卻會導致標題錯誤,修改如下:
inputStream match { case Some(is) => is.writeStream .foreach(new ForeachWriter[Row]() { var successBufferedWriter: Option[BufferedWriter] = None def openHdfs(path: String, partitionId: Long, version: Long): Option[BufferedWriter] = { val configuration: Configuration = new Configuration() configuration.set("fs.defaultFS", hdfsAddr) val fileSystem: FileSystem = FileSystem.get(configuration) val hdfsWritePath = new Path(path + "/" + partitionId) val fsDataOutputStream: FSDataOutputStream = if (fileSystem.exists(hdfsWritePath)) fileSystem.append(hdfsWritePath) else fileSystem.create(hdfsWritePath) Some(new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8))) } override def open(partitionId: Long, version: Long): Boolean = { successBufferedWriter = if (successBufferedWriter.isEmpty) openHdfs(successPath, partitionId, version) else successBufferedWriter true } override def process(value: Row): Unit = { successBufferedWriter.get.write(value.mkString(",")) successBufferedWriter.get.newLine() } override def close(errorOrNull: Throwable): Unit = { successBufferedWriter.get.flush() successBufferedWriter.get.close() } }) .start() .awaitTermination()
如此輕鬆(其實困擾了我一天)就解決了這個可能大家都會遇到的問題,讀取時路徑到 successPath
即可,分享出來。
如果有什麼問題或不足,希望大家可以與我聯繫,共同進步。
完~~~~