Informer 獲取K8s event 保存至ES

      網友投稿 1247 2025-04-01

      一 背景


      Informer 是 Client-go 中的一個核心工具包。在Kubernetes源碼中,如果 Kubernetes 的某個組件,需要 List/Get Kubernetes 中的 Object,在絕大多 數情況下,會直接使用Informer實例中的Lister()方法(該方法包含 了 Get 和 List 方法),而很少直接請求Kubernetes API。Informer 最基本 的功能就是List/Get Kubernetes中的 Object。

      二 Informer 機制

      整個架構大體分為以下幾個部分:

      2.1 Index

      tools/cache/thread_safe_store.go中,定義了實現了線程安全的存儲接口ThreadSafeStore:

      type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) //傳入indexName和obj,返回所有和obj有相同index key的obj IndexKeys(indexName, indexKey string) ([]string, error) // 傳入indexName和index key,返回index key指定的所有obj key ListIndexFuncValues(name string) []string //獲取indexName對應的index內的所有index key ByIndex(indexName, indexKey string) ([]interface{}, error) //和IndexKeys方法類似,只是返回的是index key指定的所有obj GetIndexers() Indexers //返回目前所有的indexers AddIndexers(newIndexers Indexers) error //存儲數據前調用,添加indexer Resync() error // Resync is a no-op and is deprecated }

      結構體threadSafeMap將資源對象數據存儲于一個內存中的map數據結構中:

      type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} //實際存所有資源對象的地方 indexers Indexers indices Indices }

      每次的增、刪、改、查操作都會都會加鎖,以保證數據的一致性。

      k8s.io/client-go/tools/cache/store.go中,定義了存儲接口Store:

      type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) Replace([]interface{}, string) error Resync() error }

      在tools/cache/index.go中,定義了Indexer接口:

      type Indexer interface { Store // 繼承接口Store Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexedValue string) ([]string, error) ListIndexFuncValues(indexName string) []string ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }

      還定義了一些數據結構:

      type IndexFunc func(obj interface{}) ([]string, error) // 計算資源對象的index key的函數類型,值得注意的是,返回的是多個index key組成的列表 type Indexers map[string]IndexFunc // 計算索引的方法不止一個,通過給他們命名來加以區別,存儲索引名(name)與索引方法(IndexFunc)的映射 type Indices map[string]Index // 索引名(name)與索引(index)的映射 type Index map[string]sets.String // 索引鍵(index key)與值(Obj Key)列表的映射

      它們之間的關系如圖:

      具體實現在tools/cache/store.go中的cache結構體:

      `type` `cache ``struct` `{ `` ``cacheStorage ThreadSafeStore ``//cacheStorage是一個ThreadSafeStore類型的對象,實際使用的是threadSafeMap類型`` ``keyFunc KeyFunc ``//用于計算資源對象的index key``}``type` `KeyFunc ``func``(obj ``interface``{}) (string, error)`

      cache結構體封裝了threadSafeMap的很多方法,對外提供了Add、Update、Delete等方法;Indexer接口中規定需要實現的那些方法都是調用的threadSafeMap的實現

      通過cache.NewIndexer(keyFunc, Indexers)初始化Indexer對象

      keyFunc:k8s內部目前使用的自定義的indexFunc有PodPVCIndexFunc 、indexByPodNodeName 、MetaNamespaceIndexFunc

      默認使用MetaNamespaceKeyFunc:根據資源對象計算出/格式的key,如果資源對象的為空,則作為key

      Indexers:通過NewThreadSafeStore(indexers, Indices{})得到結構體內的cacheStorage

      // 定義一個IndexFunc,功能為:根據Annotations的users字段返回index key func UsersIndexFunc(obj interface{}) ([]string, error){ pod := obj.(*v1.Pod) usersString := pod.Annotations["users"] return strings.Split(usersString, ","), nil } func main() { index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"byUser":UsersIndexFunc}) pod1 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"one",Annotations:map[string]string{"users":"ernie,bert"}}} pod2 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"two",Annotations:map[string]string{"users":"bert,oscar"}}} pod3 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"three",Annotations:map[string]string{"users":"ernie,elmo"}}} //添加3個Pod資源對象 index.Add(pod1) index.Add(pod2) index.Add(pod3) //通過index.ByIndex函數(通過執行索引器函數得到索引結果)查詢byUser索引器下匹配ernie字段的Pod列表 erniePods, err := index.ByIndex("byUser","ernie") if err != nil{ panic(err) } for _, erniePods := range erniePods{ fmt.Println(erniePods.(*v1.Pod).Name) } }

      2.2 DeltaFIFO

      tools/cache/delta_fifo.go中定義了DeltaFIFO。Delta代表變化, FIFO則是先入先出的隊列。

      DeltaFIFO將接受來的資源event,轉化為特定的變化類型,存儲在隊列中,周期性的POP出去,分發到事件處理器,并更新Indexer中的本地緩存。

      DeltaType是string的別名,代表一種變化:

      DeltaType是string的別名,代表一種變化:

      `type` `DeltaType string`

      類型定義:

      `const` `(`` ``Added DeltaType = ``"Added"`` ``Updated DeltaType = ``"Updated"`` ``Deleted DeltaType = ``"Deleted"`` ``Replaced DeltaType = “Replaced” ``// 替換,list出錯時,會觸發relist,此時會替換`` ``Sync DeltaType = “Sync” ``// 周期性的同步,底層會當作一個update類型處理``)`

      Delta由變化類型+資源對象組成:

      type Delta struct { Type DeltaType Object interface{} }

      Deltas是[]delta切片:

      type Deltas []Delta

      DeltaFIFO的定義:

      type DeltaFIFO struct { lock sync.RWMutex //讀寫鎖 cond sync.Cond //條件變量 items map[string]Deltas //通過map數據結構的方式存儲,value存儲的是對象的Deltas數組 queue []string //存儲資源對象的key,該key通過KeyOf(obj)函數計算得到 populated bool //通過Replace()接口將第一批對象放入隊列,或者第一次調用增、刪、改接口時標記為true initialPopulationCount int //通過Replace()接口將第一批對象放入隊列,或者第一次調用增、刪、改接口時標記為true keyFunc KeyFunc knownObjects KeyListerGetter //indexer closed bool closedLock sync.Mutex emitDeltaTypeReplaced bool }

      向隊列里添加元素:

      func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) //獲取obj key if err != nil { return KeyError{obj, err} } //向items中添加delta,并對操作進行去重,目前來看,只有連續兩次操作都是刪除操作的情況下,才可以合并,其他操作不會合并 newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { //向queue和items中添加元素,添加以后,條件變量發出消息,通知可能正在阻塞的POP方法有事件進隊列了 if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else { // 冗余判斷,其實是不會走到這個分支的,去重后的delta list長度怎么也不可能小于1 delete(f.items, id) } return nil }

      從隊列里Pop元素:

      func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // 如果隊列是空的,利用條件變量阻塞住,直到有新的delta if f.IsClosed() { // 如果Close()被調用,則退出 return nil, ErrFIFOClosed } f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // Item may have been deleted subsequently. continue } delete(f.items, id) err := process(item) // 如果處理失敗了,調用addIfNotPresent:如果queue中沒有則添加。本身剛剛從queue和items中取出對象,應該不會存在重復的對象,這里調用addIfNotPresent應該只是為了保險起見 if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } }

      2.3 Reflector

      tools/cache/reflector.go中定義了Reflector:

      type Reflector struct { name string expectedTypeName string //被監控的資源的類型名 expectedType reflect.Type // 監控的對象類型 expectedGVK *schema.GroupVersionKind store Store // 存儲,就是Delta_FIFO,這里的Store類型實際是Delta_FIFO的父類 listerWatcher ListerWatcher // 用來進行list&watch的接口對象 backoffManager wait.BackoffManager resyncPeriod time.Duration //重新同步的周期 ShouldResync func() bool //周期性的判斷是否需要重新同步 clock clock.Clock //時鐘對象,主要是為了給測試留后門,方便修改時間 …… }

      同一類資源Informer共享一個Reflector。Reflector通過ListAndWatch函數來ListAndWatch apiserver來獲取資源的數據。

      獲取時需要基于ResourceVersion(Etcd生成的全局唯一且遞增的資源版本號)。通過此序號,客戶端可以知道目前與服務端信息同步的狀態,每次只取大于等于本地序號的事件。好處是可以實現事件的全局唯一,實現”斷點續傳“功能,不用擔心本地客戶端偶爾出現的網絡異常

      ListAndwatch是k8s統一的異步消息處理機制,保證了消息的實時性、可靠性、順序性、性能等,為聲明式風格的API奠定了良好的基礎,是k8s架構的精髓。

      List在Controller重啟或Watch中斷的情況下,調用資源的list API羅列資源對象以進行全量更新,基于HTTP短鏈接實現

      (1)r.listerWatcher.List用于獲取資源下的所有對象的數據,例如,獲取所有Pod的資源數據。獲取資源數據是由options的ResourceVersion控制的。如果ResourceVersion為0,則表示獲取所有Pod的資源數據;如果ResourceVersion非0,則表示根據資源版本號繼續獲取。

      (2)listMetaInterface.GetResourceVersion用于獲取資源版本號。

      (3)meta.ExtractList用于將資源數據(runtime.Object對象)轉換成資源對象列表([]runtime.Object對象)。

      因為r.listerWatcher.List獲取的是資源下的所有對象的數據,例如所有的Pod資源數據,所以它是一個資源列表。

      (4)r.syncWith用于將資源對象列表中的資源對象和資源版本號存儲至DeltaFIFO中,并會替換已存在的對象。

      (5)r.setLastSyncResourceVersion用于設置最新的資源版本號。

      Watch則在多次List之間進行,調用資源的watch API,基于當前的資源版本號監聽資源變更(如Added、Updated、Deleted)事件。

      通過在Http請求中帶上watch=true,表示采用Http長連接持續監聽apiserver發來的資源變更事件。

      apiserver在response的HTTP Header中設置Transfer-Encoding的值為chunked,表示采用分塊傳輸編碼。每當有事件來臨,返回一個WatchEvent。

      Reflector在獲取新的資源數據后,調用的Add方法將資源對象的Delta記錄存放到本地緩存DeltaFIFO中。

      2.4 Controller

      在tool/cache/controller.go中定義了Controller接口:

      type Controller interface { Run(stopCh <-chan struct{}) HasSynced() bool LastSyncResourceVersion() string }

      controller結構體實現了此接口:

      type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex clock clock.Clock }

      config結構體中是所有配置:

      type Config struct { Queue //DeltaFIFO ListerWatcher Process ProcessFunc //從DeltaFIFO Pop調用時,調用的回調 ObjectType runtime.Object //期待處理的資源對象的類型 FullResyncPeriod time.Duration //全量resync的周期 ShouldResync ShouldResyncFunc //delta fifo周期性同步判斷時使用 RetryOnError bool }

      Controller的processLoop方法會不斷地調用的Pop方法從Delta隊列中消費彈出delta記錄(隊列中沒有數據時阻塞等待數據):

      func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }

      Pop方法須傳入Process函數——用于接收并處理對象的回調方法,默認的Process函數是Informer模塊中的HandleDeltas

      2.5 informer

      Kubernetes的其他組件都是通過client-go的Informer機制與Kubernetes API Server進行通信的。

      Informer也被稱為Shared Informer,它是可以共享使用的。

      在clientgo的informer/factory.go中,有接口定義:

      type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool // 所有已知資源的shared informer Admissionregistration() admissionregistration.Interface Apps() apps.Interface Auditregistration() auditregistration.Interface Autoscaling() autoscaling.Interface Batch() batch.Interface Certificates() certificates.Interface Coordination() coordination.Interface Core() core.Interface Discovery() discovery.Interface Events() events.Interface Extensions() extensions.Interface Flowcontrol() flowcontrol.Interface Networking() networking.Interface Node() node.Interface Policy() policy.Interface Rbac() rbac.Interface Scheduling() scheduling.Interface Settings() settings.Interface Storage() storage.Interface }

      Informer 獲取K8s event 保存至ES

      sharedInformerFactory結構體實現了此接口:

      type sharedInformerFactory struct { client kubernetes.Interface namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration informers map[reflect.Type]cache.SharedIndexInformer startedInformers map[reflect.Type]bool //用于追蹤哪種informer被啟動了,避免同一資源的Informer被實例化多次,運行過多相同的ListAndWatch }

      新建一個sharedInformerFactory結構體:

      sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)

      第1個參數是用于與Kubernetes API Server交互的客戶端;第2個參數用于設置多久進行一次resync(周期性的List操作),如果該參數為0,則禁用resync功能。

      sharedInformerFactory結構體實現了所有已知資源的shared informer,例如在clientgo的informer/core/vi/pod.go中,定義了如下接口:

      type PodInformer interface{   Informer() cache.SharedIndexInformer   Listen() v1.PodLister }

      podInformer結構體實現了Informer方法和Listen方法:

      func (f *podInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) // 如果已經存在同類型的資源Informer,則返回當前Informer,不再繼續添加 } func (f *podInformer) Lister() v1.PodLister { return v1.NewPodLister(f.Informer().GetIndexer()) }

      通過調用sharedInformers.Core().V1().Pods()獲得podInformer結構體

      得到具體Pod資源的informer對象:

      informer := sharedInformers.Core().V1().Pods().Informer()

      最終獲得的,是clientgo/tool/cache/shared_informer.go中的sharedIndexInformer結構體,它實現的接口為:

      type SharedIndexInformer interface { SharedInformer AddIndexers(indexers Indexers) error //啟動informer前為其添加indexers GetIndexer() Indexer }

      它的定義為:

      type sharedIndexInformer struct { indexer Indexer controller Controller processor *sharedProcessor cacheMutationDetector MutationDetector listerWatcher ListerWatcher objectType runtime.Object resyncCheckPeriod time.Duration defaultEventHandlerResyncPeriod time.Duration clock clock.Clock started, stopped bool startedLock sync.Mutex blockDeltas sync.Mutex }

      通過informer.AddEventHandler函數可以為資源對象添加資源事件回調方法,支持3種資源事件回調方法:AddFunc、UpdateFunc、DeleteFunc

      sharedIndexInformer結構體定義了HandleDeltas函數,作為process回調函數(通過Config結構體傳給controller)

      當資源對象的操作類型為Added、Updated、Deleted時,會將該資源對象存儲至Indexer,并通過distribute函數將資源對象分發至用戶自定義的事件處理函數(通過informer.AddEventHandler添加)中

      通過informer.Run(stopCH)運行該informer,它是一個持久化的goroutine,通過clientset對象與apiserver交互。

      它會啟動controller,啟動時傳入的Config結構體包含了

      stopCH用于在程序進程退出前通知Informer退出

      調用Pod的Informer的示例:

      stopCH := make(chan struct{}) defer close(stopCH) sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute) informer := sharedInformers.Core().V1().Pods().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ //為Pod資源添加資源事件回調方法 AddFunc: func(obj interface{}){ mObj := obj.(v1.Object) log.Print("創建新Pod:",mObj.GetName()) }, UpdateFunc: func(oldObj, newObj interface{}){ oObj := oldObj.(v1.Object) nObj := newObj.(v1.Object) log.Print(oObj.GetName(),",",nObj.GetName()) }, DeleteFunc: func(obj interface{}) { mObj :=obj.(v1.Object) log.Print("刪除舊Pod:",mObj.GetName()) }, }) informer.Run(stopCH)

      2.6 Work Queue(選用)

      在開發并行程序時,需要頻繁的進行數據同步,本身golang擁有channel 機制,但不能滿足一些復雜場景的需求。例如:延時隊列、限速隊列。

      client-go中提供了多種隊列以供選擇,可以勝任更多的場景。工作隊列會對存儲的對象進行去重,從而避免多個woker 處理同一個資源的情況。

      用戶可以在回調函數里,將資源對象推送到WorkQueue(或其他隊列)中,也可以直接處理。

      三 本地啟動es

      docker run --name es01 -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" elasticsearch:latest # 使用head客戶端鏈接 docker pull mobz/elasticsearch-head:5 # 啟動header 容器 docker run -d --name my-es_admin -p 9100:9100 mobz/elasticsearch-head:5

      啟動后,es正常,為了更方便操作es,使用head插件來,插件鏈接異常,需要修改es配置,并重啟容器

      # 進入容器 $ docker exec -it es01 /bin/bash # 進入容器后設置參數 # http.cors.enabled: true # http.cors.allow-origin: "*" echo 'http.cors.enabled: true' >> config/elasticsearch.yml echo 'http.cors.allow-origin: "*"' >> config/elasticsearch.yml # 設置完成,退出后重啟容器 docker restart es01

      修改配置重啟后,可以已經可以通過head組件正常鏈接es集群

      ![image-20210927143317543](/Users/xuel/Library/Application Support/typora-user-images/image-20210927143317543.png)

      創建索引:

      三 代碼

      import ( "context" "encoding/json" "fmt" "gopkg.in/olivere/elastic.v5" "k8s.io/api/events/v1beta1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "log" "math/rand" "os" "time" ) var client *elastic.Client var host = "http://127.0.0.1:9200/" //初始化 func init() { errorlog := log.New(os.Stdout, "APP", log.LstdFlags) var err error // //這個地方有個小坑 不加上elastic.SetSniff(false) 會連接不上 client, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetErrorLog(errorlog), elastic.SetURL(host)) if err != nil { panic(err) } info, code, err := client.Ping(host).Do(context.Background()) if err != nil { panic(err) } fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number) esversion, err := client.ElasticsearchVersion(host) if err != nil { panic(err) } fmt.Printf("Elasticsearch version %s\n", esversion) } func mustSuccess(err error) { if err != nil { panic(err) } } func main() { rand.Seed(time.Now().UnixNano()) config, err := clientcmd.BuildConfigFromFlags("", "/Users/xuel/.kube/config") mustSuccess(err) clientset, err := kubernetes.NewForConfig(config) mustSuccess(err) sharedInformers := informers.NewSharedInformerFactory(clientset, 0) stopChan := make(chan struct{}) defer close(stopChan) eventInformer := sharedInformers.Events().V1beta1().Events().Informer() addChan := make(chan v1beta1.Event) deleteChan := make(chan v1beta1.Event) eventInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { unstructObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) mustSuccess(err) event := &v1beta1.Event{} err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj, event) mustSuccess(err) addChan <- *event }, UpdateFunc: func(oldObj, newObj interface{}) { }, DeleteFunc: func(obj interface{}) { }, }, 0) go func() { for { select { case event := <-addChan: str, err := json.Marshal(&event) mustSuccess(err) fmt.Printf("插入k8s事件內容:%s", string(str)) esinsert(str) break case <-deleteChan: break } } }() eventInformer.Run(stopChan) } func esinsert(str []byte) { index := "k8s_informer" dbtype := "doc" put1, err := client.Index(). Index(index). Type(dbtype). Id("1").BodyString(string(str)). Do(context.Background()) if err != nil { fmt.Println("Insert es error: %s", err) } fmt.Println("insert success", put1) }

      四 測試

      插入數據后使用es查詢:

      curl -H "Content-Type: application/json" -XGET 'http://127.0.0.1:9200/k8s_informer/doc/_search?pretty' -d '{"query":{"match_all":{}}}'

      觸發k8s事件,會自動記錄下來

      參考鏈接

      https://www.cnblogs.com/yangyuliufeng/p/13611126.html

      https://www.cnblogs.com/you-men/p/13391265.html

      https://edu.aliyun.com/roadmap/cloudnative

      API Elasticsearch

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:Vue的生命周期
      下一篇:【面試真經】2020年7月 Linux運維面試題(含答案版)
      相關文章
      中文字幕精品亚洲无线码一区| 亚洲综合日韩久久成人AV| 国产亚洲精品精华液| 亚洲精品A在线观看| 日本亚洲中午字幕乱码| 国产精品久久亚洲一区二区| 亚洲爆乳精品无码一区二区| 亚洲精品自偷自拍无码| 亚洲av永久无码一区二区三区| 亚洲欧美第一成人网站7777| 亚洲一本一道一区二区三区| 亚洲欧洲专线一区| 亚洲第一第二第三第四第五第六| 亚洲国产精品美女久久久久| 亚洲狠狠婷婷综合久久蜜芽| 欧美色欧美亚洲另类二区| 在线亚洲v日韩v| 亚洲日韩人妻第一页| 中文字幕亚洲激情| 亚洲精品无码久久久久sm| 精品国产_亚洲人成在线高清| 亚洲国产精品成人久久| 亚洲AV日韩精品久久久久久| 亚洲综合一区二区精品导航| 亚洲综合激情视频| 亚洲人6666成人观看| 亚洲欧美不卡高清在线| 国产精品久久亚洲一区二区| 国产亚洲精品线观看动态图| 亚洲人成精品久久久久| 久久精品亚洲综合| 亚洲欧洲国产成人精品| 亚洲永久在线观看| 亚洲AV永久无码精品一福利| 亚洲国产人成中文幕一级二级| 日韩亚洲变态另类中文| 亚洲AV日韩AV永久无码绿巨人| 亚洲女人影院想要爱| 亚洲色中文字幕在线播放| 国产亚洲精品欧洲在线观看| 国产av无码专区亚洲av果冻传媒 |