Spark 編程指南 (一) [Spa
- 2020 年 1 月 3 日
- 筆記
Python Programming Guide – Spark(Python)
Spark應用基本概念
每一個運行在cluster上的spark應用程序,是由一個運行main函數的driver program和運行多種並行操作的executes組成
其中spark的核心是彈性分佈式數據集(Resilient Distributed Dataset—RDD)
- Resilient(彈性):易變化、易計算
- Distributed(分佈式):可橫跨多台機器,集群分佈
- Dataset(數據集):大批量數據的集合
<!– more –>
RDD基本概念
RDD是邏輯集中的實體,代表一個分區的只讀數據集,不可發生改變
【RDD的重要內部屬性】
- 分區列表(partitions) 對於一個RDD而言,分區的多少涉及對這個RDD並行計算的粒度,每一個RDD分區的計算都會在一個單獨的任務中執行,每一個分區對應一個Task,分區後的數據存放在內存當中
- 計算每個分區的函數(compute) 對於Spark中每個RDD都是以分區進行計算的,並且每個分區的compute函數是在對迭代器進行複合操作,不需要每次計算,直到提交動作觸發才會將之前所有的迭代操作進行計算,lineage在容錯中有重要作用
- 對父級RDD的依賴(dependencies) 由於RDD存在轉換關係,所以新生成的RDD對上一個RDD有依賴關係,RDD之間通過lineage產生依賴關係
【窄依賴】 每一個父RDD的分區最多只被子RDD的一個分區所使用,可以類似於流水線一樣,計算所有父RDD的分區;在節點計算失敗的恢復上也更有效,可以直接計算其父RDD的分區,還可以進行並行計算 子RDD的每個分區依賴於常數個父分區(即與數據規模無關) 輸入輸出一對一的算子,且結果RDD的分區結構不變,主要是map、flatmap 輸入輸出一對一,但結果RDD的分區結構發生了變化,如union、coalesce 從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample 【寬依賴】 多個子RDD的分區會依賴於同一個父RDD的分區,需要取得其父RDD的所有分區數據進行計算,而一個節點的計算失敗,將會導致其父RDD上多個分區重新計算 子RDD的每個分區依賴於所有父RDD分區 對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey 對兩個RDD基於key進行jion和重組,如jion
- 對key-value數據類型RDD的分區器,控制分區策略和分區數(partitioner) partitioner就是RDD的分區函數,即HashPartitioner(哈希分區)和RangePartitioner(區域分區),分區函數決定了每個RDD的分區策略和分區數,並且這個函數只在(k-v)類型的RDD中存在,在非(k-v)結構的RDD中是None
- 每個數據分區的地址列表(preferredLocations) 與Spark中的調度相關,返回的是此RDD的每個partition所出儲存的位置,按照「移動數據不如移動計算」的理念,在spark進行任務調度的時候,儘可能將任務分配到數據塊所存儲的位置
- 控制操作(control operation) spark中對RDD的持久化操作是很重要的,可以將RDD存放在不同的存儲介質中,方便後續的操作可以重複使用。
主要有cache、persist、checkpoint,checkpoint接口是將RDD持久化到HDFS中,與persist的區別是checkpoint會切斷此RDD之前的依賴關係,而persist會保留依賴關係。checkpoint的兩大作用:一是spark程序長期駐留,過長的依賴會佔用很多的系統資源,定期checkpoint可以有效的節省資源;二是維護過長的依賴關係可能會出現問題,一旦spark程序運行失敗,RDD的容錯成本會很高
Python連接Spark
Spark 1.6.0 支持 Python 2.6+ 或者 Python 3.4+,它使用標準的CPython解釋器, 所以像NumPy這樣的C語言類庫也可以使用,同樣也支持PyPy 2.3+
可以用spark目錄里的bin/spark-submit腳本在python中運行spark應用程序,這個腳本可以加載Java/Scala類庫,讓你提交應用程序到集群當中。你也可以使用bin/pyspark腳本去啟動python交互界面
如果你希望訪問HDFS上的數據集,你需要建立對應HDFS版本的PySpark連接。
最後,你的程序需要import一些spark類庫:
from pyspark import SparkContext, SparkConf
PySpark 要求driver和workers需要相同的python版本,它通常引用環境變量PATH默認的python版本;你也可以自己指定PYSPARK_PYTHON所用的python版本,例如:
PYSPARK_PYTHON=python3.4 bin/pyspark PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py
初始化Spark
一個Spark應用程序的第一件事就是去創建SparkContext對象,它的作用是告訴Spark如何建立一個集群。創建SparkContext之前,先要創建SparkConf對象,SparkConf包含了應用程序的相關信息。
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
- appName:應用的名稱,用戶顯示在集群UI上
- master:Spark、Mesos或者YARN集群的URL,如果是本地運行,則應該是特殊的'local'字符串
在實際運行時,你不會講master參數寫死在程序代碼里,而是通過spark-submit來獲取這個參數;在本地測試和單元測試中,你仍然需要'local'去運行Spark應用程序
使用Shell
在PySpark Shell中,一個特殊SparkContext已經幫你創建好了,變量名是:sc,然而在Shell中創建你自己的SparkContext是不起作用的。
你可以通過–master參數設置master所連接的上下文主機;你也可以通過–py-files參數傳遞一個用逗號作為分割的列表,將Python中的.zip、.egg、.py等文件添加到運行路徑當中;你同樣可以通過–packages參數,傳遞一個用逗號分割的maven列表,來個這個Shell會話添加依賴(例如Spark的包)
任何額外的包含依賴的倉庫(如SonaType),都可以通過–repositories參數添加進來。 Spark中所有的Python依賴(requirements.txt的依賴包列表),在必要時都必須通過pip手動安裝
例如用4個核來運行bin/pyspark:
./bin/pyspark --master local[4]
或者,將code.py添加到搜索路徑中(為了後面可以import):
./bin/pyspark --master local[4] --py-files code.py
通過運行pyspark –help來查看完整的操作幫助信息,在這種情況下,pyspark會調用一個通用的spark-submit腳本
在IPython這樣增強Python解釋器中,也可以運行PySpark Shell;支持IPython 1.0.0+;在利用IPython運行bin/pyspark時,必須將PYSPARK_DRIVER_PYTHON變量設置成ipython:
PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
你可以通過PYSPARK_DRIVER_PYTHON_OPTS參數來自己定製ipython命令,比如在IPython Notebook中開啟PyLab圖形支持:
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark
參考:Spark Programming Guide 官方文檔
原博鏈接,請註明出處。