PySpark 教程 - 使用 Python 學(xué)習(xí) Apache Spark
在數(shù)據(jù)以如此驚人的速度生成的世界中,在正確的時間正確分析該數(shù)據(jù)非常有用。Apache Spark 是實(shí)時處理大數(shù)據(jù)和執(zhí)行分析的最令人驚奇的框架之一。?總之,Python for Spark或 PySpark 是最受歡迎的認(rèn)證課程之一,讓 Scala for Spark 物超所值。所以在這個PySpark 教程博客中,我將討論以下主題:
什么是 PySpark?
行業(yè)中的 PySpark
為什么選擇 Python?
Spark RDD
使用 PySpark 進(jìn)行機(jī)器學(xué)習(xí)
PySpark 教程:什么是 PySpark?
Apache Spark 是一個快速的集群計(jì)算框架,用于處理、查詢和分析大數(shù)據(jù)。基于內(nèi)存計(jì)算,它比其他幾個大數(shù)據(jù)框架有優(yōu)勢。
開源社區(qū)最初用 Scala 編程語言編寫,開發(fā)了一個了不起的工具來支持 Python for Apache Spark。PySpark 通過其庫Py4j幫助數(shù)據(jù)科學(xué)家與 Apache Spark 和 Python 中的RDD 交互。?有許多特性使 PySpark 成為比其他框架更好的框架:
速度:?比傳統(tǒng)的大規(guī)模數(shù)據(jù)處理框架快 100 倍
強(qiáng)大的緩存:?簡單的編程層提供強(qiáng)大的緩存和磁盤持久化能力
部署:?可以通過 Mesos、Hadoop 通過 Yarn 或 Spark 自己的集群管理器進(jìn)行部署
實(shí)時:?由于內(nèi)存計(jì)算,實(shí)時計(jì)算和低延遲
Polyglot:?支持 Scala、Java、Python 和 R 編程
讓我們繼續(xù)我們的 PySpark 教程博客,看看 Spark 在行業(yè)中的哪些應(yīng)用。
行業(yè)中的 PySpark
每個行業(yè)都圍繞大數(shù)據(jù)展開,哪里有大數(shù)據(jù),哪里就有分析。那么讓我們來看看使用 Apache Spark 的各個行業(yè)。
媒體是向在線流媒體發(fā)展的最大行業(yè)之一。?Netflix使用 Apache Spark 進(jìn)行實(shí)時流處理,為其客戶提供個性化的在線推薦。它每天處理4500 億個流向服務(wù)器端應(yīng)用程序的事件。
金融是 Apache Spark 的實(shí)時處理發(fā)揮重要作用的另一個領(lǐng)域。銀行正在使用 Spark 訪問和分析社交媒體資料,以獲取洞察力,從而幫助他們?yōu)樾庞蔑L(fēng)險(xiǎn)評估、有針對性的廣告和客戶細(xì)分做出正確的業(yè)務(wù)決策。使用 Spark 也減少了客戶流失。?欺詐檢測是 Spark 涉及的機(jī)器學(xué)習(xí)中使用最廣泛的領(lǐng)域之一。
醫(yī)療保健提供者正在使用 Apache Spark分析患者記錄以及過去的臨床數(shù)據(jù),以確定哪些患者在出院后可能面臨健康問題。Apache Spark 用于基因組測序以減少處理基因組數(shù)據(jù)所需的時間。
零售和電子商務(wù)是一個如果不使用分析和定向廣告就無法想象的行業(yè)。作為當(dāng)今最大的電子商務(wù)平臺之一,阿里巴巴運(yùn)行著一些世界上最大的 Spark Jobs,以分析 PB 級數(shù)據(jù)。阿里巴巴在圖像數(shù)據(jù)中進(jìn)行特征提取。eBay?使用 Apache Spark 提供Targeted Offers,增強(qiáng)客戶體驗(yàn)并優(yōu)化整體性能。
Travel?Industries 也使用 Apache Spark。?TripAdvisor是幫助用戶規(guī)劃完美旅行的領(lǐng)先旅游網(wǎng)站,它正在使用 Apache Spark 加速其個性化客戶推薦。TripAdvisor 使用 apache spark 通過比較數(shù)百個網(wǎng)站為客戶找到最優(yōu)惠的酒店價(jià)格,為數(shù)百萬旅客提供建議。.
本 PySpark 教程的一個重要方面是了解我們?yōu)槭裁葱枰褂?Python?為什么不是 Java、Scala 或 R?
為什么選擇 Python?
易學(xué):對于程序員來說,Python 相對容易學(xué)習(xí),因?yàn)樗恼Z法和標(biāo)準(zhǔn)庫。此外,它是一種動態(tài)類型語言,這意味著 RDD 可以保存多種類型的對象。
大量庫:??Scala 沒有足夠的數(shù)據(jù)科學(xué)工具和庫,例如 Python 用于機(jī)器學(xué)習(xí)和自然語言處理。此外,Scala 缺乏良好的可視化和本地?cái)?shù)據(jù)轉(zhuǎn)換。
巨大的社區(qū)支持:??Python 擁有一個全球社區(qū),擁有數(shù)百萬開發(fā)人員,他們在數(shù)以千計(jì)的虛擬和物理位置進(jìn)行在線和離線交互。
本 PySpark 教程中最重要的主題之一是 RDD 的使用。讓我們了解什么是 RDD
Spark RDD
當(dāng)談到迭代分布式計(jì)算,即在計(jì)算中處理多個作業(yè)的數(shù)據(jù)時,我們需要在多個作業(yè)之間重用或共享數(shù)據(jù)。Hadoop 等早期框架在處理多個操作/作業(yè)時遇到問題,例如
將數(shù)據(jù)存儲在中間存儲中,例如 HDFS
多個 I/O 作業(yè)使計(jì)算變慢
復(fù)制和序列化反過來使過程更慢
RDD 試圖通過啟用容錯分布式內(nèi)存計(jì)算來解決所有問題。RDD 是彈性分布式數(shù)據(jù)集的縮寫?。?RDD 是一種分布式內(nèi)存抽象,它允許程序員以容錯方式在大型集群上執(zhí)行內(nèi)存計(jì)算。它們是跨一組機(jī)器分區(qū)的對象的只讀集合,如果分區(qū)丟失,可以重建這些對象。在 RDD 上執(zhí)行了幾種操作:
轉(zhuǎn)換:?轉(zhuǎn)換從現(xiàn)有數(shù)據(jù)集創(chuàng)建一個新數(shù)據(jù)集。懶惰評估
動作:??Spark 僅在 RDD 上調(diào)用動作時才強(qiáng)制執(zhí)行計(jì)算
讓我們了解一些轉(zhuǎn)換、動作和函數(shù)
讀取文件并顯示前 n 個元素:
rdd = sc.textFile("file:///home/edureka/Desktop/Sample") rdd.take(n)
輸出:
[u'森林砍伐正在成為主要的環(huán)境和社會問題,現(xiàn)在已經(jīng)不僅僅是一個強(qiáng)大的惡魔。', u'我們必須了解因森林砍伐而出現(xiàn)的問題的原因、影響和解決方法。', u'我們提供了許多關(guān)于森林砍伐的長篇和短文,以幫助您的孩子和孩子了解這個問題,并參與學(xué)校或校外的論文寫作比賽。', 你可以根據(jù)班級標(biāo)準(zhǔn)選擇下面給出的任何毀林論文。', u'森林砍伐正在成為社會和環(huán)境面臨的主要全球問題。']
轉(zhuǎn)換為小寫和拆分:(降低和拆分)
def Func(lines): lines = lines.lower() lines = lines.split() return lines rdd1 = rdd.map(Func) rdd1.take(5)
輸出:
[[你'砍伐森林', 你是, 你起床了, 你是, 你那個', 你主要, 你'環(huán)保', 你和', 你'社會', 你'問題', 你'哪個', 你有, 你現(xiàn)在, 你被帶走了, ..... . . . ]
刪除停用詞:(過濾器)
stop_words = ['a','all','the','as','is','am','an','and','be','been','from','had','I','I’d','why','with'] rdd2 = rdd1.filter(lambda z: z not in stop_words) rdd2.take(10)
輸出:
[你'森林砍伐', 你起床了, 你主要, 你'環(huán)保', 你'社會', 你'問題', 你'哪個', 你有, 你現(xiàn)在, 你被帶走了]
從 1 到 500 的數(shù)字總和:(減少)
sum_rdd = sc.parallelize(range(1,500)) sum_rdd.reduce(lambda x,y: x+y)
輸出:
124750
使用 PySpark 進(jìn)行機(jī)器學(xué)習(xí)
繼續(xù)我們的 PySpark 教程博客,讓我們分析一些籃球數(shù)據(jù)并做一些未來預(yù)測。所以,這里我們將使用自1980 年?[三分球引進(jìn)年]以來 NBA 的所有球員的籃球數(shù)據(jù)。
數(shù)據(jù)加載:
df = spark.read.option('header','true') .option('inferSchema','true') .csv("file:///home/edureka/Downloads/season_totals.csv")
印刷欄目:
print(df.columns)
輸出:
['_c0','玩家','pos','年齡','team_id','g','gs','mp','fg','fga','fg_pct','fg3',' fg3a','fg3_pct','fg2','fg2a','fg2_pct','efg_pct','ft','fta','ft_pct','orb','drb','trb','ast' , 'stl', 'blk', 'tov', 'pf', 'pts', 'yr']
排序玩家(OrderBy)和 toPandas:
在這里,我們根據(jù)一個賽季的得分對球員進(jìn)行排序。
df.orderBy('pts',ascending = False).limit(10).toPandas()[['yr','player','age','pts','fg3']]
輸出:
使用 DSL 和 matplotlib:
在這里,我們正在分析每個賽季在36 分鐘的時間限制內(nèi)平均出手3 分的次數(shù)[這個時間間隔對應(yīng)于一個近似完整的 NBA 比賽并有足夠的休息]。我們使用三分球出手次數(shù) (fg3a) 和上場時間 (mp) 計(jì)算此指標(biāo),然后使用matlplotlib繪制結(jié)果。
from pyspark.sql.functions import col fga_py = df.groupBy('yr') .agg({'mp' : 'sum', 'fg3a' : 'sum'}) .select(col('yr'), (36*col('sum(fg3a)')/col('sum(mp)')).alias('fg3a_p36m')) .orderBy('yr') from matplotlib import pyplot as plt import seaborn as sns plt.style.use('fivethirtyeight') _df = fga_py.toPandas() plt.plot(_df.yr,_df.fg3a_p36m, color = '#CD5C5C') plt.xlabel('Year') _=plt.title('Player average 3-point attempts (per 36 minutes)') plt.annotate('3 pointer introduced', xy=(1980, .5), xytext=(1981, 1.1), fontsize = 9, arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2)) plt.annotate('NBA moved in 3-point line', xy=(1996, 2.4), xytext=(1991.5, 2.7), fontsize = 9, arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2)) plt.annotate('NBA moved back 3-point line', xy=(1998, 2.), xytext=(1998.5, 2.4), fontsize = 9, arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
輸出:
線性回歸和 VectorAssembler:
我們可以將線性回歸模型擬合到這條曲線上,以模擬未來 5 年的投籃次數(shù)。我們必須使用 VectorAssembler 函數(shù)將數(shù)據(jù)轉(zhuǎn)換為單列。這是一個必要條件為在MLlib線性回歸API。
from pyspark.ml.feature import VectorAssembler t = VectorAssembler(inputCols=['yr'], outputCol = 'features') training = t.transform(fga_py) .withColumn('yr',fga_py.yr) .withColumn('label',fga_py.fg3a_p36m) training.toPandas().head()
輸出:
建筑模型:
然后我們使用轉(zhuǎn)換后的數(shù)據(jù)構(gòu)建我們的線性回歸模型對象。
from pyspark.ml.regression import LinearRegression lr = LinearRegression(maxIter=10) model = lr.fit(training)
將訓(xùn)練好的模型應(yīng)用于數(shù)據(jù)集:
我們將經(jīng)過訓(xùn)練的模型對象模型與 5 年的未來數(shù)據(jù)一起應(yīng)用于我們的原始訓(xùn)練集
from pyspark.sql.types import Row # apply model for the 1979-80 season thru 2020-21 season training_yrs = training.select('yr').rdd.map(lambda x: x[0]).collect() training_y = training.select('fg3a_p36m').rdd.map(lambda x: x[0]).collect() prediction_yrs = [2017, 2018, 2019, 2020, 2021] all_yrs = training_yrs + prediction_yrs # built testing DataFrame test_rdd = sc.parallelize(all_yrs) row = Row('yr')< all_years_features = t.transform(test_rdd.map(row).toDF()) # apply linear regression model df_results = model.transform(all_years_features).toPandas()
繪制最終預(yù)測:
然后我們可以繪制我們的結(jié)果并將圖表保存在指定的位置。
plt.plot(df_results.yr,df_results.prediction, linewidth = 2, linestyle = '--',color = '#224df7', label = 'L2 Fit') plt.plot(training_yrs, training_y, color = '#f08080', label = None) plt.xlabel('Year') plt.ylabel('Number of attempts') plt.legend(loc = 4) _=plt.title('Player average 3-point attempts (per 36 minutes)') plt.tight_layout() plt.savefig("/home/edureka/Downloads/Images/REGRESSION.png")
輸出:
有了這張圖,我們就結(jié)束了這個 PySpark 教程博客。
所以就是這樣,伙計(jì)們!
我希望你們對 PySpark 是什么、為什么 Python 最適合 Spark、RDD 以及在此 PySpark 教程博客中使用 Pyspark 進(jìn)行機(jī)器學(xué)習(xí)的一瞥有所了解。恭喜,您不再是 PySpark 的新手。如果您想了解有關(guān) PySpark 的更多信息并了解不同的行業(yè)用例,請查看我們的Spark 與 Python博客。
Apache Python spark 大數(shù)據(jù)
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。