kafka源碼解析之二:Producer代碼分析(scala版本)(下)
這里主要是根據config.queueEnqueueTimeoutMs參數選擇不同的入隊列方式,該參數的值含義如下:
# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
#queue.enqueue.timeout.ms=
最后再看看異步發送線程里面到底做了些什么,線程的run函數很簡單只是調用processEvents函數,代碼如下:
private def processEvents() {
var lastSend = SystemTime.milliseconds
var events = new ArrayBuffer[KeyedMessage[K,V]]
var full: Boolean = false
// drain the queue until you get a shutdown command
Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
.takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
currentQueueItem =>
val elapsed = (SystemTime.milliseconds - lastSend)
// check if the queue time is reached. This happens when the poll method above returns after a timeout and
// returns a null object
val expired = currentQueueItem == null
if(currentQueueItem != null) {
trace("Dequeued item for topic %s, partition key: %s, data: %s"
.format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
events += currentQueueItem
}
// check if the batch size is reached
full = events.size >= batchSize
//有兩種情況會導致消息開始發送,一種是events數組個數達到批量發送的個數,另一種是poll超時后也會發送
if(full || expired) {
if(expired)
debug(elapsed + " ms elapsed. Queue time reached. Sending..")
if(full)
debug("Batch full. Sending..")
// if either queue time has reached or batch size has reached, dispatch to event handler
tryToHandle(events)
lastSend = SystemTime.milliseconds
events = new ArrayBuffer[KeyedMessage[K,V]]
}
}
// send the last batch of events
tryToHandle(events)
if(queue.size > 0)
throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
.format(queue.size))
}
這個函數主要做的事情是從queue.poll 中獲取消息,直到收到一個shutdown命令
把每個消息加到events數組中, 如果evnes數組個數得到config.batchNumMessages或者得到超時時間,則批量發送數據。
最后發送時調用handler.handle(events)進行發送,流程跟同步發送一樣 。
3.?? Server處理發送流程
Kafka server在啟動的時候會開啟N個線程來處理請求。其中N是由num.io.threads屬性指定,默認是8。Kafka推薦你設置該值至少是機器上磁盤數。在KafkaServer的startup方法中,如代碼所示:
def?startup() {
...
// 創建一個請求處理的線程池,在構造時就會開啟多個線程準備接收請求
requestHandlerPool?=?new?KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
...
}
class?KafkaRequestHandlerPool {
...
for(i <-?0?until numThreads) {
runnables(i)?=?new?KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i)?=?Utils.daemonThread("kafka-request-handler-"?+ i, runnables(i))
threads(i).start()?// 啟動每個請求處理線程
}
...
}
KafkaRequestHandler實際上是一個Runnable,它的run核心方法中以while (true)的方式調用api.handle(request)不斷地接收請求處理,如下面的代碼所示:
class?KafkaRequestHandler...?extends?Runnable {
...
def?run() {
...
while?(true) {
...
apis.handle(request)?// 調用apis.handle等待請求處理
}
...
}
...
}
在KafkaApis中handle的主要作用就是接收各種類型的請求。本文只關注ProducerRequest請求:
def?handle(request:?RequestChannel.Request) {
...
request.requestId?match?{
case?RequestKeys.ProduceKey?=> handleProducerOrOffsetCommitRequest(request)?// 如果接收到ProducerRequest交由handleProducerOrOffsetCommitRequest處理
case?...
}
...
}
如此看來,核心的方法就是handleProducerOrOffsetCommitRequest了。這個方法之所以叫這個名字,是因為它同時可以處理ProducerRequest和OffsetCommitRequest兩種請求,后者其實也是一種特殊的ProducerRequest。從Kafka 0.8.2之后kafka使用一個特殊的topic來保存提交位移(commit offset)。這個topic名字是__consumer_offsets。本文中我們關注的是真正的ProducerRequest。下面來看看這個方法的邏輯,如下圖所示:
整體邏輯看上去非常簡單,如下面的代碼所示:
def?handleProducerOrOffsetCommitRequest(request:?RequestChannel.Request) {
...
val?localProduceResults?=?appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)?// 將消息追加寫入本地提交日志
val?numPartitionsInError?=?localProduceResults.count(_.error.isDefined)?// 計算是否存在發送失敗的分區
if(produceRequest.requiredAcks?==?0) {?// request.required.acks = 0時的代碼路徑
if?(numPartitionsInError !=?0) {
info(("Send the close connection response due to error handling produce request "?+
"[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
.format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
requestChannel.closeConnection(request.processor, request)?// 關閉底層Socket以告知客戶端程序有發送失敗的情況
}?else?{
...
}
}?else?if?(produceRequest.requiredAcks?==?1?||?// request.required.acks = 0時的代碼路徑,當然還有其他兩個條件
produceRequest.numPartitions <=?0?||
numPartitionsInError?==?produceRequest.numPartitions) {
val?response?=?offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
.getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
requestChannel.sendResponse(new?RequestChannel.Response(request,?new?BoundedByteBufferSend(response)))?// 發送response給客戶端
}?else?{?//? request.required.acks = -1時的代碼路徑
// create a list of (topic, partition) pairs to use as keys for this delayed request
val?producerRequestKeys?=?produceRequest.data.keys.toSeq
val?statuses?=?localProduceResults.map(r?=>
r.key -> DelayedProduceResponseStatus(r.end +?1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val?delayedRequest?=??new?DelayedProduce(...)?// 此時需要構造延時請求進行處理,此段邏輯比較復雜,需要理解Purgatory的概念,本文暫不考慮,后續再分析
...
}
由上面代碼可見,無論request.required.acks是何值,都需要首先將待發送的消息集合追加寫入本地的提交日志中。此時如何按照默認值是是0的情況,那么這寫入日志后需要判斷下所有消息是否都已經發送成功了。如果出現了發送錯誤,那么就將關閉連入broker的Socket Server以通知客戶端程序錯誤的發生。
最后的這個方法就是Partition的appendMessagesToLeader,其主要代碼如下:
def?appendMessagesToLeader(messages:?ByteBufferMessageSet, requiredAcks:?Int=0)?=?{
inReadLock(leaderIsrUpdateLock) {
val?leaderReplicaOpt?=?leaderReplicaIfLocal()?// 判斷目標分區的leader副本是否在該broker上
leaderReplicaOpt?match?{
case?Some(leaderReplica)?=>?// 如果leader副本在該broker上
val?log?=?leaderReplica.log.get?// 獲取本地提交日志文件句柄
val?minIsr?=?log.config.minInSyncReplicas
val?inSyncSize?=?inSyncReplicas.size
// Avoid writing to leader if there are not enough insync replicas to make it safe
if?(inSyncSize < minIsr && requiredAcks?==?-1) {?//只有request.required.acks等于-1時才會判斷ISR數是否不足
throw?new?NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
.format(topic,partitionId,minIsr,inSyncSize))
}
val?info?=?log.append(messages, assignOffsets?=?true)?// 真正的寫日志操作,由于涉及Kafka底層寫日志的,以后有機會寫篇文章專門探討這部分功能
// probably unblock some follower fetch requests since log end offset has been updated
replicaManager.unblockDelayedFetchRequests(new?TopicAndPartition(this.topic,?this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
info
case?None?=>?// 如果不在,直接拋出異常表明leader不在該broker上
throw?new?NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
.format(topic, partitionId, localBrokerId))
}
}
至此,一個最簡單的scala版同步producer的代碼走讀就算正式完成了,可以發現Kafka設計的思路就是在每個broker上啟動一個server不斷地處理從客戶端發來的各種請求,完成對應的功能并按需返回對應的response
4.? ?問題 解答
1. producer客戶端是否會跟zk交互?
實際上不會跟zookeeper交互,所有的獲取metedata的信息都是通過和broker來獲取。只有Consumer需要和zk交互。
2. compression.codec 這個參數是如何使用的
這個是Producer端的一個參數,用來設置消息打包是否要加壓。從官網看目前支持如下選項:This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip", "snappy" and “lz4”.
例如設置為:
compression.codec=none
3. 問題 :? blockingChannel.send(request) 到底是阻塞的還是非阻塞的。
這個是阻塞的發送。因為在BlockingChannel中的Connect中,設置的socket屬性是blocking:
class BlockingChannel( val host: String,
val port: Int,
val readBufferSize: Int,
val writeBufferSize: Int,
val readTimeoutMs: Int ) extends Logging { def connect() = lock synchro n
ized? {
…
def connect() = lock syn
chro n ized? {
if(!connected) {
try {
channel = SocketChannel.open()
if(readBufferSize > 0)
channel.socket.setReceiveBufferSize(readBufferSize)
if(writeBufferSize > 0)
channel.socket.setSendBufferSize(writeBufferSize)
channel.configureBlocking(true) }
…
5.
附錄1.
Case
類
http://nerd-is.in/2013-09/scala-learning-pattern-matching-and-case-classes/
6.
參考資料1.?????? Kafka Producer同步模式發送message
源碼分析
http://blog.csdn.net/itleochen/article/details/19926785
2.?????? Kafka producer原理 (Scala版同步
producer)
Scala Kafka
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。