【云駐共創】教你學習 Kafka分布式消息訂閱系統
前言
本章主要講述Kafka基本概念、架構及功能。重在了解Kafka是如何保證數據存儲、傳輸的可靠性,以及對于舊數據的處理方式。
目標
學完本章后,您將能夠:
掌握消息系統的基本概念掌握Kafka系統架構
一、kafka簡介
Kafka是最初由Linkedin公司開發,是一個分布式、分區的、多副本的、多訂閱者,基于zookeeper協調的分布式日志系統。
主要應用場景是:目志收集系統和消息系統。
分布式消息傳遞基于可靠的消息隊列,在客戶端應用和消息系統之間異步傳遞消息。有兩種主要的消息傳遞模式:點對點傳遞模式、發布-訂閱模式。大部分的消息系統選用發布-訂閱模式。Kafka就是一種發布-訂閱模式。
點對點消息傳遞模式
在點對點消息系統中,消息持久化到一個隊列中。此時,將有一個或多個消費者消費隊列中的數據』但是一條消息只能被消費一次。當-個消費者消費了隊列中的某條數據之后,該條數據則從消息隊列中刪除。該模式即使有多個消費者同時消費數據,也能保證數據處理的順序。生產者發送一條消息到queue,只有一個消費者能收到。
點對點的特點:
1.每個消息只有一個接收者(Consumer)(即一旦被消費,消息就不再在消息隊列中);
2.發送者和接收者間沒有依賴性,發送者發送消息之后,不管有沒有接收者在運行,都不會影響到發送者下次發送消息;
3.接收者在成功接收消息之后需向隊列應答成功,以便消息隊列刪除當前接收的消息;
發布-訂閱消息傳遞模式
在發布-訂閱消息系統中,消息被持久化到一個topic中。與點對點消息系統不同的是,消費者可以訂閱一個或多個topic,消費者可以消費該topic中所有的數據,同一條數據可以被多個消費者消費,數據被消費后不會立馬刪除。在發布-訂閱消息系統中,消息的生產者稱為發布者,消費者稱為訂閱者。發布者發送到topic的消息,只有訂閱了topic的訂閱者才會收到消息。
發布/訂閱模式特點:
1.每個消息可以有多個訂閱者;
2.發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的消息。
3.為了消費消息,訂閱者需要提前訂閱該角色主題,并保持在線運行;
Kafka特點
以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間的訪問性能。
高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條消息的傳輸。
支持Kafka Server間的消息分區,及分布式消費,同時保證每個partition內的消息順序傳輸。
同時支持離線數據處理和實時數據處理。
Scale out:支持在線水平擴展
高吞吐量:Kafka 每秒可以生產約 25 萬消息(50 MB),每秒處理 55 萬消息(110 MB)
持久化數據存儲:可進行持久化操作。將消息持久化到磁盤,因此可用于批量消費,例如 ETL,以及實時應用程序。通過將數據持久化到硬盤以及replication 防止數據丟失。
分布式系統易于擴展:所有的 producer、broker 和 consumer 都會有多個,均為分布式的。無需停機即可擴展機器。
客戶端狀態維護:消息被處理的狀態是在 consumer 端維護,而不是由 server 端維護。當失敗時能自動平衡。
kafka支持的在線水平擴展,當kafka的性能不能夠滿足目前業務計算能力的需求,或者是存儲能力的需求,我們可以給kafka去增加一些節點,也就是說熱插拔節點能夠去提升它的水平的擴展能力、水平的服務能力。
二、kafka架構與功能
kafka拓撲結構圖
kafka可以包含多個Push,Push就是生產者可以包含多個Broker。Broker實際上是kafka的實例,可以認為是一個軟件運行起來的進程。kafka它可以包含了多個消費者,并且kafka還需要使用的Zookeeper來進行通信一致性協調服務。
Broker: 最基本的組件,Kafka集群包含一個或多個服務實例,這些服務實例被稱為Broker。
Topic:每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic.
比如說我們針對了日志系統,我們可以去劃分不同的日志的類型。比如說有一些日志屬于是數據庫的日志,有些日志是網站日志,我們都可以在kafka上面進行存儲,
去命名不同的Topic,但Topic的名字可以是自身去命名的。
Partition: Kafka將Topic分成一個或者多個Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息。
通常一個topic包含了大量的消息,這些消息我們存儲的時候,我們可以對它進行分區,也就是Partition。Partition需要進行落盤存儲,每個Partition都會在kafka的某個相關目錄創建文件夾,這個文件夾存儲Partition的所有消息。
Producer:負責發布消息到Kafka Broker。
Consumer:消息消費者,從Kafka Broker讀取消息的客戶端。
Consumer Group:每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name )。
Kafka Topics
每條發布到kafka的消息都有一個類別,這個類別被稱為Topic,也可以理解為一個存儲消息的隊列。所有的消息都已topic作為單位進行歸類。例如:天氣作為一個Topic,每天的溫度消息就可以存儲在“天氣”這個隊列里。
Kafka Partition
為了提高Kafka的吞吐量,物理上把Topic分成一個或多個Partition,每個Partition都是有序且不可變的消息隊列。每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。
在多分區的情況下,需要有自己的一套機制來如何來把消息寫入到相應的分區?
一般情況下有三種情況,一種情況是指定了分區的編號,可以直接寫入到相應的分區。如果說沒有指定分區的編號。這個消息有k值、有鍵值,就會根據哈希算法,把相同鍵值的消息放到同一個分區,當然鍵值不同,可能會放到跟一些鍵值相同的消息在同一個分區,也可能放到不同分區,是由哈希算法來決定的。如果說不滿足前兩種情況啊,默認情況,它可以去采用隨機的方式,隨機的放入到一個分區中。
Kafka Partition offset
每條消息在文件中的位置稱為offset(偏移量),offset是一個long型數字,它唯一標記一條消息。消費者通過(offset、partition、topic)跟蹤記錄。
如圖所示,消費消息的時候它是有順序的,它實際上是根據這個offset決定消費哪一條消息。一般情況下它會如果是新的一個連接讀取里面數據,它會消費就是offset的下一條數據。
offset存儲機制
Consumer在從broker讀取消息后,可以選擇commit,該操作會在Kakfa中保存該Consumer在該Partition中讀取的消息的offset。該Consumer下一次再讀該Partition時會從下一條開始讀取。
通過這一特性可以保證同一消費者從Kafka中不會重復消費數據。
消費者group位移保存在_consumer_offsets的目錄上:
計算公式: Math.abs(grouplD.hashCode()) % 50
kafka-logs目錄,里面有多個目錄,因為kafka默認會生成50個__consumer_offsets-n目錄
Consumer group
每個consumer都屬于一個consumer group,每條消息只能被consumer group中的一個Consumer消費,但可以被多個consumer group消費。即組間數據是共享的,,組內數據是競爭的。consumer group是kafka提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內必然可以有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內的所有消費者協調在一起來消費訂閱主題(subscribed topics)的所有分區(partition)。當然,每個分區只能由同一個消費組內的一個consumer來消費。
Kafka其他重要概念
replica:partition的副本,保障partition的高可用。
leader:replica中的一個角色,producer和consumer只跟leader交互。
follower:replica中的一個角色,從leader中復制數據。
controler:kafka集群中的其中一個服務器,用來進行leader election以及各種failover
三、kafka數據管理
Kafka Partition Replica
kafka是一種高可靠性以及高效率的發布訂閱消息和管理信息系統。kafka它如何去做到數據存儲的可靠性,以及我們數據消費的高吞吐量呢?這是kafka的數據備份機制。如圖所示,這里面有Broker1、Broker2、Broker3、Broker4,4個Broker,每個Broker里面又分別存儲了不同的分區。假如說我們說目前一個topic有了3個分區,大家看一看這3個分區是怎么存儲的。我們說在Broker1里面有Partition-0,備份存儲在了Broker2中,Partition-1存儲在Broker2中,備份存儲在Broker3中。
每一個分區它的備份的數量,我們可以在kafka的配置文件中進行設置。我們這個圖來演示的是每個分區是兩個備份,那采用分區它有什么好處呢?采用分區的備份機制有什么好處?它的好處就是當一個分區失效的時候,它可以用其他的分區提供了服務,保證了數據的可靠性。另外我們提供服務的分區稱為了leader,出于了備用的分區呢稱為follower。如果說當提供服務的分區發生了故障,我們會選舉處于備用的分區做為的leader繼續提供服務。同時為了提高了kafka集群的吞吐量,我們會把不同的分區放在了不同的Broker上,也就是節點上,并且不同的節點都可以去充當了某個分區的leader。因此我們去讀取數據的時候、消費數據的時候,我們就不會只訪問某一個節點,這樣可以去提高了kafka的高吞吐量。
我們看一看kafka的分區是如何來進行備份的,如上圖所示,生產者處于leader的分區寫入了數據,commit后,我們的follower Partition就會從Leader分區中拉取相應的分析數據。那Leader和Follower是怎么樣去拉取這個分區數據呢?它會產生了一個線程,這個線程叫Replica Fetcher Thread這個線程,然后從里面去拉取。采用這種方式可以去提高了kafka的高吞吐量。
Kafka HA
同一個partition可能會有多個replica (對應server.properties配置中的default.replication.factor=N )。
沒有replica的情況下,一旦broker宕機,其上所有patition的數據都不可被消費,同時producer也不能再將數據存于其上的 patition。
引入replication之后,同一個partition可能會有多個replica,而這時需要在這些replica之間選出一個leader,producer和consumer只與這個leader交互,其它replica作為follower從leader中復制數據。
Leader Failover
當partition對應的leader宕機時,需要從follower中選舉出新leader。在選舉新leader時,一個基本原則是,新的leader必須擁有舊leader commit過的所有消息。由寫入流程可知ISR里面的所有replica都跟上了leader,只有ISR里面的成員才能選為leader。
對于f+1個replica,partition可以在容忍f個replica失效的情況下保證消息不丟失。
當所有replica都不工作時,有兩種可行的方案:
等待ISR中的任一個replica活過來,并選它作為leader??杀U蠑祿粊G失,但時間可能相對較長。
選擇第一個活過來的replica(不一定是ISR成員)作為leader。無法保障數據不丟失,但相對不可用時間較短。
Kafka數據可靠性
Kafka所有消息都會被持久化到硬盤中,同時Kafka通過對Topic Partition設置Replication來保障數據可靠。
那么,在消息傳輸過程中有沒有可靠性保證呢?
消息傳輸語義是消息傳輸保證的依據
消息傳輸保障通常有以下三種:
最多一次(At Most Once):消息可能丟失。消息不會重復發送和處理。
最少一次(At Lease Once):消息不會丟失。消息可能會重復發送和處理。
僅有一次(Exactly Once):消息不會丟失。消息僅被處理一次。
可靠性保證–冪等性
一個冪等性的操作就是一種被執行多次造成的影響和只執行一次造成的影響一樣。
原理:
每發送到Kafka的消息都將包含一個序列號,broker將使用這個序列號來刪除重復數據
這個序列號被持久化到副本日志,所以,即使分區的leader掛了,其他的broker接管了leader,新leader仍可以判斷重新發送的是否重復了。
這種機制的開銷非常低:每批消息只有幾個額外的字段。
可靠性保證– acks機制
producer需要server接收到數據之后發出的確認接收的信號,此項配置就是指procuder需要多少個這樣的確認信號。此配置實際上代表了數據備份的可用性。以下設置為常用選項:
acks=0:設置為0表示producer不需要等待任何確認收到的信息。副本將立即加到socket buffer并認為已經發送。沒有任何保障可以保證此種情況下server已經成功接收數據,同時重試配置不會發生作用(因為客戶端不知道是否失敗)回饋的offset會總是設置為-1;
acks=1:這意味著至少要等待leader已經成功將數據寫入本地log,但是并沒有等待所有follower是否成功寫入。這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失。
acks=all:這意味著leader需要等待所有備份都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的保證。
舊數據處理方式
Kafka把Topic中一個Parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經消費完文件,減少磁盤占用。
對于傳統的message queue而言,一般會刪除已經被消費的消息,而Kafka集群會保留所有的消息,無論其被消費與否。當然,因為磁盤限制,不可能永久保留所有數據(實際上也沒必要),因此Kafka需要處理舊數據。
配置位置:$KAFKA_HOME/config/server.properties
Kafka Log Cleanupess
日志的清理方式有兩種: delete和compact。
刪除的閾值有兩種:過期的時間和分區內總日志大小。
在Kafka中,存在數據過期的機制,稱為data expire。如何處理過期數據是根據指定的policy(策略)決定的,而處理過期數據的行為,即為log cleanup。
Kafka Log Compact
壓縮前的原日志文件,原日志文件它存儲了偏移量,同時也存儲了它的鍵以及值,我們進行壓縮的時候,我們就把具有相同鍵的數據,也就是消息只保留一條,而多余的都給刪除掉。那它具體選擇哪一條,它肯定是要選擇最新的一條。那怎么樣知道是最新的呢?我們根據它的偏移量就可以知道了哪一條數據、哪條信息是最新的。對于每一個kafka partition的日志,以segment為單位,都會被分為兩部分,已清理和未清理的部分。同時,未清理的那部分又分為可以清理的和不可清理的。
總結
本章主要介紹了消息系統的基本概念和Kafka的應用場景,以及Kafka的系統架構和數據管理的內容。
本文整理自華為云社區【內容共創系列】活動。
查看活動詳情:https://bbs.huaweicloud.com/blogs/314887
相關任務詳情:Kafka分布式消息訂閱系統
Kafka 分布式
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。