Kafka【入門】看這一篇就夠了!
前言:在之前的文章里面已經(jīng)了解到了「消息隊(duì)列」是怎么樣的一種存在(傳送門),Kafka 作為當(dāng)下流行的一種中間件,我們現(xiàn)在開始學(xué)習(xí)它!

一、Kafka 簡(jiǎn)介
Kafka 創(chuàng)建背景
Kafka?是一個(gè)消息系統(tǒng),原本開發(fā)自 LinkedIn,用作 LinkedIn 的活動(dòng)流(Activity Stream)和運(yùn)營(yíng)數(shù)據(jù)處理管道(Pipeline)的基礎(chǔ)。現(xiàn)在它已被多家不同類型的公司 作為多種類型的數(shù)據(jù)管道和消息系統(tǒng)使用。
活動(dòng)流數(shù)據(jù)是幾乎所有站點(diǎn)在對(duì)其網(wǎng)站使用情況做報(bào)表時(shí)都要用到的數(shù)據(jù)中最常規(guī)的部分。活動(dòng)數(shù)據(jù)包括頁面訪問量(Page View)、被查看內(nèi)容方面的信息以及搜索情況等內(nèi)容。這種數(shù)據(jù)通常的處理方式是先把各種活動(dòng)以日志的形式寫入某種文件,然后周期性地對(duì)這些文件進(jìn)行統(tǒng)計(jì)分析。運(yùn)營(yíng)數(shù)據(jù)指的是服務(wù)器的性能數(shù)據(jù)(CPU、IO 使用率、請(qǐng)求時(shí)間、服務(wù)日志等等數(shù)據(jù))。運(yùn)營(yíng)數(shù)據(jù)的統(tǒng)計(jì)方法種類繁多。
近年來,活動(dòng)和運(yùn)營(yíng)數(shù)據(jù)處理已經(jīng)成為了網(wǎng)站軟件產(chǎn)品特性中一個(gè)至關(guān)重要的組成部分,這就需要一套稍微更加復(fù)雜的基礎(chǔ)設(shè)施對(duì)其提供支持。
Kafka 簡(jiǎn)介
Kafka 是一種分布式的,基于發(fā)布 / 訂閱的消息系統(tǒng)。主要設(shè)計(jì)目標(biāo)如下:
以時(shí)間復(fù)雜度為 O(1) 的方式提供消息持久化能力,即使對(duì) TB 級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間復(fù)雜度的訪問性能。
高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒 100K 條以上消息的傳輸。
支持 Kafka Server 間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè) Partition 內(nèi)的消息順序傳輸。
同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。
Scale out:支持在線水平擴(kuò)展。
Kafka 基礎(chǔ)概念
概念一:生產(chǎn)者與消費(fèi)者
對(duì)于 Kafka 來說客戶端有兩種基本類型:生產(chǎn)者(Producer)和消費(fèi)者(Consumer)。除此之外,還有用來做數(shù)據(jù)集成的 Kafka Connect API 和流式處理的 Kafka Streams 等高階客戶端,但這些高階客戶端底層仍然是生產(chǎn)者和消費(fèi)者API,它們只不過是在上層做了封裝。
這很容易理解,生產(chǎn)者(也稱為發(fā)布者)創(chuàng)建消息,而消費(fèi)者(也稱為訂閱者)負(fù)責(zé)消費(fèi)or讀取消息。
概念二:主題(Topic)與分區(qū)(Partition)
在 Kafka 中,消息以主題(Topic)來分類,每一個(gè)主題都對(duì)應(yīng)一個(gè)「消息隊(duì)列」,這有點(diǎn)兒類似于數(shù)據(jù)庫(kù)中的表。但是如果我們把所有同類的消息都塞入到一個(gè)“中心”隊(duì)列中,勢(shì)必缺少可伸縮性,無論是生產(chǎn)者/消費(fèi)者數(shù)目的增加,還是消息數(shù)量的增加,都可能耗盡系統(tǒng)的性能或存儲(chǔ)。
我們使用一個(gè)生活中的例子來說明:現(xiàn)在 A 城市生產(chǎn)的某商品需要運(yùn)輸?shù)?B 城市,走的是公路,那么單通道的高速公路不論是在「A 城市商品增多」還是「現(xiàn)在 C 城市也要往 B 城市運(yùn)輸東西」這樣的情況下都會(huì)出現(xiàn)「吞吐量不足」的問題。所以我們現(xiàn)在引入分區(qū)(Partition)的概念,類似“允許多修幾條道”的方式對(duì)我們的主題完成了水平擴(kuò)展。
概念三:Broker 和集群(Cluster)
一個(gè) Kafka 服務(wù)器也稱為 Broker,它接受生產(chǎn)者發(fā)送的消息并存入磁盤;Broker 同時(shí)服務(wù)消費(fèi)者拉取分區(qū)消息的請(qǐng)求,返回目前已經(jīng)提交的消息。使用特定的機(jī)器硬件,一個(gè) Broker 每秒可以處理成千上萬的分區(qū)和百萬量級(jí)的消息。(現(xiàn)在動(dòng)不動(dòng)就百萬量級(jí)..我特地去查了一把,好像確實(shí)集群的情況下吞吐量挺高的..摁..)
若干個(gè) Broker 組成一個(gè)集群(Cluster),其中集群內(nèi)某個(gè) Broker 會(huì)成為集群控制器(Cluster Controller),它負(fù)責(zé)管理集群,包括分配分區(qū)到 Broker、監(jiān)控 Broker 故障等。在集群內(nèi),一個(gè)分區(qū)由一個(gè) Broker 負(fù)責(zé),這個(gè) Broker 也稱為這個(gè)分區(qū)的 Leader;當(dāng)然一個(gè)分區(qū)可以被復(fù)制到多個(gè) Broker 上來實(shí)現(xiàn)冗余,這樣當(dāng)存在 Broker 故障時(shí)可以將其分區(qū)重新分配到其他 Broker 來負(fù)責(zé)。下圖是一個(gè)樣例:
Kafka 的一個(gè)關(guān)鍵性質(zhì)是日志保留(retention),我們可以配置主題的消息保留策略,譬如只保留一段時(shí)間的日志或者只保留特定大小的日志。當(dāng)超過這些限制時(shí),老的消息會(huì)被刪除。我們也可以針對(duì)某個(gè)主題單獨(dú)設(shè)置消息過期策略,這樣對(duì)于不同應(yīng)用可以實(shí)現(xiàn)個(gè)性化。
概念四:多集群
隨著業(yè)務(wù)發(fā)展,我們往往需要多集群,通常處于下面幾個(gè)原因:
基于數(shù)據(jù)的隔離;
基于安全的隔離;
多數(shù)據(jù)中心(容災(zāi))
當(dāng)構(gòu)建多個(gè)數(shù)據(jù)中心時(shí),往往需要實(shí)現(xiàn)消息互通。舉個(gè)例子,假如用戶修改了個(gè)人資料,那么后續(xù)的請(qǐng)求無論被哪個(gè)數(shù)據(jù)中心處理,這個(gè)更新需要反映出來。又或者,多個(gè)數(shù)據(jù)中心的數(shù)據(jù)需要匯總到一個(gè)總控中心來做數(shù)據(jù)分析。
上面說的分區(qū)復(fù)制冗余機(jī)制只適用于同一個(gè) Kafka 集群內(nèi)部,對(duì)于多個(gè) Kafka 集群消息同步可以使用 Kafka 提供的 MirrorMaker 工具。本質(zhì)上來說,MirrorMaker 只是一個(gè) Kafka 消費(fèi)者和生產(chǎn)者,并使用一個(gè)隊(duì)列連接起來而已。它從一個(gè)集群中消費(fèi)消息,然后往另一個(gè)集群生產(chǎn)消息。
二、Kafka 的設(shè)計(jì)與實(shí)現(xiàn)
上面我們知道了 Kafka 中的一些基本概念,但作為一個(gè)成熟的「消息隊(duì)列」中間件,其中有許多有意思的設(shè)計(jì)值得我們思考,下面我們簡(jiǎn)單列舉一些。
討論一:Kafka 存儲(chǔ)在文件系統(tǒng)上
是的,您首先應(yīng)該知道 Kafka 的消息是存在于文件系統(tǒng)之上的。Kafka 高度依賴文件系統(tǒng)來存儲(chǔ)和緩存消息,一般的人認(rèn)為 “磁盤是緩慢的”,所以對(duì)這樣的設(shè)計(jì)持有懷疑態(tài)度。實(shí)際上,磁盤比人們預(yù)想的快很多也慢很多,這取決于它們?nèi)绾伪皇褂茫灰粋€(gè)好的磁盤結(jié)構(gòu)設(shè)計(jì)可以使之跟網(wǎng)絡(luò)速度一樣快。
現(xiàn)代的操作系統(tǒng)針對(duì)磁盤的讀寫已經(jīng)做了一些優(yōu)化方案來加快磁盤的訪問速度。比如,預(yù)讀會(huì)提前將一個(gè)比較大的磁盤快讀入內(nèi)存。后寫會(huì)將很多小的邏輯寫操作合并起來組合成一個(gè)大的物理寫操作。并且,操作系統(tǒng)還會(huì)將主內(nèi)存剩余的所有空閑內(nèi)存空間都用作磁盤緩存,所有的磁盤讀寫操作都會(huì)經(jīng)過統(tǒng)一的磁盤緩存(除了直接 I/O 會(huì)繞過磁盤緩存)。綜合這幾點(diǎn)優(yōu)化特點(diǎn),如果是針對(duì)磁盤的順序訪問,某些情況下它可能比隨機(jī)的內(nèi)存訪問都要快,甚至可以和網(wǎng)絡(luò)的速度相差無幾。
上述的 Topic 其實(shí)是邏輯上的概念,面相消費(fèi)者和生產(chǎn)者,物理上存儲(chǔ)的其實(shí)是 Partition,每一個(gè) Partition 最終對(duì)應(yīng)一個(gè)目錄,里面存儲(chǔ)所有的消息和索引文件。默認(rèn)情況下,每一個(gè) Topic 在創(chuàng)建時(shí)如果不指定 Partition 數(shù)量時(shí)只會(huì)創(chuàng)建 1 個(gè) Partition。比如,我創(chuàng)建了一個(gè) Topic 名字為 test ,沒有指定 Partition 的數(shù)量,那么會(huì)默認(rèn)創(chuàng)建一個(gè) test-0 的文件夾,這里的命名規(guī)則是:
任何發(fā)布到 Partition 的消息都會(huì)被追加到 Partition 數(shù)據(jù)文件的尾部,這樣的順序?qū)懘疟P操作讓 Kafka 的效率非常高(經(jīng)驗(yàn)證,順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存還要高,這是 Kafka 高吞吐率的一個(gè)很重要的保證)。
每一條消息被發(fā)送到 Broker 中,會(huì)根據(jù) Partition 規(guī)則選擇被存儲(chǔ)到哪一個(gè) Partition。如果 Partition 規(guī)則設(shè)置的合理,所有消息可以均勻分布到不同的 Partition中。
討論二:Kafka 中的底層存儲(chǔ)設(shè)計(jì)
假設(shè)我們現(xiàn)在 Kafka 集群只有一個(gè) Broker,我們創(chuàng)建 2 個(gè) Topic 名稱分別為:「topic1」和「topic2」,Partition 數(shù)量分別為 1、2,那么我們的根目錄下就會(huì)創(chuàng)建如下三個(gè)文件夾:
在 Kafka 的文件存儲(chǔ)中,同一個(gè) Topic 下有多個(gè)不同的 Partition,每個(gè) Partition 都為一個(gè)目錄,而每一個(gè)目錄又被平均分配成多個(gè)大小相等的?Segment File?中,Segment File 又由 index file 和 data file 組成,他們總是成對(duì)出現(xiàn),后綴 ".index" 和 ".log" 分表表示 Segment 索引文件和數(shù)據(jù)文件。
現(xiàn)在假設(shè)我們?cè)O(shè)置每個(gè) Segment 大小為 500 MB,并啟動(dòng)生產(chǎn)者向 topic1 中寫入大量數(shù)據(jù),topic1-0 文件夾中就會(huì)產(chǎn)生類似如下的一些文件:
Segment 是 Kafka 文件存儲(chǔ)的最小單位。Segment 文件命名規(guī)則:Partition 全局的第一個(gè) Segment 從 0 開始,后續(xù)每個(gè) Segment 文件名為上一個(gè) Segment 文件最后一條消息的 offset 值。數(shù)值最大為 64 位 long 大小,19 位數(shù)字字符長(zhǎng)度,沒有數(shù)字用0填充。如 00000000000000368769.index 和 00000000000000368769.log。
以上面的一對(duì) Segment File 為例,說明一下索引文件和數(shù)據(jù)文件對(duì)應(yīng)關(guān)系:
其中以索引文件中元數(shù)據(jù)?<3, 497>?為例,依次在數(shù)據(jù)文件中表示第 3 個(gè) message(在全局 Partition 表示第 368769 + 3 = 368772 個(gè) message)以及該消息的物理偏移地址為 497。
注意該 index 文件并不是從0開始,也不是每次遞增1的,這是因?yàn)?Kafka 采取稀疏索引存儲(chǔ)的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引,它減少了索引文件大小,使得能夠把 index 映射到內(nèi)存,降低了查詢時(shí)的磁盤 IO 開銷,同時(shí)也并沒有給查詢帶來太多的時(shí)間消耗。
因?yàn)槠湮募麨樯弦粋€(gè) Segment 最后一條消息的 offset ,所以當(dāng)需要查找一個(gè)指定 offset 的 message 時(shí),通過在所有 segment 的文件名中進(jìn)行二分查找就能找到它歸屬的 segment ,再在其 index 文件中找到其對(duì)應(yīng)到文件上的物理位置,就能拿出該 message 。
由于消息在 Partition 的 Segment 數(shù)據(jù)文件中是順序讀寫的,且消息消費(fèi)后不會(huì)刪除(刪除策略是針對(duì)過期的 Segment 文件),這種順序磁盤 IO 存儲(chǔ)設(shè)計(jì)師 Kafka 高性能很重要的原因。
Kafka 是如何準(zhǔn)確的知道 message 的偏移的呢?這是因?yàn)樵?Kafka 定義了標(biāo)準(zhǔn)的數(shù)據(jù)存儲(chǔ)結(jié)構(gòu),在 Partition 中的每一條 message 都包含了以下三個(gè)屬性:
offset:表示 message 在當(dāng)前 Partition 中的偏移量,是一個(gè)邏輯上的值,唯一確定了 Partition 中的一條 message,可以簡(jiǎn)單的認(rèn)為是一個(gè) id;
MessageSize:表示 message 內(nèi)容 data 的大小;
data:message 的具體內(nèi)容
討論三:生產(chǎn)者設(shè)計(jì)概要
當(dāng)我們發(fā)送消息之前,先問幾個(gè)問題:每條消息都是很關(guān)鍵且不能容忍丟失么?偶爾重復(fù)消息可以么?我們關(guān)注的是消息延遲還是寫入消息的吞吐量?
舉個(gè)例子,有一個(gè)信用卡交易處理系統(tǒng),當(dāng)交易發(fā)生時(shí)會(huì)發(fā)送一條消息到 Kafka,另一個(gè)服務(wù)來讀取消息并根據(jù)規(guī)則引擎來檢查交易是否通過,將結(jié)果通過 Kafka 返回。對(duì)于這樣的業(yè)務(wù),消息既不能丟失也不能重復(fù),由于交易量大因此吞吐量需要盡可能大,延遲可以稍微高一點(diǎn)。
再舉個(gè)例子,假如我們需要收集用戶在網(wǎng)頁上的點(diǎn)擊數(shù)據(jù),對(duì)于這樣的場(chǎng)景,少量消息丟失或者重復(fù)是可以容忍的,延遲多大都不重要只要不影響用戶體驗(yàn),吞吐則根據(jù)實(shí)時(shí)用戶數(shù)來決定。
不同的業(yè)務(wù)需要使用不同的寫入方式和配置。具體的方式我們?cè)谶@里不做討論,現(xiàn)在先看下生產(chǎn)者寫消息的基本流程:
圖片來源:http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/12/kafka-producer.html
流程如下:
首先,我們需要?jiǎng)?chuàng)建一個(gè)ProducerRecord,這個(gè)對(duì)象需要包含消息的主題(topic)和值(value),可以選擇性指定一個(gè)鍵值(key)或者分區(qū)(partition)。
發(fā)送消息時(shí),生產(chǎn)者會(huì)對(duì)鍵值和值序列化成字節(jié)數(shù)組,然后發(fā)送到分配器(partitioner)。
如果我們指定了分區(qū),那么分配器返回該分區(qū)即可;否則,分配器將會(huì)基于鍵值來選擇一個(gè)分區(qū)并返回。
選擇完分區(qū)后,生產(chǎn)者知道了消息所屬的主題和分區(qū),它將這條記錄添加到相同主題和分區(qū)的批量消息中,另一個(gè)線程負(fù)責(zé)發(fā)送這些批量消息到對(duì)應(yīng)的Kafka broker。
當(dāng)broker接收到消息后,如果成功寫入則返回一個(gè)包含消息的主題、分區(qū)及位移的RecordMetadata對(duì)象,否則返回異常。
生產(chǎn)者接收到結(jié)果后,對(duì)于異常可能會(huì)進(jìn)行重試。
討論四:消費(fèi)者設(shè)計(jì)概要
消費(fèi)者與消費(fèi)組
假設(shè)這么個(gè)場(chǎng)景:我們從Kafka中讀取消息,并且進(jìn)行檢查,最后產(chǎn)生結(jié)果數(shù)據(jù)。我們可以創(chuàng)建一個(gè)消費(fèi)者實(shí)例去做這件事情,但如果生產(chǎn)者寫入消息的速度比消費(fèi)者讀取的速度快怎么辦呢?這樣隨著時(shí)間增長(zhǎng),消息堆積越來越嚴(yán)重。對(duì)于這種場(chǎng)景,我們需要增加多個(gè)消費(fèi)者來進(jìn)行水平擴(kuò)展。
Kafka消費(fèi)者是消費(fèi)組的一部分,當(dāng)多個(gè)消費(fèi)者形成一個(gè)消費(fèi)組來消費(fèi)主題時(shí),每個(gè)消費(fèi)者會(huì)收到不同分區(qū)的消息。假設(shè)有一個(gè)T1主題,該主題有4個(gè)分區(qū);同時(shí)我們有一個(gè)消費(fèi)組G1,這個(gè)消費(fèi)組只有一個(gè)消費(fèi)者C1。那么消費(fèi)者C1將會(huì)收到這4個(gè)分區(qū)的消息,如下所示:
如果我們?cè)黾有碌南M(fèi)者C2到消費(fèi)組G1,那么每個(gè)消費(fèi)者將會(huì)分別收到兩個(gè)分區(qū)的消息,如下所示:
如果增加到4個(gè)消費(fèi)者,那么每個(gè)消費(fèi)者將會(huì)分別收到一個(gè)分區(qū)的消息,如下所示:
但如果我們繼續(xù)增加消費(fèi)者到這個(gè)消費(fèi)組,剩余的消費(fèi)者將會(huì)空閑,不會(huì)收到任何消息:
總而言之,我們可以通過增加消費(fèi)組的消費(fèi)者來進(jìn)行水平擴(kuò)展提升消費(fèi)能力。這也是為什么建議創(chuàng)建主題時(shí)使用比較多的分區(qū)數(shù),這樣可以在消費(fèi)負(fù)載高的情況下增加消費(fèi)者來提升性能。另外,消費(fèi)者的數(shù)量不應(yīng)該比分區(qū)數(shù)多,因?yàn)槎喑鰜淼南M(fèi)者是空閑的,沒有任何幫助。
Kafka一個(gè)很重要的特性就是,只需寫入一次消息,可以支持任意多的應(yīng)用讀取這個(gè)消息。換句話說,每個(gè)應(yīng)用都可以讀到全量的消息。為了使得每個(gè)應(yīng)用都能讀到全量消息,應(yīng)用需要有不同的消費(fèi)組。對(duì)于上面的例子,假如我們新增了一個(gè)新的消費(fèi)組G2,而這個(gè)消費(fèi)組有兩個(gè)消費(fèi)者,那么會(huì)是這樣的:
在這個(gè)場(chǎng)景中,消費(fèi)組G1和消費(fèi)組G2都能收到T1主題的全量消息,在邏輯意義上來說它們屬于不同的應(yīng)用。
最后,總結(jié)起來就是:如果應(yīng)用需要讀取全量消息,那么請(qǐng)為該應(yīng)用設(shè)置一個(gè)消費(fèi)組;如果該應(yīng)用消費(fèi)能力不足,那么可以考慮在這個(gè)消費(fèi)組里增加消費(fèi)者。
消費(fèi)組與分區(qū)重平衡
可以看到,當(dāng)新的消費(fèi)者加入消費(fèi)組,它會(huì)消費(fèi)一個(gè)或多個(gè)分區(qū),而這些分區(qū)之前是由其他消費(fèi)者負(fù)責(zé)的;另外,當(dāng)消費(fèi)者離開消費(fèi)組(比如重啟、宕機(jī)等)時(shí),它所消費(fèi)的分區(qū)會(huì)分配給其他分區(qū)。這種現(xiàn)象稱為重平衡(rebalance)。重平衡是 Kafka 一個(gè)很重要的性質(zhì),這個(gè)性質(zhì)保證了高可用和水平擴(kuò)展。不過也需要注意到,在重平衡期間,所有消費(fèi)者都不能消費(fèi)消息,因此會(huì)造成整個(gè)消費(fèi)組短暫的不可用。而且,將分區(qū)進(jìn)行重平衡也會(huì)導(dǎo)致原來的消費(fèi)者狀態(tài)過期,從而導(dǎo)致消費(fèi)者需要重新更新狀態(tài),這段期間也會(huì)降低消費(fèi)性能。后面我們會(huì)討論如何安全的進(jìn)行重平衡以及如何盡可能避免。
消費(fèi)者通過定期發(fā)送心跳(hearbeat)到一個(gè)作為組協(xié)調(diào)者(group coordinator)的 broker 來保持在消費(fèi)組內(nèi)存活。這個(gè) broker 不是固定的,每個(gè)消費(fèi)組都可能不同。當(dāng)消費(fèi)者拉取消息或者提交時(shí),便會(huì)發(fā)送心跳。
如果消費(fèi)者超過一定時(shí)間沒有發(fā)送心跳,那么它的會(huì)話(session)就會(huì)過期,組協(xié)調(diào)者會(huì)認(rèn)為該消費(fèi)者已經(jīng)宕機(jī),然后觸發(fā)重平衡。可以看到,從消費(fèi)者宕機(jī)到會(huì)話過期是有一定時(shí)間的,這段時(shí)間內(nèi)該消費(fèi)者的分區(qū)都不能進(jìn)行消息消費(fèi);通常情況下,我們可以進(jìn)行優(yōu)雅關(guān)閉,這樣消費(fèi)者會(huì)發(fā)送離開的消息到組協(xié)調(diào)者,這樣組協(xié)調(diào)者可以立即進(jìn)行重平衡而不需要等待會(huì)話過期。
在 0.10.1 版本,Kafka 對(duì)心跳機(jī)制進(jìn)行了修改,將發(fā)送心跳與拉取消息進(jìn)行分離,這樣使得發(fā)送心跳的頻率不受拉取的頻率影響。另外更高版本的 Kafka 支持配置一個(gè)消費(fèi)者多長(zhǎng)時(shí)間不拉取消息但仍然保持存活,這個(gè)配置可以避免活鎖(livelock)。活鎖,是指應(yīng)用沒有故障但是由于某些原因不能進(jìn)一步消費(fèi)。
Partition 與消費(fèi)模型
上面提到,Kafka 中一個(gè) topic 中的消息是被打散分配在多個(gè) Partition(分區(qū)) 中存儲(chǔ)的, Consumer Group 在消費(fèi)時(shí)需要從不同的 Partition 獲取消息,那最終如何重建出 Topic 中消息的順序呢?
答案是:沒有辦法。Kafka 只會(huì)保證在 Partition 內(nèi)消息是有序的,而不管全局的情況。
下一個(gè)問題是:Partition 中的消息可以被(不同的 Consumer Group)多次消費(fèi),那 Partition中被消費(fèi)的消息是何時(shí)刪除的?Partition 又是如何知道一個(gè) Consumer Group 當(dāng)前消費(fèi)的位置呢?
無論消息是否被消費(fèi),除非消息到期 Partition 從不刪除消息。例如設(shè)置保留時(shí)間為 2 天,則消息發(fā)布 2 天內(nèi)任何 Group 都可以消費(fèi),2 天后,消息自動(dòng)被刪除。
Partition 會(huì)為每個(gè) Consumer Group 保存一個(gè)偏移量,記錄 Group 消費(fèi)到的位置。如下圖:
為什么 ?Kafka 是 pull 模型
消費(fèi)者應(yīng)該向 Broker 要數(shù)據(jù)(pull)還是 Broker 向消費(fèi)者推送數(shù)據(jù)(push)?作為一個(gè)消息系統(tǒng),Kafka 遵循了傳統(tǒng)的方式,選擇由 Producer 向 broker push 消息并由 Consumer 從 broker pull 消息。一些 logging-centric system,比如 Facebook 的Scribe和 Cloudera 的Flume,采用 push 模式。事實(shí)上,push 模式和 pull 模式各有優(yōu)劣。
push 模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由 broker 決定的。push 模式的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成 Consumer 來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 pull 模式則可以根據(jù) Consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
對(duì)于 Kafka 而言,pull 模式更合適。pull 模式可簡(jiǎn)化 broker 的設(shè)計(jì),Consumer 可自主控制消費(fèi)消息的速率,同時(shí) Consumer 可以自己控制消費(fèi)方式——即可批量消費(fèi)也可逐條消費(fèi),同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語義。
討論五:Kafka 如何保證可靠性
當(dāng)我們討論可靠性的時(shí)候,我們總會(huì)提到保證*這個(gè)詞語。可靠性保證是基礎(chǔ),我們基于這些基礎(chǔ)之上構(gòu)建我們的應(yīng)用。比如關(guān)系型數(shù)據(jù)庫(kù)的可靠性保證是ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability)。
Kafka 中的可靠性保證有如下四點(diǎn):
對(duì)于一個(gè)分區(qū)來說,它的消息是有序的。如果一個(gè)生產(chǎn)者向一個(gè)分區(qū)先寫入消息A,然后寫入消息B,那么消費(fèi)者會(huì)先讀取消息A再讀取消息B。
當(dāng)消息寫入所有in-sync狀態(tài)的副本后,消息才會(huì)認(rèn)為已提交(committed)。這里的寫入有可能只是寫入到文件系統(tǒng)的緩存,不一定刷新到磁盤。生產(chǎn)者可以等待不同時(shí)機(jī)的確認(rèn),比如等待分區(qū)主副本寫入即返回,后者等待所有in-sync狀態(tài)副本寫入才返回。
一旦消息已提交,那么只要有一個(gè)副本存活,數(shù)據(jù)不會(huì)丟失。
消費(fèi)者只能讀取到已提交的消息。
使用這些基礎(chǔ)保證,我們構(gòu)建一個(gè)可靠的系統(tǒng),這時(shí)候需要考慮一個(gè)問題:究竟我們的應(yīng)用需要多大程度的可靠性?可靠性不是無償?shù)模c系統(tǒng)可用性、吞吐量、延遲和硬件價(jià)格息息相關(guān),得此失彼。因此,我們往往需要做權(quán)衡,一味的追求可靠性并不實(shí)際。
想了解更多戳這里:http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/21/kafka-data-delivery.html
三、動(dòng)手搭一個(gè) Kafka
通過上面的描述,我們已經(jīng)大致了解到了「Kafka」是何方神圣了,現(xiàn)在我們開始嘗試自己動(dòng)手本地搭一個(gè)來實(shí)際體驗(yàn)一把。
第一步:下載 Kafka
這里以 Mac OS 為例,在安裝了 Homebrew 的情況下執(zhí)行下列代碼:
由于 Kafka 依賴了 Zookeeper,所以在下載的時(shí)候會(huì)自動(dòng)下載。
第二步:?jiǎn)?dòng)服務(wù)
我們?cè)趩?dòng)之前首先需要修改 Kafka 的監(jiān)聽地址和端口為?localhost:9092:
然后修改成下圖的樣子:
依次啟動(dòng) Zookeeper 和 Kafka:
然后執(zhí)行下列語句來創(chuàng)建一個(gè)名字為 "test" 的 Topic:
我們可以通過下列的命令查看我們的 Topic 列表:
第三步:發(fā)送消息
然后我們新建一個(gè)控制臺(tái),運(yùn)行下列命令創(chuàng)建一個(gè)消費(fèi)者關(guān)注剛才創(chuàng)建的 Topic:
用控制臺(tái)往剛才創(chuàng)建的 Topic 中添加消息,并觀察剛才創(chuàng)建的消費(fèi)者窗口:
能通過消費(fèi)者窗口觀察到正確的消息:
參考資料
https://www.infoq.cn/article/kafka-analysis-part-1 - Kafka 設(shè)計(jì)解析(一):Kafka 背景及架構(gòu)介紹
http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/06/kafka-Meet-Kafka.html - Kafka系列(一)初識(shí)Kafka
https://lotabout.me/2018/kafka-introduction/ - Kafka 入門介紹
https://www.zhihu.com/question/28925721 - Kafka 中的 Topic 為什么要進(jìn)行分區(qū)? - 知乎
https://blog.joway.io/posts/kafka-design-practice/ - Kafka 的設(shè)計(jì)與實(shí)踐思考
http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/21/kafka-data-delivery.html - Kafka系列(六)可靠的數(shù)據(jù)傳輸
本文轉(zhuǎn)載自微信公眾號(hào)JavaGuide
Kafka 存儲(chǔ) 大數(shù)據(jù)
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。