Spark-Redis工作篇:執(zhí)行海量數(shù)據(jù)插入、查詢(xún)作業(yè)時(shí)碰到的問(wèn)題
前一篇博客介紹了Spark-Redis入門(mén)篇:包括一些基礎(chǔ)概念和重要的類(lèi)、方法。Spark-Redis是用Spark在redis上面進(jìn)行讀寫(xiě)數(shù)據(jù)操作的包。其支持redis的所有數(shù)據(jù)結(jié)構(gòu):String(字符串), Hash(哈希), List(列表), Set and Sorted Set(集合和有序集合)。此模塊既可以用于Redis的standalone模式,也可用于集群情況。由于redis是基于內(nèi)存的數(shù)據(jù)庫(kù),穩(wěn)定性并不是很高,尤其是standalone模式下的redis。于是工作中在使用Spark-Redis時(shí)也會(huì)碰到很多問(wèn)題,尤其是執(zhí)行海量數(shù)據(jù)插入與查詢(xún)的場(chǎng)景中。
海量數(shù)據(jù)查詢(xún)
Redis是基于內(nèi)存讀取的數(shù)據(jù)庫(kù),相比其它的數(shù)據(jù)庫(kù),Redis的讀取速度會(huì)更快。但是當(dāng)我們要查詢(xún)上千萬(wàn)條的海量數(shù)據(jù)時(shí),即使是Redis也需要花費(fèi)較長(zhǎng)時(shí)間。這時(shí)候如果我們想要終止select作業(yè)的執(zhí)行,我們希望的是所有的running task立即killed。
Spark是有作業(yè)調(diào)度機(jī)制的。SparkContext是Spark的入口,相當(dāng)于應(yīng)用程序的main函數(shù)。SparkContext中的cancelJobGroup函數(shù)可以取消正在運(yùn)行的job。
/** ??*?Cancel?active?jobs?for?the?specified?group.?See?`org.apache.spark.SparkContext.setJobGroup` ??*?for?more?information. ??*/ ?def?cancelJobGroup(groupId:?String)?{ ???assertNotStopped() ???dagScheduler.cancelJobGroup(groupId) ?}
按理說(shuō)取消job之后,job下的所有task應(yīng)該也終止。而且當(dāng)我們?nèi)∠鹲elect作業(yè)時(shí),executor會(huì)throw TaskKilledException,而這個(gè)時(shí)候負(fù)責(zé)task作業(yè)的TaskContext在捕獲到該異常之后,會(huì)執(zhí)行killTaskIfInterrupted。
//?If?this?task?has?been?killed?before?we?deserialized?it,?let's?quit?now.?Otherwise, ?//?continue?executing?the?task. ?val?killReason?=?reasonIfKilled ?if?(killReason.isDefined)?{ ???//?Throw?an?exception?rather?than?returning,?because?returning?within?a?try{}?block ???//?causes?a?NonLocalReturnControl?exception?to?be?thrown.?The?NonLocalReturnControl ???//?exception?will?be?caught?by?the?catch?block,?leading?to?an?incorrect?ExceptionFailure ???//?for?the?task. ???throw?new?TaskKilledException(killReason.get) ?}
/** ?*?If?the?task?is?interrupted,?throws?TaskKilledException?with?the?reason?for?the?interrupt. ?*/ ?private[spark]?def?killTaskIfInterrupted():?Unit
但是Spark-Redis中還是會(huì)出現(xiàn)終止作業(yè)但是task仍然running。因?yàn)閠ask的計(jì)算邏輯最終是在RedisRDD中實(shí)現(xiàn)的,RedisRDD的compute會(huì)從Jedis中取獲取keys。所以說(shuō)要解決這個(gè)問(wèn)題,應(yīng)該在RedisRDD中取消正在running的task。這里有兩種方法:
方法一:參考Spark的JDBCRDD,定義close(),結(jié)合InterruptibleIterator。
def?close()?{ ???if?(closed)?return ???try?{ ?????if?(null?!=?rs)?{ ???????rs.close() ?????} ???}?catch?{ ?????case?e:?Exception?=>?logWarning("Exception?closing?resultset",?e) ???} ???try?{ ?????if?(null?!=?stmt)?{ ???????stmt.close() ?????} ???}?catch?{ ?????case?e:?Exception?=>?logWarning("Exception?closing?statement",?e) ???} ???try?{ ?????if?(null?!=?conn)?{ ???????if?(!conn.isClosed?&&?!conn.getAutoCommit)?{ ?????????try?{ ???????????conn.commit() ?????????}?catch?{ ???????????case?NonFatal(e)?=>?logWarning("Exception?committing?transaction",?e) ?????????} ???????} ???????conn.close() ?????} ?????logInfo("closed?connection") ???}?catch?{ ?????case?e:?Exception?=>?logWarning("Exception?closing?connection",?e) ???} ???closed?=?true ?} ? ?context.addTaskCompletionListener{?context?=>?close()?}? CompletionIterator[InternalRow,?Iterator[InternalRow]]( ???new?InterruptibleIterator(context,?rowsIterator),?close())
方法二:異步線程執(zhí)行compute,主線程中判斷task isInterrupted
try{ ???val?thread?=?new?Thread()?{ ?????override?def?run():?Unit?=?{ ???????try?{ ??????????keys?=?doCall ???????}?catch?{ ?????????case?e?=> ???????????logWarning(s"execute?http?require?failed.") ???????} ???????isRequestFinished?=?true ?????} ???} ? ???//?control?the?http?request?for?quite?if?user?interrupt?the?job ???thread.start() ???while?(!context.isInterrupted()?&&?!isRequestFinished)?{ ?????Thread.sleep(GetKeysWaitInterval) ???} ???if?(context.isInterrupted()?&&?!isRequestFinished)?{ ?????logInfo(s"try?to?kill?task?${context.getKillReason()}") ?????context.killTaskIfInterrupted() ???} ???thread.join() ???CompletionIterator[T,?Iterator[T]]( ?????new?InterruptibleIterator(context,?keys),?close)
我們可以異步線程來(lái)執(zhí)行compute,然后在另外的線程中判斷是否task isInterrupted,如果是的話(huà)就執(zhí)行TaskContext的killTaskIfInterrupted。防止killTaskIfInterrupted無(wú)法殺掉task,再結(jié)合InterruptibleIterator:一種迭代器,以提供任務(wù)終止功能。通過(guò)檢查[TaskContext]中的中斷標(biāo)志來(lái)工作。
海量數(shù)據(jù)插入
我們都已經(jīng)redis的數(shù)據(jù)是保存在內(nèi)存中的。當(dāng)然Redis也支持持久化,可以將數(shù)據(jù)備份到硬盤(pán)中。當(dāng)插入海量數(shù)據(jù)時(shí),如果Redis的內(nèi)存不夠的話(huà),很顯然會(huì)丟失部分?jǐn)?shù)據(jù)。這里讓使用者困惑的點(diǎn)在于: 當(dāng)Redis已使用內(nèi)存大于最大可用內(nèi)存時(shí),Redis會(huì)報(bào)錯(cuò):command not allowed when used memory > ‘maxmemory’。但是當(dāng)insert job的數(shù)據(jù)大于Redis的可用內(nèi)存時(shí),部分?jǐn)?shù)據(jù)丟失了,并且還沒(méi)有任何報(bào)錯(cuò)。
因?yàn)椴还苁荍edis客戶(hù)端還是Redis服務(wù)器,當(dāng)插入數(shù)據(jù)時(shí)內(nèi)存不夠,不會(huì)插入成功,但也不會(huì)返回任何response。所以目前能想到的解決辦法就是當(dāng)insert數(shù)據(jù)丟失時(shí),擴(kuò)大Redis內(nèi)存。
總結(jié)
Spark-Redis是一個(gè)應(yīng)用還不是很廣泛的開(kāi)源項(xiàng)目,不像Spark JDBC那樣已經(jīng)商業(yè)化。所以Spark-Redis還是存在很多問(wèn)題。相信隨著commiter的努力,Spark-Redis也會(huì)越來(lái)越強(qiáng)大。
EI企業(yè)智能 智能數(shù)據(jù) 數(shù)據(jù)湖探索 DLI HUAWEI CONNECT
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶(hù)投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。