Flink狀態(tài)容錯介紹

      網(wǎng)友投稿 1270 2022-05-29

      時間、窗口、狀態(tài)、容錯可以稱為是flink的四大基石,本文將介紹下flink中的狀態(tài)和容錯。

      1. 狀態(tài)

      flink中的計算分為有狀態(tài)和無狀態(tài)兩種,有狀態(tài)算子需要使用內(nèi)置狀態(tài)和流入的事件來計算結(jié)果,本文基于有狀態(tài)計算來講述狀態(tài)機制。

      1.1 狀態(tài)類型

      1.1.1 按鍵值區(qū)分

      根據(jù)key值分區(qū),flink可以分為keyed state 和 operator state兩種。

      對于flink數(shù)據(jù)流的每個key分區(qū),都會有一個keystate與其對應(yīng),鍵值相同的記錄訪問到的是一樣的狀態(tài)。

      對于非keyedstream 與其對應(yīng)的是opeartor state, operator state只與算子的并行實例相關(guān),每個算子實例只持有一部分狀態(tài)數(shù)據(jù),其對應(yīng)的state類型只有一種,為listState。

      flink中的狀態(tài)可以分為managed state(托管狀態(tài))、和raw state 原生狀態(tài),managedState是flink自己管理的state,rawstate需要用戶自己管理。使用byte[]來讀寫狀態(tài)。

      1.1.3 廣播狀態(tài)

      即broadcaststate,廣播至下游所有任務(wù),在本地存儲,broadcaststate必須是mapstate類型。對于dataset api中,廣播在每個計算節(jié)點存儲一份,對于datastream 而言,在每個并行度存在一份,算子的每個任務(wù)的狀態(tài)都相同。同時flink watermark和CheckpointBarrier向下游傳遞時也是基于廣播機制。

      1.2 狀態(tài)后端

      狀態(tài)后端是一種可插拔的機制,按照存儲介質(zhì),狀態(tài)后端分為MemoryStateBackend、FsStateBackend、RocksDBStateBackend三種。狀態(tài)后端主要負責(zé)本地狀態(tài)管理和將狀態(tài)checkpoint到遠程存儲。checkpoint機制將在下節(jié)講解。

      1. memoryStateBackend

      這種形式狀態(tài)存儲在堆內(nèi)存中,狀態(tài)過大可能導(dǎo)致oom問題,checkpoint時快照到j(luò)obmanager內(nèi)存中。

      2. FsStateBackend

      狀態(tài)保存在taskmanager內(nèi)存中,與memoryStateBackend不同的是,checkpoint會把state快照到外部文件系統(tǒng)中,相對memoryStateBackend可用性更高。

      上述兩種狀態(tài)都依賴于heapkeyedstatebackend,使用statetable存儲數(shù)據(jù),statetable有兩種實現(xiàn)

      CopyOnWriteStateTable 支持copy-on-write實現(xiàn),可以支持異步快照,

      另外一種是NestedMapsStateTable,其使用一個雙層的hashmap和單層的hashmap作為狀態(tài)存儲。

      3. RocksDBStateBackend

      rocksdb是一種類似于hbase的kv存儲本地數(shù)據(jù)庫,依賴于lsm實現(xiàn),可以將數(shù)據(jù)保存到本地磁盤上,讀寫狀態(tài)時會涉及到序列化反序列化操作,與內(nèi)存相比,性能會偏低些。但其可以保存比較大的狀態(tài),受限于磁盤大小,但其key value依賴于byte數(shù)組,大小受byte[]限制。在一些與外部系統(tǒng)交互的場景可以適當(dāng)?shù)氖褂胷ocksdb減少依賴外部系統(tǒng)。同時rocksdb后端支持增量checkpoint。

      1.3 算子擴縮容

      有狀態(tài)算子并行度調(diào)整時,同時伴隨著狀態(tài)的重分布。

      1. 對于broadcaststate,并行度改變時,狀態(tài)會發(fā)到新的task上,以確保所有的task狀態(tài)相同。

      2. 對于liststate,并行度改變時,將每個list取出,合并成一個,根據(jù)元素均勻的分配給新的task。

      3. 對于keyedState,flink會將所有的鍵值分為不同的鍵值組,每個鍵值組包含部分鍵值,每個鍵值隸屬于唯一的鍵值組,flink以鍵值為單位將鍵值分配給不同的任務(wù)。`鍵值組分配算法如下:

      /** ?*?Assigns?the?given?key?to?a?key-group?index. ?* ?*?@param?key?the?key?to?assign ?*?@param?maxParallelism?the?maximum?supported?parallelism,?aka?the?number?of?key-groups. ?*?@return?the?key-group?to?which?the?given?key?is?assigned ?*/ public?static?int?assignToKeyGroup(Object?key,?int?maxParallelism)?{ ???Preconditions.checkNotNull(key,?"Assigned?key?must?not?be?null!"); ???return?computeKeyGroupForKeyHash(key.hashCode(),?maxParallelism); } /** ?*?Assigns?the?given?key?to?a?key-group?index. ?* ?*?@param?keyHash?the?hash?of?the?key?to?assign ?*?@param?maxParallelism?the?maximum?supported?parallelism,?aka?the?number?of?key-groups. ?*?@return?the?key-group?to?which?the?given?key?is?assigned ?*/ public?static?int?computeKeyGroupForKeyHash(int?keyHash,?int?maxParallelism)?{ ???return?MathUtils.murmurHash(keyHash)?%?maxParallelism; }

      該圖展現(xiàn)了keyedState的擴縮容流程

      1.4 持久化策略

      heapsnapshotStrategy策略對應(yīng)heapKeyedStateBackend,rocksdbStateBackend支持rocksfullSnapshotStratey和rocksIncementalSnapshotStrategy全量持久化策略和增量持久化策略。

      1. 全量持久化

      全量持久化策略每次講全量的state寫到狀態(tài)存儲中,在執(zhí)行持久化策略時,使用異步的方式,后臺啟動一個線程去做持久化工作,基于memory的狀態(tài)后端會使用CopyOnWriteStateTable來保證線程安全,基于rocksdb的則使用rocksdb的快照機制來確保線程安全。

      2. 增量持久化

      只有rocksdb后端支持增量持久化,rocksdb基于lsm-tree實現(xiàn),

      基于lsm-tree的memtable、sstable、以及Compaction來實現(xiàn)增量更新。

      1.5 基于max算子講解managedState

      會有同學(xué)有疑問,我沒看到狀態(tài),狀態(tài)在哪里被使用,下面基于max算子講下managedstate的使用流程。

      StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();?????? ?????????env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args)); ????????env.fromElements(WORDS)?? ??????????.flatMap(new?FlatMapFunction>()?{????????})? ???????????.keyBy(0)????datastream?以key劃分成keyedStream,相同的key流入一個分區(qū) ???????????.max(1)???max為有狀態(tài)算子,每個key上都有一個對應(yīng)的狀態(tài),每來一條數(shù)據(jù)的時候,都會取出取出狀態(tài)和當(dāng)前數(shù)據(jù)比較,并更新放回狀態(tài)。是個???????????reducefunction ???????????.print();

      下面可以看下max的具體實現(xiàn)

      public?SingleOutputStreamOperator?maxBy(int?positionToMaxBy,?boolean?first)?{ ???return?aggregate(new?ComparableAggregator<>(positionToMaxBy,?getType(),?AggregationFunction.AggregationType.MAXBY,?first,??//?重置compare方法 ?????????getExecutionConfig())); } protected?SingleOutputStreamOperator?aggregate(AggregationFunction?aggregate)?{ ???StreamGroupedReduce?operator?=?new?StreamGroupedReduce( ?????????clean(aggregate),?getType().createSerializer(getExecutionConfig())); ???return?transform("Keyed?Aggregation",?getType(),?operator); }/ //?StreamGroupedReduce??繼承于AbstractUdfStreamOperator?--》?AbstractStreamOperator?--》?StreamOperator算子體系 public?class?StreamGroupedReduce?extends?AbstractUdfStreamOperator> ??????implements?OneInputStreamOperator?{ ? ???private?transient?ValueState?values; ?public?StreamGroupedReduce(ReduceFunction?reducer,?TypeSerializer?serializer)?{ ??????super(reducer);????//?傳入reduce函數(shù)即上文重寫compare方法后的?ComparableAggregator?extends?reducefunction ??????this.serializer?=?serializer; ???} ???@Override ???public?void?open()?throws?Exception?{ ????ValueStateDescriptor?stateId?=?new?ValueStateDescriptor<>(STATE_NAME,?serializer);??//?初始化?valuestate?描述符 ??????values?=?getPartitionedState(stateId);?//?獲取當(dāng)前key對應(yīng)的valuestate ???} ???@Override ???public?void?processElement(StreamRecord?element)?throws?Exception?{ ??????IN?value?=?element.getValue();??//?拿到當(dāng)前流數(shù)據(jù) ??????IN?currentValue?=?values.value();??//?取出狀態(tài) ??????if?(currentValue?!=?null)?{ ?????????IN?reduced?=?userFunction.reduce(currentValue,?value); ?????????values.update(reduced);?//?更新狀態(tài) ?????????output.collect(element.replace(reduced)); ??????}?else?{ ?????????values.update(value);??//?第一次接受流數(shù)據(jù) ?????????output.collect(element.replace(value)); ??????} ???}

      1.6 實現(xiàn)有狀態(tài)函數(shù)

      1.用戶可以使用flink自帶的richfunction操作狀態(tài)。richfunction可以通過getRunctionContext獲取RuntimeContext對象,以此操作狀態(tài)。首先注冊一個狀態(tài)描述符,包含狀態(tài)名稱和類型,狀態(tài)名稱的作用域是整個算子,可以在算子中注冊多個狀態(tài)描述符來創(chuàng)建多個狀態(tài)對象。狀態(tài)引用通常在richfunction中的open方法中初始化。

      2. 處理函數(shù) ProcessFunction,不過其也是繼承與richfunction體系。

      3. 通過實現(xiàn)ListCheckpointed和實現(xiàn)CheckpointedFunction接口

      Flink狀態(tài)與容錯介紹

      public?interface?ListCheckpointed?{ //?生成檢查點時調(diào)用,返回列表狀態(tài)快照 List?snapshotState(long?checkpointId,?long?timestamp)?throws?Exception; //?初始化函數(shù)時調(diào)用,作業(yè)啟動或者故障恢復(fù)時使用, ?void?restoreState(List?state)?throws?Exception; }

      snapshotState可以將算子狀態(tài)拆分成多個部分,restoreState可以對多個子狀態(tài)進行組裝,算子進行狀態(tài)恢復(fù)時,flink會將狀態(tài)的各個部分分發(fā)到相關(guān)并行實例上。

      public?interface?CheckpointedFunction?{ ???/** ????*?This?method?is?called?when?a?snapshot?for?a?checkpoint?is?requested.?This?acts?as?a?hook?to?the?function?to ????*?ensure?that?all?state?is?exposed?by?means?previously?offered?through?{@link?FunctionInitializationContext}?when ????*?the?Function?was?initialized,?or?offered?now?by?{@link?FunctionSnapshotContext}?itself. ?????*/??生成檢查點時被調(diào)用 ???void?snapshotState(FunctionSnapshotContext?context)?throws?Exception; ???/** ????*?This?method?is?called?when?the?parallel?function?instance?is?created?during?distributed ????*?execution.?Functions?typically?set?up?their?state?storing?data?structures?in?this?method. ????*/??任務(wù)啟動或故障重啟時觸發(fā) ???void?initializeState(FunctionInitializationContext?context)?throws?Exception; }

      checkpointedFunction 可使用列表狀態(tài)和鍵值分區(qū)狀態(tài)。

      1.7 狀態(tài)過期

      2. 容錯機制

      流式系統(tǒng)通常被設(shè)計用來7 * 24 運行,這樣就要確保作業(yè)能夠從失敗中恢復(fù),失敗時,系統(tǒng)需要重啟任務(wù),并且恢復(fù)狀態(tài)。

      2.1 容錯語義

      1. at more once

      至多一次

      2. at least once

      至少一次

      3. exactly once

      flink內(nèi)精確一次

      4. end to end exactly once

      端到端精確一次會關(guān)注三個問題,sources、shuffle、sinks間的exactly once。

      How Dataflow guarantees that every source record is processed exactly once

      How Dataflow guarantees that every record is shuffled exactly once

      How Dataflow guarantees that every sink produces accurate output.

      本文中的內(nèi)容將回答這三個問題。

      2.2 組件容錯

      對于flink這種分布式組件,每個組件失敗都會導(dǎo)致作業(yè)失敗,下文將基于jobmaster、taskmanager、resourcemanager三種角色來講解容錯,這三種組件通過akka進行網(wǎng)絡(luò)通信,保持心跳。故障檢測需要使用心跳的方式,故障恢復(fù)需要狀態(tài)快照和恢復(fù)。重要組件使用至少兩個的HA機制,一個leader和若干個standby。

      2.2.1 jobmaster容錯

      jobmaster在flink中負責(zé)應(yīng)用程序執(zhí)行調(diào)度和管理,將客戶端提交過來的jobGraph轉(zhuǎn)化為executionGraph,向resourcemanager申請資源,并將executionGraph的任務(wù)分發(fā)給taskmanager執(zhí)行,在執(zhí)行過程中,jm還會觸發(fā)checkpoint執(zhí)行,通常一個leader,一個standby。

      jm失敗時,所有的task都會被自動取消,首先會從zk中取到j(luò)obgraph、jar、最近的checkpoint地址,向resourcemanager去申請資源重啟任務(wù),任務(wù)重啟后將從最近的一次checkpoint狀態(tài)恢復(fù)task。 如圖所示:

      private?JobGraph?jobGraph; private?final?HighAvailabilityServices?highAvailabilityServices; //?心跳管理 ivate?final?HeartbeatManager?taskManagerHeartbeatManager; private?final?HeartbeatManager?resourceManagerHeartbeatManager; //?slot資源池 private?final?SlotPool?slotPool;??? /**?Managers?for?the?different?slot?sharing?groups.?*/ protected?final?Map?slotSharingManagers; private?final?SchedulingStrategy?schedulingStrategy; //重啟策略 private?final?RestartStrategy?restartStrategy; //?---------?BackPressure?--------?反壓追蹤 private?final?BackPressureStatsTracker?backPressureStatsTracker; //?---------?ResourceManager?-------- private?final?LeaderRetrievalService?resourceManagerLeaderRetriever; //?---------?TaskManagers?-------- private?final?Map>?registeredTaskManagers; //?--------?Mutable?fields?--------- private?ExecutionGraph?executionGraph;

      2.2.2 taskmanager容錯

      taskmanager在flink中的工作進程,是flink處理資源單元,同時是checkpoint的執(zhí)行者,每個taskmanager有1到多個slot,tm啟動后,會向rm注冊它的處理槽,jm就可以像slot分配任務(wù)來執(zhí)行任務(wù),task在tm中執(zhí)行,tm負責(zé)task的啟動、執(zhí)行、取消,task異常時會向jm匯報,tm為task數(shù)據(jù)交換提供依托,如同一個tm內(nèi)不同線程間的通過緩沖區(qū)的數(shù)據(jù)交換,不同tm的線程間通過網(wǎng)絡(luò)傳輸機制進行數(shù)據(jù)交換并提供反壓機制。執(zhí)行期間如果一個taskmanager失敗了,jobmaster將向resourcemanager申請可用的slot,有足夠的slot后,這個taskmanager將會重啟。重啟策略分為三種,fixed-delay、failure-rate、no-restart。

      /**?The?access?to?the?leader?election?and?retrieval?services.?*/ private?final?HighAvailabilityServices?haServices; /**?The?heartbeat?manager?for?job?manager?in?the?task?manager.?*/ private?final?HeartbeatManager?jobManagerHeartbeatManager; /**?The?heartbeat?manager?for?resource?manager?in?the?task?manager.?*/ private?final?HeartbeatManager?resourceManagerHeartbeatManager; //?slot清單 private?final?TaskSlotTable?taskSlotTable;

      2.2.3 resourcemanager容錯

      resourcemanager在flink中是一個可插拔組件,對于不用的環(huán)境有不同的實現(xiàn),如對用k8s 和 yarn 有對應(yīng)的KubernetesResourceManager和YarnResourceManager等。其負責(zé)資源的管理,同時和tm、jm保持心跳。rm中持有一個slotmanager組件,負責(zé)維護當(dāng)前有多少tm,各個slot的使用情況。 rm發(fā)生故障時,jm會通過leader選舉通知得到新的rm,并重新嘗試和新的rm建立連接。tm也會將自己的slot注冊到新的rm。

      //?高可用管理 /**?High?availability?services?for?leader?retrieval?and?election.?*/ private?final?HighAvailabilityServices?highAvailabilityServices; //?心跳管理 /**?The?heartbeat?manager?with?task?managers.?*/ private?final?HeartbeatManager?taskManagerHeartbeatManager; /**?The?heartbeat?manager?with?job?managers.?*/ private?final?HeartbeatManager?jobManagerHeartbeatManager; //?slot管理器 /**?The?slot?manager?maintains?the?available?slots.?*/ private?final?SlotManager?slotManager;

      2.3 checkpoint

      flink的checkpoint機制是輕量級異步分布式快照,那它是如何做到它的名字中說的異步分布式的呢,又是如何做到at least once 和 exactly once的呢?

      2.3.1 分布式快照

      分布式是按算子將數(shù)據(jù)流切分,flink使用Chandy-Lamport分布式快照算法來生成檢查點,將生成檢查點的過程與處理過程分離。流計算計算的正確性語義需要依賴state + checkpoint + connector特性來支持。flink jm中使用CheckpointCoordinator周期性的以廣播的形式發(fā)送barrier到數(shù)據(jù)流中,每個barrier都有一個檢查點編號,用來從邏輯上切分數(shù)據(jù)流,barrier在流中的位置不會提前或延后,隨數(shù)據(jù)流鄉(xiāng)下流動,在數(shù)據(jù)源注入barrier后,觸發(fā)向狀態(tài)后端生成檢查點,barrier所在位置就是恢復(fù)數(shù)據(jù)時的起始位置。下游算子收齊barrier后,會執(zhí)行自己的算子state快照,并向下游廣播barrier,直至下游sink算子收集齊barrier后,對自己的state執(zhí)行快照,完成快照后,并通知CheckpointCoordinator快照地址,CheckpointCoordinator收集齊barriar確認信息后,確認本次快照完成。

      2.3.2 at least once

      對齊barrier時,早來的數(shù)據(jù)持續(xù)處理就是at least once。該數(shù)據(jù)依舊被包含在checkpoint備份的狀態(tài)之中,當(dāng)故障發(fā)生時,從備份的狀態(tài)恢復(fù)時,該數(shù)據(jù)依舊會被處理。

      2.3.3 exactly once

      對齊barrier時,早來的數(shù)據(jù)進行buffer就是flink引擎內(nèi)exectly once。就是對齊環(huán)節(jié),將數(shù)據(jù)收集緩存起來,對齊完成之后再處理。但只能保證flink引擎內(nèi)的exactly once 語義。

      2.3.4 end to end exactly once

      如果要做到end to end exactly once 語義,需要Source端和sink端的配合,

      1. 數(shù)據(jù)源需要支持斷點讀取

      2. sink端需要支持回滾機制或滿足冪等性。回滾--將部分寫入結(jié)果回滾到寫入之前的狀態(tài),冪等-- 多次寫入一致

      源端和目的端支持的語義如下:

      flink中采用的是2pc解決方案,即 two phase commit 兩階段提交,所謂的兩個階段是指:第一階段:準備階段(投票階段)和第二階段:提交階段(執(zhí)行階段)。我們將提議的節(jié)點稱為協(xié)調(diào)者(coordinator),其他參與決議節(jié)點稱為參與者(participants)。另還有3pc的方式。

      sink算子收到上游所有的barrier后,執(zhí)行state快照,并預(yù)提交事務(wù),再通知CheckpointCoordinator,CheckpointCoordinator確認本次快照完成, sink算子提交事務(wù)。

      flink提供了一個2pc的TwoPhaseCommitSinkFunction的抽象 類

      public?abstract?class?TwoPhaseCommitSinkFunction ??????extends?RichSinkFunction ??????implements?CheckpointedFunction,?CheckpointListener?{??//?繼承于checkpointedfunction?和?checkpointlistener?和richfunction /** ?*?Method?that?starts?a?new?transaction.??開啟事務(wù)?創(chuàng)建臨時文件 ?*/ protected?abstract?TXN?beginTransaction()?throws?Exception; /** ?*?Pre?commit?previously?created?transaction.?Pre?commit?must?make?all?of?the?necessary?steps?to?prepare?the ?*?transaction?for?a?commit?that?might?happen?in?the?future.?After?this?point?the?transaction?might?still?be ?*?aborted,?but?underlying?implementation?must?ensure?that?commit?calls?on?already?pre?committed?transactions ?*?will?always?succeed. ?*??預(yù)提交階段??寫到臨時文件,?關(guān)閉文件,同時開啟新事務(wù),執(zhí)行屬于下一個檢查點的寫入操作。 */ protected?abstract?void?preCommit(TXN?transaction)?throws?Exception; /** ?*?Commit?a?pre-committed?transaction.?If?this?method?fail,?Flink?application?will?be ?*?restarted?and?{@link?TwoPhaseCommitSinkFunction#recoverAndCommit(Object)}?will?be?called?again?for?the ?*?same?transaction.???提交階段,原子操作上一階段的文件,寫入真正的文件下,?提交失敗,?重啟嘗試恢復(fù)并重新提交事務(wù),恢復(fù)時冪等提交。 ?*/ protected?abstract?void?commit(TXN?transaction); *?Abort?a?transaction.???終止事務(wù),?刪除臨時文件 ?*/ protected?abstract?void?abort(TXN?transaction); }

      TwoPhaseCommitSinkFunction 繼承了checkpointfunction接口,可以在預(yù)提交階段,將檢查點寫到可靠性存儲。其又繼承了checkpointlistener接口,在提交階段接收jm的確認通知,觸發(fā)提交外部事務(wù)。

      2.4 savepoint

      savepoint和checkpoint都是一致性快照,checkpoint由flink自動創(chuàng)建,故障發(fā)生時會自動加載恢復(fù),flink根據(jù)配置自動刪除,而保存點需要用戶手動觸發(fā),不會被自動刪除,啟動時,從savepoint啟動即可。savepoint主要用于作業(yè)遷移和集群升級等場景中。

      bin/flink/?savepoint?jobId?savepointpath? bin/flin/?run?-s?savepointpath?...

      3。 后記

      后續(xù)將更多的投向flink sql,so,上云就上華為云,流計算服務(wù)推薦選擇華為云EI 數(shù)據(jù)湖探索 DLI-FLINK serverless云服務(wù)。

      參考:

      https://ci.apache.org/projects/flink/flink-docs-master/

      https://github.com/apache/flink/tree/blink

      https://www.oreilly.com/library/view/stream-processing-with/9781491974285/

      分布式 任務(wù)調(diào)度

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:Three.js案例分析系列1--webgl_animation_cloth 草坪上漂浮的白布
      下一篇:關(guān)于Java基礎(chǔ)你不得不會的34個問題
      相關(guān)文章
      亚洲 暴爽 AV人人爽日日碰| 亚洲视频一区二区在线观看| 亚洲中文字幕无码一去台湾| 亚洲欧洲视频在线观看| 久久久无码精品亚洲日韩按摩| 亚洲国产精品无码AAA片| 亚洲成AV人片在线观看| 亚洲国产精品乱码一区二区 | 国产亚洲精AA在线观看SEE| 狠狠亚洲狠狠欧洲2019| 亚洲小说区图片区另类春色| 亚洲五月综合缴情在线观看| 亚洲无线观看国产精品| 国产l精品国产亚洲区在线观看| 精品亚洲综合久久中文字幕| 久久亚洲综合色一区二区三区 | 亚洲国产精品一区二区成人片国内| 亚洲日本乱码在线观看| 国产成人A人亚洲精品无码| 亚洲AV无码乱码在线观看裸奔| 久久亚洲国产视频| 久久精品a亚洲国产v高清不卡| 亚洲黄色免费观看| 久久精品国产亚洲αv忘忧草| 国产日本亚洲一区二区三区| 亚洲欧美不卡高清在线| 毛片亚洲AV无码精品国产午夜| 亚洲精品tv久久久久| 国产亚洲av片在线观看18女人| 亚洲精品无码久久久久去q| 亚洲va在线va天堂va不卡下载| 亚洲人成电影在线天堂| 亚洲图片中文字幕| 亚洲午夜无码久久久久软件| 美国毛片亚洲社区在线观看 | 久久久久亚洲av毛片大| 亚洲国产精品成人精品无码区| 亚洲男人天堂2017| 亚洲avav天堂av在线网爱情| 亚洲国产精品无码久久久秋霞1| 精品国产亚洲第一区二区三区 |