kafka介紹與安裝
Kafka介紹與安裝

Apache Kafka? 是 一個分布式流處理平臺,是一個發布訂閱系統
Kafka適合什么樣的場景?
構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。 (相當于message queue
構建實時流式應用程序,對這些流數據進行轉換或者影響。 (就是流處理,通過kafka stream topic和topic之間內部進行變化)
常用的場景將微服務日志輸入到kafka中,通過kafka的高速緩存特性降低系統延遲,然后異步消費寫入到ELK中。
kafka每條記錄中包含一個key,一個value和一個timestamp(時間戳)。
Kafka有四個核心的API:
The Producer API 允許一個應用程序發布一串流式的數據到一個或者多個Kafka topic。
The Consumer API 允許一個應用程序訂閱一個或多個 topic ,并且對發布給他們的流式數據進行處理。
The Streams API 允許一個應用程序作為一個流處理器,消費一個或者多個topic產生的輸入流,然后生產一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換。
The Connector API 允許構建并運行可重用的生產者或者消費者,將Kafka topics連接到已存在的應用程序或者數據系統。比如,連接到一個關系型數據庫,捕捉表(table)的所有變更內容。
Topics
Topic 就是數據主題,是數據記錄發布的地方,可以用來區分業務系統。Kafka中的Topics總是多訂閱者模式,一個topic可以擁有一個或者多個消費者來訂閱它的數據。
Kafka 集群保留所有發布的記錄—無論他們是否已被消費—并通過一個可配置的參數——保留期限來控制. 舉個例子, 如果保留策略設置為2天,一條記錄發布后兩天內,可以隨時被消費,兩天過后這條記錄會被拋棄并釋放磁盤空間。Kafka的性能和數據大小無關.
Kafka 消費者是非常廉價的—消費者的增加和減少,對集群或者其他消費者沒有多大的影響。
分布式
日志的分區partition (分布)在Kafka集群的服務器上。每個服務器在處理數據和請求時,共享這些分區。每一個分區都會在已配置的服務器上進行備份,確保容錯性.
生產者
生產者可以將數據發布到所選擇的topic(主題)中。生產者負責將記錄分配到topic的哪一個 partition(分區)中。可以使用循環的方式來簡單地實現負載均衡,也可以根據某些語義分區函數(例如:記錄中的key)來完成。
消費者
消費者使用一個 消費組 名稱來進行標識,發布到topic中的每條記錄被分配給訂閱消費組中的一個消費者實例.消費者實例可以分布在多個進程中或者多個機器上。
如果所有的消費者實例在同一消費組中,消息記錄會負載平衡到每一個消費者實例.
如果所有的消費者實例在不同的消費組中,每條消息記錄會廣播到所有
Kafka作為消息系統
隊列的優點在于它允許你將處理數據的過程分給多個消費者實例,使你可以擴展處理過程。 不好的是,隊列不是多訂閱者模式的—一旦一個進程讀取了數據,數據就會被丟棄。 而發布-訂閱系統允許你廣播數據到多個進程,但是無法進行擴展處理,因為每條消息都會發送給所有的訂閱者。
與RabbitMq隊列相比,kafka具有嚴格的順序保證。
傳統隊列在服務器上保存有序的記錄,如果多個消費者消費隊列中的數據, 服務器將按照存儲順序輸出記錄。 雖然服務器按順序輸出記錄,但是記錄被異步傳遞給消費者, 因此記錄可能會無序的到達不同的消費者。這意味著在并行消耗的情況下, 記錄的順序是丟失的。因此消息系統通常使用“唯一消費者”的概念,即只讓一個進程從隊列中消費, 但這就意味著不能夠并行地處理數據。
比如 商品sku1的商品進行庫存變更先后有兩條記錄 msg1 庫存1,msg2 庫存2,有多個消費者時雖然先發送msg1后發送msg2,但是并發條件下可能msg2的消費者先處理完成,msg1的消費者還在消費的情況,導致時序丟失。
Kafka topic中的partition是一個并行的概念。 Kafka能夠為一個消費者池提供順序保證和負載平衡,是通過將topic中的partition分配給消費者組中的消費者來實現的, 以便每個分區由消費組中的一個消費者消耗。通過這樣,我們能夠確保消費者是該分區的唯一讀者,并按順序消費數據。 眾多分區保證了多個消費者實例間的負載均衡。但請注意,消費者組中的消費者實例個數不能超過分區的數量。
kafka可以通過記錄中的key進行分區分配。
下載與安裝
- http://kafka.apache.org/downloads
Binary downloads:
Scala 2.12 - kafka_2.12-2.8.0.tgz (asc, sha512)
解壓文件
$ tar -xzf kafka_2.13-2.8.0.tgz
$ cd kafka_2.13-2.8.0
啟動zookeeper 新版本中將不再依賴外部的zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
啟動kafka
$ bin/kafka-server-start.sh config/server.properties
啟動成功 started (kafka.server.KafkaServer)
[2021-07-30 21:46:39,446] INFO Kafka version: 2.8.0 (org.apache.kafka.common.utils.AppInfoParser) [2021-07-30 21:46:39,446] INFO Kafka commitId: ebb1d6e21cc92130 (org.apache.kafka.common.utils.AppInfoParser) [2021-07-30 21:46:39,447] INFO Kafka startTimeMs: 1627652799443 (org.apache.kafka.common.utils.AppInfoParser) [2021-07-30 21:46:39,448] INFO [KafkaServer id=0] started (kafka.server.KafkaServer) [2021-07-30 21:46:39,495] INFO [broker-0-to-controller-send-thread]: Recorded new controller, from now on will use broker 172.20.25.8:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
創建一個主題
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.
查看主題
bin/kafka-topics.sh --list --zookeeper localhost:2181
啟動生產者
$ ./bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
發送消息 hello message
啟動消費者
啟動多個消費者,每個消費者都可以收到消息
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
若想清理數據
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
集群
在本級上創建3個節點
復制配置文件
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
編輯這些新文件并設置如下屬性:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id屬性是集群中每個節點的名稱,這一名稱是唯一且永久的。我們必須重寫端口和日志目錄,因為我們在同一臺機器上運行這些,我們不希望所有的代理嘗試在同一個端口注冊,或者覆蓋彼此的數據。
啟動兩個新的節點:
> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...
創建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic: my-replicated-topic TopicId: U842ZMiFQAyZgdiUSJ5ICA PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
第一行給出了所有分區的摘要,下面的每行都給出了一個分區的信息。因為我們只有一個分區,所以只有一行。
“leader”是負責給定分區所有讀寫操作的節點。每個節點都是隨機選擇的部分分區的領導者。
“replicas”是復制分區日志的節點列表,不管這些節點是leader還是僅僅活著。
“isr”是一組“同步”replicas,是replicas列表的子集,它活著并被指到leader。
查看原先的topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic quickstart-events
Topic: quickstart-events TopicId: YmOLaKbBTxitS9AgDfs8LQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
原來的主題沒有副本且在服務器0上。我們創建集群時,這是唯一的服務器.
當有一個kafka服務掛掉時服務仍可正常運營。
Kafka RabbitMQ
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。