你遇到了嗎?Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException)

  • 2019 年 11 月 7 日
  • 筆記

我在使用 Structured Streaming 的 ForeachWriter,寫 HDFS 文件時,出現了這個異常
image

這個異常出現的原因是HDFS作為一個分散式文件系統,支援多執行緒讀,但是不支援多執行緒寫入。所以HDFS引入了一個時間類型的鎖機制,也就是HDFS的租約機制(** lease holder**)。
這個知識點來源於這篇文章 http://blog.csdn.net/weixin_44252761/article/details/89517393

大數據計算時,多執行緒與分散式的並行可以很好的加速數據的處理速度。可在大數據存儲時,分散式的文件存儲系統對並發的寫請求支援存在天然的缺陷。這是一對天然的矛盾,暫時無法解決,只能緩和。

怎麼緩和呢?不得不崇拜Spark開發者的智商,非常的簡單和實用。不能同時寫一個文件,但是可以同時寫多個文件啊,只要我(spark或者程式)認為這多個文件是一個文件,那寫一個和多個就沒有區別了。

按照這個想法,修改我的程式碼,真正程式碼篇幅太長,主要就是一個地方:
val hdfsWritePath = new Path(path) 改為 val hdfsWritePath = new Path(path + "/" + partitionId) 即可。

有興趣的朋友可以看看更全面的程式碼,原來的源程式碼如下:

       inputStream match {              case Some(is) =>                  is.writeStream                          .foreach(new ForeachWriter[Row]() {                              var successBufferedWriter: Option[BufferedWriter] = None                                def openHdfs(path: String, partitionId: Long, version: Long): Option[BufferedWriter] = {                                  val configuration: Configuration = new Configuration()                                  configuration.set("fs.defaultFS", hdfsAddr)                                    val fileSystem: FileSystem = FileSystem.get(configuration)                                  val hdfsWritePath = new Path(path)                                    val fsDataOutputStream: FSDataOutputStream =                                      if (fileSystem.exists(hdfsWritePath))                                          fileSystem.append(hdfsWritePath)                                      else                                          fileSystem.create(hdfsWritePath)                                    Some(new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)))                              }                                override def open(partitionId: Long, version: Long): Boolean = {                                  successBufferedWriter =                                          if (successBufferedWriter.isEmpty) openHdfs(successPath, partitionId, version)                                          else successBufferedWriter                                  true                              }                                override def process(value: Row): Unit = {                                  successBufferedWriter.get.write(value.mkString(","))                                  successBufferedWriter.get.newLine()                              }                                override def close(errorOrNull: Throwable): Unit = {                                  successBufferedWriter.get.flush()                                  successBufferedWriter.get.close()                              }                          })                          .start()                          .awaitTermination()

上述程式碼初看沒問題,卻會導致標題錯誤,修改如下:

       inputStream match {              case Some(is) =>                  is.writeStream                          .foreach(new ForeachWriter[Row]() {                              var successBufferedWriter: Option[BufferedWriter] = None                                def openHdfs(path: String, partitionId: Long, version: Long): Option[BufferedWriter] = {                                  val configuration: Configuration = new Configuration()                                  configuration.set("fs.defaultFS", hdfsAddr)                                    val fileSystem: FileSystem = FileSystem.get(configuration)                                  val hdfsWritePath = new Path(path + "/" + partitionId)                                    val fsDataOutputStream: FSDataOutputStream =                                      if (fileSystem.exists(hdfsWritePath))                                          fileSystem.append(hdfsWritePath)                                      else                                          fileSystem.create(hdfsWritePath)                                    Some(new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)))                              }                                override def open(partitionId: Long, version: Long): Boolean = {                                  successBufferedWriter =                                          if (successBufferedWriter.isEmpty) openHdfs(successPath, partitionId, version)                                          else successBufferedWriter                                  true                              }                                override def process(value: Row): Unit = {                                  successBufferedWriter.get.write(value.mkString(","))                                  successBufferedWriter.get.newLine()                              }                                override def close(errorOrNull: Throwable): Unit = {                                  successBufferedWriter.get.flush()                                  successBufferedWriter.get.close()                              }                          })                          .start()                          .awaitTermination()

如此輕鬆(其實困擾了我一天)就解決了這個可能大家都會遇到的問題,讀取時路徑到 successPath 即可,分享出來。

如果有什麼問題或不足,希望大家可以與我聯繫,共同進步。

完~~~~