分布式消息中間件實踐讀書筆記

      網友投稿 890 2025-03-31

      寫在前面

      聽人家總說這些,之前項目里也沒有用到,其實挺好奇的,之前也沒時間看。這里準備用這本書入門,查了一下,書評很一般,但是入門應該夠了,而且200多頁,正合適,生活加油

      筆記主要是書的摘寫,不懂的地方寫一些自己的理解。

      嗯,看了一些發現東西挺多,是我唐突了…,不簡單。

      筆記還在更新中。

      夫人之相與,俯仰一世,或取諸懷抱,悟言一室之內;或因寄所托,放浪形骸之外--------《蘭亭集序》——東晉·王羲之

      第1章消息隊列

      1.1 系統間通信技術介紹

      如果是一個業務被拆分成多個子業務部署在不同的服務器上,那就是分布式應用;

      如果是同一個業務部署在多臺服務器上,那就是集群。

      分布式應用的子系統之間并不是完全獨立的,它們需要相互通信來共同完成某個功能,這就涉及系統間通信了。

      目前,業界通常有兩種方式來實現系統間通信,

      基于遠程過程調用的方式(RPC調用);

      基于消息隊列的方式。

      RPC是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。

      它是協議,是一種規范,就需要有遵循這套規范的實現。典型的RPC實現包括Dubbo、Thrift、?GRPC等。

      網絡通信的實現是透明的,調用方不需要關心網絡之間的通信協議、網絡IO模型、通信的信息格式等。

      跨語言,調用方實際上并不清楚對端服務器使用的是什么程序語言。對于調用方來說,無論其使用的是何種程序語言,調用都應該成功,并且返回值也應按照調用方程序語言能理解的形式來描述。

      基于消息隊列的方式是指由應用中的某個系統負責發送信息,由關心這條消息的相應系統負責接收消息,并在收到消息后進行各自系統內的業務處理。消息可以非常簡單,比如只包含文本字符串;也可以很復雜,比如包含字節流、字節數組,還可能包含嵌入對象,甚至是Java·對象(經過序列化的對象)。

      1.2 為何要使用消息隊列

      消息隊列的典型場景-異步處理,解耦、流量削峰、日志收集、事務最終一致性等問題。

      異步處理:這個可以結合AJAX理解,比如AJAX通過異步請求,這個異步請求是可以隨時發出的,那么在Tomcat里就有相應的工作隊列來存儲請求信息。

      模塊解耦:隨著需求的疊加,各模塊之間逐漸變成了相互調用的關系,這種模塊間緊密關聯的關系就是緊耦合。緊耦合帶來的問題是對一個模塊的功能變更將導致其關聯模塊發生變化,因此各個模塊難以獨立演化。要解決這個問題,可以在模塊之間調用時增加一個中間層來實現解耦,這也方便了以后的擴展。所謂解耦,簡單地講,就是一個模塊只關心自己的核心流程,而依賴該模塊執行結果的其他模塊如果做的不是很重要的事情,有通知即可,無須等待結果。換句話說,基于消息隊列的模型,關心的是通知,而非處理。

      流量削峰?某一時刻網站突然迎來用戶請求高峰期的情況,如果在設計上考慮不周甚至會發生雪崩(在分布式系統中,經常會出現某個基礎服務不可用造成整個系統不可用的情況,這種現象被稱為“服務雪崩效應”),從而發生整個系統不可用的嚴重生產事故。當訪問量劇增時系統依然可以繼續使用,該怎么做呢?首先想到的是購買更多的服務器進行擴展,以增強系統處理并發請求的能力。如果都以能處理此類流量峰值為標準投入大量資源隨時待命無疑是很大的浪費。在業界的諸多實踐中,常見的是使用消息隊列,先將短時間高并發的請求持久化,然后逐步處理,從而削平高峰期的并發流量,改善系統的性能

      日志收集?利用消息隊列產品在接收和持久化消息方面的高性能,引入消息隊列快速接收日志消息,避免因為寫入日志時的某些故障導致業務系統訪問阻塞、請求延遲等。所以很多公司會選擇構建一個日志收集系統,由它來統一收集業務日志數據,供離線和在線的分析系統使用。

      事務最終一致性?:業界曾經提出過一個處理分布式事務的規范-XAXA主要定義了全局事務管理器(Transaction Manager)和局部資源管理器(Resource Manager)之間的接口.XA接口是雙向的系統接口,在事務管理器及一個或多個資源管理器之間形成通信橋梁。XA引入的事務管理器充當全局事務中的協調者的角色。事務管理器控制著全局事務,管理事務生命周期,并協調資源。資源管理器負責控制和管理實際資源(如數據庫或JMS隊列)。目前各主流數據庫都提供了對XA規范的支持。XA所以它的最大缺陷是性能很差,因此并不適合在生產環境下有高并發和高性能要求的場景。在業界的很多實踐方案中,都可以借助消息隊列來處理此問題。

      1.3 消息隊列的功能特點

      一個典型意義上的消息隊列,至少需要包含消息的發送、接收和暫存功能

      在生產環境應用中,對消息隊列的要求遠不止基本的消息發送、接收和暫存。在不同的業務場景中,需要消息隊列產品能解決諸如消息堆積、消息持久化、可靠投遞、消息重復、嚴格有序、集群等各種問題。

      消息堆積:消息在處理中心逐漸積壓而得不到釋放。比如給消息隊列設置一個閾值,將超過閾值的消息不再放入處理中心,以防止系統資源被耗盡,導致機器掛掉甚至整個消息隊列不可用。

      消息持久化: 將消息放在內存中存在的最大問題是,一旦機器宿掉消息將丟失。持久化方案有很多種,比如將消息存到本地文件、分布式文件系統、數據庫系統中等。

      可靠投遞:可靠投遞是不允許存在消息丟失的情況的。從消息的整個生命周期來分析,消息丟失的情況一般發生在如下過程中:

      從生產者到消息處理中心。

      從消息處理中心到消息消費者。

      消息處理中心持久化消息。

      消息重復:為了支持消息可靠投遞,當消息發送失敗或者不知道是否發送成功時(比如超時),消息的狀態是待發送,定時任務不停地輪詢所有的待發送消息,最終保證消息不會丟失,這就帶來了消息可能會重復的問題。

      嚴格有序?: 在實際的業務場景中,經常會碰到需要按生產消息時的順序來消費的情形。需要消息隊列能夠提供有序消息的保證。但順序消費卻不一定需要消息在整個產品中全局有序,有的產品可能只需要提供局部有序的保證。

      集群:系統架構一般都需要實現高可用性,以排除單點故障引起的服務中斷,保證7x24小時不間斷運行,所以可能需要消息隊列產品提供對集群模式的支持。集群不僅可以讓消費者和生產者在某個節點崩潰的情況下繼續運行,集群之間的多個節點還能夠共享負載,當某臺機器或網絡出現故障時能自動進行負載均衡,而且可以通過增加更多的節點來提高消息通信的吞吐量。

      消息中間件:消息中間件關注于數據的發送和接收,利用高效、可靠的異步消息傳遞機制集成分布式系統。消息傳輸中間件(MOM)簡化了應用之間數據的傳輸,屏蔽了底層的異構操作系統和網絡平臺,提供了一致的通信和應用開發標準,確保在分布式計算網絡環境下可靠、跨平臺的信息傳輸和數據交換。它基于消息隊列的存儲-轉發機制,并提供了特有的異步傳輸機制,能夠基于消息傳輸和異步事務處理實現應用整合與數據交換。

      中間件:非底層操作系統軟件、非業務應用軟件,不是直接給最終用戶使用的,不能直接給客戶帶來價值的軟件統稱為中間件。

      1.4 設計一個簡單的消息中間件

      1.4.1 消息處理中心

      實現一個消息隊列。代碼詳見https://gitee.com/liruilonger/workspack/tree/master/src/com/msg_queue/jkd

      package com.msg_queue.jkd; import java.util.concurrent.ArrayBlockingQueue; /** * @Classname Broker * @Description TODO 消息處理中心類( Broker ) * @Date 2021/7/5 0:37 * @Created Li Ruilong */ public class Broker { ///隊列存儲消息的最大數量 private final static int MAX_SIZE = 3; //保存消息數據的容 private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue<>(MAX_SIZE); // 生產消息 public static void produce(String msg){ if (messageQueue.offer(msg)){ System.out.printf("投遞成功%s,當前暫存消息數量%d",msg,messageQueue.size()); }else { System.out.println("消息處理中心內暫存的消息達到最大負荷,不能放入消息"); } } //消費消息 public static String consume(){ // String msg = messageQueue.poll(); if (msg != null){ System.out.printf("已經消費消息 %s,當前暫存消息數量 %s",msg,messageQueue.size()); }else { System.out.println("消息處理中心沒有消息可供消費"); } return msg; } }

      /** * @Classname BrokerServer * @Description TODO 定義了BrokerServer類用來對外提供Broker類的服務。 * @Date 2021/7/5 12:32 * @Created Li Ruilong */ public class BrokerServer implements Runnable{ public static int SERVICE_PORT = 9999; private final Socket socket; //該Socket是由客戶端請求的得到的Socket實例。 public BrokerServer(Socket socket) { this.socket = socket; } @Override public void run() { try( // 拿到輸入流 BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 拿到輸出流 PrintWriter out = new PrintWriter(socket.getOutputStream()) ){ while (true){ // 拿到輸入的數據 String str = in.readLine(); if (str == null){ continue; } System.out.println("接受原始消息"+str); // CONSUME 表示消費一條消息 if ("CONSUME".equals(str)){ // 消費消息 String s = Broker.consume(); out.println(s); out.flush(); }else { // 其他情況表示生產消息放到消息隊列里面 Broker.produce(str); } } }catch (Exception e){ e.printStackTrace(); } } public static void main(String[] args) throws Exception { // 服務端套接字,監聽9999 端口 ServerSocket serverSocket = new ServerSocket(SERVICE_PORT); while (true){ // todo serverSocket.accept()接受客戶端Socket連接請求,并返回一個與客戶端Socket對應的Socket實例,該方法是一個阻塞方法, // 如果沒有接受到客戶端發送的Socket,則一直處于等待狀態,線程也會被阻塞。 BrokerServer server = new BrokerServer(serverSocket.accept()); new Thread(server).start(); } } }

      1.4.2 客戶端訪問

      /** * @Classname MqClient * @Description TODO 客戶端 * @Date 2021/7/5 13:36 * @Created Li Ruilong */ public class MqClient { /* * @return * @Description 消息生產者 * @author Liruilong * @date 2021/7/5 14:06 **/ public static void produce(String message) throws Exception { // 利用Socket模擬發送消息的一方。 Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT); try ( PrintWriter out = new PrintWriter(socket.getOutputStream()) ){ out.println(message); out.flush(); } } /* * @return * @Description 消息消費者 * @author Liruilong * @date 2021/7/5 15:20 **/ public static String consume() throws Exception{ // 利用Socket模擬消費消息的一方。 Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT); try ( BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream()); ){ out.println("CONSUME"); out.flush(); String message = in.readLine(); return message; } } }

      生產消息

      /** * @Classname ProduceClient * @Description TODO 生產消息 * @Date 2021/7/5 14:36 * @Created Li Ruilong */ public class ProduceClient { public static void main(String[] args) throws Exception { MqClient client = new MqClient(); client.produce("Hello Word"); } }

      消費消息

      /** * @Classname ConsumeClient * @Description TODO 消費消息 * @Date 2021/7/5 14:39 * @Created Li Ruilong */ public class ConsumeClient { public static void main(String[] args) throws Exception { MqClient mqClient = new MqClient(); String consume = mqClient.consume(); System.out.println("獲取的消息為:"+consume); } }

      第2章消息協議

      類似于上免得CONSUME,用與區分連接Socket的是生產消息的客戶端還是消費消息的客戶端。

      消息協議則是指用于實現消息隊列功能時所涉及的協議。按照是否向行業開放消息規范文檔,可以將消息協議分為開放協議和私有協議。

      常見協議有AMOP, MQTT, STOMP,XMPP等。有些特殊框架(如Redis, Kafka, ZeroMQ)根據自身需要未嚴格遵循MQ規范,而是基于TCP/IP自行封裝了一套協議,通過網絡Socket接口進行傳輸,實現了MQ的功能。

      這里的協議可以簡單地理解成對雙方通信的一個約定.

      2.1 AMQP

      在2004年,摩根大通和iMatrix開始著手Advanced Message Queuing Protocol (AMQP)開放標準的開發。2006年,發布了AMQP規范。目前AMQP協議的版本為1.0。

      一般來說,將AMQP協議的內容分為三部分:基本概念、功能命令和傳輸層協議。

      基本概念是指AMQP內部定義的各組件及組件的功能說明。

      功能命令是指該協議所定義的一系列命令,應用程序可以基于這些命令來實現相應的功能。

      傳輸層協議是一個網絡級協議,它定義了數據的傳輸格式,消息隊列的客戶端可以基于這個協議與消息代理和AMQP的相關模型進行交互通信,該協議的內容包括數據幀處理、信道復用、內容編碼、心跳檢測、數據表示和錯誤處理等。

      主要概念

      Message (消息):消息服務器所處理數據的原子單元。消息可以攜帶內容,從格式上看,消息包括一個內容頭、一組屬性和一個內容體。這里所說的消息可以對應到許多不同應用程序的實體,比如一個應用程序級消息、一個傳輸文件、一個數據流幀等。消息可以被保存到磁盤上,這樣即使發生嚴重的網絡故障、服務器崩潰也可確保投遞消息可以有優先級,高優先級的消息會在等待同一個消息隊列時在低優先級的消息之前發送,當消息必須被丟棄以確保消息服務器的服務質量時,服務器將會優先丟棄低優先級的消息。消息服務器不能修改所接收到的并將傳遞給消費者應用程序的消息內容體。消息服務器可以在內容頭中添加額外信息,但不能刪除或修改現有信息。

      Publisher (消息生產者):也是一個向交換器發布消息的客戶端應用程序。

      Exchange (交換器):用來接收消息生產者所發送的消息并將這些消息路由給服務器中的隊列。

      Binding (綁定):用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表(路由控制表)。

      Virtual Host (虛擬主機):它是消息隊列以及相關對象的集合,是共享同一個身份驗證和加密環境的獨立服務器域。每個虛擬主機本質上都是一個mini版的消息服務器,擁有自己的隊列、交換器、綁定和權限機制。

      Broker (消息代理):表示消息隊列服務器,接受客戶端連接,實現AMQP消息隊列和路由功能的過程。

      Routing Key (路由規則):虛擬機可用它來確定如何路由一個特定消息。

      Queue (消息隊列):用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可被投入一個或多個隊列中。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。

      Connection (連接):可以理解成客戶端和消息隊列服務器之間的一個TCP連接。

      Channel (信道):僅僅當創建了連接后,若客戶端還是不能發送消息,則需要為連接創建一個信道。信道是一條獨立的雙向數據流通道,它是建立在真實的TCP連接內的虛擬連接,AMQP命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,它們都通過信道完成。一個連接可以包含多個信道,之所以需要信道,是因為TCP連接的建立和釋放都是十分昂貴的,如果客戶端的每一個線程都需要與消息服務器交互,如果每一個線程都建立了一個TCP連接,則暫且不考慮TCP連接是否浪費,就算操作系統也無法承受每秒建立如此多的TCP連接。

      Consumer (消息消費者):表示一個從消息隊列中取得消息的客戶端應用程序。

      核心組件的生命周期

      (1)消息的生命周期一條消息的流轉過程通常是這樣的:

      Publisher產生一條數據,發送到Broker,?Broker中的Exchange可以被理解為一個規則表(Routing Key和Queue的映射關系-Binding),?Broker收到消息后根據Routing Key查詢投遞的目標Queue.

      Consumer向Broker發送訂閱消息時會指定自己監聽哪個Queue,當有數據到達Queue時Broker會推送數據到Consumer.

      (2)交換器的生命周期

      每臺AMQP服務器都預先創建了許多交換器實例,它們在服務器啟動時就存在并且不能被銷毀。如果你的應用程序有特殊要求,則可以選擇自己創建交換器,并在完成工作后進行銷毀。

      (3)隊列的生命周期

      這里主要有兩種消息隊列的生命周期,即持久化消息隊列和臨時消息隊列。持久化消息隊列可被多個消費者共享,不管是否有消費者接收,它們都可以獨立存在。臨時消息隊列對某個消費者是私有的,只能綁定到此消費者,當消費者斷開連接時,該消息隊列將被刪除。

      功能命令

      AMQP協議文本是分層描述的,在不同主版本中劃分的層次是有一定區別的。

      0-9?版本共分兩層:?Functional Layer (功能層)和Transport Layer (傳輸層)

      功能層定義了一系列命令,這些命令按功能邏輯組合成不同的類(Class),客戶端應用可以利用它們來實現自己的業務功能。

      傳輸層將功能層所接收的消息傳遞給服務器經過相應處理后再返回,處理的事情包括信道復用、幀同步、內容編碼、心跳檢測、數據表示和錯誤處理等.

      0-10?版本則分為三層:?Model Layer (模型層)、Session Layer (會話層)和Transport Layer(傳輸層)。

      模型層定義了一套命令,客戶端應用利用這些命令來實現業務功能。

      會話層負責將命令從客戶端應用傳遞給服務器,再將服務器的響應返回給客戶端應用,會話層為這個傳遞過程提供了可靠性、同步機制和錯誤處理。

      傳輸層負責提供幀處理、信道復用、錯誤檢測和數據表示

      消息數據格式

      所有的消息必須有特定的格式來支持,這部分就是在傳輸層中定義的。AMQP是二進制協議,協議的不同版本在該部分的描述有所不同。0-9-1版本為例,看一下該版本中的消息格式

      所有的消息數據都被組織成各種類型的幀(Frame),幀可以攜帶協議方法和其他信息,所有幀都有同樣的格式,都由一個幀頭(header, 7個字節)、任意大小的負載(payload)和一個檢測錯誤的結束幀(frame-end)字節組成。其中:

      幀頭包括一個type字段、一個channel字段和一個size字段;

      幀負載的格式依賴幀類型(type)

      要讀取一個幀需要三步。

      ①讀取幀頭,檢查幀類型和通道(channel).

      ②根據幀類型讀取幀負載并進行處理。

      ③讀取結束幀字節。

      AMQP定義了如下幀類型。

      type =1, “METHOD”:方法幀;

      type=2, “HEADER”:內容頭幀;

      type=3,“BODY”:內容體幀;

      type=4, “HEARTBEAT”:心跳幀通道

      編號為0的代表全局連接中的所有幀, 1-65535代表特定通道的幀。size字段是指幀負載的大小,它的數值不包括結束幀字節。AMQP使用結束幀來檢測錯誤客戶端和服務器實現引起的錯誤。

      2.2 MQTT

      MQTT (Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通信協議,該協議支持所有平臺,幾乎可以把所有聯網物品和外部連接起來,被用來當作傳感器和制動器的通信協議。

      目前MQTT協議版本為2014年發布的MQTT 3.1.1,它是一個基于TCP/IP協議、可提供發布/訂閱消息模式、十分輕量級的通信協議。除標準版外,還有一個簡化版MQTI-SN,它基于非TCP/IP協議(如ZigBee協議),該協議主要為嵌入式設備提供消息通信。

      這里主要介紹標準版MQTT 3.1.1,該協議是一個基于客戶端-服務器的消息發布/訂閱傳輸協議,其特點是輕量、簡單、開放和易于實現。正因為這些特點,使它常應用于很多機器計算能力有限、低帶寬、網絡不可靠的遠程通信應用場景中。

      IBM WebSphere, MQ Telemetry, IBM Messagesight. Mosquitto, Eclipse Paho, emqttd?Xively.m2m.io, webMethods, Nirvana Messaging,?RabbitMQ?Apache?ActiveMQ, Apache Apollo,Moquette, HiveMQ, Mosca, Litmus Automation Loop. JoramMQ, ThingMQ, VerneMQ.

      主要概念

      所有基于網絡連接的應用都會有客戶端(Client)和服務器(Server),而在MQTT協議中使用者有三種身份:發布者(Publisher)、代理(Broker)和訂閱者(Subscriber)。其中消息的·發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。一條消息的流轉過程是這樣的:先由消息發布者發布消息到代理服務器,在消息中會包含主題(Topic),之后消息訂閱者如果訂閱了該主題的消息,將會收到代理服務器推送的消息.(基于觀察者模式理解)

      下面介紹MQTT協議中的基本組件。

      (1)網絡連接(Network Connection):網絡連接指客戶端連接到服務器時所使,用的底層傳輸協議, 由該連接來負責提供有序的、可靠的、基于字節流的雙向傳輸

      (2)應用消息(Application Message):應用消息指通過網絡所傳輸的應用數據,該數據一般包括主題和負載兩部分。

      (3)主題(Topic):主題相當于應用消息的類型,消息訂閱者訂閱后,就會收到該主題的消息內容。

      (4)負載(Payload):負載指消息訂閱者具體接收的內容

      (5)客戶端(Client):客戶端指使用MQTT的程序或設備。客戶端總是通過網絡連接到服務端,它可以發布應用消息給其他相關的客戶端、訂閱消息用以請求接收相關的應用消息、取消訂閱應用消息、從服務器斷開連接等。

      (6)服務器(Server): 服務器也是指程序或設備,它作為發送消息的客戶端和請求訂閱的客戶端之間的中介。服務器的功能包括接收來自客戶端的網絡連接、接收客戶端發布的應用消息、處理客戶端的訂閱和取消訂閱的請求、轉發應用消息給相應的客戶端等。

      (7) 會話(Session):客戶端與服務器建立連接之后就是一個會話,客戶端和服務器之間通過會話來進行狀態交互。會話存在于一個網絡連接之間,也可能會跨越多個連續的網絡連接。會話主要用于客戶端和服務器之間的邏輯層面的通信。

      (8)訂閱(Subscription):訂閱一般與一個會話關聯,會話可以包含多于一個的訂閱。訂閱包含一個主題過濾器和一個服務質量(Qos)等級。會話的每個訂閱都有一個不同的主題過濾器。

      (9)主題名(Topic Name):主題名是附加在消息上的一個標簽,該標簽與服務器的訂閱相匹配,服務器會根據該標簽將消息發送給與訂閱所匹配的每個客戶端。

      (10)主題過濾器(Topic Filter):?主題過濾器是訂閱中包含的一個表達式,用于表示相關聯的一個或多個主題。主題過濾器可以使用通配符。

      (11) MQTT控制報文(MQTT Control Packet):?MQTT控制報文實際上就是通過網絡連接發送的信息數據包。

      消息數據格式

      MQTT協議是通過交換預定義的MQTT控制報文來通信的,·內容由三部分組成

      固定報頭(Fixed header):存在于所有控制報文中,內容包含控制報文類型、相應的標識位和剩余長度

      可變報頭(Variable header):存在于部分控制報文中,由固定報頭中的控制報文類型決定是否需要可變報頭,以及可變報頭的具體內容。

      消息體(Payload):存在于部分控制報文中,表示客戶端接收到的具體內容。

      嗯,有些多,先不看額,遇到在深入學習。:)

      2.3 STOMP

      STOMP (Streaming Text Orientated Messaging Protocol,流文本定向消息協議)是一個簡單的文本消息傳輸協議,它提供了一種可互操作的連接格式,允許客戶端與任意消息服務器(Broker)進行交互。在設計STOMP時借鑒了HTTP的一些理念,將簡易性、互通性作為其主要設計哲學,這使得STOMP協議的客戶端的實現很容易。

      主要介紹STOMP 1.2版本協議的相關內容。STOMP被設計成輕量級的協議,使得很容易用其他語言來實現客戶端和服務器端,因此它在多種語言和平臺上得到廣泛應用。目前有很多STOMP消息中間件服務器,如下都是STOMP協議的服務器端實現。

      Apache Apollo, Apache?ActiveMQ,?RabbitMQ?HornetQ, Stampy, StompServer.

      嗯,有些多,簡單了解下,先不看額,遇到在深入學習。:)

      2.4 XMPP

      XMPP (可擴展通信與表示協議)是一種基于XML的流式即時通信協議,它的特點是將上下文信息等嵌入到用XML表示的結構化數據中,使得人與人之間、人與應用系統之間,以及應用系統之間能即時相互通信。XMPP的基本語法和語義最初主要是由Jabber開放源代碼社區于1999年開發的,其基礎部分早在2002-2004年就得到了互聯網工程任務組(IETF)的批準。

      XMPP定義了用于通信網絡實體之間的開放協議的規范,其規范說明由一系列作用不同的RFC文檔組成, 目前核心規范主要包括RFC 6120, RFC 6121,RFC 7622及RFC 7395中定義的WebSocket綁定。

      2.5 JMS

      JMS (Java Message Service)即Java消息服務應用程序接口,是Java平臺中面向消息中間件的一套規范的Java API接口,用于在兩個應用程序之間或分布式系統中發送消息,進行異步通信。這套規范由SUN提出, 目前主要使用的版本有兩個:一個是2002年發布的1.1版;yi個是2013年發布的2.0版。

      不同于本章上面所介紹的AMQP, MQTT,STOMP,XMPP等協議,JMS并不是消息隊列協議的一種,更不是消息隊列產品,它是與具體平臺無關的API, 目前市面上的絕大多數消息中間件廠商都支持JMS接口規范。換句話說,你可以使用JMS API來連接支持AMQP, STOMP等協議的消息中間件產品(比如ActiveMQ, RabbitMQ等),在這一點上它與Java中的JDBC的作用很像,我們可以用JDBC API來訪問具體的數據庫產品(比如OracleMySQL等)。

      1 體系架構

      JMS的作用是提供通用接口保證基于JMS API編寫的程序適用于任何一種模型,使得在更換消息隊列提供商的情況下應用程序相關代碼也不需要做太大的改動。

      [x] (1)點對點模型在點對點(Point to Point)模型中,應用程序由隊列(Queue)、發送者(Sender)和接收者(Receiver)組成。每條消息都被發送到一個特定的隊列中,接收者從隊列中獲取消息

      隊列中一直保留著消息,直到它們被接收或超時。點對點?模型的特點如下

      每條消息只有一個接收者,消息一旦被接收就不再保留在消息隊列中了。

      發送者和接收者之間在時間上沒有依賴。也就是說,當消息被發送之后,不管接收者有沒有在運行,都不會影響消息被發送到隊列中。

      [x] (2)發布/訂閱模型在發布/訂閱(Pub/Sub)模型中,應用程序由主題(Topic)、發布者(Publisher)和訂閱者(Subscriber)組成。發布者發布一條消息,該消息通過主題傳遞給所有的訂閱者

      在這種模型中,發布者和訂閱者彼此不知道對方,它們是匿名的并且可以動態發布和訂閱主題。主題用于保存和傳遞消息,并且會一直保存消息直到消息被傳遞給訂閱者。發布/訂閱模型的特點如下:

      每條消息可以有多個訂閱者。

      發布者和訂閱者之間有時間上的依賴。一般情況下,某個主題的訂閱者需要在創建了訂閱之后才能接收到消息,而且為了接收消息訂閱者必須保持運行的狀態。

      JMS允許訂閱者創建一個可持久化的訂閱,這樣即使訂閱者沒有運行也能接收到所訂閱的消息。

      每條消息都會傳送給該主題下的所有訂閱者。通常發布者不會知道也意識不到哪一個訂閱者正在接收消息。

      2.基本概念

      按照JMS規范中所說的,一個JMS應用由如下幾個部分組成。

      JMS客戶端(JMS Client):指發送和接收消息的Java程序。

      非JMS客戶端(Non-JMS Client):指使用消息系統原生的客戶端API代替JMS的客戶端。如果應用程序在JMS規范前就已存在,則它可能同時包含JMS客戶端和非JMS客戶端。

      消息(Message):每個應用都定義了一組消息,用于多個客戶端之間的消息通信。

      JMS提供商(JMS Provider):指實現了JMS API的實際消息系統。

      受管對象(Administered Object):指由管理員創建,并預先配置好給客戶端使用的JMS對象。JMS中的受管對象分為兩種,即ConnectionFactory (客戶端使用這個對象來創建到提供者的連接)和Destination (客戶端使用這個對象來指定發送或接收消息的目的地)。

      而具體到JMS應用程序,則主要涉及以下基本概念。

      生產者(Producer):創建并發送消息的JMS客戶端,在點對點模型中就是發送者,在發布/訂閱模型中就是發布者。

      消費者(Consumer):接收消息的JMS客戶端,在點對點模型中就是接收者,在發布/訂閱模型中就是訂閱者。

      客戶端(Client):生產或消費消息的基于Java的應用程序或對象。

      隊列(Queue ):一個容納被發送的等待閱讀的消息的區域。它是點對點模型中的隊列。

      主題(Topic):一種支持發送消息給多個訂閱者的機制。它是發布/訂閱模型中的主題。

      消息(Message):在JMS客戶端之間傳遞的數據對象。JMS消息又包括消息頭、屬性和消息體三部分。

      編程接口

      (1) ConnectionFactory接口(連接工廠)?: ConnectionFactory是創建Connection對象的工廠,根據不同的消息類型用戶可選擇用隊列連接工廠或者主題連接工廠,分別對應QueueConnectionFactory和TopicConnectionFactory。可以通過JNDI來查找ConnectionFactory對象。

      (2) Destination接口(目的地)?: Destination是一個包裝了消息目的地標識符的受管對象。消息目的地是指消息發布和接收的地點,消息目的地要么是隊列要么是主題。對于消息生產者來說,它的Destination是某個隊列或某個主題;對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。所以Destination實際上就是兩種類型的對象:Queue和Topic,可以通過JNDI來查找Destination.

      (3) Connection接口(連接): Connection表示在客戶端和JMS系統之間建立的連接(實際上是對TCP/IP Socket的包裝)。Connection可以產生一個或多個Session,跟ConnectionFactory一樣, Connection也有兩種類型:QueueConnection和TopicConnection.

      (4) Session接口(會話): Session是實際操作消息的接口,表示一個單線程的上下文,用于發送和接收消息。因為會話是單線程的,所以消息是按照發送的順序一個個接收的。可以通過Session創建生產者、消費者、消息等。在規范中Session還提供了事務的功能。Session也分為兩種類型: QueueSession和TopicSession

      (5) MessageProducer接口(消息生產者): 消息生產者由Session創建并用于將消息發送到Destination.消費者可以同步(阻塞模式)或異步(非阻塞模式)接收隊列和主題類型的消息。消息生產者有兩種類型:QueueSender和TopicPublisher.

      (6) MessageConsumer接口(消息消費者):消息消費者由Session創建,用于接收被發送到Destination的消息。消息消費者有兩種類型: QueueReceiver和TopicSubscriber.

      (7) Message接口(消息): 消息是在消費者和生產者之間傳送的對象,即將消息從一個應用程序發送到另一個應用程序。

      (8) MessageListener (消息-): 如果注冊了消息-,那么當消息到達時將自動調用-的onMessage方法。

      JMS 1.0 示例

      消息消費著

      package msg_queue.jms; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.concurrent.TimeUnit; /** * @Classname QueueConsumer * @Description TODO 消息消費著 * @Date 2021/7/9 15:12 * @Created Li Ruilong */ public class QueueConsumer { public static final String USERNAME = ActiveMQConnection.DEFAULT_USER; public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL); Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //創建隊列,作為消費者消費消息的目的地 Queue sessionQueue = session.createQueue("test"); // 消費者 MessageConsumer consumer = session.createConsumer(sessionQueue); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); }finally { try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } } } }); TimeUnit.MICROSECONDS.sleep(100000); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }

      消息生產者

      package msg_queue.jms; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @Classname QueueProducer * @Description TODO 消息生產者 * @Date 2021/7/9 0:07 * @Created Li Ruilong */ public class QueueProducer { public static final String USERNAME = ActiveMQConnection.DEFAULT_USER; public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { // 創建連接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL); Session session = null; Connection connection = null; try { // 創建連接 connection = connectionFactory.createConnection(); // 啟動連接 connection.start(); // 創建會話 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 創建隊列,需要指定隊列名稱,消息生產者和消費者將根據它來發送、接收對應的消息 Queue sessionQueue = session.createQueue("test"); // 消息生產者 MessageProducer producer = session.createProducer(sessionQueue); TextMessage message = session.createTextMessage("測試一個點對點的一條消息"); producer.send(message); session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally { try { session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }

      JMS 2.0 示例

      在JMS 2.0中主要進行了易用性方面的改進,這樣可以幫助開發者減少代碼的編寫量。新的API被稱作簡化的API(Simplified API),其比JMS 1.1 API更簡單易用;后者被稱作經典API?(Classic API).?簡化的API由三個新接口構成:?JMSContext.JMSProducer和JMSConsumer.

      JMSContext:用于替換經典API中單獨的Connection和Session

      JMSProducer:用于替換經典API中的MessageProducer,其支持以鏈式操作(方法鏈)方式配置消息傳遞選項、消息頭和消息屬性。

      JMSConsumer:用于替換經典API中的MessageConsumer,其使用方式與JMSProducer類似。

      簡化的API不僅提供了經典API的所有特性,還增加了一些其他特性。經典API并沒有被棄用,而是作為JMS的一部分被保留下來。下面通過發送文本消息的例子來看一下兩者之間的區別。

      第3章RabbitMQ

      3.1 簡介

      1. RabbitMQ特點

      RabbitMQ是一個由Erlang語言開發的基于AMOP標準的開源實現。RabbitMQ最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。其具體特點包括:

      《分布式消息中間件實踐》 讀書筆記

      保證可靠性( Reliability),?RabbitMQ使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認等。

      具有靈活的路由(Flexible Routing)功能。在消息進入隊列之前,是通過Exchange (交換器)來路由消息的。對于典型的路由功能,?RabbitMQ已經提供了一些內置的Exchange來實現。針對更復雜的路由功能,可以將多個Exchange綁定在一起,也可以通過插件機制來實現自己的Exchange.

      支持消息集群(Clustering),多臺RabbiMQ服務器可以組成一個集群,形成一個邏輯Broker.

      具有高可用性(Highly Available),隊列可以在集群中的機器上進行鏡像,使得在部分節點出現問題的情況下隊列仍然可用。

      支持多種協議(Multi-protocol),?RabbitMQ除支持AMQP協議之外,還通過插件的方式支持其他消息隊列協議,比如STOMP, MQTT等。

      支持多語言客戶端(Many Client),RabbitMQ幾乎支持所有常用的語言,比如Java. .NET, Ruby等

      提供管理界面(Management UI),?RabbitMQ提供了一個易用的用戶界面,使得用戶可以監控和管理消息Broker的許多方面

      提供跟蹤機制(Tracing),?RabbitMQ提供了消息跟蹤機制,如果消息異常,使用者可以查出發生了什么情況。

      提供插件機制(Plugin System),?RabbitMQ提供了許多插件,從多方面進行擴展,也可以編寫自己的插件.

      2. RabbitMQ基本概念

      RabbitMQ是AMQP協議的一個開源實現,所以其基本概念也就是AMQPt中的基本概念。如圖是RabbitMQ的整體架構圖。

      Message (消息):消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列可選屬性組成,這些屬性包括?routing-key (路由鍵),priority (相對于其他消息的優先級)、?delivery-mode (指出該消息可能需要持久化存儲)等。

      Publisher (消息生產者):一個向交換器發布消息的客戶端應用程序。

      Exchange (交換器):用來接收生產者發送的消息,并將這些消息路由給服務器中的隊列。.

      Binding (綁定):用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。

      Queue (消息隊列):用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一條消息可被投入一個或多個隊列中。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。

      Connection (網絡連接):比如一個TCP連接。

      Channel (信道):多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,?AMQP命令都是通過信道發送出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成的。

      因為對于操作系統來說,建立和銷毀TCP連接都是非常昂貴的開銷,所以引入了信道的概念,以復用一個TCP連接。.

      Consumer (消息消費者):表示一個從消息隊列中取得消息的客戶端應用程序。

      Virtual Host (虛擬主機,在RabbitMQ中叫vhost):表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。本質上每個vhost就是一臺縮小版的RabbitMQ服務器,它擁有自己的隊列、交換器、綁定和權限機制。vhost是AMQP概念的基礎,必須在連接時指定, RabbitMQ默認的vhost是"1"

      Broker:表示消息隊列服務器實體。

      (1) AMQP中的消息路由

      在AMQP中增加了Exchange和Binding的角色。生產者需要把消息發布到Exchange上,消息最終到達隊列并被消費者接收,而Binding決定交換器上的消息應該被發送到哪個隊列中。

      (2)交換器類型

      不同類型的交換器分發消息的策略也不同,目前交換器有4種類型:?Direct,?Fanout,?Topic,Headers。其中Headers交換器匹配AMQP消息的Header而不是路由鍵。此外,?Headers交換器和Direct交換器完全一致,但性能相差很多,目前幾乎不用了,所以下面我們看另外三種類型。

      如果消息中的路由鍵(routing key)和Binding中的綁定鍵(binding key)一致,交換器就將消息發送到對應的隊列中.

      路由鍵與隊列名稱要完全匹配,如果將一個隊列綁定到交換機要求路由鍵為“dog",則只轉發routing key標記為"dog"的消息,不會轉發"dog.puppy"消息,也不會轉發"dog.guard "消息等。Direct交換器是完全匹配、單播的模式。

      Fanout交換器

      Fanout交換器不處理路由鍵,只是簡單地將隊列綁定到交換器

      發送到交換器的每條消息都會被轉發到與該交換器綁定的所有隊列中,這很像子網廣播,子網內的每個主機都獲得了一份復制的消息。通過Fanout交換器轉發消息是最快的。

      Topic交換器

      Topic交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某種模式進行匹配,此時隊列需要綁定一種模式。

      Topic交換器將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點"."隔開,該交換器會識別兩個通配符:?“#”和“*”,其中“#”匹配0個或多個單詞, “*”匹配不多不少一個單詞。

      3.2 工程實例

      RabbitMQ官網:https://www.rabbitmq.com/

      基于Docker的安裝:https://registry.hub.docker.com/_/rabbitmq?tab=description&page=2&ordering=last_updated

      這里我們先在阿里云上裝一個RabbitMQ,用Docker來安裝,直接拉去鏡像。

      # 啟動docker服務 [root@liruilong ~]# systemctl restart docker # 查看鏡像 [root@liruilong ~]# docker images #指定版本,該版本包含了web控制頁面 [root@liruilong ~]# docker pull rabbitmq:management #運行容器: #方式一:默認guest 用戶,密碼也是 guest [root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management #方式二:設置用戶名和密碼 [root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management # 發布Docker服務,將端口映射到15672,5672 [root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management 2189f2fa53f1e76306a2ad422e0fa33bca1ae0f3ee77514573d71aca9ce24801 [root@liruilong ~]#

      這里需要注意的是端口綁定,需要把訪問端口和管理端口同時綁定。如果是ESC的話,需要配置安全組

      3.2.1Java訪問RabbitMQ實例

      RabbitMQ支持多種語音訪問。添加依賴

      com.rabbitmq amqp-client 4.1.0

      消息生產者

      package msg_queue.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /* * @return * @Description TODO 消息生產者 * @author Liruilong * @date 2021/7/9 22:16 **/ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("39.97.241.18"); //默認訪問5672端口 factory.setPort(5672); factory.setVirtualHost("/"); //建立到代理服務器到連接 Connection conn = factory.newConnection(); //創建信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; // direct 類型,路由鍵與隊列名稱要`完全匹配` channel.exchangeDeclare(exchangeName, "direct", true); // 定義 路由鍵 String routingKey = "testRoutingKey"; //發布消息 byte[] messageBodyBytes = "這是我第一次學習Rabbitmq".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } }

      首先創建一個連接工廠,再根據連接工廠創建連接,之后從連接中創建信道,接著聲明一個交換器和指定路由鍵,然后才發布消息,最后將所創建的信道、連接等資源關閉。代碼中的ConnectionFactory, Connection、 Channel都是RabbitMQ提供的API中最基本的類。

      ConnectionFactory是Connection的制造工廠

      Connection代表RabbitMQ的Socket連接,它封裝了Socket操作的相關邏輯。

      Channel是與RabbitMQ打交道的最重要的接口,大部分業務操作都是在Channel中完成的,比如定義隊列、定義交換器、隊列與交換器的綁定、發布消息等。

      消息消費者

      package msg_queue.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /* * @return * @Description TODO 消息消費者 * @author Liruilong * @date 2021/7/9 23:45 **/ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("39.97.241.18"); factory.setVirtualHost("/"); //建立到代理服務器到連接 Connection conn = factory.newConnection(); //創建信道 final Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //聲明隊列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "testRoutingKey"; //綁定隊列,通過鍵 testRoutingKey 將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, routingKey); while (true) { //消費消息 boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費的路由鍵:" + envelope.getRoutingKey()); System.out.println("消費的內容類型:" + properties.getContentType()); long deliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.printf("消費的消息體內容:%s",new String(body, "UTF-8")); } }); } } }

      消息消費者通過不斷循環等待服務器推送消息,一旦有消息過來,就在控制臺輸出消息的相關內容。一開始的創建連接、創建信道、聲明交換器的代碼和發布消息時是一樣的,但在消費消息時需要指定隊列名稱,所以這里多了綁定隊列這一步,接下來是循環等待消息過來并打印消息內容

      3.3.6 通道

      消息客戶端和消息服務器之間的通信是雙向的,不管是對客戶端還是服務器來說,保持它們之間的網絡連接是很耗費資源的。為了在不占用大量TCP/P連接的情況下也能有大量的邏輯連接, AMQP增加了通道(Channel)的概念, RabbitMQ支持并鼓勵在一個連接中創建多個通道,因為相對來說創建和銷毀通道的代價會小很多。需要提醒的是,作為經驗法則,應該盡量避免在線程之間共享通道,你的應用應該使用每個線程單獨的通道,而不是在多個線程上共享同一個通道,因為大多數客戶端不會讓通道線程安全(因為這將對性能產生嚴重的負面影響)。

      3.3.7 總結

      個人認為,?RabbitMQ最大的優勢在于提供了比較靈活的消息路由策略、高可用性、可靠性,以及豐富的插件、多種平臺支持和完善的文檔。不過,由于AMQP協議本身導致它的實現比較重量,從而使得與其他MQ (比如Kafka)對比其吞吐量處于下風。在選擇MQ時關鍵還是看需求-是更看重消息的吞吐量、消息堆積能力還是消息路由的靈活性、高可用性、可靠性等方面,先確定場景,再對不同產品進行有針對性的測試和分析,最終得到的結論才能作為技術選型的依據

      TCP/IP 分布式

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:云制造與智能制造的區別是什么
      下一篇:Excel鼠標右擊不出現復制粘貼等等東西?怎么回事?(excel表格的鼠標右鍵怎么沒有復制)
      相關文章
      亚洲国产午夜精品理论片在线播放 | 亚洲午夜精品一区二区| 久久亚洲国产视频| 亚洲国产黄在线观看| 亚洲av无码国产精品色午夜字幕| 色偷偷亚洲第一综合| 亚洲一区精彩视频| 亚洲乱码卡一卡二卡三| 亚洲人成电影福利在线播放| 亚洲综合在线另类色区奇米| 77777亚洲午夜久久多人| 伊人久久亚洲综合| 亚洲一区视频在线播放| 超清首页国产亚洲丝袜| 伊人久久大香线蕉亚洲| 亚洲乱码中文字幕综合| 亚洲国产精品一区二区成人片国内| 亚洲精品国产美女久久久| 亚洲精品国偷自产在线| 亚洲AV永久青草无码精品| 亚洲中文久久精品无码ww16| 国产日韩成人亚洲丁香婷婷| 在线亚洲午夜理论AV大片| 亚洲精品蜜桃久久久久久| 亚洲成AV人片在线观看| 亚洲国产精品久久66| 亚洲精品网站在线观看你懂的| 亚洲福利视频一区二区三区| 亚洲一级大黄大色毛片| 亚洲欧美中文日韩视频| 国产亚洲人成在线播放| 亚洲人成网站在线观看青青| 亚洲中文字幕日产乱码高清app| 亚洲av无码一区二区三区乱子伦 | 久久被窝电影亚洲爽爽爽| 亚洲AV本道一区二区三区四区| 精品亚洲成a人片在线观看| 亚洲免费观看在线视频| 亚洲综合色一区二区三区| 久久精品熟女亚洲av麻豆| 亚洲精品乱码久久久久久蜜桃 |