SparkAPI Java版】JavaPairRDD——cache、persist、unpersist、getStorageL

      網(wǎng)友投稿 694 2025-04-02

      這四個(gè)方法都和緩存有關(guān),所以寫在一塊。


      Persist this RDD with the default storage level (MEMORY_ONLY).

      cache就是在內(nèi)存中緩存數(shù)據(jù),其實(shí)也是使用的persist。使用非序列化的方式將RDD的數(shù)據(jù)全部嘗試持久化到內(nèi)存中,cache()只是一個(gè)transformtion,是lazy的,必須通過(guò)一個(gè)action觸發(fā),才能真正的將該RDD cache到內(nèi)存中。

      //scala def cache(): JavaPairRDD[K, V] //java public JavaPairRDD cache()

      Set this RDD's storage level to persist its values across operations after the first time it is computed. Can only be called once on each RDD.

      將此RDD的存儲(chǔ)級(jí)別設(shè)置為在第一次計(jì)算后跨操作持久化其值。每個(gè)RDD只能調(diào)用一次。

      //scala def persist(newLevel: StorageLevel): JavaPairRDD[K, V] //java public JavaPairRDD persist(StorageLevel newLevel)

      Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. blocking

      將RDD標(biāo)記為非持久性的,并從內(nèi)存和磁盤中刪除它的所有塊。

      //scala def unpersist(): JavaPairRDD[K, V] def persist(newLevel: StorageLevel): JavaPairRDD[K, V] //java public JavaPairRDD unpersist() public JavaPairRDD persist(StorageLevel newLevel)

      Get the RDD's current storage level, or StorageLevel.NONE if none is set.

      獲取RDD的當(dāng)前存儲(chǔ)級(jí)別,如果未設(shè)置,則獲取StorageLevel.NONE。

      //scala def getStorageLevel: StorageLevel //java public static StorageLevel getStorageLevel()

      cache和persist都是用于將一個(gè)RDD進(jìn)行緩存的,這樣在之后使用的過(guò)程中就不需要重新計(jì)算了,可以大大節(jié)省程序運(yùn)行時(shí)間。

      其中cache這個(gè)方法是一個(gè)Tranformation,當(dāng)?shù)谝淮斡龅紸ction算子的時(shí)才會(huì)進(jìn)行持久化。

      cache的源碼:

      /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist()

      persist源碼:

      /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

      persist(StorageLevel.MEMORY_ONLY)源碼:

      /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet.. */ def persist(newLevel: StorageLevel): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this }

      從源碼中可以看得出來(lái)cache內(nèi)部調(diào)用了persist方法,persist方法又調(diào)用了persist(StorageLevel.MEMORY_ONLY)方法,所以執(zhí)行cache算子其實(shí)就是執(zhí)行了persist算子且持久化級(jí)別為MEMORY_ONLY

      兩者的區(qū)別:cache只有一個(gè)默認(rèn)的緩存級(jí)別MEMORY_ONLY ,而persist可以根據(jù)情況設(shè)置其它的緩存級(jí)別。

      persist有一個(gè) StorageLevel 類型的參數(shù),這個(gè)表示的是RDD的緩存級(jí)別。

      StorageLevel 說(shuō)明(參考博客):

      object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) ...... }

      1.MEMORY_ONLY

      使用未序列化的Java對(duì)象格式,將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會(huì)進(jìn)行持久化。那么下次對(duì)這個(gè)RDD執(zhí)行算子操作時(shí),那些沒有被持久化的數(shù)據(jù),需要從源頭處重新計(jì)算一遍。這是默認(rèn)的持久化策略,使用cache()方法時(shí),實(shí)際就是使用的這種持久化策略。

      2.MEMORY_AND_DISK

      使用未序列化的Java對(duì)象格式,優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),會(huì)將數(shù)據(jù)寫入磁盤文件中,下次對(duì)這個(gè)RDD執(zhí)行算子時(shí),持久化在磁盤文件中的數(shù)據(jù)會(huì)被讀取出來(lái)使用。

      3.MEMORY_ONLY_SER

      基本含義同MEMORY_ONLY。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過(guò)多內(nèi)存導(dǎo)致頻繁GC。

      4.MEMORY_AND_DISK_SER

      基本含義同MEMORY_AND_DISK。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過(guò)多內(nèi)存導(dǎo)致頻繁GC。

      5.DISK_ONLY

      使用未序列化的Java對(duì)象格式,將數(shù)據(jù)全部寫入磁盤文件中。

      6.MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等

      對(duì)于上述任意一種持久化策略,如果加上后綴_2,代表的是將每個(gè)持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點(diǎn)上。這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯(cuò)。假如某個(gè)節(jié)點(diǎn)掛掉,節(jié)點(diǎn)的內(nèi)存或磁盤中的持久化數(shù)據(jù)丟失了,那么后續(xù)對(duì)RDD計(jì)算時(shí)還可以使用該數(shù)據(jù)在其他節(jié)點(diǎn)上的副本。如果沒有副本的話,就只能將這些數(shù)據(jù)從源頭處重新計(jì)算一遍了。

      這里列出了12種緩存級(jí)別,但這些有什么區(qū)別呢?可以看到每個(gè)緩存級(jí)別后面都跟了一個(gè)StorageLevel的構(gòu)造函數(shù),里面包含了4個(gè)或5個(gè)參數(shù),如下

      val MEMORY_ONLY = new StorageLevel(false, true, false, true)

      查看其構(gòu)造函數(shù)

      class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable { ...... def useDisk: Boolean = _useDisk def useMemory: Boolean = _useMemory def useOffHeap: Boolean = _useOffHeap def deserialized: Boolean = _deserialized def replication: Int = _replication ...... }

      【SparkAPI JAVA版】JavaPairRDD——cache、persist、unpersist、getStorageL

      可以看到StorageLevel類的主構(gòu)造器包含了5個(gè)參數(shù):

      useDisk:使用硬盤(外存)

      useMemory:使用內(nèi)存

      useOffHeap:使用堆外內(nèi)存,這是Java虛擬機(jī)里面的概念,堆外內(nèi)存意味著把內(nèi)存對(duì)象分配在Java虛擬機(jī)的堆以外的內(nèi)存,這些內(nèi)存直接受操作系統(tǒng)管理(而不是虛擬機(jī))。這樣做的結(jié)果就是能保持一個(gè)較小的堆,以減少垃圾收集對(duì)應(yīng)用的影響。

      deserialized:反序列化,其逆過(guò)程序列化(Serialization)是java提供的一種機(jī)制,將對(duì)象表示成一連串的字節(jié);而反序列化就表示將字節(jié)恢復(fù)為對(duì)象的過(guò)程。序列化是對(duì)象永久化的一種機(jī)制,可以將對(duì)象及其屬性保存起來(lái),并能在反序列化后直接恢復(fù)這個(gè)對(duì)象

      replication:備份數(shù)(在多個(gè)節(jié)點(diǎn)上備份)

      理解了這5個(gè)參數(shù),StorageLevel 的12種緩存級(jí)別就不難理解了。

      val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用這種緩存級(jí)別的RDD將存儲(chǔ)在硬盤以及內(nèi)存中,使用序列化(在硬盤中),并且在多個(gè)節(jié)點(diǎn)上備份2份(正常的RDD只有一份)

      另外還注意到有一種特殊的緩存級(jí)別

      val OFF_HEAP = new StorageLevel(false, false, true, false)

      使用了堆外內(nèi)存,StorageLevel 類的源碼中有一段代碼可以看出這個(gè)的特殊性,它不能和其它幾個(gè)參數(shù)共存。

      if (useOffHeap) { require(!useDisk, "Off-heap storage level does not support using disk") require(!useMemory, "Off-heap storage level does not support using heap memory") require(!deserialized, "Off-heap storage level does not support deserialized storage") require(replication == 1, "Off-heap storage level does not support multiple replication") }

      建議:

      默認(rèn)情況下,性能最高的當(dāng)然是MEMORY_ONLY,但前提是你的內(nèi)存必須足夠足夠大,可以綽綽有余地存放下整個(gè)RDD的所有數(shù)據(jù)。因?yàn)椴贿M(jìn)行序列化與反序列化操作,就避免了這部分的性能開銷;對(duì)這個(gè)RDD的后續(xù)算子操作,都是基于純內(nèi)存中的數(shù)據(jù)的操作,不需要從磁盤文件中讀取數(shù)據(jù),性能也很高;而且不需要復(fù)制一份數(shù)據(jù)副本,并遠(yuǎn)程傳送到其他節(jié)點(diǎn)上。但是這里必須要注意的是,在實(shí)際的生產(chǎn)環(huán)境中,恐怕能夠直接用這種策略的場(chǎng)景還是有限的,如果RDD中數(shù)據(jù)比較多時(shí)(比如幾十億),直接用這種持久化級(jí)別,會(huì)導(dǎo)致JVM的OOM內(nèi)存溢出異常。

      如果使用MEMORY_ONLY級(jí)別時(shí)發(fā)生了內(nèi)存溢出,那么建議嘗試使用MEMORY_ONLY_SER級(jí)別。該級(jí)別會(huì)將RDD數(shù)據(jù)序列化后再保存在內(nèi)存中,此時(shí)每個(gè)partition僅僅是一個(gè)字節(jié)數(shù)組而已,大大減少了對(duì)象數(shù)量,并降低了內(nèi)存占用。這種級(jí)別比MEMORY_ONLY多出來(lái)的性能開銷,主要就是序列化與反序列化的開銷。但是后續(xù)算子可以基于純內(nèi)存進(jìn)行操作,因此性能總體還是比較高的。此外,可能發(fā)生的問(wèn)題同上,如果RDD中的數(shù)據(jù)量過(guò)多的話,還是可能會(huì)導(dǎo)致OOM內(nèi)存溢出的異常。

      如果純內(nèi)存的級(jí)別都無(wú)法使用,那么建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因?yàn)榧热坏搅诉@一步,就說(shuō)明RDD的數(shù)據(jù)量很大,內(nèi)存無(wú)法完全放下。序列化后的數(shù)據(jù)比較少,可以節(jié)省內(nèi)存和磁盤的空間開銷。同時(shí)該策略會(huì)優(yōu)先盡量嘗試將數(shù)據(jù)緩存在內(nèi)存中,內(nèi)存緩存不下才會(huì)寫入磁盤。

      通常不建議使用DISK_ONLY和后綴為_2的級(jí)別:因?yàn)橥耆诖疟P文件進(jìn)行數(shù)據(jù)的讀寫,會(huì)導(dǎo)致性能急劇降低,有時(shí)還不如重新計(jì)算一次所有RDD。后綴為_2的級(jí)別,必須將所有數(shù)據(jù)都復(fù)制一份副本,并發(fā)送到其他節(jié)點(diǎn)上,數(shù)據(jù)復(fù)制以及網(wǎng)絡(luò)傳輸會(huì)導(dǎo)致較大的性能開銷,除非是要求作業(yè)的高可用性,否則不建議使用。

      unpersist表示取消緩存,刪除掉緩存塊。默認(rèn)unpersist的blocking參數(shù)是true

      getStorageLevel 獲取的是緩存存儲(chǔ)級(jí)別,例如: StorageLevel(memory, deserialized, 1 replicas)

      cache之后一定不能直接去接算子。因?yàn)閏ache后有算子的話,它每次都會(huì)重新觸發(fā)這個(gè)計(jì)算過(guò)程,從而導(dǎo)致cache失效。

      cache操作需要當(dāng)?shù)谝粋€(gè)使用到它的job執(zhí)行后才會(huì)生效,而不是cache后馬上可用,這是spark框架的延遲計(jì)算導(dǎo)致的。可能粗想起來(lái)也不會(huì)有什么問(wèn)題,但是不正確的使用unpersist操作,也可能會(huì)導(dǎo)致cache失效。如下例子所示,在action操作之前就把緩存釋放掉:

      val data = sc.textFile(“data.csv”).flatMap(_.split(“,”)).cache() val data1 = data.map(word => (word, 1)).reduceByKey(_ + _) val data2 = data.map(word => (word, word.length)).reduceByKey(_ + _) data.unpersist() val wordCount1 = data1.count() val wordCount2 = data2.count()

      如何釋放cache緩存:unpersist,它是立即執(zhí)行的。persist是lazy級(jí)別的(沒有計(jì)算),unpersist是eager級(jí)別的。RDD cache的生命周期是application級(jí)別的,也就是如果不顯示unpersist釋放緩存,RDD會(huì)一直存在(雖然當(dāng)內(nèi)存不夠時(shí)按LRU算法進(jìn)行清除),如果不正確地進(jìn)行unpersist,讓無(wú)用的RDD占用executor內(nèi)存,會(huì)導(dǎo)致資源的浪費(fèi),影響任務(wù)的效率。

      public class Cache { public static void main(String[] args) { System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1"); SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD javaRDD = sc.parallelize(Lists.newArrayList("a","b")); JavaRDD javaRDD1 = sc.parallelize(Lists.newArrayList("1","2","3")); // cache 數(shù)據(jù) javaRDD.cache(); System.out.println(javaRDD.count()); System.out.println(javaRDD.getStorageLevel()); // persist javaRDD1.persist(StorageLevel.MEMORY_AND_DISK_SER()); System.out.println(javaRDD1.count()); System.out.println(javaRDD1.getStorageLevel()); // unpersist javaRDD.unpersist(); javaRDD1.unpersist(); } }

      2 StorageLevel(memory, deserialized, 1 replicas) 3 StorageLevel(disk, memory, 1 replicas)

      EI企業(yè)智能 Java spark 可信智能計(jì)算服務(wù) TICS 智能數(shù)據(jù)

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:裝修項(xiàng)目形象進(jìn)度表(裝飾工程進(jìn)度計(jì)劃表)
      下一篇:低代碼概念低代碼行業(yè)發(fā)展怎么樣?
      相關(guān)文章
      看亚洲a级一级毛片| 亚洲午夜无码久久久久小说| 无码亚洲成a人在线观看| 亚洲一区二区观看播放| 亚洲精品天堂在线观看| 亚洲精品视频免费在线观看| 777亚洲精品乱码久久久久久| 精品亚洲综合在线第一区| 国产亚洲精品a在线无码| 国产精品久久久亚洲| 狠狠色伊人亚洲综合成人| 亚洲精品成人片在线观看精品字幕| 亚洲综合无码精品一区二区三区 | 国产亚洲福利在线视频| 国产精品亚洲专区在线观看| 亚洲校园春色另类激情| 亚洲欧洲日本在线观看| 亚洲精品欧美综合四区| 亚洲精品无码不卡在线播放| 亚洲AV无码一区二区三区性色| 蜜芽亚洲av无码一区二区三区| 相泽南亚洲一区二区在线播放| 国产精品亚洲二区在线| 亚洲国产激情一区二区三区| 久久亚洲高清综合| 亚洲国产精品一区二区第一页| 亚洲电影一区二区| 亚洲美女人黄网成人女| 亚洲乱码中文字幕小综合| 亚洲最大av资源站无码av网址| 久久久久亚洲国产AV麻豆| 亚洲电影日韩精品 | 中日韩亚洲人成无码网站| 亚洲精品无码久久久久YW| 四虎亚洲国产成人久久精品| 亚洲一区二区三区国产精品| 亚洲国产精品嫩草影院在线观看| 久久亚洲私人国产精品vA| 亚洲人成电影在线观看青青| 亚洲色偷偷色噜噜狠狠99| 亚洲Av无码国产情品久久|