AssetBundle使用,卸載,校驗
1207
2025-03-31
本文翻譯自https://cwiki.apache.org/confluence/display/FLINK/Akka+and+actors
Akka和Actors
本頁討論了Flink 0.9版本采用的Akka分布式通信的實現。有了Akka,所有的遠程過程調用(RPC)被實現成異步消息。這主要影響了JobManager、TaskManager和JobClient三個組件。未來,很可能更多的組件將被轉換成一個actor,使它們可以發送和處理異步消息。
Akka和Actor Model
Akka是一個開發并發、容錯和可伸縮應用的框架。它是Actor Model的一個實現,因此和Erlang的并發模型很像。在Actor模型中,所有的實體被認為是獨立的actors。actors和其他actors通過發送異步消息通信。Actor模型的強大來自于異步(asynchronism)。它也可以顯式等待響應,這使得可以執行同步(synchronous)操作。但是,強烈不建議同步消息,因為它們限制了系統的伸縮性(scalability)。每個actor有一個郵箱(mailbox),它收到的消息存儲在里面。另外,每一個actor維護自身單獨的狀態.。一個Actors網絡如下所示:
每個actor有且只有一個“處理線程”輪詢(poll)自己的郵箱,并且連續地處理收到的消息。處理一條消息后,actor可以改變它的內部狀態,發送新的消息,或者產生新的actors。如果一個actor的內部狀態只被它的處理線程改變,那么無需確保actor的狀態是線程安全的。即使每個actor自身是有序的,但是一個由多個actor組成的系統是高并發和可伸縮的,因為各個處理線程在所有actors之間共享。這也是為什么我們不應該在一個actor線程內調用“阻塞調用”。這會阻塞正被其他actors使用并處理各自消息的線程。
Actor Systems
一個Actor系統是所有actors存在的容器。它提供了共享的服務,如調度、配置和日志等。Actor系統也包含一個產生所有actors線程的線程池。多個Actor系統可以存在于同一臺機器上。如果Actor系統開始于RemoteActorRefProvider,那么它可以被可能存在于另有一臺機器上的Actor系統訪問。Actor系統自動識別actor消息被一個同系統內的actor處理還是遠程的Actor系統。在本地通信中,消息通過共享內存被有效地傳輸。在遠程通信中,消息通過網絡棧被發送。
所有的actors被層次化組織。每個新創建的actor將創建它的actor作為父actor。層級被用于監督。每個父actor有義務監督它的子actor。如果它的某個子actor產生一個error,它會被通知。如果它可以解決這個問題,那么它可以繼續或重啟這個子actor。如果該問題超出它的處理能力范圍,它可以將error向上傳播給自己的父actor。傳播一個error僅意味著當前層以上的層級現在負責解決這個問題。關于Akka監督和管理的細節可以在這里找到。
系統創建的第一個actor由系統提供的guardian actor/user?監管。關于Actor層級更深入的解釋參閱這里。更多關于Actor系統的詳細信息參閱這里。
Actors in Flink
一個actor是一個自身狀態和行為的容器。它的actor線程連續處理收到的消息。這減輕了用戶編寫易出錯的鎖和線程管理任務,因為每個actor每一時刻只有一個線程是活動的。然而,必須保證一個actor的內部狀態只被這個actor線程訪問。一個actor的行為由一個接收函數定義,這個函數包含一個收到每條消息時被執行的邏輯。
Flink系統由三個分布式通信組件組成:JobClient、JobManager、TaskManager。JobClient接收一個來自用戶的Flink作業并提交給JobManager。JobManager然后負責協調作業的執行。首先,它分配需要的資源。這主要包括TaskManagers上的運行slots。資源分配后,JobManager部署作業的各個tasks到各個TaskManagers。一收到task,TaskManager創建一個執行該task的線程。狀態的改變,例如開始計算或完成計算,被發送回JobManager。JobManager基于狀態更新控制作業執行直到完成。一旦作業完成,結果將被發送到JobClient,由它告知用戶運行結果。作業執行過程如下圖所示:
JobManager & TaskManager
JobManager是負責執行一個Flink作業的中央控制單元。同樣地,它管理資源分配、任務調度和狀態報告。在任何Flink作業可以被執行前,一個JobManager和至少一個TaskManager必須被啟動。TaskManager然后發送一個RegisterTaskManager消息到JobManager注冊自己。JobManager發送一條注冊成功的確認消息。如果TaskManager已經在JobManager注冊過了,因為有多條RegisterTaskManager消息被發送,JobManager返回一條AlreadyRegistered消息。如果注冊被拒絕,JobManager將發送一條RefuseRegistration消息。
通過發送一條附帶相應JobGraph的SubmitJob消息提交一個作業給JobManager。一收到JobGraph,JobManager基于JobGraph創建一個ExecutionGraph,它是分布式執行的邏輯表示。ExecutionGraph包含了會被部署到TaskManagers執行的tasks的相關信息。
JobManager的調度器負責在可用的TaskManagers上分配運行slots。在一個TaskManager上分配一個運行slot后,一條附帶所有執行task必要信息的SubmitTask消息被發送到該TaskManager。TaskManager發送一條TaskOperationResult消息確認task部署成功。一旦已提交作業的源碼被部署和執行,則作業提交成功。JobManager發送一條附帶相應作業Id的Success消息通知JobClient作業提交成功。
運行在TaskManagers上的每個task的狀態更新通過UpdateTaskExecutionState消息發送回JobManager。有了這些更新消息,ExecutionGraph可以被更新以反映執行的當前狀態。
JobManager也作為數據源的輸入分片器(input split assigner)。它負責向所有TaskMangers分配任務,以便盡可能保證數據本地性(data locality)。為了動態平衡負載,tasks處理完上一個數據分片(input split)后請求一個新的數據分片。這個請求通過發送一條RequestNextInputSplit給JobManager實現。JobManager返回一條NextInputSplit消息響應。如果沒有更多的數據分片,包含在JobManager返回消息中的數據分片為null。
tasks被延遲部署在TaskManagers上。這意味著消費數據的tasks會在它的一個數據生產者(producer)產生數據后被部署。一旦生產者產生數據完成,它發送一條ScheduleOrUpdateConsumers消息給JobManager。這條消息表明消費者(consumer)現在可以讀取新產生的數據。如果消費數據的task還沒有啟動,它將被部署到一個TaskManager上。
JobClient
JobClient代表分布式系統中面向用戶的組件。它用于和JobManager通信,并且負責提交Flink作業、查詢已提交作業的狀態和接收運行中作業的狀態信息。
JobClient也是一個通過消息通信的actor。存在兩種和作業提交相關的消息:SubmitJobDetached和SubmitJobWait。第一個消息提交一個作業并且取消用于接收任何狀態消息和最終作業結果的注冊。如果你想以一種發送并忽略(fire and forget)的方式提交你的作業到Flink集群, 分離模式(detached mode)很有用。第二種消息提交一個作業并注冊以接收這個作業的狀態消息。在內部,這通過創建一個helper actor作為狀態消息的接收者而實現。一旦作業終止,JobManager發送一個附帶運行時長和累計結果的JobResultSuccess消息給helper actor。當收到這個消息的時候,helper actor將這個消息轉發給發送SubmitJobWait消息的JobClient,然后終止。
Asynchronous VS. Synchronous Messages
在可能的情況下,Flink試圖使用異步消息并將響應作為Futures處理。Futures和很少已存的阻塞調用有一個timeout,在timeout之后的操作被認為失敗。這避免了在一條消息丟失或一個分布式組件崩潰的情況下系統產生死鎖。然而,如果你正好有一個很大的集群或一個很慢的網絡,timeouts或許會被錯誤的觸發。因此,這些操作的timeout可以修改在配置 “akka.ask.timeout”中修改。
在一個actor可以和另一個actor通信前,它必須查找(retrieve)得到一個ActorRef。這個操作的查找也需要一個timeout。如果一個actor沒有啟動,為了使系統快速失敗,查找的timeout被設置成一個比常規timeout更小的值。在查找timeout的情況下,你可以在配置“akka.lookup.timeout”中增加查找timeout。
Akka的另一個特點是設置了一個它可發送消息大小的最大值限制。原因是它保留了一個同樣大小的序列化buffer并且它不想浪費內存。如果你遇到一個消息超出最大值的傳輸錯誤,你可以在配置“akka.framesize”中增大幀大小(framesize)。
Failure Detection
一個分布式系統中失敗檢測對于它的魯棒性(robustness)很重要。當在一個商用集群上運行的時候,分布式系統總會遇到一些組件失敗或者不可達。這樣一個失敗的原因是多種多樣的,可以是從硬件故障到網絡中斷。一個健壯的(robust)分布式系統應當能夠檢測失敗組件并恢復它。
Flink通過Akka的DeathWatch機制檢測失敗組件。DeathWatch允許actors監視其他actors,即使它們不受這個actor監督或者甚至它們屬于另一個actor系統。一旦一個被監視的actor死掉或是不可達,一個終止消息會被發送給這個actor的監視者。因此,一收到這個消息,系統可以對這個actor采取相應措施。在內部,DeathWatch被實現成心跳(heartbeat)和一個基于心跳間隔、心跳暫停、心跳閾值的失敗檢測器,它判斷一個actor什么時候很可能是dead。心跳間隔可以在配置“akka.watch.heartbeat.interval”中設置??山邮艿男奶鴷和?梢酝ㄟ^配置“akka.watch.heartbeat.pause”確定。心跳暫停應當是心跳間隔的幾倍,否則一個丟失的心跳會直接觸發DeathWatch。失敗(心跳)閾值可以通過配置“akka.watch.threshold”確定,并且它有效地控制失敗檢測器的敏感度。更多關于DeathWatch機制和失敗檢測器的細節可以參閱這里。
在Flink中,JobManager監視所有已注冊的TaskManagers并且所有的TaskManagers監視JobManager。這樣,兩類組件都知道什么時候另一個組件是不可達的。某個TaskManager不可達的時候,JobManager會將這個不能部署tasks的TaskManager標記為dead。另外,JobManager使運行在這個TaskManager上的所有tasks失敗,并且在另一個TaskManager上重新調度執行這些tasks。TaskManager在由于臨時連接丟失開而被標記為dead的情況下,當連接重新建立的時候,它可以向JobManager重新注冊自己。TaskManager也監視JobManager。這個監視允許TaskManager檢測到JobManager失敗的時候通過使所有正在運行的tasks失敗而進入一個清潔的(clean)狀態。另外,在只是由于網絡擁塞或連接丟失而觸發的death情況下,TaskManager將試圖重新連接JobManager。
Future Development
當前只有三個組件:JobClient、JobManager和TaskManager被實現成actor。為了更好的實現并發性以提高伸縮性,可以將更多的組件實現成actors。一個有希望的候選者是ExecutionGraph,它的ExecutionVertices或者其相關聯的Execution對象也可以實現成一個actor。這樣一個細粒度的Actor模型將有利于狀態更新直接發送各自的Execution對象。這樣的話,JobManger將顯著地從作為單一的通信節點中解放出來。
Configuration
akka.ask.timeout:用于所有Futures和阻塞的Akka調用的timeout。如果Flink由于timeouts而失敗,那么你應該增大這個值。timeouts可以由運行慢的機器或者擁塞的網絡造成。timeout值需要時間單元標識符(ms/s/min/h/d) ( 默認:100s )
akka.lookup.timeout:用于JobManager查找的timeout。timeout值需要時間單元區分符(ms/s/min/h/d)( 默認:10s )
akka.framesize:?JobManager和TaskManager之間發送的消息大小的最大值。如果Flink由于消息大小超出這個限制而失敗,那么你應該增大這個值。消息大小需要消息單元標識符。( 默認:10485760?b )
akka.watch.heartbeat.interval:Akka檢測dead?TaskManager的DeathWatch機制的時間間隔。如果TaskManagers由于丟失或延遲的心跳消息而錯誤地被標記為dead,那么你應該增大這個值。一個關于Akka的DeathWatch的詳細介紹可以在這里找到。(默認:akka.ask.timeout/10)
akka.watch.heartbeat.pause:Akka的DeathWatch機制可接受的心跳暫停值。一個較低的值不允許一個無規律的心跳。一個關于Akka的DeathWatch機制的詳細介紹可以在這里找到。(默認:akka.ask.timeout)
akka.watch.threshold:DeathWatch失敗檢測器的閾值。一個較低的值容易產生錯誤的判斷,反之,一個較大的值增加了檢測到dead?TaskManager的時間。一個關于Akka的DeathWatch機制的詳細介紹可以在這里找到。(默認:12)
智能數據 表格存儲服務 CloudTable EI企業智能
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。