Kafka筆記Kafka 多線程消費(fèi)消息

      網(wǎng)友投稿 929 2025-03-31

      Kafka Java Consumer采用的是單線程的設(shè)計(jì)。其入口類KafkaConsumer是一個(gè)雙線程的設(shè)計(jì),即用戶主線程和心跳線程。


      【Kafka筆記】Kafka 多線程消費(fèi)消息

      用戶主線程,指的是啟動Consumer應(yīng)用程序main方法的線程,心跳線程(Heartbeat Thread)只負(fù)責(zé)定期給對應(yīng)的Broker機(jī)器發(fā)送心跳請求,以表示消費(fèi)者應(yīng)用的存活性。

      官網(wǎng)文檔對于consumer多線程的處理方式 :

      每個(gè)線程一個(gè)消費(fèi)者

      每個(gè)線程自己的消費(fèi)者實(shí)例。這里是這種方法的優(yōu)點(diǎn)和缺點(diǎn):

      PRO: 這是最容易實(shí)現(xiàn)的

      PRO: 因?yàn)樗恍枰诰€程之間協(xié)調(diào),所以通常它是最快的。

      CON: 更多的消費(fèi)者意味著更多的TCP連接到集群(每個(gè)線程一個(gè))。一般kafka處理連接非常的快,所以這是一個(gè)小成本。

      CON: 更多的消費(fèi)者意味著更多的請求被發(fā)送到服務(wù)器,但稍微較少的數(shù)據(jù)批次可能導(dǎo)致I/O吞吐量的一些下降。

      CON: 所有進(jìn)程中的線程總數(shù)受到分區(qū)總數(shù)的限制。

      解耦消費(fèi)和處理

      另一個(gè)替代方式是一個(gè)或多個(gè)消費(fèi)者線程,它來消費(fèi)所有數(shù)據(jù),其消費(fèi)所有數(shù)據(jù)并將ConsumerRecords實(shí)例切換到由實(shí)際處理記錄處理的處理器線程池來消費(fèi)的阻塞隊(duì)列。這個(gè)選項(xiàng)同樣有利弊:

      可擴(kuò)展消費(fèi)者和處理進(jìn)程的數(shù)量。這樣單個(gè)消費(fèi)者的數(shù)據(jù)可分給多個(gè)處理器線程來執(zhí)行,避免對分區(qū)的任何限制。

      CON: 跨多個(gè)處理器的順序保證需要特別注意,因?yàn)榫€程是獨(dú)立的執(zhí)行,后來的消息可能比遭到的消息先處理,這僅僅是因?yàn)榫€程執(zhí)行的運(yùn)氣。如果對排序沒有問題,這就不是個(gè)問題。

      CON: 手動提交變得更困難,因?yàn)樗枰獏f(xié)調(diào)所有的線程以確保處理對該分區(qū)的處理完成。

      這是兩種不同的處理方式。

      解釋: 消費(fèi)者程序啟動多個(gè)線程,每個(gè)線程維護(hù)專屬的KafkaConsumer,負(fù)責(zé)完整的消息獲取、消息處理流程。 (其實(shí)就是一個(gè)消費(fèi)者客戶端開啟多個(gè)線程,每個(gè)線程都有各自的Consumer對同一個(gè)topic或者多個(gè)topic進(jìn)行消費(fèi),這些消費(fèi)者(線程)組成了一個(gè)消費(fèi)者組)

      借用網(wǎng)上的圖:

      topic數(shù)據(jù)實(shí)例:

      代碼:

      public class KafkaConsumerThread implements Runnable{ private KafkaConsumer consumer; private AtomicBoolean closed = new AtomicBoolean(false); public KafkaConsumerThread(){ } // 構(gòu)造方法 生成自己的consumer public KafkaConsumerThread(Properties props) { this.consumer = new KafkaConsumer<>(props); } @Override public void run() { try { // 消費(fèi)同一主題 consumer.subscribe(Collections.singletonList("six-topic")); // 線程名稱 String threadName = Thread.currentThread().getName(); while (!closed.get()){ ConsumerRecords records = consumer.poll(3000); for (ConsumerRecord record : records) { System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value()); } } }catch (WakeupException e){ e.printStackTrace(); }finally { consumer.close(); } } /** * 關(guān)閉消費(fèi) */ public void shutdown(){ closed.set(true); // wakeup 可以安全地從外部線程來中斷活動操作 consumer.wakeup(); } public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXXXX:9093"); props.put("group.id", "thread-1");//消費(fèi)者組,只要group.id相同,就屬于同一個(gè)消費(fèi)者組 props.put("enable.auto.commit", "true");//自動提交offset props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records",6); // 運(yùn)行三個(gè)線程,消費(fèi)同一個(gè)topic 這個(gè)topic的分區(qū)必須大于等于3 否則會有消費(fèi)者消費(fèi)不到數(shù)據(jù) for (int i = 0; i < 3 ; i++) { new Thread(new KafkaConsumerThread(props),"Thread"+i).start(); } } }

      日志:

      Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 0, key= ImKey-0-one,value= ImValue-0-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 0, key= ImKey-1-one,value= ImValue-1-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 1, key= ImKey-5-one,value= ImValue-5-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 2, key= ImKey-8-one,value= ImValue-8-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 3, key= ImKey-10-one,value= ImValue-10-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 4, key= ImKey-13-one,value= ImValue-13-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 5, key= ImKey-14-one,value= ImValue-14-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 0, key= ImKey-4-one,value= ImValue-4-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 1, key= ImKey-6-one,value= ImValue-6-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 2, key= ImKey-7-one,value= ImValue-7-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 3, key= ImKey-11-one,value= ImValue-11-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 4, key= ImKey-15-one,value= ImValue-15-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 5, key= ImKey-21-one,value= ImValue-21-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 6, key= ImKey-25-one,value= ImValue-25-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 7, key= ImKey-27-one,value= ImValue-27-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 8, key= ImKey-29-one,value= ImValue-29-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 1, key= ImKey-2-one,value= ImValue-2-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 2, key= ImKey-3-one,value= ImValue-3-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 3, key= ImKey-9-one,value= ImValue-9-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 4, key= ImKey-12-one,value= ImValue-12-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 5, key= ImKey-16-one,value= ImValue-16-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 6, key= ImKey-17-one,value= ImValue-17-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 7, key= ImKey-24-one,value= ImValue-24-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 8, key= ImKey-32-one,value= ImValue-32-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 6, key= ImKey-18-one,value= ImValue-18-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 7, key= ImKey-19-one,value= ImValue-19-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 8, key= ImKey-20-one,value= ImValue-20-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 9, key= ImKey-22-one,value= ImValue-22-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 10, key= ImKey-23-one,value= ImValue-23-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 11, key= ImKey-26-one,value= ImValue-26-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 12, key= ImKey-28-one,value= ImValue-28-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 13, key= ImKey-30-one,value= ImValue-30-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 14, key= ImKey-31-one,value= ImValue-31-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 15, key= ImKey-33-one,value= ImValue-33-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 16, key= ImKey-34-one,value= ImValue-34-one

      可以看到三個(gè)線程,一個(gè)消費(fèi)者組,每個(gè)線程消費(fèi)者得到一個(gè)topic的分區(qū)去消費(fèi)消息。

      解釋: 消費(fèi)者程序使用單或多線程獲取消息,同時(shí)創(chuàng)建多個(gè)消費(fèi)線程執(zhí)行消息處理邏輯。獲取消息的線程可以是一個(gè)或多個(gè),每個(gè)維護(hù)專屬KafkaConsumer實(shí)例,處理消息交由特定線程池來做,從而實(shí)現(xiàn)消息獲取與消息處理的真正解耦。

      這里的多線程處理消息邏輯可以有多種方法,這里就列出來幾種:

      使用隊(duì)列存儲消息,多線程處理隊(duì)列:

      使用獨(dú)立鎖(FIFO)隊(duì)列LinkedBlockingQueue

      該隊(duì)列是線程安全的先進(jìn)先出隊(duì)列

      public class KafkaConsumerThread2 implements Runnable { // 存儲消息 先進(jìn)先出隊(duì)列 private LinkedBlockingQueue> list; private AtomicBoolean closed = new AtomicBoolean(false); public KafkaConsumerThread2() { } public KafkaConsumerThread2(LinkedBlockingQueue> list) { this.list = list; } @Override public void run() { // 線程名稱 String threadName = Thread.currentThread().getName(); // 處理消息 while (!closed.get()){ try { ConsumerRecords records = list.take(); System.out.println("消息數(shù)量"+records.count()); if (records.isEmpty()){ System.out.printf("隊(duì)列為空,不消費(fèi)數(shù)據(jù),Thread-name= %s\n",threadName); }else { for (ConsumerRecord record : records) { Thread.sleep(3000); System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value()); } } }catch (InterruptedException e){ e.printStackTrace(); } } } public static void main(String[] args) { LinkedBlockingQueue> list = new LinkedBlockingQueue<>(); Properties props = new Properties(); props.put("bootstrap.servers", "10.33.68.68:9093"); props.put("group.id", "thread-5");//消費(fèi)者組,只要group.id相同,就屬于同一個(gè)消費(fèi)者組 props.put("enable.auto.commit", "true");//自動提交offset props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records",5); KafkaConsumer consumer = new KafkaConsumer<>(props); // 消費(fèi)同一主題 consumer.subscribe(Collections.singletonList("six-topic")); // 開啟三個(gè)線程處理隊(duì)列中的消息 for (int i = 0; i <3 ; i++) { new Thread(new KafkaConsumerThread2(list),"thread-"+i).start(); } while (true){ ConsumerRecords records = consumer.poll(1000); try { list.put(records); //Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }

      創(chuàng)建線程池,使用線程池處理消息邏輯

      邏輯處理類ConsumerDealThread:

      public class ConsumerDealThread implements Runnable{ private ConsumerRecord record; public ConsumerDealThread(ConsumerRecord record) { this.record = record; } public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",Thread.currentThread().getName(),record.topic(),record.partition(),record.offset(),record.key(),record.value()); } }

      運(yùn)行類KafkaConsumerThread3:

      public class KafkaConsumerThread3 { public static void main(String[] args) { LinkedBlockingQueue> list = new LinkedBlockingQueue<>(); Properties props = new Properties(); props.put("bootstrap.servers", "10.33.68.68:9093"); props.put("group.id", "thread-18");//消費(fèi)者組,只要group.id相同,就屬于同一個(gè)消費(fèi)者組 props.put("enable.auto.commit", "true");//自動提交offset props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records",5); KafkaConsumer consumer = new KafkaConsumer<>(props); // 消費(fèi)同一主題 consumer.subscribe(Collections.singletonList("six-topic")); ExecutorService executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); while (true){ ConsumerRecords records = consumer.poll(1000); try { for (ConsumerRecord record : records) { executor.submit(new ConsumerDealThread(record)); } } catch (Exception e) { e.printStackTrace(); consumer.wakeup(); executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("超時(shí),未關(guān)閉線程池"); } } catch (InterruptedException e2) { e.printStackTrace(); } } BlockingQueue queue = ((ThreadPoolExecutor) executor).getQueue(); System.out.println("隊(duì)列數(shù)量:"+queue.size()); } } }

      EI企業(yè)智能 Kafka 可信智能計(jì)算服務(wù) TICS 多線程 智能數(shù)據(jù)

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:excel如何顯示網(wǎng)格線(怎樣讓excel顯示網(wǎng)格線)
      下一篇:拉迷家居是如何完成產(chǎn)銷一體化數(shù)字化變革的
      相關(guān)文章
      亚洲精品成a人在线观看| 亚洲日韩国产欧美一区二区三区| 亚洲日本VA午夜在线影院| 亚洲国语精品自产拍在线观看 | 亚洲无线观看国产精品| 综合久久久久久中文字幕亚洲国产国产综合一区首 | 老子影院午夜伦不卡亚洲| 日韩国产欧美亚洲v片| 亚洲AV无码国产一区二区三区| 亚洲欧洲无卡二区视頻| 亚洲午夜福利在线视频| 亚洲一区二区三区91| 亚洲毛片基地4455ww| 亚洲中文字幕久久精品蜜桃| 日本亚洲免费无线码| 亚洲午夜无码久久久久软件| 亚洲精品色播一区二区| 亚洲av永久无码精品秋霞电影秋 | 看亚洲a级一级毛片| 精品亚洲国产成人av| 九月婷婷亚洲综合在线| 国产天堂亚洲国产碰碰| 亚洲aⅴ天堂av天堂无码麻豆 | 亚洲欧洲免费视频| 中文字幕亚洲精品| 亚洲成a人片在线网站| 亚洲精品偷拍无码不卡av| 亚洲一卡2卡4卡5卡6卡在线99| 国产日本亚洲一区二区三区| 亚洲日韩国产AV无码无码精品| 亚洲一区二区无码偷拍| 亚洲AV成人精品一区二区三区| 看亚洲a级一级毛片| 久久亚洲AV永久无码精品| 国产成人精品日本亚洲网站| 亚洲男人天堂av| 亚洲成人高清在线观看| 亚洲码一区二区三区| 亚洲砖码砖专无区2023| 亚洲日韩精品A∨片无码加勒比| 久久水蜜桃亚洲AV无码精品|