學(xué)習(xí)筆記20170601">【PMP】學(xué)習(xí)筆記20170601
955
2022-05-29
文章目錄
零、本講學(xué)習(xí)目標
一、認識Spark
(一)Spark簡介
(二)Spark官網(wǎng)
(三)Spark發(fā)展歷史
(四)Spark的特點
1、快速
2、易用性
3、通用性
4、隨處運行
5、代碼簡潔
(1)采用MapReduce實現(xiàn)詞頻統(tǒng)計
(2)采用Spark實現(xiàn)詞頻統(tǒng)計
(五)Spark存儲層次
(六)Spark生態(tài)圈
1、Spark SQL
2、Spark Streaming
3、MLlib
4、GraphX
(七)Spark應(yīng)用場景
1、騰訊
2、Yahoo
3、淘寶
4、優(yōu)酷土豆
二、搭建Spark環(huán)境
(一)搭建單機版環(huán)境
1、卸載CentOS7自帶的OpenJDK
2、下載和安裝JDK
3、下載Spark安裝包到Windows本地
4、將Spark安裝包上傳到Linux的/opt目錄下
5、將Spark安裝包解壓到/usr/local目錄下
6、配置Spark環(huán)境變量
7、使用SparkPi來計算Pi的值
8、使用Scala版本Spark-Shell
9、使用Python版本Spark-Shell
例1、在Python 中使用textFile() 創(chuàng)建一個字符串的RDD
例2、調(diào)用轉(zhuǎn)化操作filter()
例3、調(diào)用first() 行動操作
三、Spark運行架構(gòu)及原理
零、本講學(xué)習(xí)目標
了解Spark的發(fā)展歷史及特點
學(xué)會搭建Spark環(huán)境
了解Spark的運行架構(gòu)與原理
一、認識Spark
(一)Spark簡介
快速、分布式、可擴展、容錯的集群計算框架;
Spark是基于內(nèi)存計算的大數(shù)據(jù)分布式計算框架;
Spark提供低延遲的復(fù)雜分析;
Spark是Hadoop MapReduce的替代方案。MapReudce不適合迭代和交互式任務(wù),Spark主要為交互式查詢和迭代算法設(shè)計,支持內(nèi)存存儲和高效的容錯恢復(fù)。Spark擁有MapReduce具有的優(yōu)點,但不同于MapReduce,Spark中間輸出結(jié)果可以保存在內(nèi)存中,減少讀寫HDFS的次數(shù)。
(二)Spark官網(wǎng)
官網(wǎng)網(wǎng)址:https://spark.apache.org
(三)Spark發(fā)展歷史
Spark目前最新版本是2022年1月26日發(fā)布的Spark3.2.1
(四)Spark的特點
1、快速
一般情況下,對于迭代次數(shù)較多的應(yīng)用程序,Spark程序在內(nèi)存中的運行速度是Hadoop MapReduce運行速度的100多倍,在磁盤上的運行速度是Hadoop MapReduce運行速度的10多倍。
2、易用性
Spark支持使用Scala、Python、Java及R語言快速編寫應(yīng)用。同時Spark提供超過80個高級運算符,使得編寫并行應(yīng)用程序變得容易并且可以在Scala、Python或R的交互模式下使用Spark。
3、通用性
Spark可以與SQL、Streaming及復(fù)雜的分析良好結(jié)合。Spark還有一系列的高級工具,包括Spark SQL、MLlib(機器學(xué)習(xí)庫)、GraphX(圖計算)和Spark Streaming,并且支持在一個應(yīng)用中同時使用這些組件。
4、隨處運行
用戶可以使用Spark的獨立集群模式運行Spark,也可以在EC2(亞馬遜彈性計算云)、Hadoop YARN或者Apache Mesos上運行Spark。并且可以從HDFS、Cassandra、HBase、Hive、Tachyon和任何分布式文件系統(tǒng)讀取數(shù)據(jù)。
5、代碼簡潔
參看【采用多種方式實現(xiàn)詞頻統(tǒng)計】
編寫WordCountMapper
package net.hw.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Created by howard on 2018/2/6. */ public class WordCountMapper extends Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
編寫WordCountReducer
package net.hw.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by howard on 2018/2/6. */ public class WordCountReducer extends Reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
編寫WordCountDriver
package net.hw.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.net.URI; /** * Created by howard on 2018/2/6. */ public class WordCountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); String uri = "hdfs://hadoop:9000"; Path inputPath = new Path(uri + "/word"); Path outputPath = new Path(uri + "/word/result"); FileSystem fs = FileSystem.get(new URI(uri), conf); fs.delete(outputPath, true); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); System.out.println("統(tǒng)計結(jié)果:"); FileStatus[] fileStatuses = fs.listStatus(outputPath); for (int i = 1; i < fileStatuses.length; i++) { System.out.println(fileStatuses[i].getPath()); FSDataInputStream in = fs.open(fileStatuses[i].getPath()); IOUtils.copyBytes(in, System.out, 4096, false); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
運行程序WordCountDriver,查看結(jié)果
編寫WordCount
package net.hw.spark.wc import org.apache.spark.{SparkConf, SparkContext} /** * Created by howard on 2018/2/6. */ object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("wordcount") val sc = new SparkContext(conf) val rdd = sc.textFile("test.txt") .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) rdd.foreach(println) rdd.saveAsTextFile("result") } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
啟動WordCount,查看結(jié)果
大家可以看出,完成同樣的詞頻統(tǒng)計任務(wù),Spark代碼比MapReduce代碼簡潔很多。
(五)Spark存儲層次
Spark 不僅可以將任何Hadoop 分布式文件系統(tǒng)(HDFS)上的文件讀取為分布式數(shù)據(jù)集,也可以支持其他支持Hadoop 接口的系統(tǒng),比如本地文件、亞馬遜S3、Cassandra、Hive、HBase 等。我們需要弄清楚的是,Hadoop 并非Spark 的必要條件,Spark 支持任何實現(xiàn)了Hadoop 接口的存儲系統(tǒng)。Spark 支持的Hadoop 輸入格式包括文本文件、SequenceFile、Avro、Parquet 等。
(六)Spark生態(tài)圈
1、Spark SQL
val sqlContext = new org.apache.spark.sql.SQLContext(sc) val prop = new java.util.Properties prop.put("user","root") prop.put("password","root") val df = sqlContext.read.jdbc("jdbc:mysql://hadoop:3306/studb", "student", prop) df.show()
1
2
3
4
5
6
2、Spark Streaming
3、MLlib
4、GraphX
(七)Spark應(yīng)用場景
1、騰訊
廣點通是最早使用Spark的應(yīng)用之一。騰訊大數(shù)據(jù)精準推薦借助Spark快速迭代的優(yōu)勢,圍繞“數(shù)據(jù)+算法+系統(tǒng)”這套技術(shù)方案,實現(xiàn)了在“數(shù)據(jù)實時采集、算法實時訓(xùn)練、系統(tǒng)實時預(yù)測”的全流程實時并行高維算法,最終成功應(yīng)用于廣點通pCTR (Predict Click-Through Rate) 投放系統(tǒng)上,支持每天上百億的請求量。
2、Yahoo
Yahoo將Spark用在Audience Expansion中。Audience Expansion是廣告中尋找目標用戶的一種方法,首先廣告者提供一些觀看了廣告并且購買產(chǎn)品的樣本客戶,據(jù)此進行學(xué)習(xí),尋找更多可能轉(zhuǎn)化的用戶,對他們定向廣告。Yahoo采用的算法是Logistic Regression。同時由于某些SQL負載需要更高的服務(wù)質(zhì)量,又加入了專門跑Shark的大內(nèi)存集群,用于取代商業(yè)BI/OLAP工具,承擔報表/儀表盤和交互式/即席查詢,同時與桌面BI工具對接。
3、淘寶
淘寶技術(shù)團隊使用了Spark來解決多次迭代的機器學(xué)習(xí)算法、高計算復(fù)雜度的算法等,將Spark運用于淘寶的推薦相關(guān)算法上,同時還利用GraphX解決了許多生產(chǎn)問題,包括以下計算場景:基于度分布的中樞節(jié)點發(fā)現(xiàn)、基于最大連通圖的社區(qū)發(fā)現(xiàn)、基于三角形計數(shù)的關(guān)系衡量、基于隨機游走的用戶屬性傳播等。
4、優(yōu)酷土豆
目前Spark已經(jīng)廣泛使用在優(yōu)酷土豆的視頻推薦,廣告業(yè)務(wù)等方面,相比Hadoop,Spark交互查詢響應(yīng)快,性能比Hadoop提高若干倍。一方面,使用Spark模擬廣告投放的計算效率高、延遲?。ㄍ琀adoop比延遲至少降低一個數(shù)量級)。另一方面,優(yōu)酷土豆的視頻推薦往往涉及機器學(xué)習(xí)及圖計算,而使用Spark解決機器學(xué)習(xí)、圖計算等迭代計算能夠大大減少網(wǎng)絡(luò)傳輸、數(shù)據(jù)落地等的次數(shù),極大地提高了計算性能。
二、搭建Spark環(huán)境
(一)搭建單機版環(huán)境
參看學(xué)習(xí)筆記《大數(shù)據(jù)學(xué)習(xí)筆記03:安裝配置CentOS7虛擬機》下載鏈接:https://pan.baidu.com/s/1wxRh3ggzxZtzQshqMy_A8g 提取碼:71yw
在VMware Workstation上創(chuàng)建了虛擬機 - ied
1、卸載CentOS7自帶的OpenJDK
通過命令 rpm -qa | grep java 查詢已經(jīng)安裝的java包
通過命令rpm -e --nodeps xxxxxx卸載已經(jīng)安裝的OpenJDK包
rpm -e --nodeps java-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64 rpm -e --nodeps java-1.7.0-openjdk-1.7.0.261-2.6.22.2.el7_8.x86_64 rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.262.b10-1.el7.x86_64 rpm -e --nodeps java-1.7.0-openjdk-headless-1.7.0.261-2.6.22.2.el7_8.x86_64
1
2
3
4
確認是否已經(jīng)刪除成功
2、下載和安裝JDK
下載鏈接:https://pan.baidu.com/s/1RcqHInNZjcV-TnxAMEtjzA 提取碼:jivr
上傳到虛擬機/opt目錄
將Java安裝包解壓到/usr/local
tar -zxvf jdk-8u231-linux-x64.tar.gz -C /usr/local
配置環(huán)境變量
JAVA_HOME=/usr/local/jdk1.8.0_231 CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar PATH=$JAVA_HOME/bin:$PATH export JAVA_HOME PATH CLASSPATH
1
2
3
4
存盤退出,讓環(huán)境配置生效
在任意目錄下都可以查看JDK版本(不是CentOS自帶的OpenJDK)
3、下載Spark安裝包到Windows本地
下載鏈接:https://pan.baidu.com/s/1dLKt5UJgpqehRNNDcoY2DQ 提取碼:zh0x
4、將Spark安裝包上傳到Linux的/opt目錄下
進入/opt目錄
利用rz命令上傳Spark安裝包
5、將Spark安裝包解壓到/usr/local目錄下
tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz -C /usr/local
6、配置Spark環(huán)境變量
執(zhí)行 vim /etc/profile
JAVA_HOME=/usr/local/jdk1.8.0_231 CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar SPARK_HOME=/usr/local/spark-2.4.4-bin-hadoop2.7 PATH=$JAVA_HOME/bin:$SPARK_HOME/bin:$PATH export JAVA_HOME SPARK_HOME PATH CLASSPATH
1
2
3
4
5
存盤退出,讓環(huán)境配置生效
7、使用SparkPi來計算Pi的值
run-example SparkPi 2 # 其中參數(shù)2是指兩個并行度
[root@ied opt]# run-example SparkPi 2 22/02/20 04:24:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 22/02/20 04:24:34 INFO SparkContext: Running Spark version 2.4.4 22/02/20 04:24:34 INFO SparkContext: Submitted application: Spark Pi 22/02/20 04:24:34 INFO SecurityManager: Changing view acls to: root 22/02/20 04:24:34 INFO SecurityManager: Changing modify acls to: root 22/02/20 04:24:34 INFO SecurityManager: Changing view acls groups to: 22/02/20 04:24:34 INFO SecurityManager: Changing modify acls groups to: 22/02/20 04:24:34 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 22/02/20 04:24:35 INFO Utils: Successfully started service 'sparkDriver' on port 41942. 22/02/20 04:24:35 INFO SparkEnv: Registering MapOutputTracker 22/02/20 04:24:36 INFO SparkEnv: Registering BlockManagerMaster 22/02/20 04:24:36 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 22/02/20 04:24:36 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 22/02/20 04:24:36 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8de32b0e-530a-47ba-ad2d-efcfaa2af498 22/02/20 04:24:36 INFO MemoryStore: MemoryStore started with capacity 413.9 MB 22/02/20 04:24:36 INFO SparkEnv: Registering OutputCommitCoordinator 22/02/20 04:24:36 INFO Utils: Successfully started service 'SparkUI' on port 4040. 22/02/20 04:24:36 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://ied:4040 22/02/20 04:24:36 INFO SparkContext: Added JAR file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar at spark://ied:41942/jars/spark-examples_2.11-2.4.4.jar with timestamp 1645302276946 22/02/20 04:24:36 INFO SparkContext: Added JAR file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/jars/scopt_2.11-3.7.0.jar at spark://ied:41942/jars/scopt_2.11-3.7.0.jar with timestamp 1645302276946 22/02/20 04:24:37 INFO Executor: Starting executor ID driver on host localhost 22/02/20 04:24:37 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33814. 22/02/20 04:24:37 INFO NettyBlockTransferService: Server created on ied:33814 22/02/20 04:24:37 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 22/02/20 04:24:37 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ied, 33814, None) 22/02/20 04:24:37 INFO BlockManagerMasterEndpoint: Registering block manager ied:33814 with 413.9 MB RAM, BlockManagerId(driver, ied, 33814, None) 22/02/20 04:24:37 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ied, 33814, None) 22/02/20 04:24:37 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, ied, 33814, None) 22/02/20 04:24:39 INFO SparkContext: Starting job: reduce at SparkPi.scala:38 22/02/20 04:24:39 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:38) with 2 output partitions 22/02/20 04:24:39 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:38) 22/02/20 04:24:39 INFO DAGScheduler: Parents of final stage: List() 22/02/20 04:24:39 INFO DAGScheduler: Missing parents: List() 22/02/20 04:24:39 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no missing parents 22/02/20 04:24:40 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1936.0 B, free 413.9 MB) 22/02/20 04:24:40 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1256.0 B, free 413.9 MB) 22/02/20 04:24:40 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ied:33814 (size: 1256.0 B, free: 413.9 MB) 22/02/20 04:24:40 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1161 22/02/20 04:24:40 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34) (first 15 tasks are for partitions Vector(0, 1)) 22/02/20 04:24:40 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 22/02/20 04:24:40 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7866 bytes) 22/02/20 04:24:40 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 22/02/20 04:24:40 INFO Executor: Fetching spark://ied:41942/jars/scopt_2.11-3.7.0.jar with timestamp 1645302276946 22/02/20 04:24:41 INFO TransportClientFactory: Successfully created connection to ied/192.168.225.100:41942 after 185 ms (0 ms spent in bootstraps) 22/02/20 04:24:41 INFO Utils: Fetching spark://ied:41942/jars/scopt_2.11-3.7.0.jar to /tmp/spark-1426c39a-4d28-40e6-84da-d2d5f6071ddf/userFiles-3f7a473d-50b4-46ed-be1f-d77e07167e09/fetchFileTemp2787747616090799670.tmp 22/02/20 04:24:42 INFO Executor: Adding file:/tmp/spark-1426c39a-4d28-40e6-84da-d2d5f6071ddf/userFiles-3f7a473d-50b4-46ed-be1f-d77e07167e09/scopt_2.11-3.7.0.jar to class loader 22/02/20 04:24:42 INFO Executor: Fetching spark://ied:41942/jars/spark-examples_2.11-2.4.4.jar with timestamp 1645302276946 22/02/20 04:24:42 INFO Utils: Fetching spark://ied:41942/jars/spark-examples_2.11-2.4.4.jar to /tmp/spark-1426c39a-4d28-40e6-84da-d2d5f6071ddf/userFiles-3f7a473d-50b4-46ed-be1f-d77e07167e09/fetchFileTemp5384793568751348333.tmp 22/02/20 04:24:42 INFO Executor: Adding file:/tmp/spark-1426c39a-4d28-40e6-84da-d2d5f6071ddf/userFiles-3f7a473d-50b4-46ed-be1f-d77e07167e09/spark-examples_2.11-2.4.4.jar to class loader 22/02/20 04:24:42 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 910 bytes result sent to driver 22/02/20 04:24:42 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7866 bytes) 22/02/20 04:24:42 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 22/02/20 04:24:42 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 867 bytes result sent to driver 22/02/20 04:24:42 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1654 ms on localhost (executor driver) (1/2) 22/02/20 04:24:42 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 139 ms on localhost (executor driver) (2/2) 22/02/20 04:24:42 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 22/02/20 04:24:42 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 2.597 s 22/02/20 04:24:42 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 2.956212 s Pi is roughly 3.1441757208786045 22/02/20 04:24:42 INFO SparkUI: Stopped Spark web UI at http://ied:4040 22/02/20 04:24:42 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 22/02/20 04:24:42 INFO MemoryStore: MemoryStore cleared 22/02/20 04:24:42 INFO BlockManager: BlockManager stopped 22/02/20 04:24:42 INFO BlockManagerMaster: BlockManagerMaster stopped 22/02/20 04:24:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 22/02/20 04:24:42 INFO SparkContext: Successfully stopped SparkContext 22/02/20 04:24:42 INFO ShutdownHookManager: Shutdown hook called 22/02/20 04:24:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-1426c39a-4d28-40e6-84da-d2d5f6071ddf 22/02/20 04:24:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8fe131d-a733-466f-9665-4277ace75a06
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
8、使用Scala版本Spark-Shell
執(zhí)行 spark-shell 命令啟動Scala版的Spark-Shell
9、使用Python版本Spark-Shell
執(zhí)行 pyspark 命令啟動Python版的Spark-Shell
上傳test.txt文件到/opt目錄
執(zhí)行 pyspark 啟動 spark shell
Spark 中的RDD (Resilient Distributed Dataset) 就是一個不可變的分布式對象集合。每個RDD 都被分為多個分區(qū),這些分區(qū)運行在集群中的不同節(jié)點上。RDD 可以包含Python、Java、Scala 中任意類型的對象,甚至可以包含用戶自定義的對象。用戶可以使用兩種方法創(chuàng)建RDD:讀取一個外部數(shù)據(jù)集,或在驅(qū)動器程序里分發(fā)驅(qū)動器程序中的對象集合(比如list 和set)。
>>> lines = sc.textFile('test.txt')
創(chuàng)建出來后,RDD 支持兩種類型的操作: 轉(zhuǎn)化操作(transformation) 和行動操作(action)。轉(zhuǎn)化操作會由一個RDD 生成一個新的RDD。另一方面,行動操作會對RDD 計算出一個結(jié)果,并把結(jié)果返回到驅(qū)動器程序中,或把結(jié)果存儲到外部存儲系統(tǒng)(如HDFS)中。
>>> sparkLines = lines.filter(lambda line: 'spark' in line)
>>> sparkLines.first()
‘hello hadoop hello spark’
轉(zhuǎn)化操作和行動操作的區(qū)別在于Spark 計算RDD 的方式不同。雖然你可以在任何時候定義新的RDD,但Spark 只會惰性計算這些RDD。它們只有第一次在一個行動操作中用到時,才會真正計算。這種策略剛開始看起來可能會顯得有些奇怪,不過在大數(shù)據(jù)領(lǐng)域是很有道理的。比如,看看例2 和例3,我們以一個文本文件定義了數(shù)據(jù),然后把其中包含spark的行篩選出來。如果Spark 在我們運行l(wèi)ines = sc.textFile(…) 時就把文件中所有的行都讀取并存儲起來,就會消耗很多存儲空間,而我們馬上就要篩選掉其中的很多數(shù)據(jù)。相反, 一旦Spark 了解了完整的轉(zhuǎn)化操作鏈之后,它就可以只計算求結(jié)果時真正需要的數(shù)據(jù)。事實上,在行動操作first() 中,Spark 只需要掃描文件直到找到第一個匹配的行為止,而不需要讀取整個文件。
三、Spark運行架構(gòu)及原理
Hadoop spark
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應(yīng)法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。