Java的面向?qū)ο缶幊?/a>">Java的面向?qū)ο缶幊?/a>
694
2025-04-02
這四個(gè)方法都和緩存有關(guān),所以寫在一塊。
Persist this RDD with the default storage level (MEMORY_ONLY).
cache就是在內(nèi)存中緩存數(shù)據(jù),其實(shí)也是使用的persist。使用非序列化的方式將RDD的數(shù)據(jù)全部嘗試持久化到內(nèi)存中,cache()只是一個(gè)transformtion,是lazy的,必須通過(guò)一個(gè)action觸發(fā),才能真正的將該RDD cache到內(nèi)存中。
//scala def cache(): JavaPairRDD[K, V] //java public JavaPairRDD
Set this RDD's storage level to persist its values across operations after the first time it is computed. Can only be called once on each RDD.
將此RDD的存儲(chǔ)級(jí)別設(shè)置為在第一次計(jì)算后跨操作持久化其值。每個(gè)RDD只能調(diào)用一次。
//scala def persist(newLevel: StorageLevel): JavaPairRDD[K, V] //java public JavaPairRDD
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. blocking
將RDD標(biāo)記為非持久性的,并從內(nèi)存和磁盤中刪除它的所有塊。
//scala def unpersist(): JavaPairRDD[K, V] def persist(newLevel: StorageLevel): JavaPairRDD[K, V] //java public JavaPairRDD
Get the RDD's current storage level, or StorageLevel.NONE if none is set.
獲取RDD的當(dāng)前存儲(chǔ)級(jí)別,如果未設(shè)置,則獲取StorageLevel.NONE。
//scala def getStorageLevel: StorageLevel //java public static StorageLevel getStorageLevel()
cache和persist都是用于將一個(gè)RDD進(jìn)行緩存的,這樣在之后使用的過(guò)程中就不需要重新計(jì)算了,可以大大節(jié)省程序運(yùn)行時(shí)間。
其中cache這個(gè)方法是一個(gè)Tranformation,當(dāng)?shù)谝淮斡龅紸ction算子的時(shí)才會(huì)進(jìn)行持久化。
cache的源碼:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist()
persist源碼:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
persist(StorageLevel.MEMORY_ONLY)源碼:
/** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet.. */ def persist(newLevel: StorageLevel): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this }
從源碼中可以看得出來(lái)cache內(nèi)部調(diào)用了persist方法,persist方法又調(diào)用了persist(StorageLevel.MEMORY_ONLY)方法,所以執(zhí)行cache算子其實(shí)就是執(zhí)行了persist算子且持久化級(jí)別為MEMORY_ONLY
兩者的區(qū)別:cache只有一個(gè)默認(rèn)的緩存級(jí)別MEMORY_ONLY ,而persist可以根據(jù)情況設(shè)置其它的緩存級(jí)別。
persist有一個(gè) StorageLevel 類型的參數(shù),這個(gè)表示的是RDD的緩存級(jí)別。
StorageLevel 說(shuō)明(參考博客):
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) ...... }
1.MEMORY_ONLY
使用未序列化的Java對(duì)象格式,將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會(huì)進(jìn)行持久化。那么下次對(duì)這個(gè)RDD執(zhí)行算子操作時(shí),那些沒有被持久化的數(shù)據(jù),需要從源頭處重新計(jì)算一遍。這是默認(rèn)的持久化策略,使用cache()方法時(shí),實(shí)際就是使用的這種持久化策略。
2.MEMORY_AND_DISK
使用未序列化的Java對(duì)象格式,優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),會(huì)將數(shù)據(jù)寫入磁盤文件中,下次對(duì)這個(gè)RDD執(zhí)行算子時(shí),持久化在磁盤文件中的數(shù)據(jù)會(huì)被讀取出來(lái)使用。
3.MEMORY_ONLY_SER
基本含義同MEMORY_ONLY。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過(guò)多內(nèi)存導(dǎo)致頻繁GC。
4.MEMORY_AND_DISK_SER
基本含義同MEMORY_AND_DISK。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過(guò)多內(nèi)存導(dǎo)致頻繁GC。
5.DISK_ONLY
使用未序列化的Java對(duì)象格式,將數(shù)據(jù)全部寫入磁盤文件中。
6.MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等
對(duì)于上述任意一種持久化策略,如果加上后綴_2,代表的是將每個(gè)持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點(diǎn)上。這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯(cuò)。假如某個(gè)節(jié)點(diǎn)掛掉,節(jié)點(diǎn)的內(nèi)存或磁盤中的持久化數(shù)據(jù)丟失了,那么后續(xù)對(duì)RDD計(jì)算時(shí)還可以使用該數(shù)據(jù)在其他節(jié)點(diǎn)上的副本。如果沒有副本的話,就只能將這些數(shù)據(jù)從源頭處重新計(jì)算一遍了。
這里列出了12種緩存級(jí)別,但這些有什么區(qū)別呢?可以看到每個(gè)緩存級(jí)別后面都跟了一個(gè)StorageLevel的構(gòu)造函數(shù),里面包含了4個(gè)或5個(gè)參數(shù),如下
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
查看其構(gòu)造函數(shù)
class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable { ...... def useDisk: Boolean = _useDisk def useMemory: Boolean = _useMemory def useOffHeap: Boolean = _useOffHeap def deserialized: Boolean = _deserialized def replication: Int = _replication ...... }
可以看到StorageLevel類的主構(gòu)造器包含了5個(gè)參數(shù):
useDisk:使用硬盤(外存)
useMemory:使用內(nèi)存
useOffHeap:使用堆外內(nèi)存,這是Java虛擬機(jī)里面的概念,堆外內(nèi)存意味著把內(nèi)存對(duì)象分配在Java虛擬機(jī)的堆以外的內(nèi)存,這些內(nèi)存直接受操作系統(tǒng)管理(而不是虛擬機(jī))。這樣做的結(jié)果就是能保持一個(gè)較小的堆,以減少垃圾收集對(duì)應(yīng)用的影響。
deserialized:反序列化,其逆過(guò)程序列化(Serialization)是java提供的一種機(jī)制,將對(duì)象表示成一連串的字節(jié);而反序列化就表示將字節(jié)恢復(fù)為對(duì)象的過(guò)程。序列化是對(duì)象永久化的一種機(jī)制,可以將對(duì)象及其屬性保存起來(lái),并能在反序列化后直接恢復(fù)這個(gè)對(duì)象
replication:備份數(shù)(在多個(gè)節(jié)點(diǎn)上備份)
理解了這5個(gè)參數(shù),StorageLevel 的12種緩存級(jí)別就不難理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用這種緩存級(jí)別的RDD將存儲(chǔ)在硬盤以及內(nèi)存中,使用序列化(在硬盤中),并且在多個(gè)節(jié)點(diǎn)上備份2份(正常的RDD只有一份)
另外還注意到有一種特殊的緩存級(jí)別
val OFF_HEAP = new StorageLevel(false, false, true, false)
使用了堆外內(nèi)存,StorageLevel 類的源碼中有一段代碼可以看出這個(gè)的特殊性,它不能和其它幾個(gè)參數(shù)共存。
if (useOffHeap) { require(!useDisk, "Off-heap storage level does not support using disk") require(!useMemory, "Off-heap storage level does not support using heap memory") require(!deserialized, "Off-heap storage level does not support deserialized storage") require(replication == 1, "Off-heap storage level does not support multiple replication") }
建議:
默認(rèn)情況下,性能最高的當(dāng)然是MEMORY_ONLY,但前提是你的內(nèi)存必須足夠足夠大,可以綽綽有余地存放下整個(gè)RDD的所有數(shù)據(jù)。因?yàn)椴贿M(jìn)行序列化與反序列化操作,就避免了這部分的性能開銷;對(duì)這個(gè)RDD的后續(xù)算子操作,都是基于純內(nèi)存中的數(shù)據(jù)的操作,不需要從磁盤文件中讀取數(shù)據(jù),性能也很高;而且不需要復(fù)制一份數(shù)據(jù)副本,并遠(yuǎn)程傳送到其他節(jié)點(diǎn)上。但是這里必須要注意的是,在實(shí)際的生產(chǎn)環(huán)境中,恐怕能夠直接用這種策略的場(chǎng)景還是有限的,如果RDD中數(shù)據(jù)比較多時(shí)(比如幾十億),直接用這種持久化級(jí)別,會(huì)導(dǎo)致JVM的OOM內(nèi)存溢出異常。
如果使用MEMORY_ONLY級(jí)別時(shí)發(fā)生了內(nèi)存溢出,那么建議嘗試使用MEMORY_ONLY_SER級(jí)別。該級(jí)別會(huì)將RDD數(shù)據(jù)序列化后再保存在內(nèi)存中,此時(shí)每個(gè)partition僅僅是一個(gè)字節(jié)數(shù)組而已,大大減少了對(duì)象數(shù)量,并降低了內(nèi)存占用。這種級(jí)別比MEMORY_ONLY多出來(lái)的性能開銷,主要就是序列化與反序列化的開銷。但是后續(xù)算子可以基于純內(nèi)存進(jìn)行操作,因此性能總體還是比較高的。此外,可能發(fā)生的問(wèn)題同上,如果RDD中的數(shù)據(jù)量過(guò)多的話,還是可能會(huì)導(dǎo)致OOM內(nèi)存溢出的異常。
如果純內(nèi)存的級(jí)別都無(wú)法使用,那么建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因?yàn)榧热坏搅诉@一步,就說(shuō)明RDD的數(shù)據(jù)量很大,內(nèi)存無(wú)法完全放下。序列化后的數(shù)據(jù)比較少,可以節(jié)省內(nèi)存和磁盤的空間開銷。同時(shí)該策略會(huì)優(yōu)先盡量嘗試將數(shù)據(jù)緩存在內(nèi)存中,內(nèi)存緩存不下才會(huì)寫入磁盤。
通常不建議使用DISK_ONLY和后綴為_2的級(jí)別:因?yàn)橥耆诖疟P文件進(jìn)行數(shù)據(jù)的讀寫,會(huì)導(dǎo)致性能急劇降低,有時(shí)還不如重新計(jì)算一次所有RDD。后綴為_2的級(jí)別,必須將所有數(shù)據(jù)都復(fù)制一份副本,并發(fā)送到其他節(jié)點(diǎn)上,數(shù)據(jù)復(fù)制以及網(wǎng)絡(luò)傳輸會(huì)導(dǎo)致較大的性能開銷,除非是要求作業(yè)的高可用性,否則不建議使用。
unpersist表示取消緩存,刪除掉緩存塊。默認(rèn)unpersist的blocking參數(shù)是true
getStorageLevel 獲取的是緩存存儲(chǔ)級(jí)別,例如: StorageLevel(memory, deserialized, 1 replicas)
cache之后一定不能直接去接算子。因?yàn)閏ache后有算子的話,它每次都會(huì)重新觸發(fā)這個(gè)計(jì)算過(guò)程,從而導(dǎo)致cache失效。
cache操作需要當(dāng)?shù)谝粋€(gè)使用到它的job執(zhí)行后才會(huì)生效,而不是cache后馬上可用,這是spark框架的延遲計(jì)算導(dǎo)致的。可能粗想起來(lái)也不會(huì)有什么問(wèn)題,但是不正確的使用unpersist操作,也可能會(huì)導(dǎo)致cache失效。如下例子所示,在action操作之前就把緩存釋放掉:
val data = sc.textFile(“data.csv”).flatMap(_.split(“,”)).cache() val data1 = data.map(word => (word, 1)).reduceByKey(_ + _) val data2 = data.map(word => (word, word.length)).reduceByKey(_ + _) data.unpersist() val wordCount1 = data1.count() val wordCount2 = data2.count()
如何釋放cache緩存:unpersist,它是立即執(zhí)行的。persist是lazy級(jí)別的(沒有計(jì)算),unpersist是eager級(jí)別的。RDD cache的生命周期是application級(jí)別的,也就是如果不顯示unpersist釋放緩存,RDD會(huì)一直存在(雖然當(dāng)內(nèi)存不夠時(shí)按LRU算法進(jìn)行清除),如果不正確地進(jìn)行unpersist,讓無(wú)用的RDD占用executor內(nèi)存,會(huì)導(dǎo)致資源的浪費(fèi),影響任務(wù)的效率。
public class Cache { public static void main(String[] args) { System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1"); SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD
2 StorageLevel(memory, deserialized, 1 replicas) 3 StorageLevel(disk, memory, 1 replicas)
EI企業(yè)智能 Java spark 可信智能計(jì)算服務(wù) TICS 智能數(shù)據(jù)
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。