GeminiDB for Cassandra 流功能介紹

      網友投稿 736 2025-04-03

      1????? 使用GeminiDB for Cassandra流捕獲表活動


      1.1????? 功能介紹

      當存儲在GeminiDB for Cassandra集群中某張表的某項目發生變更時,其他的程序能夠做出相應的變更,比如:一個應用程序更改了GeminiDB for Cassandra集群中的某行數據,另一個應用程序能夠讀取到這行數據變更并做出相應的動作。

      GeminiDB for Cassandra支持這種類似的場景。流表捕獲原表的變動,并存儲24小時之后過期??蛻舳送ㄟ^SDK可訪問流表并獲取數據修改前后的項目數據。

      GeminiDB for Cassandra流是一種有關表中的項目修改的有序信息流,當啟動某張表的流時,GeminiDB for Cassandra流表捕獲原表的數據項目的更改信息。當應用程序在表中插入、更新、或者刪除某條數據時,流表都會記錄一條修改數據的流記錄。流記錄包含對表中某條數據所做的數據修改的相關信息,包含修改前后的新舊數據記錄。原表中修改的每個項目,流表中的記錄將按照對應的實際修改的順序顯示。

      流表會實時的監控原表的記錄,包括插入、刪除、更新操作,以便能夠在其他情況使用,但不包含DDL操作記錄;流表使用GeminiDB for Cassandra的物化視圖實現,遵循物化視圖的相關限制,比如有必須先刪除物化視圖表之后才能刪除原表,對應必須要先刪除流表才能刪除原表。

      1.2????? 使用場景

      主要用于Cassandra往大數據/ES(Elasticsearch)同步數據變化的場景,支撐客戶對應業務開展。

      1.3????? 功能特色

      華為GeminiDB for Cassandra專有能力,原生Cassandra不支持。

      2????? GeminiDB for Cassandra流使用方法概述

      GeminiDB for Cassandra原表與流表維護兩個獨立的表。在開啟原表的流開關之后,訪問原表并且有操作原表數據會記錄到對應的流表中,要讀取處理流表記錄,通過訪問流表,訪問方式與數據庫其他表的訪問方式相同,見第四、五章節訪問方法。

      3????? 開啟關閉流

      啟用流的方式:使用alter table KS.TABLE with stream_enabled=true 語句啟用流,當前流支持新舊映像。

      關閉流:使用alter table KS.TABLE with stream_enabled=false 可以隨時禁用流。關閉流之后當前流表中的數據不會立即刪除,24之后刪除,原表中數據的變更不會再記錄到流表中。舉例:

      CREATE TABLE ks.table ? ( id int, name text, addr text,age int,email text,PRIMARY KEY (name, id));???? // 創建表

      Alter TABLE ks.table ? with stream_enabled=true;???? // 開啟流

      Desc ks.table;???? // 查看流表是否創建

      INSERT INTO ks.table ? (name , id , addr , age , email ) VALUES ('xiaoxin',31,'beijing1',33,'xiaoxin@163.com');?? // 向原表寫入數據

      select * from ks."table$streaming";?? // 查看流表是否有相應數據產生

      Alter TABLE ks.table ? with stream_enabled=false;?? // 關閉流表

      4????? 讀取和處理流記錄

      應用程序要讀取和處理流時,應用程序需要通過SDK連接到C*流表進行相應操作。

      流表中的每條流記錄均代表一個原表中數據的修改,每條流記錄都會有一個時間信息,標識這條流產生的時間信息。每條流記錄會在24小時后自動刪除。

      流表結構:

      CREATE TABLE ks.table$streaming (

      @shardID ? text,

      @eventID ? timeuuid,

      pk,

      ck,

      @newOldImage ? boolean,

      @eventName text,

      co1,

      co2,

      PRIMARY ? KEY (@shardID, @eventID, pk, ck, @newOldImage)? //pk,ck,為原表的pk,ck, co1,co2是原表的普通列

      );

      如上,流表中包含幾個特殊的字段:"@shardID"是分區鍵;"@eventID"是由插入時間生成的timeuuid,代表流數據產生的時間;"@newOldImage"代表新舊映像,0表示舊映像,1表示新映像;"@eventName"代表操作事件如"insert"、"update"、"delete"。

      迭代處理流表的數據時請使用流表的分區加上時間戳范圍訪問。查詢分區時使用,返回分區列表:

      select stream_shards from ? system_schema.tables? where ? keyspace_name='ks' and table_name='table';

      例如:

      cqlsh:ks> select stream_shards from system_schema.tables? where keyspace_name='ks' and ? table_name='table1';

      stream_shards

      --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

      ['-9223372036854775808', ? '-6148914691236517206', '-4611686018427387905', '-3843071682022823258', ? '-1537228672809129303', '-2', '1537228672809129299', '4611686018427387901', ? '5270498306774157603', '6148914691236517202', '7686143364045646492', ? '7686143364045646503']

      (1 rows)

      使用分區+時間遍歷流表數據,范圍查詢時使用"@eventID"時間查詢,每次默認返回的大小根據數據量大小決定,下次迭代使用時間繼續往后迭代。例如:

      select * From ks."table$streaming" ? where "@shardID" = '-9223372036854775808' and "@eventID" ? > a64a8340-e999-11e9-a7bd-cb5f001f61df limit 50;

      5????? 接口說明

      5.1????? GetShardIterator

      public static List ? GetShardIterator(Cluster cluster, String keySpace, String tableName)

      功能: 獲取流表的shard信息

      入參:

      Cluster cluster:集群的cluster信息,連接數據庫使用

      String keySpace:要查詢流數據的數據庫名稱

      String tableName:要查詢流數據的表名稱

      出參:

      返回List 一組shard集合,GetRecords接口中使用

      5.2????? GetRecords

      public static StreamInfo ? GetRecords(Cluster cluster, String keySpace, TableEvent tableEvent)

      功能: 獲取流表的具體數據

      入參:

      Cluster cluster:集群的cluster信息,連接數據庫使用

      String keySpace:要查詢流數據的數據庫名稱

      TableEvent tableEvent:要查詢流數據相關信息,TableEvent結構如下:

      String table:要查詢流數據的表名稱

      String shardID:要查詢流數據的shardID

      String eventID:要查詢流數據的時間戳信息

      int limitRow:要查詢流數據的限制條數,沒有指定情況下默認是100;

      出參:

      返回一組StreamInfo數據;具體結構如下:

      String shardID:流數據的shardID

      String table:流數據的原表名

      List columns:一組流數據的集合,RowInfo具體結構如下:

      String eventID:流數據的時間戳信息

      String operateType:操作類型,例如: INSERT、UPDATE、DELETE

      List Keys: 流數據對應的原表的主鍵信息

      List NewImage: 新映像的信息

      List OldImage: 舊映像的信息

      5.3????? GetShardIterator

      public static List ? GetShardIterator(Session session, String keySpace, String tableName)

      功能: 獲取流表的shard信息

      入參:

      Session session:數據庫集群的連接session,調用函數之后session需要調用者關閉

      String keySpace:要查詢流數據的數據庫名稱

      String tableName:要查詢流數據的表名稱

      出參:

      返回List 一組shard集合,GetRecords接口中使用

      5.4????? GetRecords

      public static StreamInfo GetRecords(Session ? session, String keySpace, TableEvent tableEvent)

      功能: 獲取流表的具體數據

      入參:

      Session session:數據庫集群的連接session,調用函數之后session需要調用者關閉;

      String keySpace:要查詢流數據的數據庫名稱;

      TableEvent tableEvent:要查詢流數據相關信息,TableEvent結構如下:

      String table:要查詢流數據的表名稱;

      String shardID:要查詢流數據的shardID;

      String eventID:要查詢流數據的時間戳信息;

      int limitRow:要查詢流數據的限制條數,沒有指定情況下默認是100;

      List primaryKey: 要查詢流數據的表的主鍵名稱類型信息

      出參:

      返回一組StreamInfo數據;具體結構如下:

      String shardID:流數據的shardID

      String table:流數據的原表名

      List columns:一組流數據的集合,RowInfo具體結構如下:

      String eventID:流數據的時間戳信息

      String operateType:操作類型,例如: INSERT、UPDATE、DELETE

      List Keys: 流數據對應的原表的主鍵信息

      List NewImage: 新映像的信息

      List OldImage: 舊映像的信息

      5.5????? GetRecords返回結果范例

      {

      "ShardID": ? "-4611686018427387905",

      "Table": ? "tb1",

      "Records": ? [{

      "EventID": ? "52236080-efb5-11e9-9c62-49626763b3dc",

      "OperateType": ? "INSERT",

      "Keys": ? [{

      "columnName": ? "name",

      "value": ? "zhoujielun",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "id",

      "value": ? 31,

      "type": ? "int"

      }],

      "NewImage": ? [{

      "columnName": ? "name",

      "value": ? "zhoujielun",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "id",

      "value": ? 31,

      "type": ? "int"

      }, ? {

      "columnName": ? "addr",

      "value": ? "宇宙中心",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "age",

      "value": ? 33,

      "type": ? "int"

      }, ? {

      "columnName": ? "email",

      "value": ? "zhoujielun.com",

      "type": ? "varchar"

      }],

      "OldImage": ? []

      }, ? {

      "EventID": ? "52255c50-efb5-11e9-9c62-49626763b3dc",

      "OperateType": ? "UPDATE",

      "Keys": ? [{

      "columnName": ? "name",

      "value": ? "zhoujielun",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "id",

      "value": ? 32,

      "type": ? "int"

      }],

      "NewImage": ? [{

      "columnName": ? "name",

      "value": ? "zhoujielun",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "id",

      "value": ? 32,

      "type": ? "int"

      }, ? {

      "columnName": ? "addr",

      "value": ? "宇宙中心",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "age",

      "value": ? 33,

      "type": ? "int"

      }, ? {

      "columnName": ? "email",

      "value": ? "zhoujielun.com",

      "type": ? "varchar"

      }],

      "OldImage": ? [{

      "columnName": ? "name",

      "value": ? "zhoujielun",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "id",

      "value": ? 32,

      "type": ? "int"

      }, ? {

      "columnName": ? "addr",

      "value": ? "宇宙中心",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "age",

      "value": ? 33,

      "type": ? "int"

      }, ? {

      "columnName": ? "email",

      "value": ? "zhoujielun.com",

      "type": ? "varchar"

      }]

      }, ? {

      "EventID": ? "52261fa0-efb5-11e9-9c62-49626763b3dc",

      "OperateType": ? "UPDATE",

      "Keys": ? [{

      "columnName": ? "name",

      "value": ? "zhoujielun",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "id",

      "value": ? 33,

      "type": ? "int"

      }],

      "NewImage": ? [{

      "columnName": ? "name",

      "value": ? "zhoujielun",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "id",

      "value": ? 33,

      "type": ? "int"

      }, ? {

      "columnName": ? "addr",

      "value": ? "宇宙中心",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "age",

      "value": ? 33,

      "type": ? "int"

      }, ? {

      "columnName": ? "email",

      "value": ? "zhoujielun.com",

      "type": ? "varchar"

      }],

      "OldImage": ? [{

      "columnName": ? "name",

      "value": ? "zhoujielun",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "id",

      "value": ? 33,

      "type": ? "int"

      }, ? {

      "columnName": ? "addr",

      "value": ? "宇宙中心",

      "type": ? "varchar"

      }, ? {

      "columnName": ? "age",

      "value": ? 33,

      "type": ? "int"

      }, ? {

      "columnName": ? "email",

      "value": ? "zhoujielun.com",

      "type": ? "varchar"

      }]

      }]

      }

      5.6????? 接口使用demo1

      package com.huawei.hwcloud.stream;

      import com.datastax.driver.core.Cluster;

      import com.datastax.driver.core.ColumnMetadata;

      import com.google.gson.Gson;

      import com.google.gson.GsonBuilder;

      import com.huawei.hwcloud.stream.req.RowInfo;

      import com.huawei.hwcloud.stream.req.StreamInfo;

      import com.huawei.hwcloud.stream.req.TableEvent;

      import java.util.List;

      public class Main {

      public static void main(String[] args) {

      Cluster cluster = Cluster.builder().addContactPoint("xxx.95.xxx.201").withPort(9042).build();

      //??????? Cluster cluster = Cluster.builder().addContactPoint(endpoint).withPort(port).withCredentials(username, password).build();

      List pm = cluster.getMetadata().getKeyspace("test").getTable("tb1").getPrimaryKey();

      System.out.println(pm);

      List streamShards = null;

      try {

      streamShards = StreamFetcher.GetShardIterator(cluster, "test", "tb1");

      }

      catch (Exception e) {

      e.printStackTrace();

      }

      System.out.println(streamShards);

      TableEvent tableEvent = new TableEvent();

      tableEvent.setEventID("43e0eeb0-ee80-11e9-9c62-49626763b3dc");

      tableEvent.setShardID("-4611686018427387905");

      tableEvent.setTable("tb1");

      tableEvent.setLimitRow(6);

      StreamInfo streamInfo = null;

      try {

      streamInfo = StreamFetcher.GetRecords(cluster, "test", tableEvent);

      }

      catch (Exception e) {

      e.printStackTrace();

      }

      Gson gson = new GsonBuilder().create();

      String line = gson.toJson(streamInfo);

      System.out.println(line);

      System.out.println(streamInfo.getColumns().size());

      for (RowInfo rowInfo: streamInfo.getColumns()) {

      System.out.println(rowInfo.toString());

      }

      System.exit(0);

      }

      }

      5.7????? 接口使用demo2

      package com.huawei.hwcloud.stream;

      import com.datastax.driver.core.Cluster;

      import com.datastax.driver.core.ColumnMetadata;

      import com.datastax.driver.core.Session;

      import com.google.gson.Gson;

      import com.google.gson.GsonBuilder;

      import com.huawei.hwcloud.stream.req.RowInfo;

      GeminiDB for Cassandra 流功能介紹

      import com.huawei.hwcloud.stream.req.StreamInfo;

      import com.huawei.hwcloud.stream.req.TableEvent;

      import com.huawei.hwcloud.stream.utils.WrapperCassandraSession;

      import java.util.List;

      public class Main2

      {

      public static void main(String[] args) {

      Cluster cluster = Cluster.builder().addContactPoint("XXX.95.XXX.201").withPort(9042).build();

      //??????? Cluster cluster = Cluster.builder().addContactPoint(endpoint).withPort(port).withCredentials(username, password).build();

      List pk = cluster.getMetadata().getKeyspace("test").getTable("tb1").getPrimaryKey();

      System.out.println(pk);

      Session session = cluster.connect();

      List streamShards = StreamFetcher.GetShardIterator(session, "test", "tb1");

      System.out.println(streamShards);

      TableEvent tableEvent = new TableEvent();

      tableEvent.setEventID("43e0eeb0-ee80-11e9-9c62-49626763b3dc");

      tableEvent.setShardID("-4611686018427387905");

      tableEvent.setTable("tb1");

      tableEvent.setLimitRow(6);

      tableEvent.setPrimaryKey(pk);

      StreamInfo streamInfo = StreamFetcher.GetRecords(session, "test", tableEvent);

      Gson gson = new GsonBuilder().create();

      String line = gson.toJson(streamInfo);

      System.out.println(line);

      System.out.println(streamInfo.getColumns().size());

      for (RowInfo rowInfo: streamInfo.getColumns()) {

      System.out.println(rowInfo.toString());

      }

      session.close();

      System.exit(0);

      }

      }

      6????? 功能約束

      1)流表中的數據保留24小時。

      2)流表中的數據會占用數據庫的磁盤空間。

      3)通過CQL語句不能創建帶有"$streaming"后綴的流表。

      4)流表可以通過drop MATERIALIZED VIEW ks."table$streaming";進行刪除,流表使用物化視圖實現,遵從物化視圖的限制要求。

      附件: GeminiDB for Cassandra流功能介紹.docx 90.42KB 下載次數:0次

      云數據庫 GaussDB(for Cassandra) 云數據庫 MySQL 數據庫

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:WPS表格怎樣改變表格線顏色(wps怎么修改表格線條顏色)
      下一篇:excel表格如何按順序拖動排列
      相關文章
      亚洲一本到无码av中文字幕| 亚洲成人黄色在线观看| 亚洲一区中文字幕在线电影网| 亚洲综合国产一区二区三区| 亚洲av午夜成人片精品电影| 亚洲爆乳AAA无码专区| 亚洲中文字幕久久精品无码A | 亚洲国产综合精品中文第一区| 亚洲精品无码专区久久久| 久久久久亚洲精品中文字幕| 亚洲欧洲日产国码高潮αv| 国产亚洲精品国产福利在线观看| 欧美激情综合亚洲一二区| 精品久久久久久亚洲中文字幕| 亚洲AV无码片一区二区三区| 亚洲欧美成人av在线观看| 亚洲国产精品无码久久98| 色五月五月丁香亚洲综合网| 亚洲国产精品专区在线观看| 亚洲精品国产精品乱码不卞| 亚洲一级Av无码毛片久久精品| 亚洲一区无码精品色| 日韩亚洲变态另类中文| 国产亚洲福利精品一区| 久久精品国产亚洲AV麻豆~| 久久精品7亚洲午夜a| 78成人精品电影在线播放日韩精品电影一区亚洲| 亚洲av无码成人黄网站在线观看| 久久久久亚洲精品美女| 亚洲精品视频久久| 亚洲一区中文字幕在线电影网| 在线aⅴ亚洲中文字幕| 亚洲αⅴ无码乱码在线观看性色| 久久无码av亚洲精品色午夜| 亚洲精品无码久久久| 亚洲人成中文字幕在线观看| 亚洲成年轻人电影网站www| 亚洲精品影院久久久久久| 亚洲AV无码一区二区三区牛牛| 国产精品亚洲专区无码WEB| 亚洲国产精品成人网址天堂|