阿里四面:kafka何時、如何刪除Topic?
Kafka有很多狀態機和管理器,如Controller通道管理器ControllerChannelManager、處理Controller事件的ControllerEventManager等。這些管理器和狀態機,大多與各自“宿主”聯系密切。就如Controller這倆管理器,必須與Controller組件緊耦合,才能實現各自功能。
Kafka還有一些狀態機和管理器,具有相對獨立的功能框架,不嚴重依賴使用方,如:
TopicDeletionManager(主題刪除管理器)
負責對指定Kafka主題執行刪除操作,清除待刪除主題在集群上的各類“痕跡”。
ReplicaStateMachine(副本狀態機)
負責定義Kafka副本狀態、合法的狀態轉換,以及管理狀態之間的轉換。
PartitionStateMachine(分區狀態機)
負責定義Kafka分區狀態、合法的狀態轉換,以及管理狀態之間的轉換。
本文看看Kafka是如何刪除一個主題的。
前言
以為成功執行kafka-topics.sh --delete命令后,主題就會被刪除。這種不正確的認知會導致經常發現主題沒被刪干凈。于是,網傳終極“武林秘籍”:手動刪除磁盤上的日志文件,手動刪除ZooKeeper下關于主題的各節點。但我不推薦這么干:
并不完整
除非你重啟Broker,否則,這套“秘籍”無法清理Controller端和各個Broker上元數據緩存中的待刪除主題的相關條目
并沒有被官方所認證,后果自負
與其琢磨刪除主題失敗之后怎么自救,還是研究Kafka到底如何執行該操作。TopicDeletionManager.scala包括:
DeletionClient接口:負責實現刪除主題以及后續的動作
如更新元數據
ControllerDeletionClient類:實現DeletionClient接口的類,分別實現了剛剛說到的那4個方法。
TopicDeletionManager類:主題刪除管理器類
定義方法維護主題刪除前后集群狀態的正確性。如,何時刪除主題、何時主題不能被刪除、主題刪除過程中要規避哪些操作等
DeletionClient接口及實現
刪除主題,并將刪除主題的事件同步給其他Broker。
DeletionClient接口目前只有一個實現類ControllerDeletionClient,構造器的兩個字段:
KafkaController實例
Controller組件對象
KafkaZkClient實例
Kafka與ZooKeeper交互的客戶端對象
API
刪除主題在zk上的所有“痕跡”。分別調用KafkaZkClient的3個方法刪除ZooKeeper下/brokers/topics/節點、/config/topics/節點和/admin/delete_topics/節點。
刪除zk下待刪除主題的標記節點。調用KafkaZkClient#deleteTopicDeletions,批量刪除一組主題在/admin/delete_topics下的子節點。注意,deleteTopicDeletions這個方法名結尾的Deletions,表示/admin/delete_topics下的子節點。所以:
deleteTopic是刪除主題
deleteTopicDeletions是刪除/admin/delete_topics下的對應子節點
這兩個方法里都有epochZkVersion字段,代表期望的Controller Epoch版本號。若使用一個舊Epoch版本號執行這些方法,zk會拒絕,因為和它自己保存的版本號不匹配。若一個Controller的Epoch<ZooKeeper中保存的,則該Controller很可能是已過期的Controller。這就是Zombie Controller。epochZkVersion字段的作用,就是隔離Zombie Controller發送的操作。
屏蔽主題分區數據變更-:取消/brokers/topics/節點數據變更的監聽。
當該主題的分區數據發生變更后,由于對應zk-已被取消,因此不會觸發Controller相應處理邏輯。
為何取消該-?為避免操作相互干擾:假設用戶A發起主題刪除,同時用戶B為這個主題新增分區。此時,這兩個操作就會沖突,若允許Controller同時處理這倆操作,勢必會造成邏輯混亂及狀態不一致。為應對這種情況,在移除主題副本和分區對象前,代碼要先執行這個方法,確保不再響應用戶對該主題的其它操作。
mutePartitionModifications調用unregisterPartitionModificationsHandlers,并接著調用KafkaZkClient#unregisterZNodeChangeHandler,取消zk上對給定主題的分區節點數據變更的監聽。
調用KafkaController#sendUpdateMetadataRequest,給集群所有Broker發送更新請求,告訴它們不要再為已刪除主題的分區提供服務:
該方法會給集群中的所有Broker發送更新元數據請求,告知它們要同步給定分區的狀態。
TopicDeletionManager定義及初始化
創建TopicDeletionManager類實例
在KafkaController類初始化時被創建:
實例化了一個全新的ControllerDeletionClient對象,然后利用該對象實例和replicaStateMachine、partitionStateMachine,一起創建TopicDeletionManager實例。
KafkaServerStartable.startup()=》KafkaServer.startup()=》KafkaController.init=》TopicDeletionManager
TopicDeletionManager重要API
除了類定義和初始化,還有resumeDeletions:重啟主題刪除操作過程。
主題因為某些事件可能一時無法完成刪除,如主題分區正在進行副本重分配等。一旦這些事件完成,主題重新具備可刪除資格。就需調用resumeDeletions重啟刪除操作。
從元數據緩存中獲取要刪除主題列表,之后定義了兩個空的主題列表,分別保存待重試刪除主題和待刪除主題
遍歷每個要刪除的主題,去看它所有副本的狀態。如果副本狀態都是ReplicaDeletionSuccessful,就表明該主題已經被成功刪除,此時,再調用completeDeleteTopic方法,完成后續的操作就可以了。對于那些刪除操作尚未開始,并且暫時無法執行刪除的主題,源碼會把這類主題加到待重試主題列表中,用于后續重試;如果主題是能夠被刪除的,就將其加入到待刪除列表中。
最后,調用retryDeletionForIneligibleReplicas重試待重試主題列表中的主題刪除操作。對待刪除主題列表中的主題則調用onTopicDeletion刪除。
retryDeletionForIneligibleReplicas重試主題刪除:將對應主題副本的狀態,從ReplicaDeletionIneligible變更到OfflineReplica。這樣,后續再次調用resumeDeletions,就會嘗試重新刪除主題。
下面,我再用一張圖來解釋下resumeDeletions方法的執行流程:
resumeDeletions串聯起了TopicDeletionManger中的很多方法,較關鍵的:
completeDeleteTopic:
onTopicDeletion:
onTopicDeletion會多次使用分區狀態機,調整待刪除主題的分區狀態。最后調用onPartitionDeletion執行真正的底層物理磁盤文件刪除。這是通過副本狀態機狀態轉換操作完成的。
總結
在主題刪除過程中,Kafka會調整集群中三個地方的數據:
ZooKeeper
刪除主題時,zk上與該主題相關的所有ZNode節點必須被清除
元數據緩存
Controller端元數據緩存中的相關項,也必須要被處理,并且要被同步到集群的其他Broker上
磁盤日志文件
要清理的首要目標
這三個地方須統一處理,就好似原子操作。回想“秘籍”,它無法清除Controller端的元數據緩存項。因此,避免使用這“大招”。
DeletionClient接口主要是操作ZooKeeper,實現ZooKeeper節點的刪除等操作。
TopicDeletionManager,是在KafkaController創建過程中被初始化的,主要通過與元數據緩存進行交互的方式,來更新各類數據。
Kafka ZooKeeper
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。