MongoDB數據庫中">46 - 將xml文檔保存在MongoDB數據庫中
736
2025-04-03
1????? 使用GeminiDB for Cassandra流捕獲表活動
1.1????? 功能介紹
當存儲在GeminiDB for Cassandra集群中某張表的某項目發生變更時,其他的程序能夠做出相應的變更,比如:一個應用程序更改了GeminiDB for Cassandra集群中的某行數據,另一個應用程序能夠讀取到這行數據變更并做出相應的動作。
GeminiDB for Cassandra支持這種類似的場景。流表捕獲原表的變動,并存儲24小時之后過期??蛻舳送ㄟ^SDK可訪問流表并獲取數據修改前后的項目數據。
GeminiDB for Cassandra流是一種有關表中的項目修改的有序信息流,當啟動某張表的流時,GeminiDB for Cassandra流表捕獲原表的數據項目的更改信息。當應用程序在表中插入、更新、或者刪除某條數據時,流表都會記錄一條修改數據的流記錄。流記錄包含對表中某條數據所做的數據修改的相關信息,包含修改前后的新舊數據記錄。原表中修改的每個項目,流表中的記錄將按照對應的實際修改的順序顯示。
流表會實時的監控原表的記錄,包括插入、刪除、更新操作,以便能夠在其他情況使用,但不包含DDL操作記錄;流表使用GeminiDB for Cassandra的物化視圖實現,遵循物化視圖的相關限制,比如有必須先刪除物化視圖表之后才能刪除原表,對應必須要先刪除流表才能刪除原表。
1.2????? 使用場景
主要用于Cassandra往大數據/ES(Elasticsearch)同步數據變化的場景,支撐客戶對應業務開展。
1.3????? 功能特色
華為GeminiDB for Cassandra專有能力,原生Cassandra不支持。
2????? GeminiDB for Cassandra流使用方法概述
GeminiDB for Cassandra原表與流表維護兩個獨立的表。在開啟原表的流開關之后,訪問原表并且有操作原表數據會記錄到對應的流表中,要讀取處理流表記錄,通過訪問流表,訪問方式與數據庫其他表的訪問方式相同,見第四、五章節訪問方法。
3????? 開啟關閉流
啟用流的方式:使用alter table KS.TABLE with stream_enabled=true 語句啟用流,當前流支持新舊映像。
關閉流:使用alter table KS.TABLE with stream_enabled=false 可以隨時禁用流。關閉流之后當前流表中的數據不會立即刪除,24之后刪除,原表中數據的變更不會再記錄到流表中。舉例:
CREATE TABLE ks.table ? ( id int, name text, addr text,age int,email text,PRIMARY KEY (name, id));???? // 創建表
Alter TABLE ks.table ? with stream_enabled=true;???? // 開啟流
Desc ks.table;???? // 查看流表是否創建
INSERT INTO ks.table ? (name , id , addr , age , email ) VALUES ('xiaoxin',31,'beijing1',33,'xiaoxin@163.com');?? // 向原表寫入數據
select * from ks."table$streaming";?? // 查看流表是否有相應數據產生
Alter TABLE ks.table ? with stream_enabled=false;?? // 關閉流表
4????? 讀取和處理流記錄
應用程序要讀取和處理流時,應用程序需要通過SDK連接到C*流表進行相應操作。
流表中的每條流記錄均代表一個原表中數據的修改,每條流記錄都會有一個時間信息,標識這條流產生的時間信息。每條流記錄會在24小時后自動刪除。
流表結構:
CREATE TABLE ks.table$streaming (
@shardID ? text,
@eventID ? timeuuid,
pk,
ck,
@newOldImage ? boolean,
@eventName text,
co1,
co2,
PRIMARY ? KEY (@shardID, @eventID, pk, ck, @newOldImage)? //pk,ck,為原表的pk,ck, co1,co2是原表的普通列
);
如上,流表中包含幾個特殊的字段:"@shardID"是分區鍵;"@eventID"是由插入時間生成的timeuuid,代表流數據產生的時間;"@newOldImage"代表新舊映像,0表示舊映像,1表示新映像;"@eventName"代表操作事件如"insert"、"update"、"delete"。
迭代處理流表的數據時請使用流表的分區加上時間戳范圍訪問。查詢分區時使用,返回分區列表:
select stream_shards from ? system_schema.tables? where ? keyspace_name='ks' and table_name='table';
例如:
cqlsh:ks> select stream_shards from system_schema.tables? where keyspace_name='ks' and ? table_name='table1';
stream_shards
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
['-9223372036854775808', ? '-6148914691236517206', '-4611686018427387905', '-3843071682022823258', ? '-1537228672809129303', '-2', '1537228672809129299', '4611686018427387901', ? '5270498306774157603', '6148914691236517202', '7686143364045646492', ? '7686143364045646503']
(1 rows)
使用分區+時間遍歷流表數據,范圍查詢時使用"@eventID"時間查詢,每次默認返回的大小根據數據量大小決定,下次迭代使用時間繼續往后迭代。例如:
select * From ks."table$streaming" ? where "@shardID" = '-9223372036854775808' and "@eventID" ? > a64a8340-e999-11e9-a7bd-cb5f001f61df limit 50;
5????? 接口說明
5.1????? GetShardIterator
public static List
功能: 獲取流表的shard信息
入參:
Cluster cluster:集群的cluster信息,連接數據庫使用
String keySpace:要查詢流數據的數據庫名稱
String tableName:要查詢流數據的表名稱
出參:
返回List
5.2????? GetRecords
public static StreamInfo ? GetRecords(Cluster cluster, String keySpace, TableEvent tableEvent)
功能: 獲取流表的具體數據
入參:
Cluster cluster:集群的cluster信息,連接數據庫使用
String keySpace:要查詢流數據的數據庫名稱
TableEvent tableEvent:要查詢流數據相關信息,TableEvent結構如下:
String table:要查詢流數據的表名稱
String shardID:要查詢流數據的shardID
String eventID:要查詢流數據的時間戳信息
int limitRow:要查詢流數據的限制條數,沒有指定情況下默認是100;
出參:
返回一組StreamInfo數據;具體結構如下:
String shardID:流數據的shardID
String table:流數據的原表名
List
String eventID:流數據的時間戳信息
String operateType:操作類型,例如: INSERT、UPDATE、DELETE
List
List
List
5.3????? GetShardIterator
public static List
功能: 獲取流表的shard信息
入參:
Session session:數據庫集群的連接session,調用函數之后session需要調用者關閉
String keySpace:要查詢流數據的數據庫名稱
String tableName:要查詢流數據的表名稱
出參:
返回List
5.4????? GetRecords
public static StreamInfo GetRecords(Session ? session, String keySpace, TableEvent tableEvent)
功能: 獲取流表的具體數據
入參:
Session session:數據庫集群的連接session,調用函數之后session需要調用者關閉;
String keySpace:要查詢流數據的數據庫名稱;
TableEvent tableEvent:要查詢流數據相關信息,TableEvent結構如下:
String table:要查詢流數據的表名稱;
String shardID:要查詢流數據的shardID;
String eventID:要查詢流數據的時間戳信息;
int limitRow:要查詢流數據的限制條數,沒有指定情況下默認是100;
List
出參:
返回一組StreamInfo數據;具體結構如下:
String shardID:流數據的shardID
String table:流數據的原表名
List
String eventID:流數據的時間戳信息
String operateType:操作類型,例如: INSERT、UPDATE、DELETE
List
List
List
5.5????? GetRecords返回結果范例
{
"ShardID": ? "-4611686018427387905",
"Table": ? "tb1",
"Records": ? [{
"EventID": ? "52236080-efb5-11e9-9c62-49626763b3dc",
"OperateType": ? "INSERT",
"Keys": ? [{
"columnName": ? "name",
"value": ? "zhoujielun",
"type": ? "varchar"
}, ? {
"columnName": ? "id",
"value": ? 31,
"type": ? "int"
}],
"NewImage": ? [{
"columnName": ? "name",
"value": ? "zhoujielun",
"type": ? "varchar"
}, ? {
"columnName": ? "id",
"value": ? 31,
"type": ? "int"
}, ? {
"columnName": ? "addr",
"value": ? "宇宙中心",
"type": ? "varchar"
}, ? {
"columnName": ? "age",
"value": ? 33,
"type": ? "int"
}, ? {
"columnName": ? "email",
"value": ? "zhoujielun.com",
"type": ? "varchar"
}],
"OldImage": ? []
}, ? {
"EventID": ? "52255c50-efb5-11e9-9c62-49626763b3dc",
"OperateType": ? "UPDATE",
"Keys": ? [{
"columnName": ? "name",
"value": ? "zhoujielun",
"type": ? "varchar"
}, ? {
"columnName": ? "id",
"value": ? 32,
"type": ? "int"
}],
"NewImage": ? [{
"columnName": ? "name",
"value": ? "zhoujielun",
"type": ? "varchar"
}, ? {
"columnName": ? "id",
"value": ? 32,
"type": ? "int"
}, ? {
"columnName": ? "addr",
"value": ? "宇宙中心",
"type": ? "varchar"
}, ? {
"columnName": ? "age",
"value": ? 33,
"type": ? "int"
}, ? {
"columnName": ? "email",
"value": ? "zhoujielun.com",
"type": ? "varchar"
}],
"OldImage": ? [{
"columnName": ? "name",
"value": ? "zhoujielun",
"type": ? "varchar"
}, ? {
"columnName": ? "id",
"value": ? 32,
"type": ? "int"
}, ? {
"columnName": ? "addr",
"value": ? "宇宙中心",
"type": ? "varchar"
}, ? {
"columnName": ? "age",
"value": ? 33,
"type": ? "int"
}, ? {
"columnName": ? "email",
"value": ? "zhoujielun.com",
"type": ? "varchar"
}]
}, ? {
"EventID": ? "52261fa0-efb5-11e9-9c62-49626763b3dc",
"OperateType": ? "UPDATE",
"Keys": ? [{
"columnName": ? "name",
"value": ? "zhoujielun",
"type": ? "varchar"
}, ? {
"columnName": ? "id",
"value": ? 33,
"type": ? "int"
}],
"NewImage": ? [{
"columnName": ? "name",
"value": ? "zhoujielun",
"type": ? "varchar"
}, ? {
"columnName": ? "id",
"value": ? 33,
"type": ? "int"
}, ? {
"columnName": ? "addr",
"value": ? "宇宙中心",
"type": ? "varchar"
}, ? {
"columnName": ? "age",
"value": ? 33,
"type": ? "int"
}, ? {
"columnName": ? "email",
"value": ? "zhoujielun.com",
"type": ? "varchar"
}],
"OldImage": ? [{
"columnName": ? "name",
"value": ? "zhoujielun",
"type": ? "varchar"
}, ? {
"columnName": ? "id",
"value": ? 33,
"type": ? "int"
}, ? {
"columnName": ? "addr",
"value": ? "宇宙中心",
"type": ? "varchar"
}, ? {
"columnName": ? "age",
"value": ? 33,
"type": ? "int"
}, ? {
"columnName": ? "email",
"value": ? "zhoujielun.com",
"type": ? "varchar"
}]
}]
}
5.6????? 接口使用demo1
package com.huawei.hwcloud.stream;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.huawei.hwcloud.stream.req.RowInfo;
import com.huawei.hwcloud.stream.req.StreamInfo;
import com.huawei.hwcloud.stream.req.TableEvent;
import java.util.List;
public class Main {
public static void main(String[] args) {
Cluster cluster = Cluster.builder().addContactPoint("xxx.95.xxx.201").withPort(9042).build();
//??????? Cluster cluster = Cluster.builder().addContactPoint(endpoint).withPort(port).withCredentials(username, password).build();
List
System.out.println(pm);
List streamShards = null;
try {
streamShards = StreamFetcher.GetShardIterator(cluster, "test", "tb1");
}
catch (Exception e) {
e.printStackTrace();
}
System.out.println(streamShards);
TableEvent tableEvent = new TableEvent();
tableEvent.setEventID("43e0eeb0-ee80-11e9-9c62-49626763b3dc");
tableEvent.setShardID("-4611686018427387905");
tableEvent.setTable("tb1");
tableEvent.setLimitRow(6);
StreamInfo streamInfo = null;
try {
streamInfo = StreamFetcher.GetRecords(cluster, "test", tableEvent);
}
catch (Exception e) {
e.printStackTrace();
}
Gson gson = new GsonBuilder().create();
String line = gson.toJson(streamInfo);
System.out.println(line);
System.out.println(streamInfo.getColumns().size());
for (RowInfo rowInfo: streamInfo.getColumns()) {
System.out.println(rowInfo.toString());
}
System.exit(0);
}
}
5.7????? 接口使用demo2
package com.huawei.hwcloud.stream;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.Session;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.huawei.hwcloud.stream.req.RowInfo;
import com.huawei.hwcloud.stream.req.StreamInfo;
import com.huawei.hwcloud.stream.req.TableEvent;
import com.huawei.hwcloud.stream.utils.WrapperCassandraSession;
import java.util.List;
public class Main2
{
public static void main(String[] args) {
Cluster cluster = Cluster.builder().addContactPoint("XXX.95.XXX.201").withPort(9042).build();
//??????? Cluster cluster = Cluster.builder().addContactPoint(endpoint).withPort(port).withCredentials(username, password).build();
List
System.out.println(pk);
Session session = cluster.connect();
List
System.out.println(streamShards);
TableEvent tableEvent = new TableEvent();
tableEvent.setEventID("43e0eeb0-ee80-11e9-9c62-49626763b3dc");
tableEvent.setShardID("-4611686018427387905");
tableEvent.setTable("tb1");
tableEvent.setLimitRow(6);
tableEvent.setPrimaryKey(pk);
StreamInfo streamInfo = StreamFetcher.GetRecords(session, "test", tableEvent);
Gson gson = new GsonBuilder().create();
String line = gson.toJson(streamInfo);
System.out.println(line);
System.out.println(streamInfo.getColumns().size());
for (RowInfo rowInfo: streamInfo.getColumns()) {
System.out.println(rowInfo.toString());
}
session.close();
System.exit(0);
}
}
6????? 功能約束
1)流表中的數據保留24小時。
2)流表中的數據會占用數據庫的磁盤空間。
3)通過CQL語句不能創建帶有"$streaming"后綴的流表。
4)流表可以通過drop MATERIALIZED VIEW ks."table$streaming";進行刪除,流表使用物化視圖實現,遵從物化視圖的限制要求。
附件: GeminiDB for Cassandra流功能介紹.docx 90.42KB 下載次數:0次
云數據庫 GaussDB(for Cassandra) 云數據庫 MySQL 數據庫
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。