解析分布式應用框架Ray架構源碼 -2

      網友投稿 1214 2022-05-29

      Task的lifetime

      Owner負責確保提交的Task的執行,并促進將返回的ObjectRef解析為其基礎值。如下圖,提交Task的進程被視為結果的Owner,并負責從raylet獲取資源以執行Task,Driver擁有A的結果,Worker 1擁有B的結果。

      提交Task時,Owner會等待所有依賴項就緒,即作為參數傳遞給Task的ObjectRefs(請參見Object的lifetime)變得可用。依賴項不需要是本地的;Owner一旦認為依賴項在群集中的任何地方可用,就會立即就緒。當依賴關系就緒時,Owner從分布式調度程序請求資源以執行任務,一旦資源可用,調度程序就會授予請求,并使用分配給owner的worker的地址進行響應。

      Owner將task spec通過gRPC發送給租用的worker來調度任務。執行任務后,worker必須存儲返回值。如果返回值較小,則工作線程將值直接inline返回給Owner,Owner將其復制到其進程中對象存儲區。如果返回值很大,則worker將對象存儲在其本地共享內存存儲中,并向所有者返回分布式內存中的ref。讓owner可以引用對象,不必將對象提取到其本地節點。

      當Task以ObjectRef作為其參數提交時,必須在worker開始執行之前解析對象值。如果該值較小,則它將直接從所有者的進程中對象存儲復制到任務說明中,在任務說明中,執行worker線程可以引用它。如果該值較大,則必須從分布式內存中提取對象,以便worker在其本地共享內存存儲中具有副本。scheduler通過查找對象的位置并從其他節點請求副本來協調此對象傳輸。

      容錯:任務可能會以錯誤結束。Ray區分了兩種類型的任務錯誤:

      應用程序級。這是工作進程處于活動狀態,但任務以錯誤結束的任何場景。例如,在Python中拋出IndexError的任務。

      系統級。這是工作進程意外死亡的任何場景。例如,隔離故障的進程,或者如果工作程序的本地raylet死亡。

      由于應用程序級錯誤而失敗的任務永遠不會重試。異常被捕獲并存儲為任務的返回值。由于系統級錯誤而失敗的任務可以自動重試到指定的嘗試次數。

      代碼參考:

      src/ray/core_worker/core_worker.cc

      src/ray/common/task/task_spec.h

      src/ray/core_worker/transport/direct_task_transport.cc

      src/ray/core_worker/transport/依賴關系_解析器.cc

      src/ray/core_worker/task_manager.cc

      src/ray/protobuf/common.proto

      Object的lifetime

      下圖Ray中的分布式內存管理。worker可以創建和獲取對象。owner負責確定對象何時安全釋放。

      對象的owner就是通過提交創建task或調用ray.put創建初始ObjectRef的worker。owner管理對象的生存期。Ray保證,如果owner是活的,對象最終可能會被解析為其值(或者在worker失敗的情況下引發錯誤)。如果owner已死亡,則獲取對象值的嘗試永遠不會hang,但可能會引發異常,即使對象仍有物理副本。

      每個worker存儲其擁有的對象的引用計數。有關如何跟蹤引用的詳細信息,請參閱引用計數。Reference僅在下面兩種操作期間計算:

      1.將ObjectRef或包含ObjectRef的對象作為參數傳遞給Task。

      2.從Task中返回ObjectRef或包含ObjectRef的對象。

      對象可以存儲在owner的進程內內存存儲中,也可以存儲在分布式對象存儲中。此決定旨在減少每個對象的內存占用空間和解析時間。

      當沒有故障時,owner保證,只要對象仍在作用域中(非零引用計數),對象的至少一個副本最終將可用。。

      有兩種方法可以將ObjectRef解析為其值:

      1.在ObjectRef上調用ray.get。

      2.將ObjectRef作為參數傳遞給任務。執行工作程序將解析ObjectRefs,并將任務參數替換為解析的值。

      當對象較小時,可以通過直接從owner的進程內存儲中檢索它來解析。大對象存儲在分布式對象存儲中,必須使用分布式協議解析。

      當沒有故障時,解析將保證最終成功(但可能會引發應用程序級異常,例如worker segfault)。如果存在故障,解析可能會引發系統級異常,但永遠不會掛起。如果對象存儲在分布式內存中,并且對象的所有副本都因raylet故障而丟失,則該對象可能會失敗。Ray還提供了一個選項,可以通過重建自動恢復此類丟失的對象。如果對象的所有者進程死亡,對象也可能失敗。

      代碼參考:

      src/ray/core_worker/store_Provider/memory_store/memory_store.cc

      src/ray/core_worker/store_Provider/plasma_store_provider.cc

      src/ray/core_worker/reference_count.cc

      src/ray/object_manager/object_manager.cc

      Actor的lifetime

      Actor的lifetimes和metadata (如IP和端口)是由GCS service管理的.每一個Actor的Client都會在本地緩存metadata,使用metadata通過gRPC將task發送給Actor.

      如上圖,與Task提交不同,Task提交完全分散并由Task Owner管理,Actor lifetime由GCS服務集中管理。

      在Python中創建Actor時,worker首先同步向GCS注冊Actor。這確保了在創建Actor之前,如果創建worker失敗的情況下的正確性。一旦GCS響應,Actor創建過程的其余部分將是異步的。Worker進程在創建一個稱為Actor創建Task的特殊Task隊列。這與普通的非Actor任務類似,只是其指定的資源是在actor進程的生存期內獲取的。創建者異步解析actor創建task的依賴關系,然后將其發送到要調度的GCS服務。同時,創建actor的Python調用立即返回一個“actor句柄”,即使actor創建任務尚未調度,也可以使用該句柄。

      Actor的任務執行與普通Task 類似:它們返回futures,通過gRPC直接提交給actor進程,在解析所有ObjectRef依賴關系之前,不會運行。和普通Task主要有兩個區別:

      執行Actor任務不需要從調度器獲取資源。這是因為在計劃其創建任務時,參與者已在其生命周期內獲得資源。

      對于Actor的每個調用者,任務的執行順序與提交順序相同。

      當Actor的創建者退出時,或者群集中的作用域中沒有更多掛起的任務或句柄時,將被清理。不過對于detached Actor來說不是這樣的,因為detached actor被設計為可以通過名稱引用的長Actor,必須使用ray.kill(no_restart=True)顯式清理。

      Ray還支持async actor,這些Actor可以使用asyncio event loop并發運行任務。從調用者的角度來看,向這些actor提交任務與向常規actor提交任務相同。唯一的區別是,當task在actor上運行時,它將發布到在后臺線程或線程池中運行的異步事件循環中,而不是直接在主線程上運行。

      代碼參考:

      src/ray/core_worker/core_worker.cc

      src/ray/core_worker/transport/direct_actor_transport.cc

      src/ray/gcs/gcs_server/gcs_actor_manager.cc

      src/ray/gcs/gcs_server/gcs_actor_scheduler.cc

      src/ray/protobuf/core_worker.proto

      故障模型

      Ray工作節點設計的是完全同構,因此任何單個節點都可能丟失,而不會導致整個群集崩潰。當前的例外是頭節點,因為它承載的GCS目前未做高可用。

      所有節點都被分配一個唯一的標識符,并通過心跳相互通信。GCS負責群集的成員管理,如哪些節點當前處于活動狀態。GCS會對任何超時的節點ID進行處理,在該節點上使用不同的節點ID啟動新的raylet,以便重用物理資源。如果一個alive的raylet超時,會立刻退出。節點的故障檢測當前不處理網絡分區:如果工作節點與GCS所在分區隔離了,它就會超時并標記為已死。

      每個raylet向GCS報告所有本地worker的death事件。GCS廣播這些失敗事件,并使用它們來處理Actor death。所有worker進程都與其節點上的raylet fate-share。

      raylet負責防止單個工作進程故障后群集資源和系統狀態的泄漏。對于失敗的工作進程(本地或遠程),每個raylet負責:

      釋放任務執行所需的集群資源,如CPU。這是通過kill 失敗的worker 進程。Fail 的worker 提出的任何未完成的資源請求也將被取消。

      釋放用于該worker 擁有的對象的任何分布式對象存儲內存(請參見內存管理)。同時也會清理對象目錄中的關聯entries。

      系統故障模型意味著Ray中的任務和對象將與其owner共享命運。例如,如果運行a的worker在此場景中失敗,則將收集在其子樹中創建的任何對象和任務(灰色的b和z)。如果b是在a’的子樹中創建的actor,情況也是如此。主要影響是:

      如果試圖獲取此類對象的值,任何其他實時進程都將收到應用程序級異常。例如,如果在上述場景中z ObjectRef已傳遞回driver,則driver將在ray.get(z)上收到錯誤。

      通過修改程序將不同的任務放置在不同的子樹(即通過嵌套函數調用),可以將故障隔離。

      應用程序將與driver共享命運,driver是所有ownership tree的根。

      避免fate-shareing行為的選項是使用detached actor,該actor可能會超過其原始driver的生存期,并且只能通過程序的顯式調用銷毀。detached actor本身可以擁有任何其他任務和對象,一旦被摧毀,這些任務和對象將與actor分享命運。

      后續會支持對象溢出,這將允許對象在其所有者的生命周期內持續存在。

      Ray提供了一些選項來幫助透明恢復,包括自動任務重試、參與者重新啟動和對象重建。

      Object 管理

      進程內存儲 VS 分布式對象存儲,上圖描述了提交依賴于對象(x)的任務(a)時分配內存的方式的差異。

      在Ray中小對象存儲在其所有者的進程內存儲中,而大對象存儲在分布式對象存儲中。這個設計主要是為了減少每個對象的內存占用空間和解析時間。在后一種情況下,進程中存儲中會保存一個占位符,以指示該對象已提升到共享內存。

      進程內存儲中的對象可以通過直接內存副本快速解析,但由于額外的副本,許多進程引用時可能會占用更高的內存。單個worker中存儲的容量也僅限于該計算機的內存容量,限制了在任何給定時間可以引用的此類對象的總數。對于多次引用的對象,吞吐量也可能受到所有者進程的處理能力的限制。

      分布式對象存儲中的對象的解析需要至少一個IPC從worker到worker的本地共享內存存儲。如果worker的本地共享內存存儲尚未包含對象的副本,則可能需要額外的RPC。另一方面,由于共享內存存儲是用共享內存實現的,因此同一節點上的多個工作進程可以引用對象的同一副本。如果對象可以用零副本反序列化,這可以減少總體內存占用。使用分布式內存還允許進程引用對象,而不使對象本地,這意味著進程可以引用總大小超過單個計算機內存容量的對象。最后,吞吐量可以隨著分布式對象存儲中的節點數量而擴展,因為對象的多個副本可能存儲在不同的節點上。

      參考代碼:

      src/ray/core_worker/store_provider/memory_store/memory_store.cc

      src/ray/core_worker/store_provider/plasma_store_provider.cc

      src/ray/common/buffer.h

      src/ray/protobuf/object_manager.proto

      Object 解析

      對象的值可以使用ObjectRef解析,ObjectRef包括兩個字段:

      唯一的20字節標識符。這是生成對象的任務ID和該任務迄今創建的整數對象的級聯。

      對象所有者(worker)的地址。這包括工作進程的唯一ID、IP地址和端口以及本地Raylet的唯一ID。

      小對象可以通過直接從所有者的進程內存儲中復制來解析。例如,如果所有者調用ray.get,系統將從本地進程內存儲查找并反序列化值。如果所有者提交了從屬任務,它將通過將值直接復制到任務描述中來內聯對象。請注意,這些對象是所有者進程的本地對象:如果借用者嘗試解析值,則對象將升級到共享內存,在共享內存中,可以通過下面描述的分布式對象解析協議檢索它。

      上圖為大對象解析,對象x最初是在節點2上創建的(例如,返回值的任務在該節點上運行),所有者(任務的調用者)調用ray.get時的步驟:

      查找對象在GCS中的位置。

      選擇位置并發送對象副本的請求。

      接收對象。

      大對象存儲在分布式對象存儲中,必須使用分布式協議解析,如果對象已存儲在引用持有者的本地共享內存存儲中,則引用持有者可以通過IPC檢索對象。這將返回一個指向共享內存的指針,該指針可能會被同一節點上的其他工作線程同時引用。

      如果對象在本地共享內存存儲中不可用,則引用持有者通知其本地raylet,然后后者嘗試從遠程raylet獲取副本。raylet從對象目錄中查找位置并請求其中一個raylet傳輸對象。

      參考代碼:

      src/ray/common/id.h

      解析:分布式應用框架Ray架構源碼 -2

      src/ray/object_manager/object_directory.h

      內存管理

      遠程任務的對象值由執行的worker計算。如果值較小,工作線程將直接將值回復給owner,該值將復制到owner的進程中存儲;如果該值較大,則執行工作程序將該值存儲在其本地共享內存存儲中,共享內存對象的此初始副本稱為主副本。

      如上圖:主副本vs可驅逐副本。主副本(節點2)不能被刪除, 但是,節點1(通過ray.get創建)和節點3(通過任務提交創建)上的副本可以在內存壓力下被刪除。

      主副本是唯一的,因為只要對象的所有者引用計數大于0,它就不會被刪除,這與對象的其他副本形成了鮮明對比,后者可能會在本地內存壓力下被LRU淘汰刪除。因此,如果單個對象存儲包含所有主副本,占用內存容量,另一個對象試圖存儲的時候應用程序可能會收到OutOfMemoryError。

      在大多數情況下,主副本是要創建對象的第一個副本。如果初始副本因故障而丟失,owner將嘗試根據對象的可用位置指定新的主副本。

      一旦對象引用計數變為0,對象的所有副本最終將自動垃圾收集。owner將立即從進程中存儲中刪除小對象,大對象由Raylet異步從分布式對象存儲中擦除。

      Raylet還管理分布式對象傳輸,該傳輸根據對象當前需要的位置創建對象的額外副本,例如,如果依賴于對象的任務被調度到遠程節點。

      引用計數

      每個工作進程存儲其擁有的每個對象的引用計數, owner的本地引用計數包括本地Python引用計數和owner提交的依賴于對象的掛起任務數, 當Python ObjectRef被釋放時,前者將遞減, 當依賴于對象的任務成功完成時,后者將遞減(請注意,以應用程序級異常結束的任務算作成功)。

      ObjectRefs也可以通過將它們存儲在另一個對象中來復制到另一個進程, 接收ObjectRef副本的進程稱為借用者。例如:

      @ray.remote def temp_borrow(obj_refs): # Can use obj_refs temporarily as if I am the owner. x = ray.get(obj_refs[0]) @ray.remote class Borrower: def borrow(self, obj_refs): self.x = obj_refs[0] x_ref = foo.remote() temp_borrow.remote([x_ref]) # Passing x_ref in a list will allow `borrow` to run before the value is ready. b = Borrower.remote() b.borrow.remote([x_ref]) # x_ref can also be borrowed permanently by an actor

      如果worker仍在借用任何引用,owner將worker的ID添加到本地borrowers列表中。borrower保留第二個本地引用計數,類似于owner,一旦borrower的本地引用計數變為0,owner要求borrower響應。此時,owner可以從borrower列表中刪除工人并收集對象。在上面的示例中,Borrower actor正在永久借用引用,因此在 Borrower actor本身超出范圍或死亡之前,owner不會釋放對象。

      borrower也可以遞歸地添加到owner列表中。如果borrower本身將ObjectRef傳遞給另一個進程,則會發生這種情況。在這種情況下,當borrower響應owner其本地引用計數為0時,它還包括它創建的任何borrower,owner反過來使用相同的協議聯系這些新borrower。

      類似的協議用于跟蹤其owner返回的ObjectRef。例如:

      @ray.remote def parent(): y_ref = child.remote() x_ref = ray.get(y_ref) x = ray.get(x_ref) @ray.remote def child(): x_ref = foo.remote() return x_ref

      當child函數返回時,x_ref的owner(執行child的worker)將標記x_ref包含在y_ref中。然后,owner將parrentworker添加到x_ref的borrower列表中。從這里開始,協議與上面類似:owner向parentworker發送一條消息,要求borrower在其對y_ref和x_ref的引用超出范圍后回復。

      不同類型的引用及其更新方式的摘要:

      在遠程函數或類定義中捕獲的引用將被永久固定。例如:

      x_ref = foo.remote() @ray.remote def capture(): ray.get(x_ref) # x_ref is captured. It will be pinned as long as the driver lives.

      也可以通過使用ray.cloudPickle拾取ObjectRef來創建“帶外”引用。在這種情況下,將向對象的計數添加永久引用,以防止對象超出范圍。其他帶外序列化方法(例如,傳遞唯一標識ObjectRef的二進制字符串)不能保證有效,因為它們不包含所有者的地址,而且所有者不會跟蹤引用。

      代碼參考:

      src/ray/core_worker/reference_count.cc

      python/ray/includes/object_ref.pxi

      java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java

      上述相同的引用計數協議用于跟蹤(non-detached)actor的生存期。虛擬對象用于表示actor。此對象的ID是根據參與者創建任務的ID計算的。actor的創建者擁有虛擬對象。

      當Python actor句柄被釋放時,這將減少虛擬對象的本地引用計數。當在actor句柄上提交任務時,這將增加虛擬對象的已提交任務計數。當actor句柄傳遞給另一個進程時,接收進程將被計算為虛擬對象的借用者。一旦引用計數達到0,所有者就會通知GCS服務銷毀參與者是安全的。

      代碼參考:

      src/ray/core_worker/Actor_handler.cc

      python/ray/Actor.py

      java/api/src/main/java/io/ray/api/ActorCall.java

      當對象是Python中引用循環的一部分時,Python垃圾收集器不保證這些對象將及時被垃圾收集。由于未收集的Python ObjectRefs可以虛假地在分布式對象存儲中保持Ray對象的活動狀態,因此當對象存儲接近容量時,Ray會定期在所有Python工作線程中觸發gc.collect(),這確保了Python引用循環永遠不會導致虛假的對象存儲滿的狀態。

      Object 丟失

      小對象:存儲在進程中對象存儲中的小對象與其onwer共享命運。由于借用的對象被提升到共享內存,因此任何借用者都將通過下面描述的分布式協議檢測故障。

      如果對象從分布式內存中丟失:對象的非主副本可能會丟失,而不會產生任何后果。如果對象的主副本丟失,所有者將嘗試通過查找對象目錄中的剩余位置來指定新的主副本。如果不存在,則owner存儲在對象解析期間將引發的系統級錯誤。

      Ray支持對象重建,或通過重新執行創建對象的任務恢復丟失的對象。啟用此功能時,所有者緩存對象:在內存中重新創建對象所需任務的描述。然后,如果所有對象副本都因失敗而丟失,所有者將重新提交返回對象的任務。任務所依賴的任何對象都會遞歸重建。

      使用ray.put創建的對象不支持對象重建:這些對象的主副本始終是所有者的本地共享內存存儲。因此,如果主副本不能獨立于所有者進程丟失。

      如果存儲在分布式內存中的對象的所有者丟失:在對象解析期間,raylet將嘗試查找對象的副本。同時,raylet將定期聯系所有者,檢查所有者是否還活著。如果所有者已死亡,raylet將存儲一個系統級錯誤,該錯誤將在對象解析期間引發到引用持有者。

      對象溢出和持久化

      一旦對象存儲已滿,Ray 1.3+將對象溢出到外部存儲, 默認情況下,對象會溢出到本地文件系統。

      任務調度 分布式

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:共創 共享 共贏 | 2021年度華為云市場優秀伙伴獎項公布!
      下一篇:ModelArts算法外殼用戶手冊
      相關文章
      亚洲av无码片vr一区二区三区| 亚洲精品午夜无码专区| 亚洲精品国产高清不卡在线| 99亚偷拍自图区亚洲| 亚洲午夜免费视频| 亚洲色图国产精品| 婷婷久久久亚洲欧洲日产国码AV | 国产亚洲成人久久| 久久精品亚洲乱码伦伦中文| 亚洲国产精品尤物YW在线观看| 国产精品亚洲五月天高清| 亚洲成av人在线观看网站 | 亚洲老熟女五十路老熟女bbw| 亚洲精品精华液一区二区| 亚洲va中文字幕| 色欲aⅴ亚洲情无码AV蜜桃| 亚洲日韩国产AV无码无码精品| 亚洲人成色777777老人头| 欧美激情综合亚洲一二区| 国产亚洲视频在线观看网址| 亚洲AV伊人久久青青草原| 亚洲成A人片77777国产| 久久亚洲精品无码播放| 国产偷国产偷亚洲清高动态图| 亚洲精品乱码久久久久66| 亚洲Av熟妇高潮30p| 亚洲色av性色在线观无码| 亚洲国产日韩在线| 国产精品亚洲精品青青青| 亚洲人成欧美中文字幕| 最新亚洲人成网站在线观看| 亚洲乱码日产精品a级毛片久久| 亚洲中文字幕无码久久精品1 | 亚洲伊人久久大香线蕉AV| 久久亚洲色WWW成人欧美| 亚洲人成影院在线观看| 亚洲精品少妇30p| 91久久亚洲国产成人精品性色 | 丁香婷婷亚洲六月综合色| 亚洲av最新在线观看网址| 亚洲国产精品自产在线播放|