【Kafka運(yùn)維】Kafka全網(wǎng)最全最詳細(xì)運(yùn)維命令合集(精品強(qiáng)烈建議收藏!!!)
推薦一款非常好用的kafka管理平臺(tái),
kafka的靈魂伴侶
滴滴開(kāi)源Logi-KafkaManager 一站式Kafka監(jiān)控與管控平臺(tái)
本文所有命令,博主均全部操作驗(yàn)證過(guò),保證準(zhǔn)確性; 非復(fù)制粘貼拼湊文章; 如果想了解更多工具命令,可在評(píng)論區(qū)留下評(píng)論,博主會(huì)擇期加上;
博主正在連載 Kafka源碼、Kafka運(yùn)維、Kafka實(shí)踐系列文章 并且相關(guān)文章會(huì)配套錄制視頻
本文為專欄第一篇?dú)g迎關(guān)注
<石臻臻的雜貨鋪>
不迷路!!!
以下大部分運(yùn)維操作,都可以使用 LogI-Kafka-Manager 在平臺(tái)上可視化操作;
@[TOC]
1.TopicCommand
1.1.Topic創(chuàng)建
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
相關(guān)可選參數(shù)
1.2.刪除Topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
支持正則表達(dá)式匹配Topic來(lái)進(jìn)行刪除,只需要將topic 用雙引號(hào)包裹起來(lái)
例如: 刪除以create_topic_byhand_zk為開(kāi)頭的topic;
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “create_topic_byhand_zk.*”
.表示任意匹配除換行符 \n 之外的任何單字符。要匹配 . ,請(qǐng)使用 . 。
·*·:匹配前面的子表達(dá)式零次或多次。要匹配 * 字符,請(qǐng)使用 *。
.* : 任意字符
刪除任意Topic (慎用)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “.*?”
更多的用法請(qǐng)參考正則表達(dá)式
1.3.Topic分區(qū)擴(kuò)容
zk方式(不推薦)
>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
kafka版本 >= 2.2 支持下面方式(推薦)
單個(gè)Topic擴(kuò)容
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4
批量擴(kuò)容 (將所有正則表達(dá)式匹配到的Topic分區(qū)擴(kuò)容到4個(gè))
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4
".*?" 正則表達(dá)式的意思是匹配所有; 您可按需匹配
PS: 當(dāng)某個(gè)Topic的分區(qū)少于指定的分區(qū)數(shù)時(shí)候,他會(huì)拋出異常;但是不會(huì)影響其他Topic正常進(jìn)行;
相關(guān)可選參數(shù)
PS: 雖然這里配置的是全部的分區(qū)副本分配配置,但是正在生效的是新增的分區(qū);
比如: 以前3分區(qū)1副本是這樣的
現(xiàn)在新增一個(gè)分區(qū),--replica-assignment 2,1,3,4 ; 看這個(gè)意思好像是把0,1號(hào)分區(qū)互相換個(gè)Broker
但是實(shí)際上不會(huì)這樣做,Controller在處理的時(shí)候會(huì)把前面3個(gè)截掉; 只取新增的分區(qū)分配方式,原來(lái)的還是不會(huì)變
1.4.查詢Topic描述
1.查詢單個(gè)Topic
sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal
2.批量查詢Topic(正則表達(dá)式匹配,下面是查詢所有Topic)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal
支持正則表達(dá)式匹配Topic,只需要將topic 用雙引號(hào)包裹起來(lái)
相關(guān)可選參數(shù)
5.查詢Topic列表
1.查詢所有Topic列表
sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal
2.查詢匹配Topic列表(正則表達(dá)式)
查詢test_create_開(kāi)頭的所有Topic列表
sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"
相關(guān)可選參數(shù)
2.ConfigCommand
Config相關(guān)操作; 動(dòng)態(tài)配置可以覆蓋默認(rèn)的靜態(tài)配置;
2.1 查詢配置
展示關(guān)于Topic的動(dòng)靜態(tài)配置
1.查詢單個(gè)Topic配置(只列舉動(dòng)態(tài)配置)
sh bin/kafka-configs.sh --describe --bootstrap-server xxxxx:9092 --topic test_create_topic
或者
sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics --entity-name test_create_topic
2.查詢所有Topic配置(包括內(nèi)部Topic)(只列舉動(dòng)態(tài)配置)
sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics
3.查詢Topic的詳細(xì)配置(動(dòng)態(tài)+靜態(tài))
只需要加上一個(gè)參數(shù)--all
同理 ;只需要將--entity-type 改成對(duì)應(yīng)的類型就行了 (topics/clients/users/brokers/broker-loggers)
sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version
所有可配置的動(dòng)態(tài)配置 請(qǐng)看最后面的 附件 部分
2.2 增刪改 配置 --alter
–alter
刪除配置: --delete-config k1=v1,k2=v2
添加/修改配置: --add-config k1,k2
選擇類型: --entity-type (topics/clients/users/brokers/broker-
loggers)
類型名稱: --entity-name
--add-config
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999
--delete-config
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms
類型有: (topics/clients/users/brokers/broker- loggers)
哪些配置可以修改 請(qǐng)看最后面的附件:ConfigCommand 的一些可選配置
3.副本擴(kuò)縮、分區(qū)遷移、跨路徑遷移 kafka-reassign-partitions
請(qǐng)戳 【kafka運(yùn)維】副本擴(kuò)縮容、數(shù)據(jù)遷移、副本重分配、副本跨路徑遷移 (如果點(diǎn)不出來(lái),表示文章暫未發(fā)表,請(qǐng)耐心等待)
4.Topic的發(fā)送kafka-console-producer.sh
4.1 生產(chǎn)無(wú)key消息
## 生產(chǎn)者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties
4.2 生產(chǎn)有key消息
加上屬性--property parse.key=true
## 生產(chǎn)者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true
默認(rèn)消息key與消息value間使用“Tab鍵”進(jìn)行分隔,所以消息key以及value中切勿使用轉(zhuǎn)義字符(\t)
可選參數(shù)
5. Topic的消費(fèi)kafka-console-consumer.sh
1. 新客戶端從頭消費(fèi)--from-beginning (注意這里是新客戶端,如果之前已經(jīng)消費(fèi)過(guò)了是不會(huì)從頭消費(fèi)的)
下面沒(méi)有指定客戶端名稱,所以每次執(zhí)行都是新客戶端都會(huì)從頭消費(fèi)
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
2. 正則表達(dá)式匹配topic進(jìn)行消費(fèi)--whitelist
消費(fèi)所有的topic
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’
消費(fèi)所有的topic,并且還從頭消費(fèi)
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning
3.顯示key進(jìn)行消費(fèi)--property print.key=true
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true
4. 指定分區(qū)消費(fèi)--partition 指定起始偏移量消費(fèi)--offset
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100
5. 給客戶端命名--group
注意給客戶端命名之后,如果之前有過(guò)消費(fèi),那么--from-beginning就不會(huì)再?gòu)念^消費(fèi)了
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group
6. 添加客戶端屬性--consumer-property
這個(gè)參數(shù)也可以給客戶端添加屬性,但是注意 不能多個(gè)地方配置同一個(gè)屬性,他們是互斥的;比如在下面的基礎(chǔ)上還加上屬性--group test-group 那肯定不行
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group
7. 添加客戶端屬性--consumer.config
跟--consumer-property 一樣的性質(zhì),都是添加客戶端的屬性,不過(guò)這里是指定一個(gè)文件,把屬性寫在文件里面, --consumer-property 的優(yōu)先級(jí)大于 --consumer.config
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties
6.kafka-leader-election Leader重新選舉
6.1 指定Topic指定分區(qū)用重新PREFERRED:優(yōu)先副本策略 進(jìn)行Leader重選舉
> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0
6.2 所有Topic所有分區(qū)用重新PREFERRED:優(yōu)先副本策略 進(jìn)行Leader重選舉
sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred --all-topic-partitions
6.3 設(shè)置配置文件批量指定topic和分區(qū)進(jìn)行Leader重選舉
先配置leader-election.json文件
{ "partitions": [ { "topic": "test_create_topic4", "partition": 1 }, { "topic": "test_create_topic4", "partition": 2 } ] }
sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred --path-to-json-file config/leader-election.json
相關(guān)可選參數(shù)
7. 持續(xù)批量推送消息kafka-verifiable-producer.sh
單次發(fā)送100條消息--max-messages 100
一共要推送多少條,默認(rèn)為-1,-1表示一直推送到進(jìn)程關(guān)閉位置
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --max-messages 100
每秒發(fā)送最大吞吐量不超過(guò)消息 --throughput 100
推送消息時(shí)的吞吐量,單位messages/sec。默認(rèn)為-1,表示沒(méi)有限制
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --throughput 100
發(fā)送的消息體帶前綴--value-prefix
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --value-prefix 666
注意--value-prefix 666必須是整數(shù),發(fā)送的消息體的格式是加上一個(gè) 點(diǎn)號(hào). 例如: 666.
其他參數(shù):
--producer.config CONFIG_FILE 指定producer的配置文件
--acks ACKS 每次推送消息的ack值,默認(rèn)是-1
8. 持續(xù)批量拉取消息kafka-verifiable-consumer
持續(xù)消費(fèi)
sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4
單次最大消費(fèi)10條消息--max-messages 10
sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 --max-messages 10
相關(guān)可選參數(shù)
9.生產(chǎn)者壓力測(cè)試kafka-producer-perf-test.sh
1. 發(fā)送1024條消息--num-records 100并且每條消息大小為1KB--record-size 1024 最大吞吐量每秒10000條--throughput 100
sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024
你可以通過(guò)LogIKM查看分區(qū)是否增加了對(duì)應(yīng)的數(shù)據(jù)大小
從LogIKM 可以看到發(fā)送了1024條消息; 并且總數(shù)據(jù)量=1M; 1024條*1024byte = 1M;
2. 用指定消息文件--payload-file發(fā)送100條消息最大吞吐量每秒100條--throughput 100
先配置好消息文件batchmessage.txt
然后執(zhí)行命令
發(fā)送的消息會(huì)從batchmessage.txt里面隨機(jī)選擇; 注意這里我們沒(méi)有用參數(shù)--payload-delimeter指定分隔符,默認(rèn)分隔符是\n換行;
bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100 --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt
驗(yàn)證消息,可以通過(guò) LogIKM 查看發(fā)送的消息
相關(guān)可選參數(shù)
10.消費(fèi)者壓力測(cè)試kafka-consumer-perf-test.sh
消費(fèi)100條消息--messages 100
sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100
相關(guān)可選參數(shù)
11.刪除指定分區(qū)的消息kafka-delete-records.sh
刪除指定topic的某個(gè)分區(qū)的消息刪除至offset為1024
先配置json文件offset-json-file.json
{"partitions": [{"topic": "test1", "partition": 0, "offset": 1024}], "version":1 }
在執(zhí)行命令
sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json
驗(yàn)證 通過(guò) LogIKM 查看發(fā)送的消息
從這里可以看出來(lái),配置"offset": 1024 的意思是從最開(kāi)始的地方刪除消息到 1024的offset; 是從最前面開(kāi)始刪除的
12. 查看Broker磁盤信息
查詢指定topic磁盤信息--topic-list topic1,topic2
sh bin/kafka-log-dirs.sh --bootstrap-server xxxx:9090 --describe --topic-list test2
查詢指定Broker磁盤信息--broker-list 0 broker1,broker2
sh bin/kafka-log-dirs.sh --bootstrap-server xxxxx:9090 --describe --topic-list test2 --broker-list 0
例如我一個(gè)3分區(qū)3副本的Topic的查出來(lái)的信息
logDir Broker中配置的log.dir
{ "version": 1, "brokers": [{ "broker": 0, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0", "error": null, "partitions": [{ "partition": "test2-1", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-0", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-2", "size": 0, "offsetLag": 0, "isFuture": false }] }] }, { "broker": 1, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1", "error": null, "partitions": [{ "partition": "test2-1", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-0", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-2", "size": 0, "offsetLag": 0, "isFuture": false }] }] }, { "broker": 2, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2", "error": null, "partitions": [{ "partition": "test2-1", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-0", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-2", "size": 0, "offsetLag": 0, "isFuture": false }] }] }, { "broker": 3, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3", "error": null, "partitions": [] }] }] }
如果你覺(jué)得通過(guò)命令查詢磁盤信息比較麻煩,你也可以通過(guò) LogIKM 查看
12. 消費(fèi)者組管理 kafka-consumer-groups.sh
1. 查看消費(fèi)者列表--list
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list
先調(diào)用MetadataRequest拿到所有在線Broker列表
再給每個(gè)Broker發(fā)送ListGroupsRequest請(qǐng)求獲取 消費(fèi)者組數(shù)據(jù)
2. 查看消費(fèi)者組詳情--describe
DescribeGroupsRequest
查看消費(fèi)組詳情--group 或 --all-groups
查看指定消費(fèi)組詳情--group
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group
查看所有消費(fèi)組詳情--all-groups
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups
查看該消費(fèi)組 消費(fèi)的所有Topic、及所在分區(qū)、最新消費(fèi)offset、Log最新數(shù)據(jù)offset、Lag還未消費(fèi)數(shù)量、消費(fèi)者ID等等信息
查詢消費(fèi)者成員信息--members
所有消費(fèi)組成員信息
sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090
指定消費(fèi)組成員信息
sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090
查詢消費(fèi)者狀態(tài)信息--state
所有消費(fèi)組狀態(tài)信息
sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090
指定消費(fèi)組狀態(tài)信息
sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090
3. 刪除消費(fèi)者組--delete
DeleteGroupsRequest
刪除消費(fèi)組–delete
刪除指定消費(fèi)組--group
sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090
刪除所有消費(fèi)組--all-groups
sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090
PS: 想要?jiǎng)h除消費(fèi)組前提是這個(gè)消費(fèi)組的所有客戶端都停止消費(fèi)/不在線才能夠成功刪除;否則會(huì)報(bào)下面異常
Error: Deletion of some consumer groups failed: * Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
4. 重置消費(fèi)組的偏移量 --reset-offsets
能夠執(zhí)行成功的一個(gè)前提是 消費(fèi)組這會(huì)是不可用狀態(tài);
下面的示例使用的參數(shù)是: --dry-run ;這個(gè)參數(shù)表示預(yù)執(zhí)行,會(huì)打印出來(lái)將要處理的結(jié)果;
等你想真正執(zhí)行的時(shí)候請(qǐng)換成參數(shù)--excute ;
下面示例 重置模式都是 --to-earliest 重置到最早的;
請(qǐng)根據(jù)需要參考下面 相關(guān)重置Offset的模式 換成其他模式;
重置指定消費(fèi)組的偏移量 --group
重置指定消費(fèi)組的所有Topic的偏移量--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置指定消費(fèi)組的指定Topic的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2
重置所有消費(fèi)組的偏移量 --all-group
重置所有消費(fèi)組的所有Topic的偏移量--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置所有消費(fèi)組中指定Topic的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2
--reset-offsets 后面需要接重置的模式
相關(guān)重置Offset的模式
--from-file著重講解一下
上面其他的一些模式重置的都是匹配到的所有分區(qū); 不能夠每個(gè)分區(qū)重置到不同的offset;不過(guò)**--from-file**可以讓我們更靈活一點(diǎn);
先配置cvs文檔
格式為: Topic:分區(qū)號(hào): 重置目標(biāo)偏移量
test2,0,100 test2,1,200 test2,2,300
執(zhí)行命令
sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv
5. 刪除偏移量delete-offsets
以上大部分運(yùn)維操作,都可以使用 LogI-Kafka-Manager 在平臺(tái)上可視化操作;
滴滴開(kāi)源Logi-KafkaManager 一站式Kafka監(jiān)控與管控平臺(tái)
技術(shù)交流
有想進(jìn)
滴滴LogI開(kāi)源用戶群
的加我個(gè)人-
jjdlmn_
進(jìn)群(備注:進(jìn)群)
群里面主要交流
** kakfa**、es、agent、LogI-kafka-manager、
等等相關(guān)技術(shù);
群內(nèi)有專人解答你的問(wèn)題
對(duì)~ 相關(guān)技術(shù)領(lǐng)域的解答人員都有; 你問(wèn)的問(wèn)題都會(huì)得到回應(yīng)
Kafka 大數(shù)據(jù) 運(yùn)維
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。