Spark系列——關於 mapPartitions的誤區
- 2019 年 10 月 30 日
- 筆記
前言
今天 Review 了一下同事的代碼, 發現其代碼中有非常多的 mapPartitions
, 問其原因,他說性能比 map
更好。 我說為什麼性能好呢? 於是就有了這篇文章
網上推崇 mapPartitions 的原因
- 執行次數變少,速度更快 按照某些文章的原話來說
一次函數調用會處理一個partition所有的數據,而不是一次函數調用處理一條,性能相對來說會高一些。
我想說的是: 一次函數調用會處理一個partition所有的數據, 確實是可以節省你調用函數的那微乎其微的時間開銷, 但是這個節省的時間真的太小了, 尤其是對與spark這種框架, 本身就不是用來做毫秒級響應的東西, 甚至硬要扯的話,你引入迭代器, 做迭代器的操作難道就不要消耗時間的么? 如果說上面這種說法還有那麼一丟丟靠譜的話, 有些說法就真的讓我很無語了, 比如說:如果是普通的map,比如一個partition中有1萬條數據;ok, 那麼你的function要執行和計算1萬次。 但是,使用MapPartitions操作之後,一個task僅僅會執行一次function, function一次接收所有的partition數據。只要執行一次就可以了,性能比較高
這種說法如果按照上面的方式來理解其實也是那麼一回事, 但是也很容易讓一些新人理解為:map要執行1萬次,而 MapPartitions 只需要一次,這速度杠杠的提升了啊
實際上,你使用MapPartitions迭代的時候,還是是一條條數據處理的,這個次數其實完全沒變。
mapPartitions 帶來的問題
其實就我個人經驗來看, mapPartitions 的正確使用其實並不會造成什麼大的問題, 當然我也沒看出普通場景 mapPartitions 比 map 有什麼優勢, 所以 完全沒必要刻意使用 mapPartitions 反而,mapPartitions 會帶來一些問題。
- 使用起來並不是很方便,這個寫過代碼的人應該都知道。 當然這個問題並不是不能解決,我們可以寫類似下面的代碼, 確實也變的和 map 簡潔性也差不太多, 恩,我不會告訴你可以嘗試在生產環境中用用噢。 //抽象出一個函數,以後所有的 mapPartitions 都可以用 def mapFunc[T, U](iterator: Iterator[T], f: T => U) = { iterator.map(x => { f(x) }) } //使用 rdd.mapPartitions(x => { mapFunc(x, line => { s"${line}轉換數據" }) })
- 容易造成 OOM,這個也是很多博客提到的問題, 他們大致會寫出如下的代碼來做測試, rdd.mapPartitions(x => { xxxx操作 while (x.hasNext){ val next = x.next() } xxx操作 }) 如果你的代碼是上面那樣,那OOM也就不足為奇了, 不知道你注意到了沒有,mapPartitions 是接受一個迭代器, 再返回一個迭代器的, 如果你這麼寫代碼,就完全沒有使用到迭代器的懶執行特性。 將數據都堆積到了內存, 真就變成了一次處理一個partition的數據了, 在某種程度上已經破壞了 Spark Pipeline 的計算模式了。
mapPartitions 到底該怎麼用
存在即是道理, 雖然上面一直在吐槽, 但是其確實有存在的理由。 其一個分區只會被調用一次的特性, 在一些寫數據庫的時候確實很有幫助, 因為我們的 Spark 是分佈式執行的, 所以連接數據庫的操作必須放到算子內部才能正確的被Executor執行, 那麼 mapPartitions 就顯示比 map 要有優勢的多了。 比如下面這段偽代碼
rdd.mapPartitions(x => { println("連接數據庫") val res = x.map(line=>{ print("寫入數據:" + line) line }) println("斷開數據庫") res })
這樣我就一個分區只要連接一次數據庫, 而如果是 map 算子,那可能就要連接 n 多次了。
另外一點就是 mapPartitions 提供給了我們更加強大的數據控制力, 怎麼理解呢?我們可以一次拿到一個分區的數據, 那麼我們就可以對一個分區的數據進行統一處理, 雖然會加大內存的開銷,但是在某些場景下還是很有用的, 比如一些矩陣的乘法。
後記
不管你要使用哪個算子,其實都是可以的, 但是大多數時候,我還是推薦你使用 map 算子, 當然遇到一些map算子不合適的場景, 那就沒辦法了… 不過就算你是真的要使用 mapPartitions, 那麼請記得充分發揮一下 迭代器的 懶執行特性。