kafka源碼解析之二:Producer代碼分析(scala版本)(下)

      網友投稿 965 2022-05-29

      這里主要是根據config.queueEnqueueTimeoutMs參數選擇不同的入隊列方式,該參數的值含義如下:

      # Timeout for event enqueue:

      # 0: events will be enqueued immediately or dropped if the queue is full

      # -ve: enqueue will block indefinitely if the queue is full

      # +ve: enqueue will block up to this many milliseconds if the queue is full

      #queue.enqueue.timeout.ms=

      最后再看看異步發送線程里面到底做了些什么,線程的run函數很簡單只是調用processEvents函數,代碼如下:

      private def processEvents() {

      var lastSend = SystemTime.milliseconds

      var events = new ArrayBuffer[KeyedMessage[K,V]]

      var full: Boolean = false

      // drain the queue until you get a shutdown command

      Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))

      .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {

      currentQueueItem =>

      val elapsed = (SystemTime.milliseconds - lastSend)

      // check if the queue time is reached. This happens when the poll method above returns after a timeout and

      // returns a null object

      val expired = currentQueueItem == null

      if(currentQueueItem != null) {

      trace("Dequeued item for topic %s, partition key: %s, data: %s"

      .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))

      events += currentQueueItem

      }

      // check if the batch size is reached

      full = events.size >= batchSize

      //有兩種情況會導致消息開始發送,一種是events數組個數達到批量發送的個數,另一種是poll超時后也會發送

      if(full || expired) {

      if(expired)

      debug(elapsed + " ms elapsed. Queue time reached. Sending..")

      if(full)

      debug("Batch full. Sending..")

      // if either queue time has reached or batch size has reached, dispatch to event handler

      tryToHandle(events)

      lastSend = SystemTime.milliseconds

      events = new ArrayBuffer[KeyedMessage[K,V]]

      }

      }

      // send the last batch of events

      tryToHandle(events)

      if(queue.size > 0)

      throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"

      .format(queue.size))

      }

      這個函數主要做的事情是從queue.poll 中獲取消息,直到收到一個shutdown命令

      把每個消息加到events數組中, 如果evnes數組個數得到config.batchNumMessages或者得到超時時間,則批量發送數據。

      最后發送時調用handler.handle(events)進行發送,流程跟同步發送一樣 。

      3.?? Server處理發送流程

      Kafka server在啟動的時候會開啟N個線程來處理請求。其中N是由num.io.threads屬性指定,默認是8。Kafka推薦你設置該值至少是機器上磁盤數。在KafkaServer的startup方法中,如代碼所示:

      def?startup() {

      ...

      // 創建一個請求處理的線程池,在構造時就會開啟多個線程準備接收請求

      requestHandlerPool?=?new?KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)

      ...

      }

      class?KafkaRequestHandlerPool {

      ...

      for(i <-?0?until numThreads) {

      runnables(i)?=?new?KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)

      threads(i)?=?Utils.daemonThread("kafka-request-handler-"?+ i, runnables(i))

      threads(i).start()?// 啟動每個請求處理線程

      }

      ...

      }

      KafkaRequestHandler實際上是一個Runnable,它的run核心方法中以while (true)的方式調用api.handle(request)不斷地接收請求處理,如下面的代碼所示:

      class?KafkaRequestHandler...?extends?Runnable {

      ...

      def?run() {

      ...

      while?(true) {

      ...

      apis.handle(request)?// 調用apis.handle等待請求處理

      }

      ...

      }

      ...

      }

      在KafkaApis中handle的主要作用就是接收各種類型的請求。本文只關注ProducerRequest請求:

      def?handle(request:?RequestChannel.Request) {

      ...

      request.requestId?match?{

      case?RequestKeys.ProduceKey?=> handleProducerOrOffsetCommitRequest(request)?// 如果接收到ProducerRequest交由handleProducerOrOffsetCommitRequest處理

      case?...

      }

      ...

      }

      如此看來,核心的方法就是handleProducerOrOffsetCommitRequest了。這個方法之所以叫這個名字,是因為它同時可以處理ProducerRequest和OffsetCommitRequest兩種請求,后者其實也是一種特殊的ProducerRequest。從Kafka 0.8.2之后kafka使用一個特殊的topic來保存提交位移(commit offset)。這個topic名字是__consumer_offsets。本文中我們關注的是真正的ProducerRequest。下面來看看這個方法的邏輯,如下圖所示:

      整體邏輯看上去非常簡單,如下面的代碼所示:

      def?handleProducerOrOffsetCommitRequest(request:?RequestChannel.Request) {

      ...

      val?localProduceResults?=?appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)?// 將消息追加寫入本地提交日志

      val?numPartitionsInError?=?localProduceResults.count(_.error.isDefined)?// 計算是否存在發送失敗的分區

      if(produceRequest.requiredAcks?==?0) {?// request.required.acks = 0時的代碼路徑

      if?(numPartitionsInError !=?0) {

      info(("Send the close connection response due to error handling produce request "?+

      "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")

      .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))

      requestChannel.closeConnection(request.processor, request)?// 關閉底層Socket以告知客戶端程序有發送失敗的情況

      }?else?{

      ...

      }

      }?else?if?(produceRequest.requiredAcks?==?1?||?// request.required.acks = 0時的代碼路徑,當然還有其他兩個條件

      produceRequest.numPartitions <=?0?||

      numPartitionsInError?==?produceRequest.numPartitions) {

      val?response?=?offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))

      .getOrElse(ProducerResponse(produceRequest.correlationId, statuses))

      requestChannel.sendResponse(new?RequestChannel.Response(request,?new?BoundedByteBufferSend(response)))?// 發送response給客戶端

      }?else?{?//? request.required.acks = -1時的代碼路徑

      // create a list of (topic, partition) pairs to use as keys for this delayed request

      val?producerRequestKeys?=?produceRequest.data.keys.toSeq

      val?statuses?=?localProduceResults.map(r?=>

      r.key -> DelayedProduceResponseStatus(r.end +?1, ProducerResponseStatus(r.errorCode, r.start))).toMap

      val?delayedRequest?=??new?DelayedProduce(...)?// 此時需要構造延時請求進行處理,此段邏輯比較復雜,需要理解Purgatory的概念,本文暫不考慮,后續再分析

      ...

      }

      由上面代碼可見,無論request.required.acks是何值,都需要首先將待發送的消息集合追加寫入本地的提交日志中。此時如何按照默認值是是0的情況,那么這寫入日志后需要判斷下所有消息是否都已經發送成功了。如果出現了發送錯誤,那么就將關閉連入broker的Socket Server以通知客戶端程序錯誤的發生。

      最后的這個方法就是Partition的appendMessagesToLeader,其主要代碼如下:

      def?appendMessagesToLeader(messages:?ByteBufferMessageSet, requiredAcks:?Int=0)?=?{

      inReadLock(leaderIsrUpdateLock) {

      val?leaderReplicaOpt?=?leaderReplicaIfLocal()?// 判斷目標分區的leader副本是否在該broker上

      leaderReplicaOpt?match?{

      case?Some(leaderReplica)?=>?// 如果leader副本在該broker上

      val?log?=?leaderReplica.log.get?// 獲取本地提交日志文件句柄

      val?minIsr?=?log.config.minInSyncReplicas

      val?inSyncSize?=?inSyncReplicas.size

      // Avoid writing to leader if there are not enough insync replicas to make it safe

      if?(inSyncSize < minIsr && requiredAcks?==?-1) {?//只有request.required.acks等于-1時才會判斷ISR數是否不足

      throw?new?NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"

      .format(topic,partitionId,minIsr,inSyncSize))

      }

      val?info?=?log.append(messages, assignOffsets?=?true)?// 真正的寫日志操作,由于涉及Kafka底層寫日志的,以后有機會寫篇文章專門探討這部分功能

      // probably unblock some follower fetch requests since log end offset has been updated

      replicaManager.unblockDelayedFetchRequests(new?TopicAndPartition(this.topic,?this.partitionId))

      // we may need to increment high watermark since ISR could be down to 1

      maybeIncrementLeaderHW(leaderReplica)

      info

      case?None?=>?// 如果不在,直接拋出異常表明leader不在該broker上

      throw?new?NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"

      .format(topic, partitionId, localBrokerId))

      }

      }

      至此,一個最簡單的scala版同步producer的代碼走讀就算正式完成了,可以發現Kafka設計的思路就是在每個broker上啟動一個server不斷地處理從客戶端發來的各種請求,完成對應的功能并按需返回對應的response

      4.? ?問題 解答

      1. producer客戶端是否會跟zk交互?

      實際上不會跟zookeeper交互,所有的獲取metedata的信息都是通過和broker來獲取。只有Consumer需要和zk交互。

      2. compression.codec 這個參數是如何使用的

      這個是Producer端的一個參數,用來設置消息打包是否要加壓。從官網看目前支持如下選項:This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip", "snappy" and “lz4”.

      例如設置為:

      compression.codec=none

      3. 問題 :? blockingChannel.send(request) 到底是阻塞的還是非阻塞的。

      這個是阻塞的發送。因為在BlockingChannel中的Connect中,設置的socket屬性是blocking:

      class BlockingChannel( val host: String,

      val port: Int,

      val readBufferSize: Int,

      val writeBufferSize: Int,

      val readTimeoutMs: Int ) extends Logging { def connect() = lock synchro n

      ized? {

      def connect() = lock syn

      chro n ized? {

      if(!connected) {

      try {

      channel = SocketChannel.open()

      if(readBufferSize > 0)

      channel.socket.setReceiveBufferSize(readBufferSize)

      if(writeBufferSize > 0)

      channel.socket.setSendBufferSize(writeBufferSize)

      channel.configureBlocking(true) }

      kafka源碼解析之二:Producer代碼分析(scala版本)(下)

      5.

      附錄1.

      Case

      http://nerd-is.in/2013-09/scala-learning-pattern-matching-and-case-classes/

      6.

      參考資料1.?????? Kafka Producer同步模式發送message

      源碼分析

      http://blog.csdn.net/itleochen/article/details/19926785

      2.?????? Kafka producer原理 (Scala版同步

      producer)

      Scala Kafka

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:基于Redis實現DelayQueue延遲隊列設計方案
      下一篇:Linux系列:shell編程之變量
      相關文章
      亚洲av日韩av欧v在线天堂| 亚洲无人区视频大全| 亚洲xxxx视频| 久久久久亚洲AV无码观看 | 久久久久久久综合日本亚洲| 亚洲国产精品一区二区第四页| 色噜噜噜噜亚洲第一| 亚洲爆乳大丰满无码专区| 亚洲日本VA中文字幕久久道具| 国产日本亚洲一区二区三区| 亚洲网站在线播放| 亚洲国产精品线观看不卡| 亚洲国产人成在线观看| 亚洲美女视频网址| 亚洲国产电影在线观看| 亚洲Av高清一区二区三区| 亚洲综合校园春色| 亚洲日韩精品国产3区| 亚洲国产成人综合精品| 亚洲色精品VR一区区三区| 亚洲精品中文字幕无码A片老| 亚洲成a人无码亚洲成av无码| www亚洲精品久久久乳| 亚洲А∨精品天堂在线| 亚洲无码日韩精品第一页| 亚洲香蕉成人AV网站在线观看| 亚洲人成精品久久久久| 久久久久久亚洲精品| 亚洲视频在线观看视频| 亚洲国产成人久久| 伊人久久亚洲综合影院首页| 亚洲AV永久无码精品一福利 | 老牛精品亚洲成av人片| 亚洲欧洲自拍拍偷精品 美利坚| 国产亚洲老熟女视频| 亚洲av无码潮喷在线观看| 亚洲黄色三级视频| 亚洲第一区二区快射影院| 精品国产亚洲AV麻豆| 在线观看国产区亚洲一区成人 | 亚洲AV女人18毛片水真多|