kafka原理深入研究 (轉 )

      網友投稿 727 2022-05-29

      一、為什么需要消息系統

      1.解耦:

      允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

      2.冗余:

      消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。

      3.擴展性:

      因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。

      4.靈活性 & 峰值處理能力:

      在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

      5.可恢復性:

      系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。

      6.順序保證:

      在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。(Kafka 保證一個 Partition 內的消息的有序性)

      kafka原理深入研究 (轉 )

      7.緩沖:

      有助于控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。

      8.異步通信:

      很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

      二、kafka 架構

      2.1 拓撲結構

      如下圖:

      圖.1

      2.2 相關概念

      如圖.1中,kafka 相關名詞解釋如下:

      1.producer:  消息生產者,發布消息到?kafka?集群的終端或服務。 2.broker:  kafka?集群中包含的服務器。 3.topic:  每條發布到?kafka?集群的消息屬于的類別,即?kafka?是面向?topic?的。 4.partition:  partition?是物理上的概念,每個?topic?包含一個或多個?partition。kafka?分配的單位是?partition。 5.consumer:  從?kafka?集群中消費消息的終端或服務。 6.Consumer?group:  high-level?consumer?API?中,每個?consumer?都屬于一個?consumer?group,每條消息只能被?consumer?group?中的一個?Consumer?消費,但可以被多個?consumer?group?消費。 7.replica:  partition?的副本,保障?partition?的高可用。 8.leader:  replica?中的一個角色,?producer?和?consumer?只跟?leader?交互。 9.follower:  replica?中的一個角色,從?leader?中復制數據。 10.controller:  kafka?集群中的其中一個服務器,用來進行?leader?election?以及?各種?failover。12.zookeeper:  kafka?通過?zookeeper?來存儲集群的?meta?信息。

      2.3 zookeeper 節點

      kafka 在 zookeeper 中的存儲結構如下圖所示:

      圖.2

      三、producer 發布消息

      3.1 寫入方式

      producer 采用 push 模式將消息發布到 broker,每條消息都被 append 到 patition 中,屬于順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障 kafka 吞吐率)。

      3.2 消息路由

      producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪一個 partition。其路由機制為:

      1.?指定了?patition,則直接使用; 2.?未指定?patition?但指定?key,通過對?key?的?value?進行hash?選出一個?patition 3.?patition?和?key?都未指定,使用輪詢選出一個?patition。

      附上 java 客戶端分區源碼,一目了然:

      //創建消息實例 public?ProducerRecord(String?topic,?Integer?partition,?Long?timestamp,?K?key,?V?value)?{ ?????if?(topic?==?null) ??????????throw?new?IllegalArgumentException("Topic?cannot?be?null"); ?????if?(timestamp?!=?null?&&?timestamp??record,?byte[]?serializedKey?,?byte[]?serializedValue,?Cluster?cluster)?{ ?????Integer?partition?=?record.partition(); ?????if?(partition?!=?null)?{ ??????????List?partitions?=?cluster.partitionsForTopic(record.topic()); ??????????int?lastPartition?=?partitions.size()?-?1; ??????????if?(partition??lastPartition)?{ ???????????????throw?new?IllegalArgumentException(String.format("Invalid?partition?given?with?record:?%d?is?not?in?the?range?[0...%d].",?partition,?lastPartition)); ??????????} ??????????return?partition; ?????} ?????return?this.partitioner.partition(record.topic(),?record.key(),?serializedKey,?record.value(),?serializedValue,?cluster); } //?使用?key?選取?patition public?int?partition(String?topic,?Object?key,?byte[]?keyBytes,?Object?value,?byte[]?valueBytes,?Cluster?cluster)?{ ?????List?partitions?=?cluster.partitionsForTopic(topic); ?????int?numPartitions?=?partitions.size(); ?????if?(keyBytes?==?null)?{ ??????????int?nextValue?=?counter.getAndIncrement(); ??????????List?availablePartitions?=?cluster.availablePartitionsForTopic(topic); ??????????if?(availablePartitions.size()?>?0)?{ ???????????????int?part?=?DefaultPartitioner.toPositive(nextValue)?%?availablePartitions.size(); ???????????????return?availablePartitions.get(part).partition(); ??????????}?else?{ ???????????????return?DefaultPartitioner.toPositive(nextValue)?%?numPartitions; ??????????} ?????}?else?{ ??????????//對?keyBytes?進行?hash?選出一個?patition ??????????return?DefaultPartitioner.toPositive(Utils.murmur2(keyBytes))?%?numPartitions; ?????} }

      3.3 寫入流程

      producer 寫入消息序列圖如下所示:

      圖.3

      流程說明:

      1.?producer?先從?zookeeper?的?"/brokers/.../state"?節點找到該?partition?的?leader 2.?producer?將消息發送給該?leader 3.?leader?將消息寫入本地?log 4.?followers?從?leader?pull?消息,寫入本地?log?后?leader?發送?ACK 5.?leader?收到所有?ISR?中的?replica?的?ACK?后,增加?HW(high?watermark,最后?commit?的?offset)?并向?producer?發送?ACK

      3.4 producer delivery guarantee

      一般情況下存在三種情況:

      1.?At?most?once?消息可能會丟,但絕不會重復傳輸 2.?At?least?one?消息絕不會丟,但可能會重復傳輸 3.?Exactly?once?每條消息肯定會被傳輸一次且僅傳輸一次

      當 producer 向 broker 發送消息時,一旦這條消息被 commit,由于 replication 的存在,它就不會丟。但是如果 producer 發送數據給 broker 后,遇到網絡問題而造成通信中斷,那 Producer 就無法判斷該條消息是否已經 commit。雖然 Kafka 無法確定網絡故障期間發生了什么,但是 producer 可以生成一種類似于主鍵的東西,發生故障時冪等性的重試多次,這樣就做到了 Exactly once,但目前還并未實現。所以目前默認情況下一條消息從 producer 到 broker 是確保了 At least once,可通過設置 producer 異步發送實現At most once。

      四、broker 保存消息

      4.1 存儲方式

      物理上把 topic 分成一個或多個 patition(對應 server.properties 中的 num.partitions=3 配置),每個 patition 物理上對應一個文件夾(該文件夾存儲該 patition 的所有消息和索引文件),如下:

      圖.4

      4.2 存儲策略

      無論消息是否被消費,kafka 都會保留所有消息。有兩種策略可以刪除舊數據:

      1.?基于時間:log.retention.hours=168 2.?基于大小:log.retention.bytes=1073741824

      需要注意的是,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這里刪除過期文件與提高 Kafka 性能無關。

      4.3 topic 創建與刪除

      創建 topic 的序列圖如下所示:

      圖.5

      流程說明:

      1.?controller?在?ZooKeeper?的?/brokers/topics?節點上注冊?watcher,當?topic?被創建,則?controller?會通過?watch?得到該?topic?的?partition/replica?分配。 2.?controller從?/brokers/ids?讀取當前所有可用的?broker?列表,對于?set_p?中的每一個?partition: 2.1?從分配給該?partition?的所有?replica(稱為AR)中任選一個可用的?broker?作為新的?leader,并將AR設置為新的?ISR 2.2?將新的?leader?和?ISR?寫入?/brokers/topics/[topic]/partitions/[partition]/state 3.?controller?通過?RPC?向相關的?broker?發送?LeaderAndISRRequest。

      刪除 topic 的序列圖如下所示:

      圖.6

      流程說明:

      1.?controller?在?zooKeeper?的?/brokers/topics?節點上注冊?watcher,當?topic?被刪除,則?controller?會通過?watch?得到該?topic?的?partition/replica?分配。 2.?若?delete.topic.enable=false,結束;否則?controller?注冊在?/admin/delete_topics?上的?watch?被?fire,controller?通過回調向對應的?broker?發送?StopReplicaRequest。

      五、kafka HA

      5.1 replication

      如圖.1所示,同一個 partition 可能會有多個 replica(對應 server.properties 配置中的 default.replication.factor=N)。沒有 replica 的情況下,一旦 broker 宕機,其上所有 patition 的數據都不可被消費,同時 producer 也不能再將數據存于其上的 patition。引入replication 之后,同一個 partition 可能會有多個 replica,而這時需要在這些 replica 之間選出一個 leader,producer 和 consumer 只與這個 leader 交互,其它 replica 作為 follower 從 leader 中復制數據。

      Kafka 分配 Replica 的算法如下:

      1.?將所有?broker(假設共?n?個?broker)和待分配的?partition?排序 2.?將第?i?個?partition?分配到第(i?mod?n)個?broker?上 3.?將第?i?個?partition?的第?j?個?replica?分配到第((i?+?j)?mode?n)個?broker上

      5.2 leader failover

      當 partition 對應的 leader 宕機時,需要從 follower 中選舉出新 leader。在選舉新leader時,一個基本的原則是,新的 leader 必須擁有舊 leader commit 過的所有消息。

      kafka 在 zookeeper 中(/brokers/.../state)動態維護了一個 ISR(in-sync replicas),由3.3節的寫入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成員才能選為 leader。對于 f+1 個 replica,一個 partition 可以在容忍 f 個 replica 失效的情況下保證消息不丟失。

      當所有 replica 都不工作時,有兩種可行的方案:

      1.?等待?ISR?中的任一個?replica?活過來,并選它作為?leader。可保障數據不丟失,但時間可能相對較長。 2.?選擇第一個活過來的?replica(不一定是?ISR?成員)作為?leader。無法保障數據不丟失,但相對不可用時間較短。

      kafka 0.8.* 使用第二種方式。

      kafka 通過 Controller 來選舉 leader,流程請參考5.3節。

      5.3 broker failover

      kafka broker failover 序列圖如下所示:

      圖.7

      流程說明:

      1.?controller?在?zookeeper?的?/brokers/ids/[brokerId]?節點注冊?Watcher,當?broker?宕機時?zookeeper?會?fire?watch 2.?controller?從?/brokers/ids?節點讀取可用broker 3.?controller決定set_p,該集合包含宕機?broker?上的所有?partition 4.?對?set_p?中的每一個?partition ????4.1?從/brokers/topics/[topic]/partitions/[partition]/state?節點讀取?ISR ????4.2?決定新?leader(如4.3節所描述) ????4.3?將新?leader、ISR、controller_epoch?和?leader_epoch?等信息寫入?state?節點 5.?通過?RPC?向相關?broker?發送?leaderAndISRRequest?命令

      5.4 controller failover

      當 controller 宕機時會觸發 controller failover。每個 broker 都會在 zookeeper 的 "/controller" 節點注冊 watcher,當 controller 宕機時 zookeeper 中的臨時節點消失,所有存活的 broker 收到 fire 的通知,每個 broker 都嘗試創建新的 controller path,只有一個競選成功并當選為 controller。

      當新的 controller 當選時,會觸發 KafkaController.onControllerFailover 方法,在該方法中完成如下操作:

      1.?讀取并增加?Controller?Epoch。 2.?在?reassignedPartitions?Patch(/admin/reassign_partitions)?上注冊?watcher。 3.?在?preferredReplicaElection?Path(/admin/preferred_replica_election)?上注冊?watcher。 4.?通過?partitionStateMachine?在?broker?Topics?Patch(/brokers/topics)?上注冊?watcher。 5.?若?delete.topic.enable=true(默認值是?false),則?partitionStateMachine?在?Delete?Topic?Patch(/admin/delete_topics)?上注冊?watcher。 6.?通過?replicaStateMachine在?Broker?Ids?Patch(/brokers/ids)上注冊Watch。 7.?初始化?ControllerContext?對象,設置當前所有?topic,“活”著的?broker?列表,所有?partition?的?leader?及?ISR等。 8.?啟動?replicaStateMachine?和?partitionStateMachine。 9.?將?brokerState?狀態設置為?RunningAsController。 10.?將每個?partition?的?Leadership?信息發送給所有“活”著的?broker。 11.?若?auto.leader.rebalance.enable=true(默認值是true),則啟動?partition-rebalance?線程。 12.?若?delete.topic.enable=true?且Delete?Topic?Patch(/admin/delete_topics)中有值,則刪除相應的Topic。

      6. consumer 消費消息

      6.1 consumer API

      kafka 提供了兩套 consumer API:

      1.?The?high-level?Consumer?API 2.?The?SimpleConsumer?API

      其中 high-level consumer API 提供了一個從 kafka 消費數據的高層抽象,而 SimpleConsumer API 則需要開發人員更多地關注細節。

      high-level consumer API 提供了 consumer group 的語義,一個消息只能被 group 內的一個 consumer 所消費,且?consumer 消費消息時不關注 offset,最后一個 offset 由 zookeeper 保存。

      使用 high-level consumer API 可以是多線程的應用,應當注意:

      1.?如果消費線程大于?patition?數量,則有些線程將收不到消息 2.?如果?patition?數量大于線程數,則有些線程多收到多個?patition?的消息 3.?如果一個線程消費多個?patition,則無法保證你收到的消息的順序,而一個?patition?內的消息是有序的

      如果你想要對 patition 有更多的控制權,那就應該使用 SimpleConsumer API,比如:

      1.?多次讀取一個消息 2.?只消費一個?patition?中的部分消息 3.?使用事務來保證一個消息僅被消費一次

      但是使用此 API 時,partition、offset、broker、leader 等對你不再透明,需要自己去管理。你需要做大量的額外工作:

      1.?必須在應用程序中跟蹤?offset,從而確定下一條應該消費哪條消息 2.?應用程序需要通過程序獲知每個?Partition?的?leader?是誰 3.?需要處理?leader?的變更

      使用 SimpleConsumer API 的一般流程如下:

      1.?查找到一個“活著”的?broker,并且找出每個?partition?的?leader 2.?找出每個?partition?的?follower 3.?定義好請求,該請求應該能描述應用程序需要哪些數據 4.?fetch?數據 5.?識別?leader?的變化,并對之作出必要的響應

      以下針對 high-level Consumer API 進行說明。

      6.2 consumer group

      如 2.2 節所說, kafka 的分配單位是 patition。每個 consumer 都屬于一個 group,一個 partition 只能被同一個 group 內的一個 consumer 所消費(也就保障了一個消息只能被 group 內的一個 consuemr 所消費),但是多個 group 可以同時消費這個 partition。

      kafka 的設計目標之一就是同時實現離線處理和實時處理,根據這一特性,可以使用 spark/Storm 這些實時處理系統對消息在線處理,同時使用 Hadoop 批處理系統進行離線處理,還可以將數據備份到另一個數據中心,只需要保證這三者屬于不同的 consumer group。如下圖所示:

      圖.8

      6.3 消費方式

      consumer 采用 pull 模式從 broker 中讀取數據。

      push 模式很難適應消費速率不同的消費者,因為消息發送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費消息。

      對于 Kafka 而言,pull 模式更合適,它可簡化 broker 的設計,consumer 可自主控制消費消息的速率,同時 consumer 可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。

      6.4 consumer?delivery guarantee

      如果將 consumer 設置為 autocommit,consumer 一旦讀到數據立即自動 commit。如果只討論這一讀取消息的過程,那 Kafka 確保了 Exactly once。

      但實際使用中應用程序并非在 consumer 讀取完數據就結束了,而是要進行進一步處理,而數據處理與 commit 的順序在很大程度上決定了consumer delivery guarantee:

      1.讀完消息先?commit?再處理消息。 ????這種模式下,如果?consumer?在?commit?后還沒來得及處理消息就?crash?了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應于?At?most?once2.讀完消息先處理再?commit。 ????這種模式下,如果在處理完消息之后?commit?之前?consumer?crash?了,下次重新開始工作時還會處理剛剛未?commit?的消息,實際上該消息已經被處理過了。這就對應于?At?least?once。3.如果一定要做到?Exactly?once,就需要協調?offset?和實際操作的輸出。 ????精典的做法是引入兩階段提交。如果能讓?offset?和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支持兩階段提交。比如,consumer?拿到數據后可能把數據放到?HDFS,如果把最新的?offset?和數據本身一起寫到?HDFS,那就可以保證數據的輸出和?offset?的更新要么都完成,要么都不完成,間接實現?Exactly?once。(目前就?high-level?API而言,offset?是存于Zookeeper?中的,無法存于HDFS,而SimpleConsuemr?API的?offset?是由自己去維護的,可以將之存于?HDFS?中)

      總之,Kafka 默認保證 At least once,并且允許通過設置 producer 異步提交來實現 At most once(見文章《kafka consumer防止數據丟失》)。而 Exactly once 要求與外部存儲系統協作,幸運的是 kafka 提供的 offset 可以非常直接非常容易得使用這種方式。

      更多關于 kafka 傳輸語義的信息請參考《Message Delivery Semantics》。

      6.5 consumer rebalance

      當有 consumer 加入或退出、以及 partition 的改變(如 broker 加入或退出)時會觸發 rebalance。consumer rebalance算法如下:

      1.?將目標?topic?下的所有?partirtion?排序,存于PT 2.?對某?consumer?group?下所有?consumer?排序,存于?CG,第?i?個consumer?記為?Ci 3.?N=size(PT)/size(CG),向上取整 4.?解除?Ci?對原來分配的?partition?的消費權(i從0開始) 5.?將第i*N到(i+1)*N-1個?partition?分配給?Ci

      在 0.8.*版本,每個 consumer 都只負責調整自己所消費的 partition,為了保證整個consumer group 的一致性,當一個 consumer 觸發了 rebalance 時,該 consumer group 內的其它所有其它 consumer 也應該同時觸發 rebalance。這會導致以下幾個問題:

      1.Herd?effect  任何?broker?或者?consumer?的增減都會觸發所有的?consumer?的?rebalance2.Split?Brain  每個?consumer?分別單獨通過?zookeeper?判斷哪些?broker?和?consumer?宕機了,那么不同?consumer?在同一時刻從?zookeeper?看到的?view?就可能不一樣,這是由?zookeeper?的特性決定的,這就會造成不正確的?reblance?嘗試。3.?調整結果不可控  所有的?consumer?都并不知道其它?consumer?的?rebalance?是否成功,這可能會導致?kafka?工作在一個不正確的狀態。

      基于以上問題,kafka 設計者考慮在0.9.*版本開始使用中心 coordinator 來控制 consumer rebalance,然后又從簡便性和驗證要求兩方面考慮,計劃在 consumer 客戶端實現分配方案。

      七、注意事項

      7.1 producer 無法發送消息的問題

      最開始在本機搭建了kafka偽集群,本地 producer 客戶端成功發布消息至 broker。隨后在服務器上搭建了 kafka 集群,在本機連接該集群,producer 卻無法發布消息到 broker(奇怪也沒有拋錯)。最開始懷疑是 iptables 沒開放,于是開放端口,結果還不行(又開始是代碼問題、版本問題等等,倒騰了很久)。最后沒辦法,一項一項查看 server.properties 配置,發現以下兩個配置:

      #?The?address?the?socket?server?listens?on.?It?will?get?the?value?returned?from? #?java.net.InetAddress.getCanonicalHostName()?if?not?configured. #???FORMAT: #?????listeners?=?security_protocol://host_name:port #???EXAMPLE: #?????listeners?=?PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092  #?Hostname?and?port?the?broker?will?advertise?to?producers?and?consumers.?If?not?set,?  #?it?uses?the?value?for?"listeners"?if?configured.?Otherwise,?it?will?use?the?value  #?returned?from?java.net.InetAddress.getCanonicalHostName().  #advertised.listeners=PLAINTEXT://your.host.name:9092

      以上說的就是?advertised.listeners 是 broker 給 producer 和 consumer 連接使用的,如果沒有設置,就使用?listeners,而如果 host_name 沒有設置的話,就使用?java.net.InetAddress.getCanonicalHostName() 方法返回的主機名。

      修改方法:

      1.?listeners=PLAINTEXT://121.10.26.XXX:9092 2.?advertised.listeners=PLAINTEXT://121.10.26.XXX:9092

      修改后重啟服務,正常工作。

      ZooKeeper Kafka

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:6000字面試總結,兩周連續面試字節,餓了么,喜馬,b站,哈羅,得物,越卷越成長~
      下一篇:語境偏移如何解決?專有領域端到端ASR之路(三)
      相關文章
      亚洲人成在线播放网站| 中文字幕亚洲乱码熟女一区二区| 亚洲精品蜜夜内射| 7777久久亚洲中文字幕蜜桃| 亚洲第一中文字幕| 亚洲三级电影网址| 亚洲精品免费视频| 777亚洲精品乱码久久久久久| 亚洲高清专区日韩精品| 久久久久亚洲爆乳少妇无| 亚洲日韩国产一区二区三区| 亚洲成?v人片天堂网无码| 国产成人亚洲精品电影| 久久精品国产亚洲av天美18| 亚洲精品国产精品| 国产精品日本亚洲777| www.亚洲一区| 国产偷国产偷亚洲高清日韩| 中文字幕专区在线亚洲| 亚洲中文字幕不卡无码| 亚洲精品无码永久在线观看你懂的| 亚洲韩国精品无码一区二区三区 | 亚洲专区中文字幕| 亚洲av无码不卡久久| 91丁香亚洲综合社区| 亚洲精品无码中文久久字幕| 亚洲AV成人片无码网站| 成人亚洲综合天堂| 超清首页国产亚洲丝袜| 九月丁香婷婷亚洲综合色| 久久久久亚洲精品影视| 亚洲精品在线不卡| 亚洲不卡中文字幕| 亚洲女子高潮不断爆白浆| jjzz亚洲亚洲女人| 亚洲尤码不卡AV麻豆| 亚洲国产精品乱码一区二区| 亚洲精品无码久久久久久久| 亚洲人成在久久综合网站| 亚洲国产成人五月综合网| 国产成人综合亚洲绿色|