大數據進階之路——Spark SQL小結

      網友投稿 791 2022-05-29

      文章目錄

      手寫 WordCount

      RDD、DAG、 Stage、 Task 、 Job

      Spark 作業提交流程

      Spark 的 Local 和 Standalone

      大數據進階之路——Spark SQL小結

      寬依賴、窄依賴

      Spark SQL比 Hive 快在哪

      打包的注意事項

      手寫 WordCount

      使用flatMap、reduceByKey 來計算

      //sc是SparkContext對象,該對象是提交spark程序的入口 sc.textFile("file:///home/hadoop/data/hello.txt") // 讀取文件, .flatMap(line => line.split(" ")) // 將文件中的每一行單詞按照分隔符(這里是空格)分隔 .map(word => (word,1)) //給每個單詞計數為1 .reduceByKey((x,y) => (x+y)) // 統計相同單詞的數量 .collect

      1

      2

      3

      4

      5

      6

      簡寫

      sc.textFile("file:///home/hadoop/data/hello.txt") .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .collect

      1

      2

      3

      4

      5

      6

      RDD、DAG、 Stage、 Task 、 Job

      RDD(Resilient Distributed Datasets),彈性分布式數據集 DAG(Directed Acyclic Graph),有向無環圖

      1

      2

      RDD RDD 是 Spark 的靈魂,也稱為彈性分布式數據集。一個 RDD 代表一個可以被分區的只讀數據集。RDD 內部可以有許多分區(partitions),每個分區又擁有大量的記錄(records)。

      DAG Spark 中使用 DAG 對 RDD 的關系進行建模,描述了 RDD 的依賴關系,這種關系也被稱之為 lineage(血緣),RDD 的依賴關系使用 Dependency 維護。

      Stage 在 DAG 中又進行 Stage 的劃分,劃分的依據是依賴是否是 shuffle 的,每個 Stage 又可以劃分成若干 Task。接下來的事情就是 Driver 發送 Task 到 Executor,Executor 線程池去執行這些 task,完成之后將結果返回給 Driver。

      Job Spark 的 Job 來源于用戶執行 action 操作(這是 Spark 中實際意義的 Job),就是從 RDD 中獲取結果的操作,而不是將一個 RDD 轉換成另一個 RDD 的 transformation 操作。

      Task 一個 Stage 內,最終的 RDD 有多少個 partition,就會產生多少個 task。

      Spark 作業提交流程

      spark-submit 提交代碼,執行 new SparkContext(),在 SparkContext 里構造 DAGScheduler 和 TaskScheduler。

      TaskScheduler 會通過后臺的一個進程,連接 Master,向 Master 注冊 Application。

      Master 接收到 Application 請求后,會使用相應的資源調度算法,在 Worker 上為這個 Application 啟動多個 Executor

      Executor 啟動后,會自己反向注冊到 TaskScheduler 中。所有 Executor 都注冊到 Driver 上之后,SparkContext 結束初始化,接下來往下執行我們自己的代碼。

      每執行到一個 Action,就會創建一個 Job。Job 會提交給 DAGScheduler。

      DAGScheduler 會將 Job 劃分為多個 Stage,然后每個 Stage 創建一個 TaskSet。

      TaskScheduler 會把每一個 TaskSet 里的 Task,提交到 Executor 上執行。

      Executor 上有線程池,每接收到一個 Task,就用 TaskRunner 封裝,然后從線程池里取出一個線程執行這個 task。(TaskRunner 將我們編寫的代碼,拷貝,反序列化,執行 Task,每個 Task 執行 RDD 里的一個 partition)

      Spark 的 Local 和 Standalone

      Spark一共有6種運行模式:Local,Standalone,Yarn-Cluster,Yarn-Client, Mesos, Kubernetes

      Local: Local 模式即單機模式,如果在命令語句中不加任何配置,則默認是 Local 模式,在本地運行。這也是部署、設置最簡單的一種模式,所有的 Spark 進程都運行在一臺機器或一個虛擬機上面。

      Standalone: Standalone 是 Spark 自身實現的資源調度框架。如果我們只使用 Spark 進行大數據計算,不使用其他的計算框架時,就采用 Standalone 模式就夠了,尤其是單用戶的情況下。Standalone 模式是 Spark 實現的資源調度框架,其主要的節點有 Client 節點、Master 節點和 Worker 節點。其中 Driver 既可以運行在 Master 節點上中,也可以運行在本地 Client 端。當用 spark-shell 交互式工具提交 Spark 的 Job 時,Driver 在 Master 節點上運行;當使用 spark-submit 工具提交 Job 或者在 Eclipse、IDEA 等開發平臺上使用 new SparkConf.setManager(“spark://master:7077”) 方式運行 Spark 任務時,Driver 是運行在本地 Client 端上的。

      Standalone 模式的部署比較繁瑣,不過官方有提供部署腳本,需要把 Spark 的部署包安裝到每一臺節點機器上,并且部署的目錄也必須相同,而且需要 Master 節點和其他節點實現 SSH 無密碼登錄。啟動時,需要先啟動 Spark 的 Master 和 Slave 節點。提交命令類似于:

      ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://Oscar-2.local:7077 \ /tmp/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar \ 100

      1

      2

      3

      4

      5

      其中 master:7077是 Spark 的 Master 節點的主機名和端口號,當然集群是需要提前啟動。

      不管使用什么模式,Spark應用程序的代碼是一模一樣的,只需要在提交的時候通過–master參數來指定我們的運行模式即可

      Client

      Driver運行在Client端(提交Spark作業的機器)

      Client會和請求到的Container進行通信來完成作業的調度和執行,Client是不能退出的

      日志信息會在控制臺輸出:便于我們測試

      Cluster

      Driver運行在ApplicationMaster中

      Client只要提交完作業之后就可以關掉,因為作業已經在YARN上運行了

      日志是在終端看不到的,因為日志是在Driver上,只能通過yarn logs -applicationIdapplication_id

      ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --executor-memory 1G \ --num-executors 1 \ /home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \

      1

      2

      3

      4

      5

      6

      7

      此處的yarn就是我們的yarn client模式

      如果是yarn cluster模式的話,yarn-cluster

      Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

      如果想運行在YARN之上,那么就必須要設置HADOOP_CONF_DIR或者是YARN_CONF_DIR

      1) export HADOOP_CONF_DIR=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop

      2) $SPARK_HOME/conf/spark-env.sh

      ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --executor-memory 1G \ --num-executors 1 \ /home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \ 4

      1

      2

      3

      4

      5

      6

      7

      yarn logs -applicationId application_1495632775836_0002

      寬依賴、窄依賴

      窄依賴指的是每一個 Parent RDD 的 Partition 最多被子 RDD 的一個 Partition 使用(一子一親)

      寬依賴指的是多個子 RDD 的 Partition 會依賴同一個 parent RDD的 partition(多子一親)

      RDD 作為數據結構,本質上是一個只讀的分區記錄集合。一個 RDD 可以包含多個分區,每個分區就是一個數據集片段。

      首先,窄依賴可以支持在同一個節點上,以 pipeline 形式執行多條命令(也叫同一個 Stage 的操作),例如在執行了 map 后,緊接著執行 filter。相反,寬依賴需要所有的父分區都是可用的,可能還需要調用類似 MapReduce 之類的操作進行跨節點傳遞。

      其次,則是從失敗恢復的角度考慮。窄依賴的失敗恢復更有效,因為它只需要重新計算丟失的 parent partition 即可,而且可以并行地在不同節點進行重計算(一臺機器太慢就會重新調度到多個節點進行)。

      Spark SQL比 Hive 快在哪

      當Map的輸出結果要被Reduce使用時,輸出結果需要按key哈希,并且分發到每一個Reducer上去,這個過程就是shuffle。

      由于shuffle涉及到了磁盤的讀寫和網絡的傳輸,因此shuffle性能的高低直接影響到了整個程序的運行效率。

      Spark SQL 比 Hadoop Hive 快,是有一定條件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎還比 Spark SQL 的引擎更快。其實,關鍵還是在于 Spark 本身快。

      消除了冗余的 HDFS 讀寫: Hadoop 每次 shuffle 操作后,必須寫到磁盤,而 Spark 在 shuffle 后不一定落盤,可以 persist 到內存中,以便迭代時使用。如果操作復雜,很多的 shufle 操作,那么 Hadoop 的讀寫 IO 時間會大大增加,也是 Hive 更慢的主要原因了。

      消除了冗余的 MapReduce 階段: Hadoop 的 shuffle 操作一定連著完整的 MapReduce 操作,冗余繁瑣。而 Spark 基于 RDD 提供了豐富的算子操作,且 reduce 操作產生 shuffle 數據,可以緩存在內存中 。

      JVM 的優化: Hadoop 每次 MapReduce 操作,啟動一個 Task 便會啟動一次 JVM,基于進程的操作。而 Spark 每次 MapReduce 操作是基于線程的,只在啟動 Executor 是啟動一次 JVM,內存的 Task 操作是在線程復用的。每次啟動 JVM 的時間可能就需要幾秒甚至十幾秒,那么當 Task 多了,這個時間 Hadoop 不知道比 Spark 慢了多少。

      打包的注意事項

      打包時要注意,pom.xml中需要添加如下plugin

      maven-assembly-plugin jar-with-dependencies

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      mvn assembly:assembly

      ./bin/spark-submit \ --class com.hiszm.log.SparkStatCleanJobYARN \ --name SparkStatCleanJobYARN \ --master yarn \ --executor-memory 1G \ --num-executors 1 \ --files /home/hadoop/lib/ipDatabase.csv,/home/hadoop/lib/ipRegion.xlsx \ /home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \ hdfs://hadoop001:8020/hiszm/input/* hdfs://hadoop001:8020/hiszm/clean

      1

      2

      3

      4

      5

      6

      7

      8

      9

      注意:–files在spark中的使用

      spark.read.format("parquet").load("/hiszm/clean/day=20170511/part-00000-71d465d1-7338-4016-8d1a-729504a9f95e.snappy.parquet").show(false)

      ./bin/spark-submit \ --class com.hiszm.log.TopNStatJobYARN \ --name TopNStatJobYARN \ --master yarn \ --executor-memory 1G \ --num-executors 1 \ /home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \ hdfs://hadoop001:8020/hiszm/clean 20170511

      1

      2

      3

      4

      5

      6

      7

      8

      存儲格式的選擇:http://www.infoq.com/cn/articles/bigdata-store-choose/

      壓縮格式的選擇:https://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-compression-analysis/

      調整并行度

      ./bin/spark-submit \ --class com.hiszm.log.TopNStatJobYARN \ --name TopNStatJobYARN \ --master yarn \ --executor-memory 1G \ --num-executors 1 \ --conf spark.sql.shuffle.partitions=100 \ /home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \ hdfs://hadoop001:8020/hiszm/clean 20170511

      1

      2

      3

      4

      5

      6

      7

      8

      9

      spark SQL 大數據

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:KubeSphere排錯實戰(三)
      下一篇:FusionInsight MRS備份恢復簡介
      相關文章
      亚洲人成色77777在线观看大| 老司机亚洲精品影院| 亚洲国产精品久久网午夜| 亚洲小视频在线观看| 久久精品国产亚洲香蕉| 亚洲国产精品一区二区成人片国内| 亚洲人成色77777在线观看大| 日韩亚洲精品福利| 亚洲免费日韩无码系列| 亚洲人成电影网站色| 中国亚洲呦女专区| 亚洲熟伦熟女专区hd高清| 亚洲国产AV一区二区三区四区| 亚洲AV无码XXX麻豆艾秋| 亚洲电影日韩精品| 久久亚洲精品无码播放| 亚洲深深色噜噜狠狠爱网站| 亚洲AV无码精品色午夜果冻不卡| 亚洲高清国产拍精品26U| 午夜影视日本亚洲欧洲精品一区| 亚洲国产精华液网站w| 亚洲嫩草影院久久精品| 亚洲精品美女在线观看| 精品亚洲AV无码一区二区三区 | 亚洲最大的成网4438| 亚洲日韩图片专区第1页| 亚洲成人高清在线观看| 精品日韩99亚洲的在线发布| 亚洲熟女精品中文字幕| 男人的天堂亚洲一区二区三区| 亚洲一日韩欧美中文字幕在线| 亚洲1区2区3区精华液| 亚洲国产精品一区二区第一页免 | 伊伊人成亚洲综合人网7777| 亚洲国产精品无码专区在线观看| 亚洲最大的成网4438| 亚洲最大福利视频| 无码天堂亚洲国产AV| 美腿丝袜亚洲综合| 亚洲国产综合91精品麻豆| 亚洲美女视频网站|