快速理解spark-on-k8s中的external-shuffle-service
如果你想在kubernetes集群中運行Spark任務,那么你可能會對:如何在k8s上運行external-shuffle-service感興趣。把Driver和Executor都當做容器,丟到k8s上(k8s集群則把他們當做一般的容器,和其他業務類app一樣對待),這種模式,可以使得集群資源池歸一,避免Spark一個資源池,業務類(K8S)集群一個資源池。提升整體資源利用率,統一維護也降低運維成本。這也是Spark官方在2.3版本后為什么要支持Spark-on-k8s的主要驅動力。
1????? external-shuffle-service作用
如果想要executor數量可以動態變化,就需要依賴external-shuffle-service功能(注意這句話,因為在k8s集群中,容器啟動關閉很方便。所以非常希望executor數量可以動態調整,提升資源利用率)。
原因是在shuffle過程中,一個executor會到另一個executor那里取數據。如果一個executor節點掛掉了,那么它也就無法處理其他executor發過來的 shuffle 的數據讀取請求了,它之前生成的數據都沒有意義了。為了解決“取shuffle數據”,和“目標executor是否運行”分開。Spark引入了external-shuffle-service服務。相當于先把shuffle數據暫存到external-shuffle-service那里,然后大家去external-shuffle-service那里取就行了(有點像個中介)。
好文參考:https://zhmin.github.io/2019/08/05/spark-external-shuffle-service/
2????? 原來怎么部署
在原Spark框架中,external-shuffle-service是部署在每個節點上的。
(圖還是來自上面的那個鏈接)
(1)executor 告訴 external-shuffle-service 數據存放在哪里,然后(2) external-shuffle-service 記下來,供別人查詢。所以問題的關鍵是,數據放“哪里”支持哪些格式呢。我們看(1)里面通知是結構是長這樣:
public class RegisterExecutor extends BlockTransferMessage {
public final String appId;???????? // spark application id
public final String execId;??????? // executor id
public final ExecutorShuffleInfo executorInfo;??? // 《==文件路徑
}
可以看出來,關鍵在 “在哪里” 要看(2)長什么樣:
public class ExecutorShuffleInfo implements Encodable {
public final String[] localDirs;???????? // 《== 第一級目錄列表
public final int subDirsPerLocalDir;???? // 第二級目錄列表
public final String shuffleManager;????? // shuffleManager的類型,目前只有一種類型 SortShuffleManager
}
可以看到,這個shuffle數據 “在哪里” 只能支持HostPath(本地路徑)。
問題的關鍵就來了:executor容器跑在k8s節點上面,external-shuffle-service跑在另一個容器里面。要想共享相同Path文件,那就必須使用節點路徑(k8s-hostpath)。要用這個Hostpath 還得擁有節點的所有權,這個對于多用戶共享的K8s集群來說,權限不安全,數據未隔離。
3????? 在k8s上要怎么解決(一)
Spark的external-shuffle-service要怎么在k8s上運行,這是個問題。Spark社區關于這個有個討論:https://docs.google.com/document/d/1uCkzGGVG17oGC6BJ75TpzLAZNorvrAU3FRd2X-rVHSM/edit#heading=h.btqugnmt2h40
這個文檔主要是說:
當前external-shuffle-service的實現有缺點:(1)多個Spark應用共用一個external-shuffle-service,如果external-shuffle-service出問題,多個Spark應用都受影響,即隔離性差。(2)一個節點一個external-shuffle-service,導致不同節點間壓力不均衡。同時如果節點掛了,external-shuffle-service也就沒了,這個節點上面的所有executor都受影響,可靠性差。(3)在當前較火熱的Docker容器環境下,executor寫入的shuffle數據(在一個容器內)。不一定就能被external-shuffle-service讀取到(在另一個容器內)。因為有些k8s集群中,管理員出于安全考慮,會強制隔離不同用戶的容器,禁止任何共享。
所以提出了改進方向:即executor保存shuffle數據時,不限定非得是保存在本地Path中。
具體實現方案可以有多種。
(1)?????? 保存shuffle數據時,通過external-shuffle-service上傳的方式。
(2)?????? external-shuffle-service支持shuffle數據為遠端uri地址,而不僅僅是主機路徑。
(3)?????? 由Driver來維護所有的shuffle數據信息,取消external-shuffle-service組件。
(4)?????? 將shuffle數據保存到分布式存儲中。
(5)?????? 將shuffle數據上傳到external-shuffle-service,然后由Driver跟蹤文件路徑。
總體思路就是:以前external-shuffle-service是本地寫,遠程讀。調整為:遠程寫,遠程讀。
4????? 在k8s上要怎么解決(二)
其實要在k8s上實現executor數量動態調整(dynamic resource allocation),還有另一條小路(即不通過external-shuffle-service的方式)。并且這條路已經實現了,在這個PR里面。https://github.com/apache/spark/pull/24817
?? 實現原理:
當發現executor里面是shuffle數據沒有用了,則可以刪除該executor。如果這個executor里面的shuffle數據,還會被其他Jop讀取,那么就保持這個executor存活著不被刪除。從而實現executor數量可以動態調整。
?? 缺點:
可以看出來,這種方式其實是一種緩兵之計。(1)刪除部分暫時不被使用executor,但是必須保留那些還會被使用的executor。所以動態效果并不是最優的。另外,(2)一個executor也許最近不被使用,被刪除了。但是后續其他Stage又有可能去訪問那個shuffle數據。結果發現找不到(被動態刪除嘛),這個時候又得重新計算,浪費性能。
PR里面的討論也說了,這個是無法用來完整替代external-shuffle-service的。
5????? 路標計劃
通過上面的分析,基本了解了在k8s上面跑external-shuffle-service的困難和思路。
所以要達到目的的路徑為:(1)external-shuffle-service支持遠端保存shuffle數據。(2)executor和external-shuffle-service共享云端shuffle數據。(3)executor數量可以動態調整,不影響功能。(4)在k8s上支持了executor數量動態調整(dynamic resource allocation)。
看Spark的規劃是在 3.0.0 版本提供完整能力,嗯,讓我們期待Spark on K8s越來越溜吧。
https://issues.apache.org/jira/browse/SPARK-24432
Spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。