【Kafka筆記】Kafka API詳細解析 Java版本(Producer API,Consumer API,攔截器等)

      網(wǎng)友投稿 872 2025-03-31

      Kafka的API有Producer API,Consumer API還有自定義Interceptor (自定義-),以及處理的流使用的Streams API和構(gòu)建連接器的Kafka Connect API。


      Kafka的Producer發(fā)送消息采用的是異步發(fā)送的方式。在消息發(fā)送過程中,涉及兩個線程:main線程和Sender線程,以及一個線程共享變量RecordAccumulator。main線程將消息發(fā)送給RecordAccmulator,Sender線程不斷地從RecordAccumulator中拉取消息發(fā)送給Kafka broker。

      這里的ACk機制,不是生產(chǎn)者得到ACK返回信息才開始發(fā)送,ACK保證的是生產(chǎn)者不丟失數(shù)據(jù)。例如:

      而是只要有消息數(shù)據(jù),就向broker發(fā)送。

      生產(chǎn)者使用send方法,經(jīng)過-之后在經(jīng)過序列化器,然后在走分區(qū)器。然后通過分批次把數(shù)據(jù)發(fā)送到PecordAccumulator,main線程到此過程就結(jié)束了,然后在回去執(zhí)行send。

      Sender線程不斷的獲取RecordAccumulator的數(shù)據(jù)發(fā)送到topic。

      消息發(fā)送流程是異步發(fā)送的,并且順序是一定的--》序列化器-》分區(qū)器

      需要用到的類:

      KafkaProducer: 需要創(chuàng)建一個生產(chǎn)者對象,用來發(fā)送數(shù)據(jù) ProducerConfig:獲取所需要的一系列配置參數(shù) ProducerRecord:每條數(shù)據(jù)都要封裝成一個ProducerRecord對象

      實例:

      public class KafkaProducerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXXXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重試次數(shù) props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待時間 props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區(qū)大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 創(chuàng)建KafkaProducer客戶端 KafkaProducer producer = new KafkaProducer<>(props); for (int i = 0; i < 10 ; i++) { producer.send(new ProducerRecord<>("my-topic","ImKey-"+i,"ImValue-"+i)); } // 關(guān)閉資源 producer.close(); } }

      配置參數(shù)說明:

      send():方法是異步的,添加消息到緩沖區(qū)等待發(fā)送,并立即返回。生產(chǎn)者將單個的消息批量在一起發(fā)送來提高效率。

      ack:是判斷請求是否完整的條件(就會判斷是不是成功發(fā)送了,也就是上次說的ACK機制),指定all將會阻塞消息,性能低但是最可靠。

      retries:如果請求失敗,生產(chǎn)者會自動重試,我們指定是1次,但是啟動重試就有可能出現(xiàn)重復(fù)數(shù)據(jù)。

      batch.size:指定緩存的大小,生產(chǎn)者緩存每個分區(qū)未發(fā)送的消息。值越大的話將會產(chǎn)生更大的批量,并需要更大的內(nèi)存(因為每個活躍的分區(qū)都有一個緩存區(qū))。

      linger.ms:指示生產(chǎn)者發(fā)送請求之前等待一段時間,設(shè)置等待時間是希望更多地消息填補到未滿的批中。默認緩沖可以立即發(fā)送,即便緩沖空間還沒有滿,但是如果想減少請求的數(shù)量可以設(shè)置linger.ms大于0。需要注意的是在高負載下,相近的時間一般也會組成批,即使等于0。

      buffer.memory:控制生產(chǎn)者可用的緩存總量,如果消息發(fā)送速度比其傳輸?shù)椒?wù)器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其他發(fā)送調(diào)用將被阻塞,阻塞時間的閾值通過max.block.ms設(shè)定,之后將會拋出一個TimeoutException

      key.serializer和value.serializer將用戶提供的key和value對象ProducerRecord轉(zhuǎn)換成字節(jié),你可以使用附帶的ByteArraySerializaer或StringSerializer處理簡單的string或byte類型。

      運行日志:

      [Godway] INFO 2019-11-14 14:46 - org.apache.kafka.clients.producer.ProducerConfig[main] - ProducerConfig values: acks = all batch.size = 16384 bootstrap.servers = [XXXXXX:9093] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 1 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 1 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer [Godway] INFO 2019-11-14 14:46 - org.apache.kafka.common.utils.AppInfoParser[main] - Kafka version : 0.11.0.3 [Godway] INFO 2019-11-14 14:46 - org.apache.kafka.common.utils.AppInfoParser[main] - Kafka commitId : 26ddb9e3197be39a [Godway] WARN 2019-11-14 14:46 - org.apache.kafka.clients.NetworkClient[kafka-producer-network-thread | producer-1] - Error while fetching metadata with correlation id 1 : {my-topic=LEADER_NOT_AVAILABLE} [Godway] INFO 2019-11-14 14:46 - org.apache.kafka.clients.producer.KafkaProducer[main] - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. Process finished with exit code 0

      有一條警告{my-topic=LEADER_NOT_AVAILABLE} 提示該topic不存在,但是沒有關(guān)系kafka會自動給你創(chuàng)建一個topic,不過創(chuàng)建的topic是有一個分區(qū)和一個副本:

      查看一下該topic的消息:

      消息已經(jīng)在topic里了

      上面的實例是沒有回調(diào)函數(shù)的,send方法是有回調(diào)函數(shù)的:

      public class KafkaProducerCallbackDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重試次數(shù) props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待時間 props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區(qū)大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 創(chuàng)建KafkaProducer客戶端 KafkaProducer producer = new KafkaProducer<>(props); for (int i = 10; i < 20 ; i++) { producer.send(new ProducerRecord("my-topic", "ImKey-" + i, "ImValue-" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ System.out.println("消息發(fā)送成功!"+recordMetadata.offset()); }else { System.err.println("消息發(fā)送失敗!"); } } }); } producer.close(); } }

      回調(diào)函數(shù)會在producer收到ack時調(diào)用,為異步調(diào)用,該方法有兩個參數(shù),分別是RecordMetadata和Exception,如果Exception為null,說明消息發(fā)送成功,如果Exception不為null說明消息發(fā)送失敗。

      注意: 消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試,使用回調(diào)也是無阻塞的。而且callback一般在生產(chǎn)者的IO線程中執(zhí)行,所以是非常快的,否則將延遲其他的線程消息發(fā)送。如果需要執(zhí)行阻塞或者計算的回調(diào)(耗時比較長),建議在callbanck主體中使用自己的Executor來并行處理!

      同步發(fā)送的意思就是,一條消息發(fā)送之后,會阻塞當前的線程,直到返回ack(此ack和異步的ack機制不是一個ack)。

      此ack是Future阻塞main線程,當發(fā)送完成就返回一個ack去通知main線程已經(jīng)發(fā)送完畢,繼續(xù)往下走了

      public Future send(ProducerRecord record,Callback callback)

      send是異步的,并且一旦消息被保存在等待發(fā)送的消息緩存中,此方法就立即返回。這樣并行發(fā)送多條消息而不阻塞去等待每一條消息的響應(yīng)。

      發(fā)送的結(jié)果是一個RecordMetadata,它指定了消息發(fā)送的分區(qū),分配的offset和消息的時間戳。如果topic使用的是CreateTime,則使用用戶提供的時間戳或發(fā)送的時間(如果用戶沒有指定指定消息的時間戳)如果topic使用的是LogAppendTime,則追加消息時,時間戳是broker的本地時間。

      由于send調(diào)用是異步的,它將為分配消息的此消息的RecordMetadata返回一個Future。如果future調(diào)用get(),則將阻塞,直到相關(guān)請求完成并返回該消息的metadata,或拋出發(fā)送異常。

      InterruptException - 如果線程在阻塞中斷。

      SerializationException - 如果key或value不是給定有效配置的serializers。

      TimeoutException - 如果獲取元數(shù)據(jù)或消息分配內(nèi)存話費的時間超過max.block.ms。

      KafkaException - Kafka有關(guān)的錯誤(不屬于公共API的異常)。

      public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重試次數(shù) props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待時間 props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區(qū)大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 創(chuàng)建KafkaProducer客戶端 KafkaProducer producer = new KafkaProducer<>(props); for (int i = 20; i < 30 ; i++) { RecordMetadata metadata = producer.send(new ProducerRecord<>("my-topic", "ImKey-" + i, "ImValue-" + i)).get(); System.out.println(metadata.offset()); } producer.close(); } }

      生產(chǎn)者在向topic發(fā)送消息的時候的分區(qū)規(guī)則:

      public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable

      headers) public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) public ProducerRecord(String topic, Integer partition, K key, V value, Iterable
      headers) public ProducerRecord(String topic, Integer partition, K key, V value) public ProducerRecord(String topic, K key, V value) public ProducerRecord(String topic, V value)

      根據(jù)send方法的參數(shù)的構(gòu)造方法就可以看出來,

      指定分區(qū)就發(fā)送到指定分區(qū)

      沒有指定分區(qū),有key值,就按照key值的Hash值分配分區(qū)

      沒有指定分區(qū),也沒有指定key值,輪詢分區(qū)分配(只分配一次,以后都按照第一次的分區(qū)順序)

      自定義分區(qū)器需要實現(xiàn)org.apache.kafka.clients.producer.Partitioner接口。并且實現(xiàn)三個方法

      public class KafkaMyPartitions implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { return 0; } @Override public void close() { } @Override public void configure(Map map) { } }

      自定義分區(qū)實例:

      KafkaMyPartitions:

      public class KafkaMyPartitions implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { // 這里寫自己的分區(qū)策略 // 我這里指定為1 return 1; } @Override public void close() { } @Override public void configure(Map map) { } }

      KafkaProducerCallbackDemo:

      public class KafkaProducerCallbackDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重試次數(shù) props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待時間 props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區(qū)大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定自定義分區(qū) props.put("partitioner.class","com.firehome.newkafka.KafkaMyPartitions"); // 創(chuàng)建KafkaProducer客戶端 KafkaProducer producer = new KafkaProducer<>(props); for (int i = 20; i < 25 ; i++) { producer.send(new ProducerRecord("th-topic", "ImKey-" + i, "ImValue-" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ System.out.printf("消息發(fā)送成功!topic=%s,partition=%s,offset=%d \n",recordMetadata.topic(),recordMetadata.partition(),recordMetadata.offset()); }else { System.err.println("消息發(fā)送失敗!"); } } }); } producer.close(); } }

      返回日志:

      消息發(fā)送成功!topic=th-topic,partition=1,offset=27 消息發(fā)送成功!topic=th-topic,partition=1,offset=28 消息發(fā)送成功!topic=th-topic,partition=1,offset=29 消息發(fā)送成功!topic=th-topic,partition=1,offset=30 消息發(fā)送成功!topic=th-topic,partition=1,offset=31

      可以看到直接發(fā)送到了分區(qū)1上了。

      Producer API是線程安全的,直接就可以使用多線程發(fā)送消息,實例:

      public class KafkaProducerThread implements Runnable { private KafkaProducer kafkaProducer; public KafkaProducerThread(){ } public KafkaProducerThread(KafkaProducer kafkaProducer){ this.kafkaProducer = kafkaProducer; } @Override public void run() { for (int i = 0; i < 20 ; i++) { String key = "ImKey-" + i+"-"+Thread.currentThread().getName(); String value = "ImValue-" + i+"-"+Thread.currentThread().getName(); kafkaProducer.send(new ProducerRecord<>("th-topic", key, value)); System.out.printf("Thread-name = %s, key = %s, value = %s",Thread.currentThread().getName(),key,value); } } public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重試次數(shù) props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待時間 props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區(qū)大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 創(chuàng)建KafkaProducer客戶端 KafkaProducer producer = new KafkaProducer<>(props); KafkaProducerThread producerThread1 = new KafkaProducerThread(producer); //KafkaProducerThread producerThread2 = new KafkaProducerThread(producer); Thread one = new Thread(producerThread1, "one"); Thread two = new Thread(producerThread1, "two"); System.out.println("線程開始"); one.start(); two.start(); } }

      這里只是一個簡單的實例。

      kafka客戶端通過TCP長連接從集群中消費消息,并透明地處理kafka集群中出現(xiàn)故障服務(wù)器,透明地調(diào)節(jié)適應(yīng)集群中變化的數(shù)據(jù)分區(qū)。也和服務(wù)器交互,平衡均衡消費者。

      kafka為分區(qū)中的每條消息保存一個偏移量(offset),這個偏移量是該分區(qū)中一條消息的唯一標示符。也表示消費者在分區(qū)的位置。例如,一個位置是5的消費者(說明已經(jīng)消費了0到4的消息),下一個接收消息的偏移量為5的消息。實際上有兩個與消費者相關(guān)的“位置”概念:

      消費者的位置給出了下一條記錄的偏移量。它比消費者在該分區(qū)中看到的最大偏移量要大一個。 它在每次消費者在調(diào)用poll(long)中接收消息時自動增長。

      “已提交”的位置是已安全保存的最后偏移量,如果進程失敗或重新啟動時,消費者將恢復(fù)到這個偏移量。消費者可以選擇定期自動提交偏移量,也可以選擇通過調(diào)用commit API來手動的控制(如:commitSync 和 commitAsync)。

      這個區(qū)別是消費者來控制一條消息什么時候才被認為是已被消費的,控制權(quán)在消費者。

      Kafka的消費者組概念,通過進程池瓜分消息并處理消息。這些進程可以在同一臺機器運行,也可分布到多臺機器上,以增加可擴展性和容錯性,相同group.id的消費者將視為同一個消費者組。

      分組中的每個消費者都通過subscribe API動態(tài)的訂閱一個topic列表。kafka將已訂閱topic的消息發(fā)送到每個消費者組中。并通過平衡分區(qū)在消費者分組中所有成員之間來達到平均。因此每個分區(qū)恰好地分配1個消費者(一個消費者組中)。所有如果一個topic有4個分區(qū),并且一個消費者分組有只有2個消費者。那么每個消費者將消費2個分區(qū)。

      消費者組的成員是動態(tài)維護的:如果一個消費者故障。分配給它的分區(qū)將重新分配給同一個分組中其他的消費者。同樣的,如果一個新的消費者加入到分組,將從現(xiàn)有消費者中移一個給它。這被稱為重新平衡分組。當新分區(qū)添加到訂閱的topic時,或者當創(chuàng)建與訂閱的正則表達式匹配的新topic時,也將重新平衡。將通過定時刷新自動發(fā)現(xiàn)新的分區(qū),并將其分配給分組的成員。

      從概念上講,你可以將消費者分組看作是由多個進程組成的單一邏輯訂閱者。作為一個多訂閱系統(tǒng),Kafka支持對于給定topic任何數(shù)量的消費者組,而不重復(fù)。

      這是在消息系統(tǒng)中常見的功能的略微概括。所有進程都將是單個消費者分組的一部分(類似傳統(tǒng)消息傳遞系統(tǒng)中的隊列的語義),因此消息傳遞就像隊列一樣,在組中平衡。與傳統(tǒng)的消息系統(tǒng)不同的是,雖然,你可以有多個這樣的組。但每個進程都有自己的消費者組(類似于傳統(tǒng)消息系統(tǒng)中pub-sub的語義),因此每個進程都會訂閱到該主題的所有消息。

      此外,當分組重新分配自動發(fā)生時,可以通過ConsumerRebalanceListener通知消費者,這允許他們完成必要的應(yīng)用程序級邏輯,例如狀態(tài)清除,手動偏移提交等

      它也允許消費者通過使用assign(Collection)手動分配指定分區(qū),如果使用手動指定分配分區(qū),那么動態(tài)分區(qū)分配和協(xié)調(diào)消費者組將失效。

      訂閱一組topic,當調(diào)用poll(long)時,消費者將自動加入到消費者組中。只要持續(xù)調(diào)用poll,消費者將一直保持可用,并繼續(xù)從分配的分區(qū)中接收數(shù)據(jù)。此外,消費者向服務(wù)器定時發(fā)送心跳。如果消費者崩潰或無法再session.timeout.ms配置的時間內(nèi)發(fā)送心跳,則消費者就被視為死亡,并且其分區(qū)將被重新分配。

      還有一種可能,消費者可能遇到活鎖的情況,它持續(xù)的發(fā)送心跳,但是沒有處理。為了預(yù)防消費者在這總情況下一直擁有分區(qū),我們使用max.poll.interval.ms活躍監(jiān)測機制。在此基礎(chǔ)上,如果你調(diào)用的poll的頻率大于最大間隔,則客戶端將主動地離開組,以便其他消費者接管該分區(qū)。發(fā)生這種情況時,你會看到offset提交失敗( 調(diào)用commitSync()引發(fā)的CommitFailedException )。這是一種安全機制,保障只有活動成員能夠提交offset。所以要留在組中,你必須持續(xù)調(diào)用poll。

      消費者提供兩種配置設(shè)置來控制poll循環(huán):

      max.poll.interval.ms: 增大poll的間隔,可以為消費者提供更多的時間去處理返回的消息(調(diào)用poll(long)返回的消息,通常返回的消息都是一批),缺點是此值越大將會延遲組重新平衡。

      max.poll.records:此設(shè)置限制每次調(diào)用poll返回的消息數(shù),這樣可以更容易的預(yù)測每次poll間隔要處理的最大值。通過調(diào)整此值,可以減少poll間隔,減少重新平衡分組的

      對于消息處理時間不可預(yù)測地情況,這些選項是不夠的。 處理這種情況的推薦方法是將消息處理移到另一個線程中,讓消費者繼續(xù)調(diào)用poll。 但是必須注意確保已提交的offset不超過實際位置。另外,你必須禁用自動提交,并只有在線程完成處理后才為記錄手動提交偏移量。 還要注意, 你需要pause暫停分區(qū),不會從poll接收到新消息,讓線程處理完之前返回的消息(如果你的處理能力比拉取消息的慢,那創(chuàng)建新線程將導(dǎo)致機器內(nèi)存溢出)。

      實例:

      public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers","xxxxxxxxxx:9093"); props.put("group.id","test-6");//消費者組,只要group.id相同,就屬于同一個消費者組 props.put("enable.auto.commit","true");//自動提交offset props.put("auto.commit.interval.ms","1000"); // 自動提交時間間隔 props.put("max.poll.records","5"); // 拉取的數(shù)據(jù)條數(shù) props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer consumer = new KafkaConsumer<>(props); // 可以寫多個topic consumer.subscribe(Arrays.asList("my-topic")); while (true){ ConsumerRecords records = consumer.poll(5000); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } System.out.println("處理了一批數(shù)據(jù)!"); } }

      配置說明:

      bootstrap.servers: 集群是通過配置bootstrap.servers指定一個或多個broker。不用指定全部的broker,它將自動發(fā)現(xiàn)集群中的其余的borker(最好指定多個,萬一有服務(wù)器故障)

      enable.auto.commit: 自動提交偏移量,如果設(shè)置了自動提交偏移量,下面這個設(shè)置就必須要用到了。

      auto.commit.interval.ms:自動提交時間間隔,和自動提交偏移量配合使用

      max.poll.records:控制從 broker拉取的消息條數(shù)

      poll(long time): 當消費者獲取不到消息時,就會使用這個參數(shù),為了減輕無效的循環(huán)請求消息,消費者會每隔long time的時間請求一次消息,單位是毫秒。

      session.timeout.ms: broker通過心跳機器自動檢測消費者組中失敗的進程,消費者會自動ping集群,告訴進群它還活著。只要消費者能夠做到這一點,它就被認為是活著的,并保留分配給它分區(qū)的權(quán)利,如果它停止心跳的時間超過session.timeout.ms,那么就會認為是故障的,它的分區(qū)將被分配到別的進程。

      auto.offset.reset:這個屬性很重要,一會詳細講解

      這里說明一下auto.commit.interval.ms以及何時提交消費者偏移量,經(jīng)過測試:

      設(shè)置props.put("auto.commit.interval.ms","60000");

      自動提交時間為一分鐘,也就是你在這一分鐘內(nèi)拉取任何數(shù)量的消息都不會被提交消費的當前偏移量,如果你此時關(guān)閉消費者(一分鐘內(nèi)),下次消費還是從和第一次的消費數(shù)據(jù)一樣,即使你在一分鐘內(nèi)消費完所有的消息,只要你在一分鐘內(nèi)關(guān)閉程序,導(dǎo)致提交不了offset,就可以一直重復(fù)消費數(shù)據(jù)。

      設(shè)置props.put("auto.commit.interval.ms","3000");

      但是在消費過程中設(shè)置sleep。

      public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers","xxxxxxxxxxxx:9093"); props.put("group.id","test-6");//消費者組,只要group.id相同,就屬于同一個消費者組 props.put("enable.auto.commit","true");//自動提交offset props.put("auto.commit.interval.ms","100000"); // 自動提交時間間隔 props.put("max.poll.records","5"); // 拉取的數(shù)據(jù)條數(shù) props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); // KafkaConsumer consumer = new KafkaConsumer<>(props); // 可以寫多個topic consumer.subscribe(Arrays.asList("my-topic")); while (true){ ConsumerRecords records = consumer.poll(5000); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } try { Thread.sleep(5000L); System.out.println("等待了5秒了!!!!!!!!!!!!開始等待15秒了"); Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("處理了一批數(shù)據(jù)!"); } }

      這里如果你消費了第一批數(shù)據(jù),在執(zhí)行第二次poll的時候,關(guān)閉程序也不會提交偏移量,只有在執(zhí)行第二次poll的時候才會把上一次的最后一個offset提交上去。

      auto.offset.reset講解:

      auto.offset.reset的值有三種:earliest,latest,none,代表者不同的意思

      earliest: 當各分區(qū)下有已經(jīng)提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費,最常用的值 latest: 當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù) none: topic各分區(qū)都存在已提交的offset時,從offset后開始消費,只要有一個分區(qū)不存在已提交的offset,則拋出異常 !!注意:當使用了latest,并且分區(qū)沒有已提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù),其實是把offset的值直接設(shè)置到最后一個消息的位置。例如,有個30條數(shù)據(jù)的demo的topic,各分區(qū)無提交offset,使用了latest,再看offset就會發(fā)現(xiàn)已經(jīng)在30的位置了,所以才只能消費新產(chǎn)生的數(shù)據(jù)!!!!

      不需要定時提交偏移量,可以自己控制offset,當消息已經(jīng)被我們消費過后,再去手動提交他們的偏移量。這個很適合我們的一些處理邏輯。

      手動提交offset的方法有兩種:分別是commitSync(同步提交) 和commitAsync(異步提交)。兩者的相同點,都會將本次poll的一批數(shù)據(jù)最高的偏移量提交;不同點是commitSync會失敗重試,一直到提交成功(如果有不可恢復(fù)的原因?qū)е拢矔峤皇。湃ダ⌒聰?shù)據(jù)。而commitAsync則沒有重試機制(提交了就去拉取新數(shù)據(jù),不管這次的提交有沒有成功),故有可能提交失敗。

      實例:

      public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers","XXXXXC:9093"); props.put("group.id","test-11");//消費者組,只要group.id相同,就屬于同一個消費者組 props.put("enable.auto.commit","false");//自動提交offset props.put("auto.commit.interval.ms","1000"); // 自動提交時間間隔 props.put("max.poll.records","20"); // 拉取的數(shù)據(jù)條數(shù) props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); int i= 0; while (true){ ConsumerRecords records = consumer.poll(5000); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); i++; } if (i == 20){ System.out.println("i_num:"+i); // 同步提交 consumer.commitSync(); // 異步提交 // consumer.commitAsync(); }else { System.out.println("不足二十個,不提交"+i); } i=0; } }

      這些都是全部提交偏移量,如果我們想更細致的控制偏移量提交,可以自定義提交偏移量:

      public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers","XXXXXXXXXX:9093"); props.put("group.id","test-18");//消費者組,只要group.id相同,就屬于同一個消費者組 props.put("enable.auto.commit","false");//自動提交offset props.put("auto.commit.interval.ms","1000000"); // 自動提交時間間隔 props.put("max.poll.records","5"); // 拉取的數(shù)據(jù)條數(shù) props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true){ ConsumerRecords records = consumer.poll(5000); for (TopicPartition partition : records.partitions()) { List> partitionRecords = records.records(partition); for (ConsumerRecord record : partitionRecords) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)), new OffsetCommitCallback() { @Override public void onComplete(Map map, Exception e) { for (Map.Entry entry : map.entrySet()){ System.out.println("提交的分區(qū):"+entry.getKey().partition()+",提交的偏移量:"+entry.getValue().offset()); } } }); } } }

      通過消費者Kafka會通過分區(qū)分配分給消費者一個分區(qū),但是我們也可以指定分區(qū)消費消息,要使用指定分區(qū),只需要調(diào)用assign(Collection)消費指定的分區(qū)即可:

      public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers","XXXXXXXXX:9093"); props.put("group.id","test-19");//消費者組,只要group.id相同,就屬于同一個消費者組 props.put("enable.auto.commit","false");//自動提交offset props.put("auto.commit.interval.ms","1000000"); // 自動提交時間間隔 props.put("max.poll.records","5"); // 拉取的數(shù)據(jù)條數(shù) props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer consumer = new KafkaConsumer<>(props); // 你可以指定多個不同topic的分區(qū)或者相同topic的分區(qū) 我這里只指定一個分區(qū) TopicPartition topicPartition = new TopicPartition("my-topic", 0); // 調(diào)用指定分區(qū)用assign,消費topic使用subscribe consumer.assign(Arrays.asList(topicPartition)); while (true){ ConsumerRecords records = consumer.poll(5000); for (TopicPartition partition : records.partitions()) { List> partitionRecords = records.records(partition); for (ConsumerRecord record : partitionRecords) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)), new OffsetCommitCallback() { @Override public void onComplete(Map map, Exception e) { for (Map.Entry entry : map.entrySet()){ System.out.println("提交的分區(qū):"+entry.getKey().partition()+",提交的偏移量:"+entry.getValue().offset()); } } }); } } }

      一旦手動分配分區(qū),你可以在循環(huán)中調(diào)用poll。消費者分區(qū)任然需要提交offset,只是現(xiàn)在分區(qū)的設(shè)置只能通過調(diào)用assign 修改,因為手動分配不會進行分組協(xié)調(diào),因此消費者故障或者消費者的數(shù)量變動都不會引起分區(qū)重新平衡。每一個消費者是獨立工作的(即使和其他的消費者共享GroupId)。為了避免offset提交沖突,通常你需要確認每一個consumer實例的groupId都是唯一的。

      注意:

      手動分配分區(qū)(assgin)和動態(tài)分區(qū)分配的訂閱topic模式(subcribe)不能混合使用。

      【Kafka筆記】Kafka API詳細解析 Java版本(Producer API,Consumer API,-等)

      EI企業(yè)智能 Java Kafka 可信智能計算服務(wù) TICS 智能數(shù)據(jù)

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:WPS文字如何設(shè)置跨頁表頭的標題行重復(fù) ?
      下一篇:android移動支付——銀聯(lián)支付
      相關(guān)文章
      亚洲国产精品无码专区在线观看 | 亚洲六月丁香婷婷综合| 精品亚洲福利一区二区| 亚洲网站在线播放| 亚洲综合日韩中文字幕v在线| 亚洲人精品午夜射精日韩| 中文字幕亚洲综合久久菠萝蜜| 2022中文字字幕久亚洲| jlzzjlzz亚洲乱熟在线播放| jlzzjlzz亚洲乱熟在线播放| 国产综合亚洲专区在线| 中文字幕中韩乱码亚洲大片| 国产亚洲视频在线播放| 亚洲欧洲成人精品香蕉网| 亚洲第一AV网站| 亚洲电影中文字幕| 亚洲毛片在线观看| 久久精品国产亚洲av麻豆小说| 亚洲视频在线观看网址| 亚洲国产韩国一区二区| 亚洲看片无码在线视频| 亚洲男人的天堂网站| 国产亚洲日韩在线a不卡| 欧美色欧美亚洲另类二区| 亚洲成A人片在线观看中文 | 亚洲国产精品无码久久| 国产精品亚洲а∨天堂2021| 亚洲乱码日产精品a级毛片久久| 中国亚洲女人69内射少妇| 亚洲大成色www永久网站| 精品亚洲成a人片在线观看| 亚洲第一网站免费视频| 亚洲午夜一区二区三区| 亚洲欧洲专线一区| 亚洲国产精品成人| 国产av天堂亚洲国产av天堂| 亚洲精品综合久久| 亚洲AV无码一区二区三区DV| 7777久久亚洲中文字幕蜜桃| 中中文字幕亚洲无线码| 无码一区二区三区亚洲人妻|