學(xué)習(xí)筆記20170601">【PMP】學(xué)習(xí)筆記20170601
929
2025-03-31
Kafka Java Consumer采用的是單線程的設(shè)計(jì)。其入口類KafkaConsumer是一個(gè)雙線程的設(shè)計(jì),即用戶主線程和心跳線程。
用戶主線程,指的是啟動Consumer應(yīng)用程序main方法的線程,心跳線程(Heartbeat Thread)只負(fù)責(zé)定期給對應(yīng)的Broker機(jī)器發(fā)送心跳請求,以表示消費(fèi)者應(yīng)用的存活性。
官網(wǎng)文檔對于consumer多線程的處理方式 :
每個(gè)線程一個(gè)消費(fèi)者
每個(gè)線程自己的消費(fèi)者實(shí)例。這里是這種方法的優(yōu)點(diǎn)和缺點(diǎn):
PRO: 這是最容易實(shí)現(xiàn)的
PRO: 因?yàn)樗恍枰诰€程之間協(xié)調(diào),所以通常它是最快的。
CON: 更多的消費(fèi)者意味著更多的TCP連接到集群(每個(gè)線程一個(gè))。一般kafka處理連接非常的快,所以這是一個(gè)小成本。
CON: 更多的消費(fèi)者意味著更多的請求被發(fā)送到服務(wù)器,但稍微較少的數(shù)據(jù)批次可能導(dǎo)致I/O吞吐量的一些下降。
CON: 所有進(jìn)程中的線程總數(shù)受到分區(qū)總數(shù)的限制。
解耦消費(fèi)和處理
另一個(gè)替代方式是一個(gè)或多個(gè)消費(fèi)者線程,它來消費(fèi)所有數(shù)據(jù),其消費(fèi)所有數(shù)據(jù)并將ConsumerRecords實(shí)例切換到由實(shí)際處理記錄處理的處理器線程池來消費(fèi)的阻塞隊(duì)列。這個(gè)選項(xiàng)同樣有利弊:
可擴(kuò)展消費(fèi)者和處理進(jìn)程的數(shù)量。這樣單個(gè)消費(fèi)者的數(shù)據(jù)可分給多個(gè)處理器線程來執(zhí)行,避免對分區(qū)的任何限制。
CON: 跨多個(gè)處理器的順序保證需要特別注意,因?yàn)榫€程是獨(dú)立的執(zhí)行,后來的消息可能比遭到的消息先處理,這僅僅是因?yàn)榫€程執(zhí)行的運(yùn)氣。如果對排序沒有問題,這就不是個(gè)問題。
CON: 手動提交變得更困難,因?yàn)樗枰獏f(xié)調(diào)所有的線程以確保處理對該分區(qū)的處理完成。
這是兩種不同的處理方式。
解釋: 消費(fèi)者程序啟動多個(gè)線程,每個(gè)線程維護(hù)專屬的KafkaConsumer,負(fù)責(zé)完整的消息獲取、消息處理流程。 (其實(shí)就是一個(gè)消費(fèi)者客戶端開啟多個(gè)線程,每個(gè)線程都有各自的Consumer對同一個(gè)topic或者多個(gè)topic進(jìn)行消費(fèi),這些消費(fèi)者(線程)組成了一個(gè)消費(fèi)者組)
借用網(wǎng)上的圖:
topic數(shù)據(jù)實(shí)例:
代碼:
public class KafkaConsumerThread implements Runnable{ private KafkaConsumer
日志:
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 0, key= ImKey-0-one,value= ImValue-0-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 0, key= ImKey-1-one,value= ImValue-1-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 1, key= ImKey-5-one,value= ImValue-5-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 2, key= ImKey-8-one,value= ImValue-8-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 3, key= ImKey-10-one,value= ImValue-10-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 4, key= ImKey-13-one,value= ImValue-13-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 5, key= ImKey-14-one,value= ImValue-14-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 0, key= ImKey-4-one,value= ImValue-4-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 1, key= ImKey-6-one,value= ImValue-6-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 2, key= ImKey-7-one,value= ImValue-7-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 3, key= ImKey-11-one,value= ImValue-11-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 4, key= ImKey-15-one,value= ImValue-15-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 5, key= ImKey-21-one,value= ImValue-21-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 6, key= ImKey-25-one,value= ImValue-25-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 7, key= ImKey-27-one,value= ImValue-27-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 8, key= ImKey-29-one,value= ImValue-29-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 1, key= ImKey-2-one,value= ImValue-2-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 2, key= ImKey-3-one,value= ImValue-3-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 3, key= ImKey-9-one,value= ImValue-9-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 4, key= ImKey-12-one,value= ImValue-12-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 5, key= ImKey-16-one,value= ImValue-16-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 6, key= ImKey-17-one,value= ImValue-17-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 7, key= ImKey-24-one,value= ImValue-24-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 8, key= ImKey-32-one,value= ImValue-32-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 6, key= ImKey-18-one,value= ImValue-18-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 7, key= ImKey-19-one,value= ImValue-19-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 8, key= ImKey-20-one,value= ImValue-20-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 9, key= ImKey-22-one,value= ImValue-22-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 10, key= ImKey-23-one,value= ImValue-23-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 11, key= ImKey-26-one,value= ImValue-26-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 12, key= ImKey-28-one,value= ImValue-28-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 13, key= ImKey-30-one,value= ImValue-30-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 14, key= ImKey-31-one,value= ImValue-31-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 15, key= ImKey-33-one,value= ImValue-33-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 16, key= ImKey-34-one,value= ImValue-34-one
可以看到三個(gè)線程,一個(gè)消費(fèi)者組,每個(gè)線程消費(fèi)者得到一個(gè)topic的分區(qū)去消費(fèi)消息。
解釋: 消費(fèi)者程序使用單或多線程獲取消息,同時(shí)創(chuàng)建多個(gè)消費(fèi)線程執(zhí)行消息處理邏輯。獲取消息的線程可以是一個(gè)或多個(gè),每個(gè)維護(hù)專屬KafkaConsumer實(shí)例,處理消息交由特定線程池來做,從而實(shí)現(xiàn)消息獲取與消息處理的真正解耦。
這里的多線程處理消息邏輯可以有多種方法,這里就列出來幾種:
使用隊(duì)列存儲消息,多線程處理隊(duì)列:
使用獨(dú)立鎖(FIFO)隊(duì)列LinkedBlockingQueue
該隊(duì)列是線程安全的先進(jìn)先出隊(duì)列
public class KafkaConsumerThread2 implements Runnable { // 存儲消息 先進(jìn)先出隊(duì)列 private LinkedBlockingQueue
創(chuàng)建線程池,使用線程池處理消息邏輯
邏輯處理類ConsumerDealThread:
public class ConsumerDealThread implements Runnable{ private ConsumerRecord record; public ConsumerDealThread(ConsumerRecord record) { this.record = record; } public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",Thread.currentThread().getName(),record.topic(),record.partition(),record.offset(),record.key(),record.value()); } }
運(yùn)行類KafkaConsumerThread3:
public class KafkaConsumerThread3 { public static void main(String[] args) { LinkedBlockingQueue
EI企業(yè)智能 Kafka 可信智能計(jì)算服務(wù) TICS 多線程 智能數(shù)據(jù)
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。