PHP如何解決網(wǎng)站大流量與高并發(fā)的問題(二)
686
2025-03-31
本文主要內(nèi)容
考察Kafka架構(gòu)。
生產(chǎn)者發(fā)送消息。
消費者讀取消息。
Kafka安裝與運行。
雖然這是一本關(guān)于Kafka Streams的書,但是要研究Kafka Streams不可能不探討Kafka,畢竟,Kafka Streams是一個運行在Kafka之上的庫。
Kafka Streams設(shè)計得非常好,因此即使具有很少或者零Kafka經(jīng)驗的人都可以啟動和運行Kafka Streams。但是,你所取得的進步和對Kafka調(diào)優(yōu)的能力將是有限的。掌握Kafka的基礎(chǔ)知識對有效使用Kafka Streams來說是必要的。
{注意}
本章面向的讀者是對Kafka Streams有興趣,但對Kafka本身具有很少或零經(jīng)驗的開發(fā)者。如果讀者對Kafka具備很好的應(yīng)用知識,那么就可以跳過本章,直接閱讀第3章。
Kafka是一個很大的話題,很難通過一章進行完整論述。本章將會覆蓋足以使讀者很好地理解Kafka的工作原理和一些核心配置項設(shè)置的必備知識。要想更深入了解Kafka的知識,請看Dylan Scott寫的_Kafka in Action_(Manning,2018)
1 數(shù)據(jù)問題
如今,各組織都在研究數(shù)據(jù)。互聯(lián)網(wǎng)公司、金融企業(yè)以及大型零售商現(xiàn)在比以往任何時候都更善于利用這些數(shù)據(jù)。通過利用數(shù)據(jù),既能更好地服務(wù)于客戶,又能找到更有效的經(jīng)營方式(我們要對這種情況持積極態(tài)度,并且在看待客戶數(shù)據(jù)時要從好的意圖出發(fā))。
讓我們考慮一下在ZMart數(shù)據(jù)管理解決方案中的各種需求。
需要一種將數(shù)據(jù)快速發(fā)送到中央存儲的方法。
由于服務(wù)器經(jīng)常發(fā)生故障,這就需要復(fù)制數(shù)據(jù)的能力,有了這種能力,不可避免的故障就不會導(dǎo)致停機和數(shù)據(jù)丟失。
需要能夠擴展到任意數(shù)量消費者的數(shù)據(jù),而不必跟蹤不同的應(yīng)用程序。需要讓組織中的任何人都能使用這些數(shù)據(jù),而不必跟蹤哪些人已經(jīng)查看了數(shù)據(jù),哪些人還沒有查看。
2 使用Kafka處理數(shù)據(jù)
在第1章中,已介紹過大型零售公司ZMart。那時,ZMart需要一個流式處理平臺來利用公司的銷售數(shù)據(jù),以便更好地提供客戶服務(wù)并提升銷售總額。但在那時的6個月前,ZMart期待了解它的數(shù)據(jù)情況,ZMart最初有一個定制的非常有效的解決方案,但是很快就發(fā)現(xiàn)該解決方案變得難以駕馭了,接下來將看到其原因。
2.1 ZMart原始的數(shù)據(jù)平臺
最初,ZMart是一家小公司,零售銷售數(shù)據(jù)從各分離的應(yīng)用程序流入系統(tǒng)。這種方法起初效果還是不錯的,但隨著時間的推移,顯然需要一種新的方法。一個部門的銷售數(shù)據(jù)不再只是該部門所感興趣的,公司的其他部門也可能感興趣,并且不同的部門對數(shù)據(jù)的重要性和數(shù)據(jù)結(jié)構(gòu)都有不同的需求。圖2-1展示了ZMart原始的數(shù)據(jù)平臺。
圖2-1 ZMart原始數(shù)據(jù)架構(gòu)簡單,足夠使每個信息源流入和流出信息
隨著時間的推移,ZMart通過收購其他公司以及擴大其現(xiàn)有商店的產(chǎn)品而持續(xù)增長。隨著應(yīng)用程序的添加,應(yīng)用程序之間的連接變得更加復(fù)雜,由最初的少量的應(yīng)用程序之間的通信演變成了一堆名副其實的意大利面條。如圖2-2所示,即使只有3個應(yīng)用程序,連接的數(shù)量也很煩瑣且令人困惑。可以看到,隨著時間的推移,添加新的應(yīng)用程序?qū)⑹惯@種數(shù)據(jù)架構(gòu)變得難以管理。
圖2-2 隨著時間的推移,越來越多的應(yīng)用程序被添加進來,連接所有這些信息源變得非常復(fù)雜
2.2 一個Kafka銷售交易數(shù)據(jù)中心
一個解決ZMart問題的方案是創(chuàng)建一個接收進程來控制所有的交易數(shù)據(jù),即建立一個交易數(shù)據(jù)中心。這個交易數(shù)據(jù)中心應(yīng)該是無狀態(tài)的,它以一種方式接受交易數(shù)據(jù)并存儲,這種方式是任何消費應(yīng)用程序可以根據(jù)自己的需要從數(shù)據(jù)中心提取信息。對哪些數(shù)據(jù)的追蹤取決于消費應(yīng)用程序,交易數(shù)據(jù)中心只知道需要將交易數(shù)據(jù)保存多久,以及在什么時候切分或刪除這些數(shù)據(jù)。
也許你還沒有猜到,我們有Kafka完美的用例。Kafka是一個具有容錯能力、健壯的發(fā)布/訂閱系統(tǒng)。一個Kafka節(jié)點被稱為一個代理,多個Kafka服務(wù)器組成一個集群。Kafka將生產(chǎn)者寫入的消息存儲在Kafka的主題之中,消費者訂閱Kafka主題,與Kafka進行通信以查看訂閱的主題是否有可用的消息。圖?2-3?展示了如何將Kafka想象為銷售交易數(shù)據(jù) 中心。
現(xiàn)在大家已經(jīng)對Kafka的概況有了大致的了解,在下面的幾節(jié)中將進行仔細研究。
圖2-3 使用Kafka作為銷售交易中心顯著簡化了ZMart數(shù)據(jù)架構(gòu),現(xiàn)在每臺服務(wù)器不需要知道其他的信息來源,它們只需要知道如何從Kafka讀取數(shù)據(jù)和將數(shù)據(jù)寫入Kafka
3 Kafka架構(gòu)
在接下來的幾個小節(jié)中,我們將介紹Kafka體系架構(gòu)的關(guān)鍵部分以及Kafka的工作原理。如果想盡早地體驗運行Kafka,可以直接跳到2.6節(jié),安裝和運行Kafka。等Kafka安裝之后,再回到這里來繼續(xù)學習Kafka。
3.1 Kafka是一個消息代理
在前一節(jié)中,我曾說過Kafka是一個發(fā)布/訂閱系統(tǒng),但更精確地說法是Kafka充當了消息代理。代理是一個中介,將進行互利交換或交易但不一定相互了解的兩部分匯聚在一起。圖2-4展示了ZMart數(shù)據(jù)架構(gòu)的演化。生產(chǎn)者和消費者被添加到圖中以展示各單獨部分如何與Kafka進行通信,它們之間不會直接進行通信。
Kafka將消息存儲在主題中,并從主題檢索消息。消息的生產(chǎn)者和消費者之間不會直接連接。此外,Kafka并不會保持有關(guān)生產(chǎn)者和消費者的任何狀態(tài),它僅作為一個消息交換中心。
Kafka主題底層的技術(shù)是日志,它是Kafka追加輸入記錄的文件。為了幫助管理進入主題的消息負載,Kafka使用分區(qū)。在第1章我們討論了分區(qū),大家可以回憶一下,分區(qū)的一個應(yīng)用是將位于不同服務(wù)器上的數(shù)據(jù)匯集到同一臺服務(wù)器上,稍后我們將詳細討論分區(qū)。
圖2-4 Kafka是一個消息代理,生產(chǎn)者將消息發(fā)送到Kafka,這些消息被存儲,并通過主題訂閱的方式提供給消費者
3.2 Kafka是一個日志
Kafka底層的機制就是日志。大多數(shù)軟件工程師都對日志很熟悉,日志用于記錄應(yīng)用程序正在做什么。如果在應(yīng)用程序中出現(xiàn)性能問題或者錯誤,首先檢查的是應(yīng)用程序的日志,但這是另一種類型的日志。在Kafka(或者其他分布式系統(tǒng))的上下文中,日志是“一種只能追加的,完全按照時間順序排列的記錄序列”[1]。
圖2-5展示了日志的樣子,當記錄到達時,應(yīng)用程序?qū)⑺鼈冏芳拥饺罩镜哪┪病S涗浻幸粋€隱含的時間順序,盡管有可能不是與每條記錄相關(guān)聯(lián)的時間戳,因為最早的記錄在左邊,后達到的記錄在右端。
日志是具有強大含義的簡單數(shù)據(jù)抽象,如果記錄按時間有序,解決沖突或確定將哪個更新應(yīng)用到不同的機器就變得明確了:最新記錄獲勝。
Kafka中的主題是按主題名稱分隔的日志,幾乎可以將主題視為有標簽的日志。如果日志在一個集群中有多個副本,那么當一臺服務(wù)器宕機后,就能夠很容易使服務(wù)器恢復(fù)正常:只需重放日志文件。從故障中恢復(fù)的能力正是分布式提交日志具有的。
圖2-5 日志是追加傳入記錄的文件——每條新到達的記錄都被立即放在接收到的最后一條記錄之后,這個過程按時間順序?qū)τ涗涍M行排序
我們只觸及了關(guān)于分布式應(yīng)用程序和數(shù)據(jù)一致性的深入話題的表面,但到目前為止所講解的知識應(yīng)該能讓讀者對Kafka涉及的內(nèi)容有了一個基本的了解。
3.3 Kafka日志工作原理
當安裝Kafka時,其中一個配置項是log.dir,該配置項用來指定Kafka存儲日志數(shù)據(jù)的路徑。每個主題都映射到指定日志路徑下的一個子目錄。子目錄數(shù)與主題對應(yīng)的分區(qū)數(shù)相同,目錄名格式為“主題名_分區(qū)編號”(將在下一節(jié)介紹分區(qū))。每個目錄里面存放的都是用于追加傳入消息的日志文件,一旦日志文件達到某個規(guī)模(磁盤上的記錄總數(shù)或者記錄的大小),或者消息的時間戳間的時間間隔達到了所配置的時間間隔時,日志文件就會被切分,傳入的消息將會被追加到一個新的日志文件中(如圖2-6所示)。
圖2-6 logs目錄是消息存儲的根目錄,/logs目錄下的每個目錄代表一個主題的分區(qū),目錄中的文件名以主題的名稱打頭,然后是下劃線,后面接一個分區(qū)的編號
可以看到日志和主題是高度關(guān)聯(lián)的概念,可以說一個主題是一個日志,或者說一個主題代表一個日志。通過主題名可以很好地處理經(jīng)由生產(chǎn)者發(fā)送到Kafka的消息將被存儲到哪個日志當中。既然已經(jīng)討論了日志的概念,那么我們再來討論Kafka另一個基本概念——分區(qū)。
3.4 Kafka和分區(qū)
分區(qū)是Kafka設(shè)計的一個重要部分,它對性能來說必不可少。分區(qū)保證了同一個鍵的數(shù)據(jù)將會按序被發(fā)送給同一個消費者。圖2-7展示了分區(qū)的工作原理。
圖2-7 Kafka使用分區(qū)來實現(xiàn)高吞吐量,并將一個主題的消息在集群的不同服務(wù)器中傳播
對主題作分區(qū)的本質(zhì)是將發(fā)送到主題的數(shù)據(jù)切分到多個平行流之中,這是Kafka能夠?qū)崿F(xiàn)巨大吞吐量的關(guān)鍵。我們解釋過每個主題就是一個分布式日志,每個分區(qū)類似于一個它自己的日志,并遵循相同的規(guī)則。Kafka將每個傳入的消息追加到日志末尾,并且所有的消息都嚴格按時間順序排列,每條消息都有一個分配給它的偏移量。Kafka不保證跨分區(qū)的消息有序,但是能夠保證每個分區(qū)內(nèi)的消息是有序的。
除了增加吞吐量,分區(qū)還有另一個目的,它允許主題的消息分散在多臺機器上,這樣給定主題的容量就不會局限于一臺服務(wù)器上的可用磁盤空間。
現(xiàn)在讓我們看看分區(qū)扮演的另一個關(guān)鍵角色:確保具有相同鍵的消息最終在一起。
3.5 分區(qū)按鍵對數(shù)據(jù)進行分組
Kafka處理鍵/值對格式的數(shù)據(jù),如果鍵為空,那么生產(chǎn)者將采用輪詢(round-robin)方式選擇分區(qū)寫入記錄。圖2-8展示了用非空鍵如何分配分區(qū)的操作。
如果鍵不為空,Kafka會使用以下公式(如下偽代碼所示)確定將鍵/值對發(fā)送到哪個分區(qū):
HashCode.(key)?%?number?of?partitions
通過使用確定性方法來選擇分區(qū),使得具有相同鍵的記錄將會按序總是被發(fā)送到同一個分區(qū)。默認的分區(qū)器使用此方法,如果需要使用不同的策略選擇分區(qū),則可以提供自定義的分區(qū)器。
圖2-8 “foo”被發(fā)送到分區(qū)0,“bar”被發(fā)送到分區(qū)1。通過鍵的
字節(jié)散列與分區(qū)總數(shù)取模來獲得數(shù)據(jù)被分配的分區(qū)
3.6 編寫自定義分區(qū)器
為什么要編寫自定義分區(qū)器呢?在幾個可能的原因中,下面將舉一個簡單的例子——組合鍵的使用。
假設(shè)將購買數(shù)據(jù)寫入Kafka,該數(shù)據(jù)的鍵包括兩個值,即客戶ID和交易日期,需要根據(jù)客戶ID對值進行分組,因此對客戶ID和交易日期進行散列是行不通的。在這種情況下,就需要編寫一個自定義分區(qū)器,該分區(qū)器知道組合鍵的哪一部分決定使用哪個分區(qū)。例如,/src/main/java/ bbejeck/model/PurchaseKey.java中的組合鍵,如代碼清單2-1所示。
代碼清單2-1 組合鍵PurchaseKey類
當提及分區(qū)時,需要保證特定用戶的所有交易信息都會被發(fā)送到同一個分區(qū)中。但是整體作為鍵就無法保證,因為購買行為會在多個日期發(fā)生,包括交易日期的記錄對一個用戶而言就會導(dǎo)致不同的鍵值,就會將交易數(shù)據(jù)隨機分布到不同的分區(qū)中。若需要確保具有相同客戶ID的交易信息都發(fā)送到同一個分區(qū),唯一的方法就是在確定分區(qū)時使用客戶ID作為鍵。
代碼清單2-2所示的自定義分區(qū)器的例子就滿足需求。PurchaseKeyPartitioner類(源代碼見src/ main/java/bbejeck/chapter_2/partitioner/PurchaseKeyPartitioner.java)從鍵中提取客戶ID來確定使用哪個分區(qū)。
代碼清單2-2 自定義分區(qū)器PurchaseKeyPartitioner類
該自定義分區(qū)器繼承自DefaultPartitioner類,當然也可以直接實現(xiàn)Partitioner接口,但是在這個例子中,在DefaultPartitioner類中有一個已存在的邏輯。
請注意,在創(chuàng)建自定義分區(qū)器時,不僅局限于使用鍵,單獨使用值或與鍵組合使用都是有效的。
{注意}
Kafka API提供了一個可以用來實現(xiàn)自定義分區(qū)器的Partitioner接口,本書不打算講解從頭開始寫一個分區(qū)器,但是實現(xiàn)原則與代碼清單2-2相同。
已經(jīng)看到如何構(gòu)造一個自定義分區(qū)器,接下來,將分區(qū)器與Kafka結(jié)合起來。
3.7 指定一個自定義分區(qū)器
既然已編寫了一個自定義分區(qū)器,那就需要告訴Kafka使用自定義的分區(qū)器代替默認的分區(qū)器。雖然還沒有討論生產(chǎn)者,但在設(shè)置Kafka生產(chǎn)者配置時可以指定一個不同的分區(qū)器[2],配置如下:
通過為每個生產(chǎn)者實例設(shè)置分區(qū)器的方式,就可以隨意地為任何生產(chǎn)者指定任何分區(qū)器類。在討論Kafka生產(chǎn)者時再對生產(chǎn)者的配置做詳細介紹。
{警告}
在決定使用的鍵以及選擇鍵/值對的部分作為分區(qū)依據(jù)時,一定要謹慎行事。要確保所選擇的鍵在所有數(shù)據(jù)中具有合理的分布,否則,由于大多數(shù)數(shù)據(jù)都分布在少數(shù)幾個分區(qū)上,最終導(dǎo)致數(shù)據(jù)傾斜。
3.8 確定恰當?shù)姆謪^(qū)數(shù)
在創(chuàng)建主題時決定要使用的分區(qū)數(shù)既是一門藝術(shù)也是一門科學。其中一個重要的考慮因素是流入該主題的數(shù)據(jù)量。更多的數(shù)據(jù)意味著更多的分區(qū)以獲得更高的吞吐量,但與生活中的任何事物一樣,也要有取舍。
增加分區(qū)數(shù)的同時也增加了TCP連接數(shù)和打開的文件句柄數(shù)。此外,消費者處理傳入記錄所花費的時間也會影響吞吐量。如果消費者線程有重量級處理操作,那么增加分區(qū)數(shù)可能有幫助,但是較慢的處理操作最終將會影響性能。
3.9 分布式日志
我們已經(jīng)討論了日志和對主題進行分區(qū)的概念,現(xiàn)在,花點時間結(jié)合這兩個概念來闡述分布式日志。
到目前為止,我們討論日志和對主題進行分區(qū)都是基于一臺Kafka服務(wù)器或者代理,但典型的Kafka生產(chǎn)集群環(huán)境包括多臺服務(wù)器。故意將討論集中單個節(jié)點上,是因為考慮一個節(jié)點更容易理解概念。但在實踐中,總是使用包括多臺服務(wù)器的Kafka集群。
當對主題進行分區(qū)時,Kafka不會將這些分區(qū)分布在一臺服務(wù)上,而是將分區(qū)分散到集群中的多臺服務(wù)器上。由于Kafka是在日志中追加記錄,因此Kafka通過分區(qū)將這些記錄分發(fā)到多臺服務(wù)器上。圖2-9展示了這個過程。
讓我們通過使用圖2-9作為一個向?qū)硗瓿梢粋€快速實例。對于這個實例,我們假設(shè)有一個主題,并且鍵為空,因此生產(chǎn)者將通過輪詢的方式分配分區(qū)。
生產(chǎn)者將第1條消息發(fā)送到位于Kafka代理1上的分區(qū)0中[3],第2條消息被發(fā)送到位于Kafka代理1上的分區(qū)1中,第3條消息被發(fā)送到位于Kafka代理2上的分區(qū)2中。當生產(chǎn)者發(fā)送第6條消息時,消息將會被發(fā)送到Kafka代理3上的分區(qū)5中,從下一條消息開始,又將重復(fù)該步驟,消息將被發(fā)送到位于Kafka代理1上的分區(qū)0中。以這種方式繼續(xù)分配消息,將消息分配到Kafka集群的所有節(jié)點中。
圖2-9 生產(chǎn)者將消息寫入主題的分區(qū)中,如果消息沒有關(guān)聯(lián)鍵,那么生產(chǎn)者就會通過輪詢方式選擇一個分區(qū),否則通過鍵的散列值與分區(qū)總數(shù)取模來決定分區(qū)
雖然遠程存儲數(shù)據(jù)聽起來會有風險,因為服務(wù)器有可能會宕機,但Kafka提供了數(shù)據(jù)冗余。當數(shù)據(jù)被寫入Kafka的一個代理時,數(shù)據(jù)會被復(fù)制到集群中一臺或多臺機器上(在后面小節(jié)會介紹副本)。
3.10 ZooKeeper:領(lǐng)導(dǎo)者、追隨者和副本
到目前為止,我們已經(jīng)討論了主題在Kafka中的作用,以及主題如何及為什么要進行分區(qū)。可以看到,分區(qū)并不都位于同一臺服務(wù)器上,而是分布在整個集群的各個代理上。現(xiàn)在是時候來看看當服務(wù)器故障時Kafka如何提供數(shù)據(jù)可用性。
Kafka代理有領(lǐng)導(dǎo)者(leader)和追隨者(follower)的概念。在Kafka中,對每一個主題分區(qū)(topic partition),會選擇其中一個代理作為其他代理(追隨者)的領(lǐng)導(dǎo)者。領(lǐng)導(dǎo)者的一個主要職責是分配主題分區(qū)的副本給追隨者代理服務(wù)器。就像Kafka在集群中為一個主題分配分區(qū)一樣,Kafka也會在集群的多臺服務(wù)器中復(fù)制分區(qū)數(shù)據(jù)。在深入探討領(lǐng)導(dǎo)者、追隨者和副本是如何工作之前,先來介紹Kafka為實現(xiàn)這一點所使用的技術(shù)。
3.11 Apache ZooKeeper
如果你是個Kafka菜鳥,你可能會問自己:“為什么在Kafka的書中會談?wù)?a target="_blank" href="http://m.bai1xia.com/news/tags-2555.html"style="font-weight:bold;">Apache ZooKeeper?”Apache ZooKeeper是Kafka架構(gòu)不可或缺的部分,正是由于ZooKeeper才使得Kafka有領(lǐng)導(dǎo)者代理,并使領(lǐng)導(dǎo)者代理做諸如跟蹤主題副本的事情,ZooKeeper官網(wǎng)對其介紹如下:
ZooKeeper是一個集中式服務(wù),用于維護配置信息、命名、提供分布式同步和組服務(wù)。這些類型的所有服務(wù)都是通過分布式應(yīng)用程序以某種形式使用。
既然Kafka是一個分布式應(yīng)用程序,那么它一開始就應(yīng)該知道ZooKeeper在其架構(gòu)中的作用。在這里的討論中,我們只考慮兩個或多個Kafka服務(wù)器的安裝問題。
在Kafka集群中,其中一個代理會被選為控制器。在2.3.4節(jié)我們介紹了分區(qū)以及如何在集群的不同服務(wù)器之間分配分區(qū)。主題分區(qū)有一個領(lǐng)導(dǎo)者分區(qū)和一到多個追隨者分區(qū)(復(fù)制的級別決定復(fù)制的程度[4]),當生成消息時,Kafka將記錄發(fā)送到領(lǐng)導(dǎo)者分區(qū)對應(yīng)的代理上。
3.12 選擇一個控制器
Kafka使用ZooKeeper來選擇代理控制器,對于其中涉及的一致性算法的探討已超出本書所講內(nèi)容的范圍,因此我們不做深入探討,只聲明ZooKeeper從集群中選擇一個代理作為控制器。
如果代理控制器發(fā)生故障或者由于任何原因而不可用時,ZooKeeper從與領(lǐng)導(dǎo)者保持同步的一系列代理(已同步的副本[ISR])中選出一個新的控制器,構(gòu)成該系列的代理是動態(tài)的[5],ZooKeeper只會從這個代理系列中選擇一個領(lǐng)導(dǎo)者[6]。
3.13 副本
Kafka在代理之間復(fù)制記錄,以確保當集群中的節(jié)點發(fā)生故障時數(shù)據(jù)可用。可以為每個主題(正如前面介紹的消息發(fā)布或消費實例中的主題)單獨設(shè)置復(fù)制級別也可以為集群中的所有主題設(shè)置復(fù)制級別[7]。圖2-10演示了代理之間的復(fù)制流。
Kafka復(fù)制過程非常簡單,一個主題分區(qū)對應(yīng)的各代理從該主題分區(qū)的領(lǐng)導(dǎo)者分區(qū)消費消息,并將消息追加到自己的日志當中。正如2.3.12節(jié)所論述的,與領(lǐng)導(dǎo)者代理保持同步的追隨者代理被認為是ISR,這些ISR代理在當前領(lǐng)導(dǎo)者發(fā)生故障或者不可用時有資格被選舉為領(lǐng)導(dǎo)者。[8]
圖2-10 代理1和代理3是一個主題分區(qū)的領(lǐng)導(dǎo)者,同時也是另外一個分區(qū)的追隨者,而代理2只是追隨者,追隨者代理從領(lǐng)導(dǎo)者代理復(fù)制數(shù)據(jù)
3.14 控制器的職責
代理控制器的職責是為一個主題的所有分區(qū)建立領(lǐng)導(dǎo)者分區(qū)和追隨者分區(qū)的關(guān)系,如果一個Kafka節(jié)點宕機或者沒有響應(yīng)(與ZooKeeper之間的心跳),那么所有已分配的分區(qū)(包括領(lǐng)導(dǎo)者和追隨者)都將由代理控制器重新進行分配。圖2-11演示了一個正在運行的代理控制器。[9]
圖2-11展示了一個簡單的故障情景。第1步,代理控制器檢測到代理3不可用。第2步,代理控制器將代理3上分區(qū)的領(lǐng)導(dǎo)權(quán)重新分配給代理2。
ZooKeeper也參與了Kafka以下幾個方面的操作。
集群成員——代理加入集群和維護集群中成員關(guān)系。如果一個代理不可用,則ZooKeeper將該代理從集群成員中移除。
主題配置——跟蹤集群中的主題,記錄哪個代理是主題的領(lǐng)導(dǎo)者,各主題有多少個分區(qū)以及主題的哪些特定的配置被覆蓋。
訪問控制——識別誰可以從特定主題中讀取或?qū)懭胂ⅰ?/p>
圖2-11 代理控制器負責將其他代理分配為某些主題/分區(qū)的領(lǐng)導(dǎo)者代理和另一些主題/分區(qū)的追隨者代理, 當代理不可用時,代理控制器將已分配給不可用代理的重新分配給集群中的其他代理
現(xiàn)在可知Kafka為什么依賴于Apache ZooKeeper了,正是ZooKeeper使得Kafka有了一個帶著追隨者的領(lǐng)導(dǎo)者代理,領(lǐng)導(dǎo)者代理的關(guān)鍵角色是為追隨者分配主題分區(qū),以便進行復(fù)制,以及在代理成員出現(xiàn)故障時重新分配主題分區(qū)。
3.15 日志管理
對追加日志已進行了介紹,但還沒有談到隨著日志持續(xù)增長如何對其進行管理。一個集群中旋轉(zhuǎn)磁盤的空間是一個有限的資源,因此對Kafka而言,隨著時間的推移,刪除消息是很重要的事。在談到刪除Kafka中的舊數(shù)據(jù)時,有兩種方法,即傳統(tǒng)的日志刪除和日志壓縮。
3.16 日志刪除
日志刪除策略是一個兩階段的方法:首先,將日志分成多個日志段,然后將最舊的日志段刪除。為了管理Kafka不斷增加的日志,Kafka將日志切分成多個日志段。日志切分的時間基于消息中內(nèi)置的時間戳。當一條新消息到達時,如果它的時間戳大于日志中第一個消息的時間戳加上log.roll.ms配置項配置的值時,Kafka就會切分日志。此時,日志被切分,一個新的日志段會被創(chuàng)建并作為一個活躍的日志段,而以前的活躍日志段仍然為消費者提供消息檢索[10]。
日志切分是在設(shè)置Kafka代理時進行設(shè)置的[11]。日志切分有兩個可選的配置項。
log.roll.ms——這個是主配置項,但沒有默認值。
log.roll.hours——這是輔助配置項,僅當log.roll.ms沒有被設(shè)置時使用,該配置項默認值是168小時。
隨著時間的推移,日志段的數(shù)據(jù)也將不斷增加,為了為傳入的數(shù)據(jù)騰出空間,需要將較舊的日志段刪除。為了刪除日志段,可以指定日志段保留的時長。圖2-12說明了日志切分的過程。
圖2-12 左邊是當前日志段,右上角是一個已被刪除的日志段,在其下面是最近切分的仍然在使用的日志段
與日志切分一樣,日志段的刪除也基于消息的時間戳,而不僅是時鐘時間或文件最后被修改的時間,日志段的刪除基于日志中最大的時間戳。用來設(shè)置日志段刪除策略的3個配置項按優(yōu)先級依次列出如下,這里按優(yōu)先級排列意味著排在前面的配置項會覆蓋后面的配置項。
log.retention.ms——以毫秒(ms)為單位保留日志文件的時長。
log.retention.minutes——以分鐘(min)為單位保留日志文件的時長。
log.retention.hours——以小時(h)為單位保留日志文件。
提出這些設(shè)置的前提是基于大容量主題的假設(shè),這里大容量是指在一個給定的時間段內(nèi)保證能夠達到文件最大值。另一個配置項log.retention.bytes,可以指定較長的切分時間閾值,以控制I/O操作。最后,為了防止日志切分閾值設(shè)置得相對較大而出現(xiàn)日志量顯著增加的情況,請使用配置項log.segment.bytes來控制單個日志段的大小。
對于鍵為空的記錄以及獨立的記錄[12],刪除日志的效果很好。但是,如果消息有鍵并需要預(yù)期的更新操作,那么還有一種方法更適合。
3.17 日志壓縮
假設(shè)日志中已存儲的消息都有鍵,并且還在不停地接收更新的消息,這意味著具有相同鍵的新記錄將會更新先前的值。例如,股票代碼可以作為消息的鍵,每股的價格作為定期更新的值。想象一下,使用這些信息來展示股票的價值,并出現(xiàn)程序崩潰或者重啟,這就需要能夠讓每個鍵恢復(fù)到最新數(shù)據(jù)[13]。
如果使用刪除策略,那么從最后一次更新到應(yīng)用程序崩潰或重啟之間的日志段就可能被去除,啟動時就得不到所有的記錄[14]。一種較好的方式是保留給定鍵的最近已知值,用與更新數(shù)據(jù)庫表鍵一樣的方式對待下一條記錄[15]。
按鍵更新記錄是實現(xiàn)壓縮主題(日志)的表現(xiàn)形式。與基于時間和日志大小直接刪除整個日志段的粗粒度方式不同,壓縮是一種更加細粒度的方式,該方式是刪除日志中每個鍵的舊數(shù)據(jù)。從一個很高的層面上來說,一個日志清理器(一個線程池)運行在后臺,如果后面的日志中出現(xiàn)了相同的鍵,則日志清理器就會重新復(fù)制日志段文件并將該鍵對應(yīng)的舊記錄去除。圖2-13闡明了日志壓縮是如何為每個鍵保留最新消息的。
這種方式保證了給定鍵的最后一條記錄在日志中。可以為每個主題指定日志保留策略,因此完全有可能某些主題使用基于時間的保留,而其他主題使用壓縮。
默認情況下,日志清理功能是開啟的。如果要對主題使用壓縮,那么需要在創(chuàng)建主題時設(shè)置屬性log.cleanup.policy=compact。
在Kafka Streams中使用應(yīng)用狀態(tài)存儲時就要用到壓縮,不過并不需要我們自己來創(chuàng)建相應(yīng)的日志或主題——框架會處理。然而,理解壓縮的原理是很重要的,日志壓縮是一個寬泛的話題,我們僅談?wù)撝链恕H绻肓私鈮嚎s方面的更多信息,參見Kafka官方文檔。
{注意}
當使用cleanup.policy為壓縮時,你可能好奇如何從日志中去除一條記錄。對于一個壓縮的主題,刪除操作會為給定鍵設(shè)置一個null值,作為一個墓碑標記。任何值為null的鍵都確保先前與其鍵相同的記錄被去除,之后墓碑標記自身也會被去除。
圖2-13 左邊是壓縮前的日志,可以看到具有不同值的重復(fù)鍵,這些值是用來更新給定鍵的。右邊是壓縮后的日志,保留了每個鍵的最新值,但日志變小了
本節(jié)的關(guān)鍵內(nèi)容是:如果事件或消息是獨立、單獨的,那么就使用日志刪除,如果要對事件或消息進行更新,那就使用日志壓縮。
我們已經(jīng)花了很多時間介紹Kafka內(nèi)部是如何處理數(shù)據(jù)的,現(xiàn)在,讓我們轉(zhuǎn)移到Kafka外部,探討如何通過生產(chǎn)者向Kafka發(fā)送消息,以及消費者如何從Kafka讀取消息。
4 生產(chǎn)者發(fā)送消息
回到ZMart對集中銷售交易數(shù)據(jù)中心的需求,看看如何將購買交易數(shù)據(jù)發(fā)送到Kafka。在Kafka中,生產(chǎn)者是用于發(fā)送消息的客戶端。圖2-14重述ZMart的數(shù)據(jù)結(jié)構(gòu),突出顯示生產(chǎn)者,以強調(diào)它們在數(shù)據(jù)流中適合的位置。
盡管ZMart有很多的銷售交易,但現(xiàn)在我們只考慮購買一個單一物品:一本10.99美元的書。當消費者完成銷售交易時,交易信息將被轉(zhuǎn)換為一個鍵/值對并通過生產(chǎn)者發(fā)送到Kafka。
鍵是客戶ID,即123447777,值是一個JSON格式的值,即"{\"item\":\"book\",\ "price\":10.99}"(這里已把雙引號轉(zhuǎn)義了,這樣JSON可以被表示為Java中的字符串)。有了這種格式的數(shù)據(jù),就可以使用生產(chǎn)者將數(shù)據(jù)發(fā)送到Kafka集群。代碼清單2-3所示的示例代碼可以在源代碼/src/main/java/bbejeck.chapter_2/producer/SimpleProducer.java類中找到。
圖2-14 生產(chǎn)者用于向Kafka發(fā)送消息,它們并不知道哪個消費者會讀取消息,也不知道消費者在什么時候會讀取消息
代碼清單2-3 SimpleProducer示例
Kafka生產(chǎn)者是線程安全的。所有消息被異步發(fā)送到Kafka——一旦生產(chǎn)者將記錄放到內(nèi)部緩沖區(qū),就立即返回Producer.send。緩沖區(qū)批量發(fā)送記錄,具體取決于配置,如果在生產(chǎn)者緩沖區(qū)滿時嘗試發(fā)送消息,則可能會有阻塞。
這里描述的Producer.send方法接受一個Callback實例,一旦領(lǐng)導(dǎo)者代理確認收到記錄,生產(chǎn)者就會觸發(fā)Callback.onComplete方法,Callback.onComplete方法中僅有一個參數(shù)為非空。在本例中,只關(guān)心在發(fā)生錯誤時打印輸出異常堆棧信息,因此檢驗異常對象是否為空。一旦服務(wù)器確認收到記錄,返回的Future就會產(chǎn)生一個RecordMetadata對象。
{定義}
在代碼清單2-3中,Producer.send方法返回一個Future對象,一個Future對象代表一個異步操作的結(jié)果。更重要的是,F(xiàn)uture可以選擇惰性地檢索異步結(jié)果,而不是等它們完成。更多信息請參考Java文檔“Interface Future?”(接口Future?)。
4.1 生產(chǎn)者屬性
當創(chuàng)建KafkaProducer實例時,傳遞了一個包含生產(chǎn)者配置信息的java.util. Properties參數(shù)。KafkaProducer的配置并不復(fù)雜,但在設(shè)置時需要考慮一些關(guān)鍵屬性,例如,可以在配置中指定自定義的分區(qū)器。這里要介紹的屬性太多了,因此我們只看一下代碼清單2-3中使用的屬性。
服務(wù)器引導(dǎo)——bootstrap.servers是一個用逗號分隔的host:port值列表。最終,生產(chǎn)者將使用集群中的所有代理。此外,此列表用于初始連接到Kafka集群。
序列化器——key.serializer和value.serializer通知Kafka如何將鍵和值轉(zhuǎn)化為字節(jié)數(shù)組。在內(nèi)部,Kafka使用鍵和值的字節(jié)數(shù)組,因此在將消息通過網(wǎng)絡(luò)發(fā)送之前需要向Kafka提供正確的序列化器,以將對象轉(zhuǎn)換為字節(jié)數(shù)組。
確認應(yīng)答——acks指定生產(chǎn)者認為在一條記錄發(fā)送完成之前需要等待的從代理返回的最小確認數(shù)。acks的有效值為all、0和1。當值為all時,生產(chǎn)者需要等待一個代理接收到所有追隨者代理都已提交記錄的確認。當值為1時,代理將記錄寫入其日志,但不用等待所有的追隨者代理來確認提交了記錄。當值為0時,意味著生產(chǎn)者不用等待任何確認——這基本上是“即發(fā)即棄”。
重試——如果發(fā)送一批消息失敗,retries指定失敗后嘗試重發(fā)的次數(shù)。如果記錄的順序很重要,那么應(yīng)該考慮設(shè)置max.in.flight.requests.connection為1,以防止失敗的記錄在重試發(fā)送之前第二批記錄成功發(fā)送的情景。
壓縮類型——如果使用數(shù)據(jù)壓縮的話,compression.type配置項用來指定要采用的壓縮算法。如果設(shè)置了compression.type,compression.type會通知生產(chǎn)者在發(fā)送數(shù)據(jù)前對本批次的數(shù)據(jù)進行壓縮。注意,是對整個批次進行壓縮,而不是單條記錄。
分區(qū)器類——partitioner.class指定實現(xiàn)Partitioner接口的類的名稱。partitioner.class與我們在2.3.7節(jié)中介紹的自定義分區(qū)器有關(guān)。
更多生產(chǎn)者相關(guān)的配置信息請參見Kafka官方文檔。
4.2 指定分區(qū)和時間戳
當創(chuàng)建一個ProducerRecord對象時,可以選擇指定分區(qū)、時間戳或者兩者都指定。在代碼清單2-3中實例化ProducerRecord時,使用了4個重載構(gòu)造方法中的一個。其他構(gòu)造方法允許設(shè)置分區(qū)和時間戳,或者只設(shè)置分區(qū),代碼如下:
4.3 指定分區(qū)
在2.3.5節(jié)中,我們討論了Kafka分區(qū)的重要性。我們也討論了DefaultPartitioner的工作原理以及如何提供一個自定義分區(qū)器。為什么要顯式設(shè)置分區(qū)?可能有多種業(yè)務(wù)上的原因,下面是其中一個例子。
假設(shè)傳入的記錄都有鍵,但是記錄被分發(fā)到哪個分區(qū)并不重要,因為消費者有邏輯來處理該鍵包含的任何數(shù)據(jù)。此外,鍵的分布可能不均勻,但你希望確保所有的分區(qū)接收到的數(shù)據(jù)量大致相同,代碼清單2-4給出的是一個粗略的實現(xiàn)方案。
代碼清單2-4 手動設(shè)置分區(qū)
上面的代碼調(diào)用Math.abs,因此對于Math.abs求得的整型值,如果該值超出Integer. MAX_VALUE,也不必關(guān)注。
{定義}
AtomicInteger屬于java.util.concurrent.atomic包,該包包含支持對單個變量進行無鎖、線程安全的操作的類。若需要更多信息,請參考Java官方文檔關(guān)于java.util.concurrent.atomic包的介紹。
4.4 Kafka中的時間戳
Kafka從0.10版本開始在記錄中增加了時間戳,在創(chuàng)建ProducerRecord對象時調(diào)用以下重載的構(gòu)造函數(shù)設(shè)置了時間戳。
如果沒有設(shè)置時間戳,那么生產(chǎn)者在將記錄發(fā)送到Kafka代理之前將會使用系統(tǒng)當前的時鐘時間。時間戳也受代理級別的配置項log.message.timestamp.type的影響,該配置項可以被設(shè)置為CreateTime(默認類型)和LogAppendTime中的一種。與許多其他代理級別的配置一樣,代理級別的配置將作為所有主題的默認值,但是在創(chuàng)建主題時可以為每個主題指定不同的值[16]。如果時間戳類型設(shè)置為LogAppendTime,并且在創(chuàng)建主題時沒有覆蓋代理級別對時間戳類型的配置,那么當將記錄追加到日志時,代理將使用當前的時間覆蓋時間戳,否則,使用來自ProducerRecord的時間戳。
兩種時間戳類型該如何選擇呢?LogAppendTime被認為是“處理時間”,而CreateTime被認為是“事件時間”,選擇哪一種類型取決于具體的業(yè)務(wù)需求。這就要確定你是否需要知道Kafka什么時候處理記錄,或者真實的事件發(fā)生在什么時候。在后面的章節(jié),將會看到時間戳對于控制Kafka Streams中的數(shù)據(jù)流所起的重要作用。(未完)
本文轉(zhuǎn)載自異步社區(qū)
Kafka 存儲 大數(shù)據(jù)
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應(yīng)法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應(yīng)法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。