你知道Kafka創建Topic這個過程做了哪些事情嗎?(附視頻) (上)
【kafka源碼】kafka-topics.sh之創建Topic源碼解析

日常運維
、
問題排查
怎么能夠少了滴滴開源的
滴滴開源LogiKM一站式Kafka監控與管控平臺
配套視頻:
https://www.bilibili.com/video/BV1uf4y1V75V?share_source=copy_web
腳本參數
創建Topic腳本
創建Topic 源碼分析
1. 源碼入口
2. 創建AdminClientTopicService 對象
2.1 先創建 Admin
3. AdminClientTopicService.createTopic 創建Topic
3.1 KafkaAdminClient.createTopics(NewTopic) 創建Topic
4. 發起網絡請求
5. Controller角色的服務端接受請求處理邏輯
5.1 KafkaApis.handle(request) 根據請求傳遞Api調用不同接口
5.2 KafkaApis.handleCreateTopicsRequest 處理創建Topic的請求
5.3 adminManager.createTopics()
5.4 寫入zookeeper數據
6. Controller監聽 /brokers/topics/Topic名稱, 通知Broker將分區寫入磁盤
6.1 onNewPartitionCreation 狀態流轉
7. Broker收到LeaderAndIsrRequest 創建本地Log
腳本參數
sh bin/kafka-topic -help 查看更具體參數
下面只是列出了跟--create 相關的參數
創建Topic腳本
zk方式(不推薦)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
需要注意的是–zookeeper后面接的是kafka的zk配置, 假如你配置的是localhost:2181/kafka 帶命名空間的這種,不要漏掉了
kafka版本 >= 2.2 支持下面方式(推薦)
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
更多TopicCommand相關命令請看
1.【kafka運維】TopicCommand運維腳本
當前分析的kafka源碼版本為 kafka-2.5
創建Topic 源碼分析
溫馨提示: 如果閱讀源碼略顯枯燥,你可以直接看源碼總結以及后面部分
首先我們找到源碼入口處, 查看一下 kafka-topic.sh腳本的內容
exec $(dirname
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
)/kafka-run-class.sh kafka.admin.TopicCommand "$@"最終是執行了kafka.admin.TopicCommand這個類,找到這個地方之后就可以斷點調試源碼了,用IDEA啟動
記得配置一下入參
比如: --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic test_create_topic3
1. 源碼入口
上面的源碼主要作用是
根據是否有傳入參數--zookeeper 來判斷創建哪一種 對象topicService
如果傳入了--zookeeper 則創建 類 ZookeeperTopicService的對象
否則創建類AdminClientTopicService的對象(我們主要分析這個對象)
根據傳入的參數類型判斷是創建topic還是刪除等等其他 判斷依據是 是否在參數里傳入了--create
2. 創建AdminClientTopicService 對象
val topicService = new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))
object AdminClientTopicService { def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = { bootstrapServer match { case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList) case None => } Admin.create(commandConfig) } def apply(commandConfig: Properties, bootstrapServer: Option[String]): AdminClientTopicService = new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer)) }
如果有入參--command-config ,則將這個文件里面的參數都放到map commandConfig里面, 并且也加入bootstrap.servers的參數;假如配置文件里面已經有了bootstrap.servers配置,那么會將其覆蓋
將上面的commandConfig 作為入參調用Admin.create(commandConfig)創建 Admin; 這個時候調用的Client模塊的代碼了, 從這里我們就可以看出,我們調用kafka-topic.sh腳本實際上是kafka模擬了一個客戶端Client來創建Topic的過程;
3. AdminClientTopicService.createTopic 創建Topic
topicService.createTopic(opts)
case class AdminClientTopicService private (adminClient: Admin) extends TopicService { override def createTopic(topic: CommandTopicPartition): Unit = { //如果配置了副本副本數--replication-factor 一定要大于0 if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1)) throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive") //如果配置了--partitions 分區數 必須大于0 if (topic.partitions.exists(partitions => partitions < 1)) throw new IllegalArgumentException(s"The partitions must be greater than 0") //查詢是否已經存在該Topic if (!adminClient.listTopics().names().get().contains(topic.name)) { val newTopic = if (topic.hasReplicaAssignment) //如果指定了--replica-assignment參數;則按照指定的來分配副本 new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get)) else { new NewTopic( topic.name, topic.partitions.asJava, topic.replicationFactor.map(_.toShort).map(Short.box).asJava) } // 將配置--config 解析成一個配置map val configsMap = topic.configsToAdd.stringPropertyNames() .asScala .map(name => name -> topic.configsToAdd.getProperty(name)) .toMap.asJava newTopic.configs(configsMap) //調用adminClient創建Topic val createResult = adminClient.createTopics(Collections.singleton(newTopic)) createResult.all().get() println(s"Created topic ${topic.name}.") } else { throw new IllegalArgumentException(s"Topic ${topic.name} already exists") } }
檢查各項入參是否有問題
adminClient.listTopics(),然后比較是否已經存在待創建的Topic;如果存在拋出異常;
判斷是否配置了參數--replica-assignment ; 如果配置了,那么Topic就會按照指定的方式來配置副本情況
解析配置--config 配置放到configsMap中; configsMap給到NewTopic對象
調用adminClient.createTopics創建Topic; 它是如何創建Topic的呢?往下分析源碼
@Override public CreateTopicsResult createTopics(final Collection
這個代碼里面主要看下Call里面的接口; 先不管Kafka如何跟服務端進行通信的細節; 我們主要關注創建Topic的邏輯;
createRequest會構造一個請求參數CreateTopicsRequest 例如下圖
選擇ControllerNodeProvider這個節點發起網絡請求
可以清楚的看到, 創建Topic這個操作是需要Controller來執行的;
4. 發起網絡請求
==>服務端客戶端網絡模型
5. Controller角色的服務端接受請求處理邏輯
首先找到服務端處理客戶端請求的 源碼入口 ? KafkaRequestHandler.run()
主要看里面的 apis.handle(request) 方法; 可以看到客戶端的請求都在request.bodyAndSize()里面
進入方法可以看到根據request.header.apiKey 調用對應的方法,客戶端傳過來的是CreateTopics
def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = { // 部分代碼省略 //如果當前Broker不是屬于Controller的話,就拋出異常 if (!controller.isActive) { createTopicsRequest.data.topics.asScala.foreach { topic => results.add(new CreatableTopicResult().setName(topic.name). setErrorCode(Errors.NOT_CONTROLLER.code)) } sendResponseCallback(results) } else { // 部分代碼省略 } adminManager.createTopics(createTopicsRequest.data.timeoutMs, createTopicsRequest.data.validateOnly, toCreate, authorizedForDescribeConfigs, handleCreateTopicsResults) } }
判斷當前處理的broker是不是Controller,如果不是Controller的話直接拋出異常,從這里可以看出,CreateTopic這個操作必須是Controller來進行, 出現這種情況有可能是客戶端發起請求的時候Controller已經變更;
鑒權 【Kafka源碼】kafka鑒權機制
調用adminManager.createTopics()
創建主題并等等主題完全創建,回調函數將會在超時、錯誤、或者主題創建完成時觸發
該方法過長,省略部分代碼
def createTopics(timeout: Int, validateOnly: Boolean, toCreate: Map[String, CreatableTopic], includeConfigsAndMetatadata: Map[String, CreatableTopicResult], responseCallback: Map[String, ApiError] => Unit): Unit = { // 1. map over topics creating assignment and calling zookeeper val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) } val metadata = toCreate.values.map(topic => try { //省略部分代碼 //檢查Topic是否存在 //檢查 --replica-assignment參數和 (--partitions || --replication-factor ) 不能同時使用 // 如果(--partitions || --replication-factor ) 沒有設置,則使用 Broker的配置(這個Broker肯定是Controller) // 計算分區副本分配方式 createTopicPolicy match { case Some(policy) => //省略部分代碼 adminZkClient.validateTopicCreate(topic.name(), assignments, configs) if (!validateOnly) adminZkClient.createTopicWithAssignment(topic.name, configs, assignments) case None => if (validateOnly) //校驗創建topic的參數準確性 adminZkClient.validateTopicCreate(topic.name, assignments, configs) else //把topic相關數據寫入到zk中 adminZkClient.createTopicWithAssignment(topic.name, configs, assignments) } }
做一些校驗檢查
①.檢查Topic是否存在
②. 檢查--replica-assignment參數和 (--partitions || --replication-factor ) 不能同時使用
③.如果(--partitions || --replication-factor ) 沒有設置,則使用 Broker的配置(這個Broker肯定是Controller)
④.計算分區副本分配方式
createTopicPolicy 根據Broker是否配置了創建Topic的自定義校驗策略; 使用方式是自定義實現org.apache.kafka.server.policy.CreateTopicPolicy接口;并 在服務器配置 create.topic.policy.class.name=自定義類; 比如我就想所有創建Topic的請求分區數都要大于10; 那么這里就可以實現你的需求了
createTopicWithAssignment把topic相關數據寫入到zk中; 進去分析一下
我們進入到adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)看看有哪些數據寫入到了zk中;
def createTopicWithAssignment(topic: String, config: Properties, partitionReplicaAssignment: Map[Int, Seq[Int]]): Unit = { validateTopicCreate(topic, partitionReplicaAssignment, config) // 將topic單獨的配置寫入到zk中 zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config) // 將topic分區相關信息寫入zk中 writeTopicPartitionAssignment(topic, partitionReplicaAssignment.mapValues(ReplicaAssignment(_)).toMap, isUpdate = false) }
源碼就不再深入了,這里直接詳細說明一下
寫入Topic配置信息
先調用SetDataRequest請求往節點/config/topics/Topic名稱 寫入數據; 這里
一般這個時候都會返回 NONODE (NoNode);節點不存在; 假如zk已經存在節點就直接覆蓋掉
節點不存在的話,就發起CreateRequest請求,寫入數據; 并且節點類型是持久節點
這里寫入的數據,是我們入參時候傳的topic配置--config; 這里的配置會覆蓋默認配置
寫入Topic分區副本信息
將已經分配好的副本分配策略寫入到 /brokers/topics/Topic名稱 中; 節點類型 持久節點
具體跟zk交互的地方在
ZookeeperClient.send() 這里包裝了很多跟zk的交互;
6. Controller監聽 /brokers/topics/Topic名稱, 通知Broker將分區寫入磁盤
Controller 有監聽zk上的一些節點; 在上面的流程中已經在zk中寫入了 /brokers/topics/Topic名稱 ; 這個時候Controller就監聽到了這個變化并相應;
KafkaController.processTopicChange
private def processTopicChange(): Unit = { //如果處理的不是Controller角色就返回 if (!isActive) return //從zk中獲取 `/brokers/topics 所有Topic val topics = zkClient.getAllTopicsInCluster //找出哪些是新增的 val newTopics = topics -- controllerContext.allTopics //找出哪些Topic在zk上被刪除了 val deletedTopics = controllerContext.allTopics -- topics controllerContext.allTopics = topics registerPartitionModificationsHandlers(newTopics.toSeq) val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics) deletedTopics.foreach(controllerContext.removeTopic) addedPartitionReplicaAssignment.foreach { case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment) } info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " + s"[$addedPartitionReplicaAssignment]") if (addedPartitionReplicaAssignment.nonEmpty) onNewPartitionCreation(addedPartitionReplicaAssignment.keySet) }
從zk中獲取 /brokers/topics 所有Topic跟當前Broker內存中所有BrokercontrollerContext.allTopics的差異; 就可以找到我們新增的Topic; 還有在zk中被刪除了的Broker(該Topic會在當前內存中remove掉)
從zk中獲取/brokers/topics/{TopicName} 給定主題的副本分配。并保存在內存中
執行onNewPartitionCreation;分區狀態開始流轉
關于Controller的狀態機 詳情請看: 【kafka源碼】Controller中的狀態機
/** * This callback is invoked by the topic change callback with the list of failed brokers as input. * It does the following - * 1. Move the newly created partitions to the NewPartition state * 2. Move the newly created partitions from NewPartition->OnlinePartition state */ private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = { info(s"New partition creation callback for ${newPartitions.mkString(",")}") partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica) partitionStateMachine.handleStateChanges( newPartitions.toSeq, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false)) ) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica) }
將待創建的分區狀態流轉為NewPartition;
將待創建的副本 狀態流轉為NewReplica;
將分區狀態從剛剛的NewPartition流轉為OnlinePartition
0. 獲取leaderIsrAndControllerEpochs; Leader為副本的第一個;
1. 向zk中寫入/brokers/topics/{topicName}/partitions/ 持久節點; 無數據
2. 向zk中寫入/brokers/topics/{topicName}/partitions/{分區號} 持久節點; 無數據
3. 向zk中寫入/brokers/topics/{topicName}/partitions/{分區號}/state 持久節點; 數據為leaderIsrAndControllerEpoch
向副本所屬Broker發送leaderAndIsrRequest請求
向所有Broker發送UPDATE_METADATA 請求
將副本狀態從剛剛的NewReplica流轉為OnlineReplica ,更新下內存
關于分區狀態機和副本狀態機詳情請看【kafka源碼】Controller中的狀態機
7. Broker收到LeaderAndIsrRequest 創建本地Log
上面步驟中有說到向副本所屬Broker發送leaderAndIsrRequest請求,那么這里做了什么呢
其實主要做的是 創建本地Log
代碼太多,這里我們直接定位到只跟創建Topic相關的關鍵代碼來分析
KafkaApis.handleLeaderAndIsrRequest->replicaManager.becomeLeaderOrFollower->ReplicaManager.makeLeaders...LogManager.getOrCreateLog
/** * 如果日志已經存在,只返回現有日志的副本否則如果 isNew=true 或者如果沒有離線日志目錄,則為給定的主題和給定的分區創建日志 否則拋出 KafkaStorageException */ def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = { logCreationOrDeletionLock synchronized { getLog(topicPartition, isFuture).getOrElse { // create the log if it has not already been created in another thread if (!isNew && offlineLogDirs.nonEmpty) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline") val logDirs: List[File] = { val preferredLogDir = preferredLogDirs.get(topicPartition) if (isFuture) { if (preferredLogDir == null) throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory") else if (getLog(topicPartition).get.dir.getParent == preferredLogDir) throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition") } if (preferredLogDir != null) List(new File(preferredLogDir)) else nextLogDirs() } val logDirName = { if (isFuture) Log.logFutureDirName(topicPartition) else Log.logDirName(topicPartition) } val logDir = logDirs .toStream // to prevent actually mapping the whole list, lazy map .map(createLogDirectory(_, logDirName)) .find(_.isSuccess) .getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", ")))) .get // If Failure, will throw val log = Log( dir = logDir, config = config, logStartOffset = 0L, recoveryPoint = 0L, maxProducerIdExpirationMs = maxPidExpirationMs, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, scheduler = scheduler, time = time, brokerTopicStats = brokerTopicStats, logDirFailureChannel = logDirFailureChannel) if (isFuture) futureLogs.put(topicPartition, log) else currentLogs.put(topicPartition, log) info(s"Created log for partition $topicPartition in $logDir with properties " + s"{${config.originals.asScala.mkString(", ")}}.") // Remove the preferred log dir since it has already been satisfied preferredLogDirs.remove(topicPartition) log } } }
如果日志已經存在,只返回現有日志的副本否則如果 isNew=true 或者如果沒有離線日志目錄,則為給定的主題和給定的分區創建日志 否則拋出KafkaStorageException
Tips:如果關于本篇文章你有疑問,可以關注公眾號留言解答
Kafka 視頻
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。