圖解Kafka服務(wù)端網(wǎng)絡(luò)模型

      網(wǎng)友投稿 721 2022-05-29

      大家好,我是彥祖

      Kafka的網(wǎng)絡(luò)模型

      關(guān)鍵類解析

      SocketServer

      ConnectionQuotas

      AbstractServerThread

      Acceptor 和 Processor

      RequestChannel

      KafkaApis

      KafkaRequestHandlerPool

      KafkaRequestHandler

      通信流程總結(jié)

      數(shù)據(jù)面板(DataPlane)

      控制器面板(ControllerPlane)

      線程模型: Reactor模式

      問答

      Kafka網(wǎng)絡(luò)模型使用的是什么線程模型?

      什么是ControllerPlane(控制器面板),什么是DataPlane(數(shù)據(jù)面板)?

      Kafka整個請求流程是什么樣子的

      與Kafka網(wǎng)絡(luò)通信相關(guān)的配置。

      Kafka的網(wǎng)絡(luò)模型

      Kafka中的網(wǎng)絡(luò)模型就是基于 主從Reactor多線程進(jìn)行設(shè)計的, 在整體講述Kafka網(wǎng)絡(luò)模型之前,我們現(xiàn)在按照源碼中的相關(guān)類來講解一下他們分別都是用來做什么的.

      關(guān)鍵類解析

      這個類是網(wǎng)絡(luò)通信的核心類,它持有這Acceptor和 Processor對象。

      這個是控制連接數(shù)配額的類,

      涉及到的Broker配置有:

      AbstractServerThread 類:這是Acceptor線程和Processor線程的抽象基類,它定義了一個抽象方法wakeup() ,主要是用來喚醒Acceptor 線程和 Processor 對應(yīng)的Selector的, 當(dāng)然還有一些共用方法

      Acceptor 線程類:繼承自AbstractServerThread, 這是接收和創(chuàng)建外部 TCP 連接的線程。每個 SocketServer 實例一般會創(chuàng)建一個 Acceptor 線程(如果listeners配置了多個就會創(chuàng)建多個Acceptor)。它的唯一目的就是創(chuàng)建連接,并將接收到的 SocketChannel(SocketChannel通道用于傳輸數(shù)據(jù)) 傳遞給下游的 Processor 線程處理,Processor主要是處理連接之后的事情,例如讀寫I/O。

      涉及到的Broker配置有:

      Processor 線程類:這是處理單個TCP 連接上所有請求的處理線程。每個Acceptor 實例創(chuàng)建若干個(num.network.threads)Processor 線程。Processor 線程負(fù)責(zé)將接收到的 SocketChannel(SocketChannel通道用于傳輸數(shù)據(jù)。), 注冊讀寫事件,當(dāng)數(shù)據(jù)傳送過來的時候,會立即讀取Request數(shù)據(jù),通過解析之后, 然后將其添加到 RequestChannel 的 requestQueue 隊列上,同時還負(fù)責(zé)將 Response 返還給 Request 發(fā)送方。

      涉及到的Broker配置有:

      簡單畫了一張兩個類之間的關(guān)系圖

      這兩個類都是 AbstractServerThead的實現(xiàn)類,超類是Runnable 可運(yùn)行的。

      每個Acceptor持有num.network.threads個 Processor 線程, 假如配置了多個listeners,那么總共Processor線程數(shù)是 listeners*num.network.threads.

      Acceptor 創(chuàng)建的是ServerSocketChannel通道,這個通道是用來監(jiān)聽新進(jìn)來的TCP鏈接的通道,

      通過serverSocketChannel.accept()方法可以拿到SocketChannel通道用于傳輸數(shù)據(jù)。

      每個Processor 線程都有一個唯一的id,并且通過Acceptor拿到的SocketChannel會被暫時放入到newConnections隊列中

      每個Processor 都創(chuàng)建了自己的Selector

      Processor會不斷的從自身的newConnections隊列里面獲取新SocketChannel,并注冊讀寫事件,如果有數(shù)據(jù)傳輸過來,則會讀取數(shù)據(jù),并解析成Request請求。

      既然兩個都是可執(zhí)行線程,那我們看看兩個線程的run方法都做了哪些事情

      Acceptor.run

      def run(): Unit = { //將serverChannel 注冊到nioSelector上,并且對 Accept事件感興趣:表示服務(wù)器監(jiān)聽到了客戶連接,那么服務(wù)器可以接收這個連接了 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) try { var currentProcessorIndex = 0 while (isRunning) { try { //返回感興趣的事件數(shù)量 這里是感興趣的是SelectionKey.OP_ACCEPT,監(jiān)聽到新的鏈接 val ready = nioSelector.select(500) if (ready > 0) { //獲取所有就緒通道 val keys = nioSelector.selectedKeys() val iter = keys.iterator() //遍歷所有就緒通道 while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() //只處理 Accept事件,其他的事件則拋出異常,ServerSocketChannel是 監(jiān)聽Tcp的鏈接通道 if (key.isAcceptable) { //根據(jù)Key 拿到SocketChannle = serverSocketChannel.accept(),然后再遍歷 accept(key).foreach { socketChannel => //將socketChannel分配給我們的 processor來處理,如果有多個socketChannel 則按照輪訓(xùn)分配的原則 //如果一個processor 中能夠處理的newconnection 隊列滿了放不下了,則找下一個 // 如果所有的都放不下,則會一直循環(huán)直到有processor能夠處理。 var retriesLeft = synchronized(processors.length) var processor: Processor = null do { retriesLeft -= 1 //輪訓(xùn)每個processors來處理 processor = synchronized { // adjust the index (if necessary) and retrieve the processor atomically for // correct behaviour in case the number of processors is reduced dynamically currentProcessorIndex = currentProcessorIndex % processors.length processors(currentProcessorIndex) } currentProcessorIndex += 1 } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0)) } } else throw new IllegalStateException("Unrecognized key state for acceptor thread.") } catch { case e: Throwable => error("Error while accepting connection", e) } } } } catch { 省略 } } } finally { 省略 } }

      將ServerSocketChannel通道注冊到nioSelector 上,并關(guān)注事件SelectionKey.OP_ACCEPT

      serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)

      while循環(huán),持續(xù)阻塞監(jiān)聽事件,超時時間500ms

      // 阻塞查詢Selector是否有監(jiān)聽到新的事件 val ready = nioSelector.select(500) // 如果有事件,則查詢具體的事件和通道 if(ready>0>{ //獲取所有就緒事件準(zhǔn)備處理 val keys = nioSelector.selectedKeys() }

      遍歷剛剛監(jiān)聽到的事件, 如果該SelectionKey不包含OP_ACCEPT(建立連接)事件,則拋出異常,通常不會出現(xiàn)這個異常。

      Unrecognized key state for acceptor thread

      如果SelectionKey包含OP_ACCEPT(建立連接)事件,則可以通過這個SelectionKey拿到serverSocketChannel,通過serverSocketChannel 拿到socketChannel,并且將SocketChannel設(shè)置為非阻塞模式。

      val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] // 調(diào)用accept方法就可以拿到ScoketChannel了。 val socketChannel = serverSocketChannel.accept() //設(shè)置為非阻塞模式 就可以在異步模式下調(diào)用connect(), read() 和write()了。 socketChannel.configureBlocking(false)

      接下來,把上面拿到的SocketChannel以遍歷的形式給Acceptor下面的Procesor, 讓Processor來執(zhí)行后面的處理。分配的體現(xiàn)形式是, 將拿到的SocketChannel保存在Processor中的newConnections阻塞隊列中,這個newConnections上限是20,在代碼里面寫死了的,也就是說一個Processor同時最多只能處理20個連接, 那么所有的Processor能處理的最大連接就是Processor數(shù)量 * 20;如果你的連接請求并發(fā)度很高,可以嘗試調(diào)大num.network.threads

      最后,如果newConnections隊列放入了一個新的SocketChannel,則會調(diào)用一下對應(yīng)Processor實例的wakeup()方法。

      Procesor.run

      override def run(): Unit = { startupComplete() try { while (isRunning) { try { // setup any new connections that have been queued up // 將之前監(jiān)聽到的TCP鏈接(暫時保存在newConnections中) 開始注冊監(jiān)聽OP_READ事件到每個Processor的 KSelector選擇器中。 configureNewConnections() // register any new responses for writing processNewResponses() //在不阻塞的情況下對每個連接執(zhí)行任何 I/O 操作。這包括完成連接、完成斷開連接、啟動新發(fā)送或在進(jìn)行中的發(fā)送或接收上取得進(jìn)展。 // 當(dāng)此調(diào)用完成時,用戶可以使用completedSends() 、 completedReceives() 、 connected() 、 disconnected()檢查已完成的發(fā)送、接收、連接或斷開連接。 poll() // 把請求解析后放到 requestChannels 隊列中,異步處理 processCompletedReceives() //處理已經(jīng)發(fā)送完成的請求 processCompletedSends() processDisconnected() closeExcessConnections() } catch { // We catch all the throwables here to prevent the processor thread from exiting. We do this because // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would // be either associated with a specific socket channel or a bad request. These exceptions are caught and // processed by the individual methods above which close the failing channel and continue processing other // channels. So this catch block should only ever see ControlThrowables. case e: Throwable => processException("Processor got uncaught exception.", e) } } } finally { debug(s"Closing selector - processor $id") CoreUtils.swallow(closeAll(), this, Level.ERROR) shutdownComplete() } }

      configureNewConnections(): 之前Acceptor監(jiān)聽到的SocketChannel保存在Procesor中的newConnections阻塞隊列中, 現(xiàn)在開始將newConnections阻塞隊列一個個取出來,向Procesor的Selector注冊SocketChannel通道,并且感興趣的事件為SelectionKey.OP_READ讀事件。

      processNewResponses() : 去Processor里面的無邊界阻塞隊列responseQueue里面獲取RequestChannel.Response數(shù)據(jù), 如果有數(shù)據(jù)并且需要返回Response的話, 則通過channel返回數(shù)據(jù). 具體的Channel是根據(jù)connectionId 獲取之前構(gòu)建的KafkaChannel, KafkaChannel則會通過監(jiān)聽SelectionKey.OP_WRITE。然后調(diào)用writeTo方法。

      至于responseQueue這個隊列是什么時候入隊的,我們后面再分析

      poll(): 這個方法里面執(zhí)行的就很多了, 這個方法底層調(diào)用的是selector.poll(); 將監(jiān)聽到的事件批量處理,它才是執(zhí)行I/O請求的最終地方, 它正對每個連接執(zhí)行任何的I/O操作,這包括了 完成連接、完成斷開連接、啟動新發(fā)送等等。

      像校驗身份信息,還有handshake等等這些也都是在這里執(zhí)行的。

      processCompletedReceives(): 處理所有completedReceives(已完成接收的請求)進(jìn)行接下來的處理, 處理的方式是解析一下收到的請求,最終調(diào)用了requestChannel.sendRequest(req). 也就是說所有的請求最終通過解析放入到了RequestChannel中的requestQueue阻塞隊列中, 這個阻塞隊列的大小為queued.max.requests默認(rèn)500;表示的是在阻塞網(wǎng)絡(luò)線程之前,數(shù)據(jù)平面允許的排隊請求數(shù)

      PS: 這個completedReceives 是在 poll()方法中添加的元素。

      processCompletedSends(): 它負(fù)責(zé)處理 Response 的回調(diào)邏輯,通過遍歷completedSends(已完成發(fā)送)集合 可以從inflightResponses中移除并拿到response對象,然后再調(diào)用回調(diào)邏輯。

      PS: 這個completedSends 是在 poll()方法中添加的元素。

      processDisconnected(): 處理斷開鏈接的情況, connectionQuotas連接限流減掉這個鏈接,inflightResponses也移除對應(yīng)連接。

      closeExcessConnections(): 關(guān)閉超限連接 ,當(dāng)總連接數(shù) >max.connections && (inter.broker.listener.name!=listener|| listeners 數(shù)量==1) 則需要關(guān)閉一些連接.

      簡單來說就是:就算Broker已經(jīng)達(dá)到了最大連接數(shù)的限制了, 也應(yīng)該允許 broker之間-上的連接, 這種情況下,將會關(guān)閉另外一個-上最近最少使用的連接。broker之間的-是配置inter.broker.listener.name 決定的

      所謂優(yōu)先關(guān)閉,是指在諸多 TCP 連接中找出最近未被使用的那個。這里“未被使用”就是說,在最近一段時間內(nèi),沒有任何 Request 經(jīng)由這個連接被發(fā)送到 Processor 線程。

      這個類保存這所有的Processor,還有一個阻塞隊列保存這待處理請求。這個隊列最大長度由queued.max.requests控制,當(dāng)待處理請求超過這個數(shù)值的時候網(wǎng)絡(luò)就會阻塞

      涉及到的Broker配置有:

      具體Request的處理類, 所有的請求方法處理邏輯都放在這個里面。

      KafkaRequestHandler的線程池,KafkaRequestHandler線程的數(shù)量由配置num.io.threads決定。

      涉及到的Broker配置有:

      請求處理類, 每個Handler都會去 requestChannel的requestQueue隊列里面poll請求, 然后去處理,最終調(diào)用的處理方法是KafkaApis.handle()

      這幾個類之間的關(guān)系如下

      通信流程總結(jié)

      KafkaServer啟動的時候,會根據(jù)listeners的配置來初始化對應(yīng)的實例。

      一個listeners對應(yīng)一個Acceptor,一個Acceptor持有若干個(num.network.threads)Processor實例。

      Acceptor 中的nioSelector注冊的是ServerSocketChannel通道,并監(jiān)聽OP_ACCEPT事件,它只負(fù)責(zé) TCP 創(chuàng)建和連接,不包含讀寫數(shù)據(jù)。

      當(dāng)Acceptor監(jiān)聽到新的連接之后,就會通過調(diào)用socketChannel = serverSocketChannel.accept()拿到SocketChannel,然后把SocketChannel保存在Processor里面的newConnection隊列中。 那么具體保存在哪個Processor中呢?當(dāng)然是輪詢分配了,確保負(fù)載均衡嘛。當(dāng)然每個Processor的newConnection隊列最大只有20,并且是代碼寫死的。如果一個Processor滿了,則會尋找下一個存放,如果所有的都滿了,那么就會阻塞。一個Acceptor的所有Processor最大能夠并發(fā)處理的請求是 20 * num.network.threads。

      Processor會持續(xù)的從自己的newConnection中poll數(shù)據(jù),拿到SocketChannel之后,就把它注冊到自己的Selector中,并且監(jiān)聽事件 OP_READ。 如果newConnection是空的話,poll的超時時間是 300ms。

      監(jiān)聽到有新的事件,比較READ,則會讀取數(shù)據(jù),并且解析成Request, 把Request放入到 RequestChannel中的requestQueue阻塞隊列中。所有的待處理請求都是臨時放在這里面。這個隊列也有最大值queued.max.requests(默認(rèn)500),超過這個大小就會阻塞。

      KafkaRequestHandlerPool中創(chuàng)建了很多(num.io.threads(默認(rèn)8))的KafkaRequestHandler,用來處理Request, 他們都會一直從RequestChannel中的requestQueue隊列中poll新的Request,來進(jìn)行處理。

      處理Request的具體邏輯是KafkaApis里面。當(dāng)Request處理完畢之后,會調(diào)用requestChannel.sendResponse()返回Response。

      當(dāng)然,請求Request和返回Response必須是一一對應(yīng)的, 你這個請求是哪個Processor監(jiān)聽到的,則需要哪個Processor返回, 他們通過id來標(biāo)識。

      Response也不是里面返回的,而是先放到Processor中的ResponseQueue隊列中,然后慢慢返回給客戶端。

      數(shù)據(jù)面板(DataPlane)

      數(shù)據(jù)面板是用來處理 Broker與Broker/Client之間的網(wǎng)絡(luò)模型模塊, 與之相對的是控制器面板

      控制器面板 是專門用于Controller與Broker之間的網(wǎng)絡(luò)通信模塊。

      其實本質(zhì)上他們都是一模一樣的, 但是為了將Controller的通信和普通通信隔離,才有這么兩個概念。

      上面的網(wǎng)絡(luò)通信模型就是以數(shù)據(jù)面板來分析的,因為本質(zhì)是一樣的, 只是有一些配置不一樣。

      那么.數(shù)據(jù)面板 就不詳細(xì)講了,我們主要講下控制器面板的不一樣的地方

      控制器面板(ControllerPlane)

      控制器面板是用來專門處理 Controller相關(guān)請求的獨立通信模塊。

      大家都知道,Controller是一個很重要的角色,基本上大部分協(xié)調(diào)整個集群的相關(guān)請求都跟它有關(guān)系, 比如創(chuàng)建Topic、刪除Topic、分區(qū)副本重分配、等等。他們都很重要

      但是一般情況下數(shù)據(jù)面板的請求很多,如果因為請求過多而導(dǎo)致Controller相關(guān)請求被阻塞不能執(zhí)行,那么可能會造成一些影響, 所以我們可以讓Controller類的請求有一個單獨的通信模塊。

      首先,要啟用控制器面板,必須配置control.plane.listener.name. 并且這個-名稱必須在listeners里面有配置

      否則的話,是不會專用的控制器鏈接的EndPoint的。

      例如:

      Broker配置

      ## 所有的- isteners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094 ## -對應(yīng)的安全協(xié)議 listener.security.protocol.map = INTERNAL: PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL ## 控制器 control.plane.listener.name = CONTROLLER

      在啟動時,代理將開始使用安全協(xié)議“SSL”監(jiān)聽“192.1.1.8:9094”。

      在控制器端,當(dāng)它通過 zookeeper 發(fā)現(xiàn)代理發(fā)布的端點時,它將使用 control.plane.listener.name 找到端點,它將用于建立與代理的連接。

      必須配置control.plane.listener.name 才能使用獨立的控制器面板

      控制器面板的RequestChannel中的requestQueue不是由queued.max.requests控制的,而是寫死的 20. 因為控制類請求不會有那么大的并發(fā)

      跟DataPlane相關(guān)隔離,互不影響。但是連接限流ConnectionQuotas是共享的,限流的時候,兩個是算在一起的

      控制類面板只有一個Acceptor和一個Processor,這個跟數(shù)據(jù)面板的區(qū)別是 DataPlane的Processor可以有多個。

      涉及到的Broker配置有:

      上面我們主要分析了一下, Kafka中的網(wǎng)絡(luò)通信模型, 那么聰明的你應(yīng)該肯定能夠看的出來,它是使用線程模型中的 Reactor模式來實現(xiàn)的。

      線程模型: Reactor模式

      該模塊詳細(xì)請參考Reactor 模型

      Reactor 模式,是指通過一個或多個輸入同時傳遞給服務(wù)處理器的服務(wù)請求的事件驅(qū)動處理模式。

      服務(wù)端程序處理傳入多路請求,并將它們同步分派給請求對應(yīng)的處理線程,Reactor 模式也叫 Dispatcher 模式。

      即 I/O 多路復(fù)用統(tǒng)一監(jiān)聽事件,收到事件后分發(fā)(Dispatch 給某進(jìn)程),是編寫高性能網(wǎng)絡(luò)服務(wù)器的必備技術(shù)之一。

      根據(jù) Reactor 的數(shù)量和處理資源池線程的數(shù)量不同,有 3 種典型的實現(xiàn):

      單 Reactor 單線程;

      單 Reactor 多線程;

      主從 Reactor 多線程。

      我們主要了解一下 主從Reactor 多線程

      針對單 Reactor 多線程模型中,Reactor 在單線程中運(yùn)行,高并發(fā)場景下容易成為性能瓶頸,可以讓 Reactor 在多線程中運(yùn)行。

      方案說明:

      圖解Kafka服務(wù)端網(wǎng)絡(luò)模型

      Reactor 主線程 MainReactor 對象通過 Select 監(jiān)控建立連接事件,收到事件后通過 Acceptor 接收,處理建立連接事件;

      Acceptor 處理建立連接事件后,MainReactor 將連接分配 Reactor 子線程給 SubReactor 進(jìn)行處理;

      SubReactor 將連接加入連接隊列進(jìn)行監(jiān)聽,并創(chuàng)建一個 Handler 用于處理各種連接事件;

      當(dāng)有新的事件發(fā)生時,SubReactor 會調(diào)用連接對應(yīng)的 Handler 進(jìn)行響應(yīng);

      Handler 通過 Read 讀取數(shù)據(jù)后,會分發(fā)給后面的 Worker 線程池進(jìn)行業(yè)務(wù)處理;

      Worker 線程池會分配獨立的線程完成真正的業(yè)務(wù)處理,如何將響應(yīng)結(jié)果發(fā)給 Handler 進(jìn)行處理;

      Handler 收到響應(yīng)結(jié)果后通過 Send 將響應(yīng)結(jié)果返回給 Client。

      更詳細(xì)的介紹可以看 Reactor 模型

      問答

      Kafka的網(wǎng)絡(luò)模型使用了Reactor模式的哪種實現(xiàn)方式?

      單 Reactor 單線程;

      單 Reactor 多線程;

      主從 Reactor 多線程。

      答案: 3 。 使用了主從Reactor多線程的實現(xiàn)方式.

      MainReactor(Acceptor)只負(fù)責(zé)監(jiān)聽OP_ACCEPT事件, 監(jiān)聽到之后把SocketChannel 傳遞給 SubReactor(Processor), 每個Processor都有自己的Selector。SubReactor會監(jiān)聽并處理其他的事件,并最終把具體的請求傳遞給KafkaRequestHandlerPool。

      很典型的主從Reactor多線程模式。

      什么是ControllerPlane(控制器面板),什么是DataPlane(數(shù)據(jù)面板)?

      控制器面板: 主要處理控制器類的的請求

      數(shù)據(jù)面板: 主要處理數(shù)據(jù)類的請求。

      讓他們隔離,互不影響,比如說普通的請求太多,導(dǎo)致了阻塞, 那么Controller相關(guān)的請求也可能被阻塞了,所以讓他們隔離,不會互相影響。

      但是默認(rèn)情況下, ControllerPlane是沒有設(shè)置的,也就是Controller相關(guān)的請求還是走的DataPlane。 想要隔離的話必須設(shè)置control.plane.listener.name .

      必須配置control.plane.listener.name

      控制器面板的RequestChannel中的requestQueue不是由queued.max.requests控制的,而是寫死的 20. 因為控制類請求不會有那么大的并發(fā)

      跟DataPlane相關(guān)隔離,互不影響。但是連接限流ConnectionQuotas是共享的,限流的時候,兩個是算在一起的

      控制類面板只有一個Acceptor和一個Processor,這個跟數(shù)據(jù)面板的區(qū)別是 DataPlane的Processor可以有多個。

      Kafka整個請求流程是什么樣子的

      請看上面網(wǎng)絡(luò)通信總結(jié)部分。

      Kafka 任務(wù)調(diào)度 網(wǎng)絡(luò)

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:51開發(fā)板萬年歷基礎(chǔ)
      下一篇:用管理產(chǎn)品和管理球隊的方式管理員工,讀《奈飛文化手冊》有感
      相關(guān)文章
      亚洲高清无码综合性爱视频| 色九月亚洲综合网| 久久精品国产亚洲Aⅴ香蕉| 日韩亚洲人成在线综合| 亚洲一卡一卡二新区无人区| 亚洲一区二区三区深夜天堂| 亚洲国产精品成人综合色在线婷婷| 久久久亚洲AV波多野结衣| 亚洲人成影院在线| 亚洲资源在线观看| 亚洲人成网站影音先锋播放| 亚洲美女视频网址| 亚洲成AV人综合在线观看| 亚洲成人福利网站| 亚洲宅男精品一区在线观看| 亚洲一级片在线观看| 中文字幕无码亚洲欧洲日韩| 亚洲色欲啪啪久久WWW综合网| 亚洲欧美国产日韩av野草社区| 亚洲人成电影网站免费| 亚洲国产成人无码AV在线| 无码亚洲成a人在线观看| 亚洲精品国产va在线观看蜜芽| 亚洲成人影院在线观看| 久久亚洲中文字幕精品一区四| 国产亚洲精品免费视频播放 | 亚洲综合在线另类色区奇米| 亚洲中文字幕不卡无码| 亚洲AV日韩精品久久久久久| 日本久久久久亚洲中字幕| 亚洲伊人久久大香线蕉| 亚洲日韩亚洲另类激情文学| 色偷偷亚洲男人天堂| 中文字幕精品无码亚洲字| 亚洲AV无码久久寂寞少妇| 亚洲成aⅴ人片在线观| 亚洲大码熟女在线观看| 亚洲一级黄色视频| 无码乱人伦一区二区亚洲| 亚洲国产精品综合一区在线| 亚洲中文字幕乱码熟女在线|