kafka系統(tǒng)入門(mén)教程

      網(wǎng)友投稿 996 2025-03-31

      一、Kafka簡(jiǎn)介

      kafka is a distributed,partitioned,replicated commit logservice。它提供了類似于JMS的特性,但是在實(shí)現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實(shí)現(xiàn)。kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類,發(fā)送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個(gè)kafka實(shí)例組成,每個(gè)實(shí)例()成為broker。無(wú)論是kafka集群,還是producer和consumer都依賴于zookeeper來(lái)保證系統(tǒng)可用性集群保存一些meta信息。

      二、基本架構(gòu)圖

      三、基本概念解釋

      1)Broker

      Kafka集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱為broker。broker端不維護(hù)數(shù)據(jù)的消費(fèi)狀態(tài),提升了性能。直接使用磁盤(pán)進(jìn)行存儲(chǔ),線性讀寫(xiě),速度快:避免了數(shù)據(jù)在JVM內(nèi)存和系統(tǒng)內(nèi)存之間的復(fù)制,減少耗性能的創(chuàng)建對(duì)象和垃圾回收。

      2)Producer

      負(fù)責(zé)發(fā)布消息到Kafka broker;Producer將消息發(fā)布到指定的Topic中,同時(shí)Producer也能決定將此消息歸屬于哪個(gè)partition;比如基于"round-robin"方式或者通過(guò)其他的一些算法等.

      3)Consumer

      消息消費(fèi)者,向Kafka broker讀取消息的客戶端,consumer從broker拉取(pull)數(shù)據(jù)并進(jìn)行處理。

      本質(zhì)上kafka只支持Topic.每個(gè)consumer屬于一個(gè)consumer group;反過(guò)來(lái)說(shuō),每個(gè)group中可以有多個(gè)consumer.發(fā)送到Topic的消息,只會(huì)被訂閱此Topic的每個(gè)group中的一個(gè)consumer消費(fèi).

      如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會(huì)在consumers之間負(fù)載均衡.

      如果所有的consumer都具有不同的group,那這就是"發(fā)布-訂閱";消息將會(huì)廣播給所有的消費(fèi)者.

      在kafka中,一個(gè)partition中的消息只會(huì)被group中的一個(gè)consumer消費(fèi);每個(gè)group中consumer消息消費(fèi)互相獨(dú)立;我們可以認(rèn)為一個(gè)group是一個(gè)"訂閱"者,一個(gè)Topic中的每個(gè)partions,只會(huì)被一個(gè)"訂閱者"中的一個(gè)consumer消費(fèi),不過(guò)一個(gè)consumer可以消費(fèi)多個(gè)partitions中的消息.kafka只能保證一個(gè)partition中的消息被某個(gè)consumer消費(fèi)時(shí),消息是順序的.事實(shí)上,從Topic角度來(lái)說(shuō),消息仍不是有序的.

      4)Topic

      每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為T(mén)opic。(物理上不同Topic的消息分開(kāi)存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)

      5)Partition

      Parition是物理上的概念,每個(gè)topic將被分成多個(gè)partition(區(qū)),每個(gè)partition在存儲(chǔ)層面是append log文件

      6)Consumer Group

      每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name,若不指定group name則屬于默認(rèn)的group)

      7)Topic & Partition

      Topic在邏輯上可以被認(rèn)為是一個(gè)queue,每條消費(fèi)都必須指定它的Topic,可以簡(jiǎn)單理解為必須指明把這條消息放進(jìn)哪個(gè)queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個(gè)或多個(gè)Partition,每個(gè)Partition在物理上對(duì)應(yīng)一個(gè)文件夾,該文件夾下存儲(chǔ)這個(gè)Partition的所有消息和索引文件。若創(chuàng)建topic1和topic2兩個(gè)topic,且分別有13個(gè)和19個(gè)分區(qū),則整個(gè)集群上會(huì)相應(yīng)會(huì)生成共32個(gè)文件夾(本文所用集群共8個(gè)節(jié)點(diǎn),此處topic1和topic2 replication-factor均為1)。

      partitions的目的有多個(gè).最根本原因是kafka基于文件存儲(chǔ).通過(guò)分區(qū),可以將日志內(nèi)容分散到多個(gè)上,來(lái)避免文件尺寸達(dá)到單機(jī)磁盤(pán)的上限,每個(gè)partiton都會(huì)被當(dāng)前broker(kafka實(shí)例)保存;可以將一個(gè)topic切分多任意多個(gè)partitions,來(lái)提高消息保存/消費(fèi)的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升并發(fā)消費(fèi)的能力.

      當(dāng)segment文件尺寸達(dá)到一定閥值時(shí)(可以通過(guò)配置文件設(shè)定,默認(rèn)1G),將會(huì)創(chuàng)建一個(gè)新的文件;當(dāng)buffer中消息的條數(shù)達(dá)到閥值時(shí)將會(huì)觸發(fā)日志信息flush到日志文件中,同時(shí)如果"距離最近一次flush的時(shí)間差"達(dá)到閥值時(shí),也會(huì)觸發(fā)flush到日志文件.如果broker失效,極有可能會(huì)丟失那些尚未flush到文件的消息.因?yàn)橐馔鈱?shí)現(xiàn),仍然會(huì)導(dǎo)致log文件格式的破壞(文件尾部),那么就要求當(dāng)server啟東是需要檢測(cè)最后一個(gè)segment的文件結(jié)構(gòu)是否合法并進(jìn)行必要的修復(fù).

      獲取消息時(shí),需要指定offset和最大chunk尺寸,offset用來(lái)表示消息的起始位置,chunk size用來(lái)表示最大獲取消息的總長(zhǎng)度(間接的表示消息的條數(shù)).根據(jù)offset,可以找到此消息所在segment文件,然后根據(jù)segment的最小offset取差值,得到它在file中的相對(duì)位置,直接讀取輸出即可.

      日志文件的刪除策略非常簡(jiǎn)單:啟動(dòng)一個(gè)后臺(tái)線程定期掃描log file列表,把保存時(shí)間超過(guò)閥值的文件直接刪除(根據(jù)文件的創(chuàng)建時(shí)間).為了避免刪除文件時(shí)仍然有read操作(consumer消費(fèi)),采取copy-on-write方式.

      8、分配

      kafka使用zookeeper來(lái)存儲(chǔ)一些meta信息,并使用了zookeeper watch機(jī)制來(lái)發(fā)現(xiàn)meta信息的變更并作出相應(yīng)的動(dòng)作(比如consumer失效,觸發(fā)負(fù)載均衡等)

      Broker node registry: 當(dāng)一個(gè)kafka broker啟動(dòng)后,首先會(huì)向zookeeper注冊(cè)自己的節(jié)點(diǎn)信息(臨時(shí)znode),同時(shí)當(dāng)broker和zookeeper斷開(kāi)連接時(shí),此znode也會(huì)被刪除.

      格式: /broker/ids/[0…N] -->host:port;其中[0…N]表示broker id,每個(gè)broker的配置文件中都需要指定一個(gè)數(shù)字類型的id(全局不可重復(fù)),znode的值為此broker的host:port信息.

      Broker Topic Registry: 當(dāng)一個(gè)broker啟動(dòng)時(shí),會(huì)向zookeeper注冊(cè)自己持有的topic和partitions信息,仍然是一個(gè)臨時(shí)znode.

      格式: /broker/topics/[topic]/[0…N] 其中[0…N]表示partition索引號(hào).

      Consumer and Consumer group: 每個(gè)consumer被創(chuàng)建時(shí),會(huì)向zookeeper注冊(cè)自己的信息;此作用主要是為了"負(fù)載均衡".

      一個(gè)group中的多個(gè)consumer可以交錯(cuò)的消費(fèi)一個(gè)topic的所有partitions;簡(jiǎn)而言之,保證此topic的所有partitions都能被此group所消費(fèi),且消費(fèi)時(shí)為了性能考慮,讓partition相對(duì)均衡的分散到每個(gè)consumer上.

      Consumer id Registry: 每個(gè)consumer都有一個(gè)唯一的ID(host:uuid,可以通過(guò)配置文件指定,也可以由系統(tǒng)生成),此id用來(lái)標(biāo)記消費(fèi)者信息.

      格式:/consumers/[group_id]/ids/[consumer_id]

      仍然是一個(gè)臨時(shí)的znode,此節(jié)點(diǎn)的值為{“topic_name”:#streams…},即表示此consumer目前所消費(fèi)的topic + partitions列表.

      Consumer offset Tracking: 用來(lái)跟蹤每個(gè)consumer目前所消費(fèi)的partition中最大的offset.

      格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]–>offset_value

      此znode為持久節(jié)點(diǎn),可以看出offset跟group_id有關(guān),以表明當(dāng)group中一個(gè)消費(fèi)者失效,其他consumer可以繼續(xù)消費(fèi).

      Partition Owner registry: 用來(lái)標(biāo)記partition被哪個(gè)consumer消費(fèi).臨時(shí)znode

      格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]–>consumer_node_id當(dāng)consumer啟動(dòng)時(shí),所觸發(fā)的操作:

      A) 首先進(jìn)行"Consumer id Registry";

      B) 然后在"Consumer id Registry"節(jié)點(diǎn)下注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)當(dāng)前group中其他consumer的"leave"和"join";只要此znode path下節(jié)點(diǎn)列表變更,都會(huì)觸發(fā)此group下consumer的負(fù)載均衡.(比如一個(gè)consumer失效,那么其他consumer接管partitions).

      C) 在"Broker id registry"節(jié)點(diǎn)下,注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)broker的存活情況;如果broker列表變更,將會(huì)觸發(fā)所有的groups下的consumer重新balance.

      9、生產(chǎn)者消費(fèi)者通信圖

      Producer端使用zookeeper用來(lái)"發(fā)現(xiàn)"broker列表,以及和Topic下每個(gè)partition leader建立socket連接并發(fā)送消息.

      Broker端使用zookeeper用來(lái)注冊(cè)broker信息,以及監(jiān)測(cè)partition leader存活性.

      Consumer端使用zookeeper用來(lái)注冊(cè)consumer信息,其中包括consumer消費(fèi)的partition列表等,同時(shí)也用來(lái)發(fā)現(xiàn)broker列表,并和partition leader建立socket連接,并獲取消息

      六、主要配置

      1.Broker配置

      /usr/local/software/kafka/kafka_2.11-1.1.0/config/server.properties

      #每一個(gè)boker都有一個(gè)唯一的id作為它們的名字。當(dāng)該服務(wù)器的IP地址發(fā)生改變時(shí),broker.id沒(méi)有變化,則不會(huì)影響consumers的消息情況

      broker.id=0

      #broker server服務(wù)端口

      port=9092

      #broker的主機(jī)地址,若是設(shè)置了,那么會(huì)綁定到這個(gè)地址上,若是沒(méi)有,會(huì)綁定到所有的接口上,并將其中之一發(fā)送到ZK

      #host.name=

      #broker處理消息的最大線程數(shù),一般情況下數(shù)量為cpu核數(shù)

      num.network.threads=3

      #處理IO的線程數(shù)

      num.io.threads=8

      kafka系統(tǒng)入門(mén)教程

      socket.send.buffer.bytes=102400

      socket.receive.buffer.bytes=102400

      socket.request.max.bytes=104857600

      #kafka數(shù)據(jù)的存放地址,多個(gè)地址的話用逗號(hào)分割,多個(gè)目錄分布在不同磁盤(pán)上可以提高讀寫(xiě)性能? /data/kafka-logs-1,/data/kafka-logs-2

      log.dirs=/tmp/kafka-logs

      #默認(rèn)分區(qū)數(shù)

      num.partitions=2

      num.recovery.threads.per.data.dir=1

      offsets.topic.replication.factor=1

      transaction.state.log.replication.factor=1

      transaction.state.log.min.isr=1

      #控制一個(gè)log保留多長(zhǎng)個(gè)小時(shí)

      log.retention.hours=168

      #單一的log segment文件大小

      log.segment.bytes=1073741824

      log.retention.check.interval.ms=300000

      #是否log cleaning

      log.cleaner.enable=false

      #指定zookeeper連接字符串, 格式如hostname:port/chroot。chroot是一個(gè)namespace

      zookeeper.connect=localhost:2181

      #連接zk的session超時(shí)時(shí)間

      zookeeper.connection.timeout.ms=6000

      group.initial.rebalance.delay.ms=0

      2.消費(fèi)者主要配置

      bootstrap.servers=localhost:9092

      #當(dāng)前消費(fèi)者的Group名稱,需要指定

      group.id=test-consumer-group

      3.生產(chǎn)者的主要配置

      bootstrap.servers=localhost:9092

      compression.type=none

      以上是關(guān)于kafka一些基礎(chǔ)說(shuō)明,在其中我們知道如果要kafka正常運(yùn)行,必須配置zookeeper,否則無(wú)論是kafka集群還是的生存者和消費(fèi)者都無(wú)法正常的工作的,以下是對(duì)zookeeper進(jìn)行一些簡(jiǎn)單的介紹:

      七、Zookeeper集群

      zookeeper是一個(gè)為分布式應(yīng)用提供一致性的服務(wù)的軟件,它是開(kāi)源的Hadoop項(xiàng)目的一個(gè)子項(xiàng)目,并根據(jù)google發(fā)表的一篇論文來(lái)實(shí)現(xiàn)的。zookeeper為分布式系統(tǒng)提供了高效且易于使用的協(xié)同服務(wù),它可以為分布式應(yīng)用提供相當(dāng)多的服務(wù),諸如統(tǒng)一命名服務(wù),配置管理,狀態(tài)同步和組服務(wù)等。zookeeper接口簡(jiǎn)單,我們不必過(guò)多地糾結(jié)在分布式系統(tǒng)難于處理的同步和一致性問(wèn)題上,你可以使用zookeeper提供的現(xiàn)成(off-the-shelf)服務(wù)來(lái)實(shí)現(xiàn)來(lái)實(shí)現(xiàn)分布式系統(tǒng)額配置管理,組管理,Leader選舉等功能。

      Zookeeper安裝方式有三種,單機(jī)模式和集群模式以及偽集群模式。

      ■ 單機(jī)模式:Zookeeper只運(yùn)行在一臺(tái)服務(wù)器上,適合測(cè)試環(huán)境;

      ■ 偽集群模式:就是在一臺(tái)物理機(jī)上運(yùn)行多個(gè)Zookeeper 實(shí)例;

      ■ 集群模式:Zookeeper運(yùn)行于一個(gè)集群上,適合生產(chǎn)環(huán)境,這個(gè)計(jì)算機(jī)集群被稱為一個(gè)“集合體”(ensemble)

      Zookeeper通過(guò)復(fù)制來(lái)實(shí)現(xiàn)高可用性,只要集合體中半數(shù)以上的機(jī)器處于可用狀態(tài),它就能夠保證服務(wù)繼續(xù)。為什么一定要超過(guò)半數(shù)呢?這跟Zookeeper的復(fù)制策略有關(guān):zookeeper確保對(duì)znode 樹(shù)的每一個(gè)修改都會(huì)被復(fù)制到集合體中超過(guò)半數(shù)的機(jī)器上。

      關(guān)于Zookeeper的功能和工作原理可以參考:https://www.cnblogs.com/felixzh/p/5869212.html

      分布式 Kafka 存儲(chǔ)

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:工資流程圖制作模板(工資流程圖制作模板軟件
      下一篇:頁(yè)腳文字上方如何添加橫線(頁(yè)腳橫線怎么添加在文字上方)
      相關(guān)文章
      亚洲av成本人无码网站| 亚洲理论在线观看| 亚洲另类视频在线观看| 久久噜噜噜久久亚洲va久| 成人午夜亚洲精品无码网站| 亚洲国产V高清在线观看| 精品无码专区亚洲| 国产精品亚洲精品久久精品| 亚洲欧洲精品成人久久曰| 亚洲日韩AV无码一区二区三区人| 亚洲sss综合天堂久久久| 亚洲天堂2016| 亚洲日本成本人观看| 亚洲精品无码久久久久秋霞 | 亚洲GV天堂无码男同在线观看| 久久久久亚洲国产| 亚洲欧美日韩综合久久久| 亚洲精品9999久久久久无码 | 亚洲国产精品久久久天堂| 久久久久亚洲精品美女| 亚洲第一精品在线视频| 少妇中文字幕乱码亚洲影视| 亚洲黄色网站视频| 亚洲中文字幕无码av在线| 亚洲人成综合在线播放| 波多野结衣亚洲一级| 亚洲国产AV一区二区三区四区| 亚洲AV香蕉一区区二区三区| 国产天堂亚洲精品| 相泽亚洲一区中文字幕| 国产亚洲精品a在线观看app| 亚洲国产精品久久| 亚洲国产精品久久久久秋霞影院| 国产成人亚洲精品| 亚洲AV无码成人精品区日韩| 亚洲美日韩Av中文字幕无码久久久妻妇 | 亚洲AV无码一区二区二三区软件 | 久久精品国产亚洲夜色AV网站| 久久久无码精品亚洲日韩按摩| 亚洲欧洲精品久久| 亚洲另类无码专区首页|