Spark基礎(chǔ)學(xué)習(xí)筆記01:初步了解Spark

      網(wǎng)友投稿 955 2022-05-29

      文章目錄

      零、本講學(xué)習(xí)目標

      一、認識Spark

      (一)Spark簡介

      (二)Spark官網(wǎng)

      Spark基礎(chǔ)學(xué)習(xí)筆記01:初步了解Spark

      (三)Spark發(fā)展歷史

      (四)Spark的特點

      1、快速

      2、易用性

      3、通用性

      4、隨處運行

      5、代碼簡潔

      (1)采用MapReduce實現(xiàn)詞頻統(tǒng)計

      (2)采用Spark實現(xiàn)詞頻統(tǒng)計

      (五)Spark存儲層次

      (六)Spark生態(tài)圈

      1、Spark SQL

      2、Spark Streaming

      3、MLlib

      4、GraphX

      (七)Spark應(yīng)用場景

      1、騰訊

      2、Yahoo

      3、淘寶

      4、優(yōu)酷土豆

      二、搭建Spark環(huán)境

      (一)搭建單機版環(huán)境

      1、卸載CentOS7自帶的OpenJDK

      2、下載和安裝JDK

      3、下載Spark安裝包到Windows本地

      4、將Spark安裝包上傳到Linux的/opt目錄下

      5、將Spark安裝包解壓到/usr/local目錄下

      6、配置Spark環(huán)境變量

      7、使用SparkPi來計算Pi的值

      8、使用Scala版本Spark-Shell

      9、使用Python版本Spark-Shell

      例1、在Python 中使用textFile() 創(chuàng)建一個字符串的RDD

      例2、調(diào)用轉(zhuǎn)化操作filter()

      例3、調(diào)用first() 行動操作

      三、Spark運行架構(gòu)及原理

      零、本講學(xué)習(xí)目標

      了解Spark的發(fā)展歷史及特點

      學(xué)會搭建Spark環(huán)境

      了解Spark的運行架構(gòu)與原理

      一、認識Spark

      (一)Spark簡介

      快速、分布式、可擴展、容錯的集群計算框架;

      Spark是基于內(nèi)存計算的大數(shù)據(jù)分布式計算框架;

      Spark提供低延遲的復(fù)雜分析;

      Spark是Hadoop MapReduce的替代方案。MapReudce不適合迭代和交互式任務(wù),Spark主要為交互式查詢和迭代算法設(shè)計,支持內(nèi)存存儲和高效的容錯恢復(fù)。Spark擁有MapReduce具有的優(yōu)點,但不同于MapReduce,Spark中間輸出結(jié)果可以保存在內(nèi)存中,減少讀寫HDFS的次數(shù)。

      (二)Spark官網(wǎng)

      官網(wǎng)網(wǎng)址:https://spark.apache.org

      (三)Spark發(fā)展歷史

      Spark目前最新版本是2022年1月26日發(fā)布的Spark3.2.1

      (四)Spark的特點

      1、快速

      一般情況下,對于迭代次數(shù)較多的應(yīng)用程序,Spark程序在內(nèi)存中的運行速度是Hadoop MapReduce運行速度的100多倍,在磁盤上的運行速度是Hadoop MapReduce運行速度的10多倍。

      2、易用性

      Spark支持使用Scala、Python、Java及R語言快速編寫應(yīng)用。同時Spark提供超過80個高級運算符,使得編寫并行應(yīng)用程序變得容易并且可以在Scala、Python或R的交互模式下使用Spark。

      3、通用性

      Spark可以與SQL、Streaming及復(fù)雜的分析良好結(jié)合。Spark還有一系列的高級工具,包括Spark SQL、MLlib(機器學(xué)習(xí)庫)、GraphX(圖計算)和Spark Streaming,并且支持在一個應(yīng)用中同時使用這些組件。

      4、隨處運行

      用戶可以使用Spark的獨立集群模式運行Spark,也可以在EC2(亞馬遜彈性計算云)、Hadoop YARN或者Apache Mesos上運行Spark。并且可以從HDFS、Cassandra、HBase、Hive、Tachyon和任何分布式文件系統(tǒng)讀取數(shù)據(jù)。

      5、代碼簡潔

      參看【采用多種方式實現(xiàn)詞頻統(tǒng)計】

      編寫WordCountMapper

      package net.hw.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Created by howard on 2018/2/6. */ public class WordCountMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] data = line.split(" "); for (int i = 0; i < data.length; i++) { context.write(new Text(data[i]), new IntWritable(1)); } } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      編寫WordCountReducer

      package net.hw.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by howard on 2018/2/6. */ public class WordCountReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count = count + value.get(); } context.write(key, new IntWritable(count)); } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      編寫WordCountDriver

      package net.hw.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.net.URI; /** * Created by howard on 2018/2/6. */ public class WordCountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); String uri = "hdfs://hadoop:9000"; Path inputPath = new Path(uri + "/word"); Path outputPath = new Path(uri + "/word/result"); FileSystem fs = FileSystem.get(new URI(uri), conf); fs.delete(outputPath, true); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); System.out.println("統(tǒng)計結(jié)果:"); FileStatus[] fileStatuses = fs.listStatus(outputPath); for (int i = 1; i < fileStatuses.length; i++) { System.out.println(fileStatuses[i].getPath()); FSDataInputStream in = fs.open(fileStatuses[i].getPath()); IOUtils.copyBytes(in, System.out, 4096, false); } } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      48

      49

      50

      51

      52

      53

      54

      55

      運行程序WordCountDriver,查看結(jié)果

      編寫WordCount

      package net.hw.spark.wc import org.apache.spark.{SparkConf, SparkContext} /** * Created by howard on 2018/2/6. */ object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("wordcount") val sc = new SparkContext(conf) val rdd = sc.textFile("test.txt") .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) rdd.foreach(println) rdd.saveAsTextFile("result") } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      啟動WordCount,查看結(jié)果

      大家可以看出,完成同樣的詞頻統(tǒng)計任務(wù),Spark代碼比MapReduce代碼簡潔很多。

      (五)Spark存儲層次

      Spark 不僅可以將任何Hadoop 分布式文件系統(tǒng)(HDFS)上的文件讀取為分布式數(shù)據(jù)集,也可以支持其他支持Hadoop 接口的系統(tǒng),比如本地文件、亞馬遜S3、Cassandra、Hive、HBase 等。我們需要弄清楚的是,Hadoop 并非Spark 的必要條件,Spark 支持任何實現(xiàn)了Hadoop 接口的存儲系統(tǒng)。Spark 支持的Hadoop 輸入格式包括文本文件、SequenceFile、Avro、Parquet 等。

      (六)Spark生態(tài)圈

      1、Spark SQL

      val sqlContext = new org.apache.spark.sql.SQLContext(sc) val prop = new java.util.Properties prop.put("user","root") prop.put("password","root") val df = sqlContext.read.jdbc("jdbc:mysql://hadoop:3306/studb", "student", prop) df.show()

      1

      2

      3

      4

      5

      6

      2、Spark Streaming

      3、MLlib

      4、GraphX

      (七)Spark應(yīng)用場景

      1、騰訊

      廣點通是最早使用Spark的應(yīng)用之一。騰訊大數(shù)據(jù)精準推薦借助Spark快速迭代的優(yōu)勢,圍繞“數(shù)據(jù)+算法+系統(tǒng)”這套技術(shù)方案,實現(xiàn)了在“數(shù)據(jù)實時采集、算法實時訓(xùn)練、系統(tǒng)實時預(yù)測”的全流程實時并行高維算法,最終成功應(yīng)用于廣點通pCTR (Predict Click-Through Rate) 投放系統(tǒng)上,支持每天上百億的請求量。

      2、Yahoo

      Yahoo將Spark用在Audience Expansion中。Audience Expansion是廣告中尋找目標用戶的一種方法,首先廣告者提供一些觀看了廣告并且購買產(chǎn)品的樣本客戶,據(jù)此進行學(xué)習(xí),尋找更多可能轉(zhuǎn)化的用戶,對他們定向廣告。Yahoo采用的算法是Logistic Regression。同時由于某些SQL負載需要更高的服務(wù)質(zhì)量,又加入了專門跑Shark的大內(nèi)存集群,用于取代商業(yè)BI/OLAP工具,承擔報表/儀表盤和交互式/即席查詢,同時與桌面BI工具對接。

      3、淘寶

      淘寶技術(shù)團隊使用了Spark來解決多次迭代的機器學(xué)習(xí)算法、高計算復(fù)雜度的算法等,將Spark運用于淘寶的推薦相關(guān)算法上,同時還利用GraphX解決了許多生產(chǎn)問題,包括以下計算場景:基于度分布的中樞節(jié)點發(fā)現(xiàn)、基于最大連通圖的社區(qū)發(fā)現(xiàn)、基于三角形計數(shù)的關(guān)系衡量、基于隨機游走的用戶屬性傳播等。

      4、優(yōu)酷土豆

      目前Spark已經(jīng)廣泛使用在優(yōu)酷土豆的視頻推薦,廣告業(yè)務(wù)等方面,相比Hadoop,Spark交互查詢響應(yīng)快,性能比Hadoop提高若干倍。一方面,使用Spark模擬廣告投放的計算效率高、延遲?。ㄍ琀adoop比延遲至少降低一個數(shù)量級)。另一方面,優(yōu)酷土豆的視頻推薦往往涉及機器學(xué)習(xí)及圖計算,而使用Spark解決機器學(xué)習(xí)、圖計算等迭代計算能夠大大減少網(wǎng)絡(luò)傳輸、數(shù)據(jù)落地等的次數(shù),極大地提高了計算性能。

      二、搭建Spark環(huán)境

      (一)搭建單機版環(huán)境

      參看學(xué)習(xí)筆記《大數(shù)據(jù)學(xué)習(xí)筆記03:安裝配置CentOS7虛擬機》下載鏈接:https://pan.baidu.com/s/1wxRh3ggzxZtzQshqMy_A8g 提取碼:71yw

      在VMware Workstation上創(chuàng)建了虛擬機 - ied

      1、卸載CentOS7自帶的OpenJDK

      通過命令 rpm -qa | grep java 查詢已經(jīng)安裝的java包

      通過命令rpm -e --nodeps xxxxxx卸載已經(jīng)安裝的OpenJDK包

      rpm -e --nodeps java-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64 rpm -e --nodeps java-1.7.0-openjdk-1.7.0.261-2.6.22.2.el7_8.x86_64 rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.262.b10-1.el7.x86_64 rpm -e --nodeps java-1.7.0-openjdk-headless-1.7.0.261-2.6.22.2.el7_8.x86_64

      1

      2

      3

      4

      確認是否已經(jīng)刪除成功

      2、下載和安裝JDK

      下載鏈接:https://pan.baidu.com/s/1RcqHInNZjcV-TnxAMEtjzA 提取碼:jivr

      上傳到虛擬機/opt目錄

      將Java安裝包解壓到/usr/local

      tar -zxvf jdk-8u231-linux-x64.tar.gz -C /usr/local

      配置環(huán)境變量

      JAVA_HOME=/usr/local/jdk1.8.0_231 CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar PATH=$JAVA_HOME/bin:$PATH export JAVA_HOME PATH CLASSPATH

      1

      2

      3

      4

      存盤退出,讓環(huán)境配置生效

      在任意目錄下都可以查看JDK版本(不是CentOS自帶的OpenJDK)

      3、下載Spark安裝包到Windows本地

      下載鏈接:https://pan.baidu.com/s/1dLKt5UJgpqehRNNDcoY2DQ 提取碼:zh0x

      4、將Spark安裝包上傳到Linux的/opt目錄下

      進入/opt目錄

      利用rz命令上傳Spark安裝包

      5、將Spark安裝包解壓到/usr/local目錄下

      tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz -C /usr/local

      6、配置Spark環(huán)境變量

      執(zhí)行 vim /etc/profile

      JAVA_HOME=/usr/local/jdk1.8.0_231 CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar SPARK_HOME=/usr/local/spark-2.4.4-bin-hadoop2.7 PATH=$JAVA_HOME/bin:$SPARK_HOME/bin:$PATH export JAVA_HOME SPARK_HOME PATH CLASSPATH

      1

      2

      3

      4

      5

      存盤退出,讓環(huán)境配置生效

      7、使用SparkPi來計算Pi的值

      run-example SparkPi 2 # 其中參數(shù)2是指兩個并行度

      [root@ied opt]# run-example SparkPi 2 22/02/20 04:24:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 22/02/20 04:24:34 INFO SparkContext: Running Spark version 2.4.4 22/02/20 04:24:34 INFO SparkContext: Submitted application: Spark Pi 22/02/20 04:24:34 INFO SecurityManager: Changing view acls to: root 22/02/20 04:24:34 INFO SecurityManager: Changing modify acls to: root 22/02/20 04:24:34 INFO SecurityManager: Changing view acls groups to: 22/02/20 04:24:34 INFO SecurityManager: Changing modify acls groups to: 22/02/20 04:24:34 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 22/02/20 04:24:35 INFO Utils: Successfully started service 'sparkDriver' on port 41942. 22/02/20 04:24:35 INFO SparkEnv: Registering MapOutputTracker 22/02/20 04:24:36 INFO SparkEnv: Registering BlockManagerMaster 22/02/20 04:24:36 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 22/02/20 04:24:36 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 22/02/20 04:24:36 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8de32b0e-530a-47ba-ad2d-efcfaa2af498 22/02/20 04:24:36 INFO MemoryStore: MemoryStore started with capacity 413.9 MB 22/02/20 04:24:36 INFO SparkEnv: Registering OutputCommitCoordinator 22/02/20 04:24:36 INFO Utils: Successfully started service 'SparkUI' on port 4040. 22/02/20 04:24:36 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://ied:4040 22/02/20 04:24:36 INFO SparkContext: Added JAR file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar at spark://ied:41942/jars/spark-examples_2.11-2.4.4.jar with timestamp 1645302276946 22/02/20 04:24:36 INFO SparkContext: Added JAR file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/jars/scopt_2.11-3.7.0.jar at spark://ied:41942/jars/scopt_2.11-3.7.0.jar with timestamp 1645302276946 22/02/20 04:24:37 INFO Executor: Starting executor ID driver on host localhost 22/02/20 04:24:37 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33814. 22/02/20 04:24:37 INFO NettyBlockTransferService: Server created on ied:33814 22/02/20 04:24:37 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 22/02/20 04:24:37 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ied, 33814, None) 22/02/20 04:24:37 INFO BlockManagerMasterEndpoint: Registering block manager ied:33814 with 413.9 MB RAM, BlockManagerId(driver, ied, 33814, None) 22/02/20 04:24:37 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ied, 33814, None) 22/02/20 04:24:37 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, ied, 33814, None) 22/02/20 04:24:39 INFO SparkContext: Starting job: reduce at SparkPi.scala:38 22/02/20 04:24:39 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:38) with 2 output partitions 22/02/20 04:24:39 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:38) 22/02/20 04:24:39 INFO DAGScheduler: Parents of final stage: List() 22/02/20 04:24:39 INFO DAGScheduler: Missing parents: List() 22/02/20 04:24:39 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no missing parents 22/02/20 04:24:40 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1936.0 B, free 413.9 MB) 22/02/20 04:24:40 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1256.0 B, free 413.9 MB) 22/02/20 04:24:40 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ied:33814 (size: 1256.0 B, free: 413.9 MB) 22/02/20 04:24:40 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1161 22/02/20 04:24:40 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34) (first 15 tasks are for partitions Vector(0, 1)) 22/02/20 04:24:40 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 22/02/20 04:24:40 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7866 bytes) 22/02/20 04:24:40 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 22/02/20 04:24:40 INFO Executor: Fetching spark://ied:41942/jars/scopt_2.11-3.7.0.jar with timestamp 1645302276946 22/02/20 04:24:41 INFO TransportClientFactory: Successfully created connection to ied/192.168.225.100:41942 after 185 ms (0 ms spent in bootstraps) 22/02/20 04:24:41 INFO Utils: Fetching spark://ied:41942/jars/scopt_2.11-3.7.0.jar to /tmp/spark-1426c39a-4d28-40e6-84da-d2d5f6071ddf/userFiles-3f7a473d-50b4-46ed-be1f-d77e07167e09/fetchFileTemp2787747616090799670.tmp 22/02/20 04:24:42 INFO Executor: Adding file:/tmp/spark-1426c39a-4d28-40e6-84da-d2d5f6071ddf/userFiles-3f7a473d-50b4-46ed-be1f-d77e07167e09/scopt_2.11-3.7.0.jar to class loader 22/02/20 04:24:42 INFO Executor: Fetching spark://ied:41942/jars/spark-examples_2.11-2.4.4.jar with timestamp 1645302276946 22/02/20 04:24:42 INFO Utils: Fetching spark://ied:41942/jars/spark-examples_2.11-2.4.4.jar to /tmp/spark-1426c39a-4d28-40e6-84da-d2d5f6071ddf/userFiles-3f7a473d-50b4-46ed-be1f-d77e07167e09/fetchFileTemp5384793568751348333.tmp 22/02/20 04:24:42 INFO Executor: Adding file:/tmp/spark-1426c39a-4d28-40e6-84da-d2d5f6071ddf/userFiles-3f7a473d-50b4-46ed-be1f-d77e07167e09/spark-examples_2.11-2.4.4.jar to class loader 22/02/20 04:24:42 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 910 bytes result sent to driver 22/02/20 04:24:42 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7866 bytes) 22/02/20 04:24:42 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 22/02/20 04:24:42 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 867 bytes result sent to driver 22/02/20 04:24:42 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1654 ms on localhost (executor driver) (1/2) 22/02/20 04:24:42 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 139 ms on localhost (executor driver) (2/2) 22/02/20 04:24:42 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 22/02/20 04:24:42 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 2.597 s 22/02/20 04:24:42 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 2.956212 s Pi is roughly 3.1441757208786045 22/02/20 04:24:42 INFO SparkUI: Stopped Spark web UI at http://ied:4040 22/02/20 04:24:42 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 22/02/20 04:24:42 INFO MemoryStore: MemoryStore cleared 22/02/20 04:24:42 INFO BlockManager: BlockManager stopped 22/02/20 04:24:42 INFO BlockManagerMaster: BlockManagerMaster stopped 22/02/20 04:24:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 22/02/20 04:24:42 INFO SparkContext: Successfully stopped SparkContext 22/02/20 04:24:42 INFO ShutdownHookManager: Shutdown hook called 22/02/20 04:24:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-1426c39a-4d28-40e6-84da-d2d5f6071ddf 22/02/20 04:24:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8fe131d-a733-466f-9665-4277ace75a06

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      48

      49

      50

      51

      52

      53

      54

      55

      56

      57

      58

      59

      60

      61

      62

      63

      64

      65

      66

      67

      68

      69

      70

      71

      8、使用Scala版本Spark-Shell

      執(zhí)行 spark-shell 命令啟動Scala版的Spark-Shell

      9、使用Python版本Spark-Shell

      執(zhí)行 pyspark 命令啟動Python版的Spark-Shell

      上傳test.txt文件到/opt目錄

      執(zhí)行 pyspark 啟動 spark shell

      Spark 中的RDD (Resilient Distributed Dataset) 就是一個不可變的分布式對象集合。每個RDD 都被分為多個分區(qū),這些分區(qū)運行在集群中的不同節(jié)點上。RDD 可以包含Python、Java、Scala 中任意類型的對象,甚至可以包含用戶自定義的對象。用戶可以使用兩種方法創(chuàng)建RDD:讀取一個外部數(shù)據(jù)集,或在驅(qū)動器程序里分發(fā)驅(qū)動器程序中的對象集合(比如list 和set)。

      >>> lines = sc.textFile('test.txt')

      創(chuàng)建出來后,RDD 支持兩種類型的操作: 轉(zhuǎn)化操作(transformation) 和行動操作(action)。轉(zhuǎn)化操作會由一個RDD 生成一個新的RDD。另一方面,行動操作會對RDD 計算出一個結(jié)果,并把結(jié)果返回到驅(qū)動器程序中,或把結(jié)果存儲到外部存儲系統(tǒng)(如HDFS)中。

      >>> sparkLines = lines.filter(lambda line: 'spark' in line)

      >>> sparkLines.first()

      ‘hello hadoop hello spark’

      轉(zhuǎn)化操作和行動操作的區(qū)別在于Spark 計算RDD 的方式不同。雖然你可以在任何時候定義新的RDD,但Spark 只會惰性計算這些RDD。它們只有第一次在一個行動操作中用到時,才會真正計算。這種策略剛開始看起來可能會顯得有些奇怪,不過在大數(shù)據(jù)領(lǐng)域是很有道理的。比如,看看例2 和例3,我們以一個文本文件定義了數(shù)據(jù),然后把其中包含spark的行篩選出來。如果Spark 在我們運行l(wèi)ines = sc.textFile(…) 時就把文件中所有的行都讀取并存儲起來,就會消耗很多存儲空間,而我們馬上就要篩選掉其中的很多數(shù)據(jù)。相反, 一旦Spark 了解了完整的轉(zhuǎn)化操作鏈之后,它就可以只計算求結(jié)果時真正需要的數(shù)據(jù)。事實上,在行動操作first() 中,Spark 只需要掃描文件直到找到第一個匹配的行為止,而不需要讀取整個文件。

      三、Spark運行架構(gòu)及原理

      Hadoop spark

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應(yīng)法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:Laravel Redis操作大全
      下一篇:【元啟發(fā)式算法】基于序列優(yōu)化的遺傳算法常用變異算子
      相關(guān)文章
      久久99亚洲网美利坚合众国 | 亚洲av午夜精品一区二区三区| 久久亚洲sm情趣捆绑调教| 亚洲av无码精品网站| 日韩亚洲一区二区三区| 亚洲免费人成在线视频观看| 亚洲日韩精品一区二区三区无码 | 亚洲人成影院在线观看| 国产精品亚洲一区二区三区| 久久亚洲精品无码gv| 色噜噜噜噜亚洲第一| 国产成人亚洲精品蜜芽影院| 蜜芽亚洲av无码一区二区三区| 亚洲熟妇无码AV不卡在线播放| 亚洲精品无码av片| 亚洲AV日韩AV一区二区三曲| 国产精品自拍亚洲| 亚洲精品第一国产综合境外资源 | 无码不卡亚洲成?人片| 亚洲国产成人精品女人久久久| 亚洲国产中文字幕在线观看 | 亚洲五月综合网色九月色| 国产成人亚洲精品| 亚洲中文字幕精品久久| 亚洲国产aⅴ成人精品无吗| 亚洲AV无码片一区二区三区| 亚洲国产aⅴ综合网| 亚洲午夜久久久影院伊人| 亚洲国产另类久久久精品| 亚洲四虎永久在线播放| 亚洲国产午夜电影在线入口| 亚洲人成小说网站色| 欧洲亚洲国产精华液| 在线观看国产一区亚洲bd| 国产午夜亚洲不卡| 亚洲AV无码一区二区三区DV| 亚洲第一成年人网站| 亚洲乱码在线观看| 亚洲 自拍 另类小说综合图区| 精品国产亚洲男女在线线电影 | 亚洲日韩亚洲另类激情文学|