AssetBundle使用,卸載,校驗
872
2025-03-31
Kafka的API有Producer API,Consumer API還有自定義Interceptor (自定義-),以及處理的流使用的Streams API和構(gòu)建連接器的Kafka Connect API。
Kafka的Producer發(fā)送消息采用的是異步發(fā)送的方式。在消息發(fā)送過程中,涉及兩個線程:main線程和Sender線程,以及一個線程共享變量RecordAccumulator。main線程將消息發(fā)送給RecordAccmulator,Sender線程不斷地從RecordAccumulator中拉取消息發(fā)送給Kafka broker。
這里的ACk機制,不是生產(chǎn)者得到ACK返回信息才開始發(fā)送,ACK保證的是生產(chǎn)者不丟失數(shù)據(jù)。例如:
而是只要有消息數(shù)據(jù),就向broker發(fā)送。
生產(chǎn)者使用send方法,經(jīng)過-之后在經(jīng)過序列化器,然后在走分區(qū)器。然后通過分批次把數(shù)據(jù)發(fā)送到PecordAccumulator,main線程到此過程就結(jié)束了,然后在回去執(zhí)行send。
Sender線程不斷的獲取RecordAccumulator的數(shù)據(jù)發(fā)送到topic。
消息發(fā)送流程是異步發(fā)送的,并且順序是一定的--》序列化器-》分區(qū)器
需要用到的類:
KafkaProducer: 需要創(chuàng)建一個生產(chǎn)者對象,用來發(fā)送數(shù)據(jù) ProducerConfig:獲取所需要的一系列配置參數(shù) ProducerRecord:每條數(shù)據(jù)都要封裝成一個ProducerRecord對象
實例:
public class KafkaProducerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXXXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重試次數(shù) props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待時間 props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區(qū)大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 創(chuàng)建KafkaProducer客戶端 KafkaProducer
配置參數(shù)說明:
send():方法是異步的,添加消息到緩沖區(qū)等待發(fā)送,并立即返回。生產(chǎn)者將單個的消息批量在一起發(fā)送來提高效率。
ack:是判斷請求是否完整的條件(就會判斷是不是成功發(fā)送了,也就是上次說的ACK機制),指定all將會阻塞消息,性能低但是最可靠。
retries:如果請求失敗,生產(chǎn)者會自動重試,我們指定是1次,但是啟動重試就有可能出現(xiàn)重復(fù)數(shù)據(jù)。
batch.size:指定緩存的大小,生產(chǎn)者緩存每個分區(qū)未發(fā)送的消息。值越大的話將會產(chǎn)生更大的批量,并需要更大的內(nèi)存(因為每個活躍的分區(qū)都有一個緩存區(qū))。
linger.ms:指示生產(chǎn)者發(fā)送請求之前等待一段時間,設(shè)置等待時間是希望更多地消息填補到未滿的批中。默認緩沖可以立即發(fā)送,即便緩沖空間還沒有滿,但是如果想減少請求的數(shù)量可以設(shè)置linger.ms大于0。需要注意的是在高負載下,相近的時間一般也會組成批,即使等于0。
buffer.memory:控制生產(chǎn)者可用的緩存總量,如果消息發(fā)送速度比其傳輸?shù)椒?wù)器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其他發(fā)送調(diào)用將被阻塞,阻塞時間的閾值通過max.block.ms設(shè)定,之后將會拋出一個TimeoutException
key.serializer和value.serializer將用戶提供的key和value對象ProducerRecord轉(zhuǎn)換成字節(jié),你可以使用附帶的ByteArraySerializaer或StringSerializer處理簡單的string或byte類型。
運行日志:
[Godway] INFO 2019-11-14 14:46 - org.apache.kafka.clients.producer.ProducerConfig[main] - ProducerConfig values: acks = all batch.size = 16384 bootstrap.servers = [XXXXXX:9093] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 1 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 1 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer [Godway] INFO 2019-11-14 14:46 - org.apache.kafka.common.utils.AppInfoParser[main] - Kafka version : 0.11.0.3 [Godway] INFO 2019-11-14 14:46 - org.apache.kafka.common.utils.AppInfoParser[main] - Kafka commitId : 26ddb9e3197be39a [Godway] WARN 2019-11-14 14:46 - org.apache.kafka.clients.NetworkClient[kafka-producer-network-thread | producer-1] - Error while fetching metadata with correlation id 1 : {my-topic=LEADER_NOT_AVAILABLE} [Godway] INFO 2019-11-14 14:46 - org.apache.kafka.clients.producer.KafkaProducer[main] - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. Process finished with exit code 0
有一條警告{my-topic=LEADER_NOT_AVAILABLE} 提示該topic不存在,但是沒有關(guān)系kafka會自動給你創(chuàng)建一個topic,不過創(chuàng)建的topic是有一個分區(qū)和一個副本:
查看一下該topic的消息:
消息已經(jīng)在topic里了
上面的實例是沒有回調(diào)函數(shù)的,send方法是有回調(diào)函數(shù)的:
public class KafkaProducerCallbackDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重試次數(shù) props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待時間 props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區(qū)大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 創(chuàng)建KafkaProducer客戶端 KafkaProducer
回調(diào)函數(shù)會在producer收到ack時調(diào)用,為異步調(diào)用,該方法有兩個參數(shù),分別是RecordMetadata和Exception,如果Exception為null,說明消息發(fā)送成功,如果Exception不為null說明消息發(fā)送失敗。
注意: 消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試,使用回調(diào)也是無阻塞的。而且callback一般在生產(chǎn)者的IO線程中執(zhí)行,所以是非常快的,否則將延遲其他的線程消息發(fā)送。如果需要執(zhí)行阻塞或者計算的回調(diào)(耗時比較長),建議在callbanck主體中使用自己的Executor來并行處理!
同步發(fā)送的意思就是,一條消息發(fā)送之后,會阻塞當前的線程,直到返回ack(此ack和異步的ack機制不是一個ack)。
此ack是Future阻塞main線程,當發(fā)送完成就返回一個ack去通知main線程已經(jīng)發(fā)送完畢,繼續(xù)往下走了
public Future
send是異步的,并且一旦消息被保存在等待發(fā)送的消息緩存中,此方法就立即返回。這樣并行發(fā)送多條消息而不阻塞去等待每一條消息的響應(yīng)。
發(fā)送的結(jié)果是一個RecordMetadata,它指定了消息發(fā)送的分區(qū),分配的offset和消息的時間戳。如果topic使用的是CreateTime,則使用用戶提供的時間戳或發(fā)送的時間(如果用戶沒有指定指定消息的時間戳)如果topic使用的是LogAppendTime,則追加消息時,時間戳是broker的本地時間。
由于send調(diào)用是異步的,它將為分配消息的此消息的RecordMetadata返回一個Future。如果future調(diào)用get(),則將阻塞,直到相關(guān)請求完成并返回該消息的metadata,或拋出發(fā)送異常。
InterruptException - 如果線程在阻塞中斷。
SerializationException - 如果key或value不是給定有效配置的serializers。
TimeoutException - 如果獲取元數(shù)據(jù)或消息分配內(nèi)存話費的時間超過max.block.ms。
KafkaException - Kafka有關(guān)的錯誤(不屬于公共API的異常)。
public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重試次數(shù) props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待時間 props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區(qū)大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 創(chuàng)建KafkaProducer客戶端 KafkaProducer
生產(chǎn)者在向topic發(fā)送消息的時候的分區(qū)規(guī)則:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable
根據(jù)send方法的參數(shù)的構(gòu)造方法就可以看出來,
指定分區(qū)就發(fā)送到指定分區(qū)
沒有指定分區(qū),有key值,就按照key值的Hash值分配分區(qū)
沒有指定分區(qū),也沒有指定key值,輪詢分區(qū)分配(只分配一次,以后都按照第一次的分區(qū)順序)
自定義分區(qū)器需要實現(xiàn)org.apache.kafka.clients.producer.Partitioner接口。并且實現(xiàn)三個方法
public class KafkaMyPartitions implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { return 0; } @Override public void close() { } @Override public void configure(Map
自定義分區(qū)實例:
KafkaMyPartitions:
public class KafkaMyPartitions implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { // 這里寫自己的分區(qū)策略 // 我這里指定為1 return 1; } @Override public void close() { } @Override public void configure(Map
KafkaProducerCallbackDemo:
public class KafkaProducerCallbackDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重試次數(shù) props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待時間 props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區(qū)大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定自定義分區(qū) props.put("partitioner.class","com.firehome.newkafka.KafkaMyPartitions"); // 創(chuàng)建KafkaProducer客戶端 KafkaProducer
返回日志:
消息發(fā)送成功!topic=th-topic,partition=1,offset=27 消息發(fā)送成功!topic=th-topic,partition=1,offset=28 消息發(fā)送成功!topic=th-topic,partition=1,offset=29 消息發(fā)送成功!topic=th-topic,partition=1,offset=30 消息發(fā)送成功!topic=th-topic,partition=1,offset=31
可以看到直接發(fā)送到了分區(qū)1上了。
Producer API是線程安全的,直接就可以使用多線程發(fā)送消息,實例:
public class KafkaProducerThread implements Runnable { private KafkaProducer
這里只是一個簡單的實例。
kafka客戶端通過TCP長連接從集群中消費消息,并透明地處理kafka集群中出現(xiàn)故障服務(wù)器,透明地調(diào)節(jié)適應(yīng)集群中變化的數(shù)據(jù)分區(qū)。也和服務(wù)器交互,平衡均衡消費者。
kafka為分區(qū)中的每條消息保存一個偏移量(offset),這個偏移量是該分區(qū)中一條消息的唯一標示符。也表示消費者在分區(qū)的位置。例如,一個位置是5的消費者(說明已經(jīng)消費了0到4的消息),下一個接收消息的偏移量為5的消息。實際上有兩個與消費者相關(guān)的“位置”概念:
消費者的位置給出了下一條記錄的偏移量。它比消費者在該分區(qū)中看到的最大偏移量要大一個。 它在每次消費者在調(diào)用poll(long)中接收消息時自動增長。
“已提交”的位置是已安全保存的最后偏移量,如果進程失敗或重新啟動時,消費者將恢復(fù)到這個偏移量。消費者可以選擇定期自動提交偏移量,也可以選擇通過調(diào)用commit API來手動的控制(如:commitSync 和 commitAsync)。
這個區(qū)別是消費者來控制一條消息什么時候才被認為是已被消費的,控制權(quán)在消費者。
Kafka的消費者組概念,通過進程池瓜分消息并處理消息。這些進程可以在同一臺機器運行,也可分布到多臺機器上,以增加可擴展性和容錯性,相同group.id的消費者將視為同一個消費者組。
分組中的每個消費者都通過subscribe API動態(tài)的訂閱一個topic列表。kafka將已訂閱topic的消息發(fā)送到每個消費者組中。并通過平衡分區(qū)在消費者分組中所有成員之間來達到平均。因此每個分區(qū)恰好地分配1個消費者(一個消費者組中)。所有如果一個topic有4個分區(qū),并且一個消費者分組有只有2個消費者。那么每個消費者將消費2個分區(qū)。
消費者組的成員是動態(tài)維護的:如果一個消費者故障。分配給它的分區(qū)將重新分配給同一個分組中其他的消費者。同樣的,如果一個新的消費者加入到分組,將從現(xiàn)有消費者中移一個給它。這被稱為重新平衡分組。當新分區(qū)添加到訂閱的topic時,或者當創(chuàng)建與訂閱的正則表達式匹配的新topic時,也將重新平衡。將通過定時刷新自動發(fā)現(xiàn)新的分區(qū),并將其分配給分組的成員。
從概念上講,你可以將消費者分組看作是由多個進程組成的單一邏輯訂閱者。作為一個多訂閱系統(tǒng),Kafka支持對于給定topic任何數(shù)量的消費者組,而不重復(fù)。
這是在消息系統(tǒng)中常見的功能的略微概括。所有進程都將是單個消費者分組的一部分(類似傳統(tǒng)消息傳遞系統(tǒng)中的隊列的語義),因此消息傳遞就像隊列一樣,在組中平衡。與傳統(tǒng)的消息系統(tǒng)不同的是,雖然,你可以有多個這樣的組。但每個進程都有自己的消費者組(類似于傳統(tǒng)消息系統(tǒng)中pub-sub的語義),因此每個進程都會訂閱到該主題的所有消息。
此外,當分組重新分配自動發(fā)生時,可以通過ConsumerRebalanceListener通知消費者,這允許他們完成必要的應(yīng)用程序級邏輯,例如狀態(tài)清除,手動偏移提交等
它也允許消費者通過使用assign(Collection)手動分配指定分區(qū),如果使用手動指定分配分區(qū),那么動態(tài)分區(qū)分配和協(xié)調(diào)消費者組將失效。
訂閱一組topic,當調(diào)用poll(long)時,消費者將自動加入到消費者組中。只要持續(xù)調(diào)用poll,消費者將一直保持可用,并繼續(xù)從分配的分區(qū)中接收數(shù)據(jù)。此外,消費者向服務(wù)器定時發(fā)送心跳。如果消費者崩潰或無法再session.timeout.ms配置的時間內(nèi)發(fā)送心跳,則消費者就被視為死亡,并且其分區(qū)將被重新分配。
還有一種可能,消費者可能遇到活鎖的情況,它持續(xù)的發(fā)送心跳,但是沒有處理。為了預(yù)防消費者在這總情況下一直擁有分區(qū),我們使用max.poll.interval.ms活躍監(jiān)測機制。在此基礎(chǔ)上,如果你調(diào)用的poll的頻率大于最大間隔,則客戶端將主動地離開組,以便其他消費者接管該分區(qū)。發(fā)生這種情況時,你會看到offset提交失敗( 調(diào)用commitSync()引發(fā)的CommitFailedException )。這是一種安全機制,保障只有活動成員能夠提交offset。所以要留在組中,你必須持續(xù)調(diào)用poll。
消費者提供兩種配置設(shè)置來控制poll循環(huán):
max.poll.interval.ms: 增大poll的間隔,可以為消費者提供更多的時間去處理返回的消息(調(diào)用poll(long)返回的消息,通常返回的消息都是一批),缺點是此值越大將會延遲組重新平衡。
max.poll.records:此設(shè)置限制每次調(diào)用poll返回的消息數(shù),這樣可以更容易的預(yù)測每次poll間隔要處理的最大值。通過調(diào)整此值,可以減少poll間隔,減少重新平衡分組的
對于消息處理時間不可預(yù)測地情況,這些選項是不夠的。 處理這種情況的推薦方法是將消息處理移到另一個線程中,讓消費者繼續(xù)調(diào)用poll。 但是必須注意確保已提交的offset不超過實際位置。另外,你必須禁用自動提交,并只有在線程完成處理后才為記錄手動提交偏移量。 還要注意, 你需要pause暫停分區(qū),不會從poll接收到新消息,讓線程處理完之前返回的消息(如果你的處理能力比拉取消息的慢,那創(chuàng)建新線程將導(dǎo)致機器內(nèi)存溢出)。
實例:
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers","xxxxxxxxxx:9093"); props.put("group.id","test-6");//消費者組,只要group.id相同,就屬于同一個消費者組 props.put("enable.auto.commit","true");//自動提交offset props.put("auto.commit.interval.ms","1000"); // 自動提交時間間隔 props.put("max.poll.records","5"); // 拉取的數(shù)據(jù)條數(shù) props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer
配置說明:
bootstrap.servers: 集群是通過配置bootstrap.servers指定一個或多個broker。不用指定全部的broker,它將自動發(fā)現(xiàn)集群中的其余的borker(最好指定多個,萬一有服務(wù)器故障)
enable.auto.commit: 自動提交偏移量,如果設(shè)置了自動提交偏移量,下面這個設(shè)置就必須要用到了。
auto.commit.interval.ms:自動提交時間間隔,和自動提交偏移量配合使用
max.poll.records:控制從 broker拉取的消息條數(shù)
poll(long time): 當消費者獲取不到消息時,就會使用這個參數(shù),為了減輕無效的循環(huán)請求消息,消費者會每隔long time的時間請求一次消息,單位是毫秒。
session.timeout.ms: broker通過心跳機器自動檢測消費者組中失敗的進程,消費者會自動ping集群,告訴進群它還活著。只要消費者能夠做到這一點,它就被認為是活著的,并保留分配給它分區(qū)的權(quán)利,如果它停止心跳的時間超過session.timeout.ms,那么就會認為是故障的,它的分區(qū)將被分配到別的進程。
auto.offset.reset:這個屬性很重要,一會詳細講解
這里說明一下auto.commit.interval.ms以及何時提交消費者偏移量,經(jīng)過測試:
設(shè)置props.put("auto.commit.interval.ms","60000");
自動提交時間為一分鐘,也就是你在這一分鐘內(nèi)拉取任何數(shù)量的消息都不會被提交消費的當前偏移量,如果你此時關(guān)閉消費者(一分鐘內(nèi)),下次消費還是從和第一次的消費數(shù)據(jù)一樣,即使你在一分鐘內(nèi)消費完所有的消息,只要你在一分鐘內(nèi)關(guān)閉程序,導(dǎo)致提交不了offset,就可以一直重復(fù)消費數(shù)據(jù)。
設(shè)置props.put("auto.commit.interval.ms","3000");
但是在消費過程中設(shè)置sleep。
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers","xxxxxxxxxxxx:9093"); props.put("group.id","test-6");//消費者組,只要group.id相同,就屬于同一個消費者組 props.put("enable.auto.commit","true");//自動提交offset props.put("auto.commit.interval.ms","100000"); // 自動提交時間間隔 props.put("max.poll.records","5"); // 拉取的數(shù)據(jù)條數(shù) props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); // KafkaConsumer
這里如果你消費了第一批數(shù)據(jù),在執(zhí)行第二次poll的時候,關(guān)閉程序也不會提交偏移量,只有在執(zhí)行第二次poll的時候才會把上一次的最后一個offset提交上去。
auto.offset.reset講解:
auto.offset.reset的值有三種:earliest,latest,none,代表者不同的意思
earliest: 當各分區(qū)下有已經(jīng)提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費,最常用的值 latest: 當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù) none: topic各分區(qū)都存在已提交的offset時,從offset后開始消費,只要有一個分區(qū)不存在已提交的offset,則拋出異常 !!注意:當使用了latest,并且分區(qū)沒有已提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù),其實是把offset的值直接設(shè)置到最后一個消息的位置。例如,有個30條數(shù)據(jù)的demo的topic,各分區(qū)無提交offset,使用了latest,再看offset就會發(fā)現(xiàn)已經(jīng)在30的位置了,所以才只能消費新產(chǎn)生的數(shù)據(jù)!!!!
不需要定時提交偏移量,可以自己控制offset,當消息已經(jīng)被我們消費過后,再去手動提交他們的偏移量。這個很適合我們的一些處理邏輯。
手動提交offset的方法有兩種:分別是commitSync(同步提交) 和commitAsync(異步提交)。兩者的相同點,都會將本次poll的一批數(shù)據(jù)最高的偏移量提交;不同點是commitSync會失敗重試,一直到提交成功(如果有不可恢復(fù)的原因?qū)е拢矔峤皇。湃ダ⌒聰?shù)據(jù)。而commitAsync則沒有重試機制(提交了就去拉取新數(shù)據(jù),不管這次的提交有沒有成功),故有可能提交失敗。
實例:
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers","XXXXXC:9093"); props.put("group.id","test-11");//消費者組,只要group.id相同,就屬于同一個消費者組 props.put("enable.auto.commit","false");//自動提交offset props.put("auto.commit.interval.ms","1000"); // 自動提交時間間隔 props.put("max.poll.records","20"); // 拉取的數(shù)據(jù)條數(shù) props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer
這些都是全部提交偏移量,如果我們想更細致的控制偏移量提交,可以自定義提交偏移量:
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers","XXXXXXXXXX:9093"); props.put("group.id","test-18");//消費者組,只要group.id相同,就屬于同一個消費者組 props.put("enable.auto.commit","false");//自動提交offset props.put("auto.commit.interval.ms","1000000"); // 自動提交時間間隔 props.put("max.poll.records","5"); // 拉取的數(shù)據(jù)條數(shù) props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer
通過消費者Kafka會通過分區(qū)分配分給消費者一個分區(qū),但是我們也可以指定分區(qū)消費消息,要使用指定分區(qū),只需要調(diào)用assign(Collection)消費指定的分區(qū)即可:
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers","XXXXXXXXX:9093"); props.put("group.id","test-19");//消費者組,只要group.id相同,就屬于同一個消費者組 props.put("enable.auto.commit","false");//自動提交offset props.put("auto.commit.interval.ms","1000000"); // 自動提交時間間隔 props.put("max.poll.records","5"); // 拉取的數(shù)據(jù)條數(shù) props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer
一旦手動分配分區(qū),你可以在循環(huán)中調(diào)用poll。消費者分區(qū)任然需要提交offset,只是現(xiàn)在分區(qū)的設(shè)置只能通過調(diào)用assign 修改,因為手動分配不會進行分組協(xié)調(diào),因此消費者故障或者消費者的數(shù)量變動都不會引起分區(qū)重新平衡。每一個消費者是獨立工作的(即使和其他的消費者共享GroupId)。為了避免offset提交沖突,通常你需要確認每一個consumer實例的groupId都是唯一的。
注意:
手動分配分區(qū)(assgin)和動態(tài)分區(qū)分配的訂閱topic模式(subcribe)不能混合使用。
EI企業(yè)智能 Java Kafka 可信智能計算服務(wù) TICS 智能數(shù)據(jù)
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。