SparkPython 簡介 – PySpark 初學者

      網友投稿 1932 2022-05-29

      在處理和使用大數據方面,?Apache Spark是使用最廣泛的框架之一,而Python是用于數據分析、機器學習等的最廣泛使用的編程語言之一。那么,為什么不一起使用它們呢?這就是Spark with Python也稱為PySpark出現的地方。

      Spark 與 Python 簡介 – PySpark 初學者

      Apache Spark 簡介

      Apache Spark 是由 Apache 軟件基金會開發的用于實時處理的開源集群計算框架。Spark 提供了一個接口,用于對具有隱式數據并行性和容錯性的整個集群進行編程。

      以下是 Apache Spark 的一些特性,這些特性使其比其他框架更具優勢:

      速度:比傳統的大規模數據處理框架快 100 倍。

      強大的緩存:簡單的編程層提供了強大的緩存和磁盤持久化能力。

      部署:?可以通過 Mesos、Hadoop 通過 Yarn 或 Spark 自己的集群管理器進行部署。

      實時:?由于內存計算,實時計算和低延遲。

      Polyglot:這是該框架最重要的特性之一,因為它可以用 Scala、Java、Python 和 R 進行編程。

      為什么選擇 Python?

      雖然 Spark 是用 Scala 設計的,這使它比 Python 快了近 10 倍,但 Scala 只有在使用的內核數量較少時才能更快。由于現在大部分的分析和處理都需要大量的內核,所以Scala的性能優勢并沒有那么大。

      對于程序員來說,Python相對容易?學習,因為它的語法和標準庫。此外,它是一種動態類型語言,這意味著 RDD 可以保存多種類型的對象。

      盡管 Scala 有SparkMLlib,但它沒有足夠的庫和工具用于機器學習和 NLP目的。此外,Scala 缺乏數據可視化。

      使用 Python (PySpark) 設置 Spark

      我希望你們知道如何下載火花并安裝它。所以,一旦你解壓了 spark 文件,安裝了它并將它的路徑添加到.bashrc文件中,你需要輸入source .bashrc

      export SPARK_HOME = /usr/lib/hadoop/spark-2.1.0-bin-hadoop2.7 export PATH = $PATH:/usr/lib/hadoop/spark-2.1.0-bin-hadoop2.7/bin

      要打開 pyspark shell,您需要輸入命令??./bin/pyspark

      Spark in Industry

      Apache Spark 由于其驚人的功能,如內存處理、多語言和快速處理,正被全球許多公司用于各種行業的各種目的:

      雅虎使用 Apache Spark 的機器學習功能來個性化其新聞、網頁以及目標廣告。他們使用 Spark 和 python 來找出什么樣的新聞——用戶有興趣閱讀和分類新聞故事,以找出什么樣的用戶有興趣閱讀每一類新聞。

      TripAdvisor使用 apache spark 通過比較數百個網站來為客戶找到最優惠的酒店價格,從而為數百萬旅客提供建議。以可讀格式閱讀和處理酒店評論所需的時間是在 Apache Spark 的幫助下完成的。

      作為全球最大的電子商務平臺之一,阿里巴巴?運行著一些世界上最大的 Apache Spark 作業,以分析其電子商務平臺上數百 PB 的數據。

      PySpark SparkContext 和數據流

      用 Python 談論 Spark,使用 RDD 是由庫 Py4j 實現的。PySpark Shell 將 Python API 鏈接到 Spark 核心并初始化 Spark 上下文。Spark Context是任何Spark?應用程序的核心。

      Spark 上下文設置內部服務并建立與 Spark 執行環境的連接。

      驅動程序中的 sparkcontext 對象協調所有分布式進程并允許資源分配。

      Cluster Managers 提供Executors,它是帶有邏輯的JVM 進程。

      SparkContext 對象將應用程序發送給執行程序。

      SparkContext 在每個執行器中執行任務。

      PySpark KDD 用例

      現在讓我們來看看KDD'99 Cup(國際知識發現和數據挖掘工具大賽)的一個用例。這里我們取一小部分數據集,因為原始數據集太大了

      import urllib f = urllib.urlretrieve ("

      創建 RDD:

      現在我們可以使用這個文件來創建我們的 RDD。

      data_file = "./kddcup.data_10_percent.gz" raw_data = sc.textFile(data_file)

      過濾:

      假設我們要計算有多少正常。我們在數據集中的交互。我們可以按如下方式過濾我們的 raw_data RDD。

      normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

      數數:

      現在我們可以計算新 RDD 中有多少元素。

      from time import time t0 = time() normal_count = normal_raw_data.count() tt = time() - t0 print "There are {} 'normal' interactions".format(normal_count) print "Count completed in {} seconds".format(round(tt,3))

      輸出:

      有 97278 次“正常”交互 計數在 5.951 秒內完成

      映射:

      在這種情況下,我們希望將數據文件讀取為 CSV 格式的文件。我們可以通過將 lambda 函數應用于 RDD 中的每個元素來做到這一點,如下所示。這里我們將使用map() 和 take() 轉換。

      from pprint import pprint csv_data = raw_data.map(lambda x: x.split(",")) t0 = time() head_rows = csv_data.take(5) tt = time() - t0 print "Parse completed in {} seconds".format(round(tt,3)) pprint(head_rows[0])

      輸出:

      解析在 1.715 秒內完成 [u'0', u'tcp', u'http', u'SF', u'181', u'5450', u'0', u'0', . . 你很正常。”]

      拆分:

      現在我們希望 RDD 中的每個元素都作為一個鍵值對,其中鍵是標簽(例如?normal),值是代表 CSV 格式文件中行的整個元素列表。我們可以進行如下操作。這里我們使用line.split() 和 map()。

      def parse_interaction(line): elems = line.split(",") tag = elems[41] return (tag, elems) key_csv_data = raw_data.map(parse_interaction) head_rows = key_csv_data.take(5) pprint(head_rows[0])

      輸出: (u'normal.', [u'0', u'tcp', u'http', u'SF', u'181', u'5450', u'0', u'0', u'0.00' , u'1.00' , 。 。 。 。 u'normal。'])

      收集行動:

      在這里,我們將使用 collect() 操作。它將把 RDD 的所有元素都放入內存中。因此,在處理大型 RDD 時必須小心使用。

      t0 = time() all_raw_data = raw_data.collect() tt = time() - t0 print "Data collected in {} seconds".format(round(tt,3))

      輸出:

      17.927 秒內收集的數據

      當然,這比我們之前使用的任何其他操作花費的時間更長。每個擁有 RDD 片段的 Spark 工作節點都必須進行協調,以便檢索其部分,然后將所有內容減少到一起。

      作為結合前面所有內容的最后一個示例,我們希望將所有normal?交互收集?為鍵值對。

      # get data from file data_file = "./kddcup.data_10_percent.gz" raw_data = sc.textFile(data_file) # parse into key-value pairs key_csv_data = raw_data.map(parse_interaction) # filter normal key interactions normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.") # collect all t0 = time() all_normal = normal_key_interactions.collect() tt = time() - t0 normal_count = len(all_normal) print "Data collected in {} seconds".format(round(tt,3)) print "There are {} 'normal' interactions".format(normal_count)

      輸出:

      12.485秒采集數據 正常互動97278次

      就是這樣,伙計們!

      我希望你喜歡這個 Spark with Python 博客。如果您正在閱讀本文,恭喜您!您不再是 PySpark 的新手。現在就在您的系統上試試這個簡單的例子。

      Apache Python spark

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:linux_軟件包管理
      下一篇:Jetpack DataStore 你總要了解一下吧?
      相關文章
      亚洲色成人网一二三区| 亚洲美免无码中文字幕在线| 91亚洲视频在线观看| 亚洲国产成人久久综合一| 亚洲国产精品无码中文字| 久久激情亚洲精品无码?V| 亚洲日韩国产一区二区三区| mm1313亚洲国产精品美女| 国产精品亚洲精品爽爽| 国产91成人精品亚洲精品| 亚洲成a人在线看天堂无码| 国产精品亚洲专区一区| 婷婷综合缴情亚洲狠狠尤物| 亚洲av手机在线观看| 亚洲国产成人久久精品99| 亚洲精品色播一区二区| 亚洲AV无码国产一区二区三区| 亚洲国产AV一区二区三区四区| 亚洲AV香蕉一区区二区三区| 欧洲亚洲综合一区二区三区| 在线91精品亚洲网站精品成人| mm1313亚洲精品国产| 91麻豆精品国产自产在线观看亚洲| 三上悠亚亚洲一区高清| 亚洲日韩一页精品发布| 亚洲国产精品VA在线观看麻豆| 亚洲第一福利视频| 久久久无码精品亚洲日韩京东传媒 | 亚洲国产精品乱码一区二区| 亚洲av中文无码乱人伦在线r▽| 亚洲av日韩av高潮潮喷无码 | 亚洲AV噜噜一区二区三区| 日本系列1页亚洲系列| 国产精品xxxx国产喷水亚洲国产精品无码久久一区 | 亚洲国产成人久久精品99| 亚洲无线码一区二区三区| 亚洲av成人无码久久精品| 亚洲成av人片不卡无码| 亚洲国产综合AV在线观看| 亚洲国产精品无码久久青草| 亚洲色婷婷六月亚洲婷婷6月|