功能列表
-
RDS数据全增量一体同步到lindorm宽表模型(兼容HBase、Cassandra访问)
-
RDS数据变换,详见配置说明
-
RDS多表同步
-
自定义数据过滤和处理 (待上线)
-
DDL变更自动感知 (待上线)
-
脏数据处理(待上线)
使用场景
-
RDS低成本历史库
-
RDS数据全量迁移至lindorm
使用限制
-
支持源数据源RDS, DTS
-
支持目标数据源lindorm宽表模型(兼容HBase、Cassandra访问),HBase,Phoenix
准备工作
操作步骤
-
创建任务。
-
在
页面,单击创建任务 -
选择RDS数据源、DTS数据源以及目标数据源
-
选择要同步的表,点击生成配置
-
点击创建完成任务创建
说明-
RDS全增量同步先进行全量历史数据迁移,迁移完成后在进行增量数据迁移
-
导入CQL默认生成配置同RDS表字段和类型一一对应,需要更改可以手动编辑修改字段名和对应关系,详见配置说明
-
导入HBase/Lindorm默认生成列簇f,RDS中的字段会和f下列一一对应,同时rowkey为RDS主键字符串拼接
-
默认生成配置会跳过RDS删除操作如果保留需要手动修改配置,详见配置说明
-
-
添加数据源
配置说明
点击编辑可以查看默认配置,同时可以修改
CQL表同步配置说明
{
"reader": {
"querySql": [
"select * from dts.cluster where id < 1000",//全量同步查询语句,一个语句对应一个读取线程
"select * from dts.cluster where id >= 1000"//建议进行拆分提高速度和减小重试代价
]
},
"writer": {
"columns": [
{
"name": "id", //目标表中字段名称
"value": "id",//原表中字段名称
"isPk": true , //是否是主键
"type": "bigint" //CQL中字段类型,可以不填写默认同RDS表类型一致
},
{
"name": "cluster_id",
"value": "cluster_id",
"isPk": false
},
{
"name": "id_and_cluster",
"value": "{{concat(id, cluster_id)}}",//支持Jtwig域名对数据进行变换
"isPk": true
},
],
"config": {
"skipDelete": true //跳过删除操作
}
}
}{
"reader": {
"querySql": [
"select * from dts.cluster where id < 1000",//全量同步查询语句,一个语句对应一个读取线程
"select * from dts.cluster where id >= 1000"//建议进行拆分提高速度和减小重试代价
]
},
"writer": {
"columns": [
{
"name": "id", //目标表中字段名称
"value": "id",//原表中字段名称
"isPk": true , //是否是主键
"type": "bigint" //CQL中字段类型,可以不填写默认同RDS表类型一致
},
{
"name": "cluster_id",
"value": "cluster_id",
"isPk": false
},
{
"name": "id_and_cluster",
"value": "{{concat(id, cluster_id)}}",//支持Jtwig域名对数据进行变换
"isPk": true
},
],
"config": {
"skipDelete": true //跳过删除操作
}
}
HBase API访问同步配置说明
{
"reader": {
"querySql": [
"select * from dts.cluster where id < 1000",//全量同步查询语句,一个语句对应一个读取线程
"select * from dts.cluster where id >= 1000"//建议进行拆分提高速度和减小重试代价
]
},
"writer": {
"columns": [
{
"name": "f:id",//目标表中字段名称
"value": "id", //原表中字段名称
"isPk": false //不影响同步忽略
},
{
"name": "f:cluster_id",
"value": "cluster_id",
"isPk": false
},
{
"name": "f:id_and_cluster",
"value": "{{concat(id, cluster_id)}}",//支持Jtwig域名对数据进行变换
}
],
"rowkey": {
"value": "id" //hbase模型中rowkey由RDS哪些字段组成,支持Jtwig语法
},
"config": {
"skipDelete": true//跳过删除操作
},
"table": {
"name": "dts:cluster",// Lindorm/HBase中表名
"parameter": {
"compression": "ZSTD",//Lindorm/HBase中,新建表压缩算法,推荐使用ZSTD
"split":["1", "5", "9", "b"] //指定splitkey,对新建表进行预分区
}
},
"sourceTable": "dts.cluster"
}
}{
"reader": {
"querySql": [
"select * from dts.cluster where id < 1000",//全量同步查询语句,一个语句对应一个读取线程
"select * from dts.cluster where id >= 1000"//建议进行拆分提高速度和减小重试代价
]
},
"writer": {
"columns": [
{
"name": "f:id",//目标表中字段名称
"value": "id", //原表中字段名称
"isPk": false //不影响同步忽略
},
{
"name": "f:cluster_id",
"value": "cluster_id",
"isPk": false
},
{
"name": "f:id_and_cluster",
"value": "{{concat(id, cluster_id)}}",//支持Jtwig域名对数据进行变换
}
],
"rowkey": {
//hbase模型中rowkey由RDS那些字段组成,支持Jtwig语法,用到的字段必须在columns中配置
"value": "id"
},
"config": {
"skipDelete": true//跳过删除操作
},
"table": {
"name": "dts:cluster",// Lindorm/HBase中表名
"parameter": {
"compression": "ZSTD",//Lindorm/HBase中,新建表压缩算法,推荐使用ZSTD
"split":["1", "5", "9", "b"] //指定splitkey,对新建表进行预分区
}
},
"sourceTable": "dts.cluster"
}
RDS&CQL类型对应关系
RDS |
CQL |
---|---|
TINYINT |
boolean |
SMALLINT |
smallint |
MEDIUMINT |
int |
INTEGER |
int |
BIGINT |
bigint |
FLOAT |
float |
DOUBLE |
double |
DECIMAL |
decimal |
DATE |
date |
TIME |
time |
YEAR |
int |
DATETIME |
text |
TIMESTAMP |
timestamp |
CHAR |
varchar |
VARCHAR |
varchar |
TINYBLOB |
blob |
TINYTEXT |
text |
BLOB |
blob |
TEXT |
text |
MEDIUMBLOB |
blob |
MEDIUMTEXT |
text |
LONGBLOB |
blob |
LONGTEXT |
text |