云原生多模数据库 Lindorm RDS全增量同步

By | 2021年4月22日

功能列表

  • RDS数据全增量一体同步到lindorm宽表模型(兼容HBase、Cassandra访问)

  • RDS数据变换,详见配置说明

  • RDS多表同步

  • 自定义数据过滤和处理 (待上线)

  • DDL变更自动感知 (待上线)

  • 脏数据处理(待上线)

使用场景

  • RDS低成本历史库

  • RDS数据全量迁移至lindorm

使用限制

  • 支持源数据源RDS, DTS

  • 支持目标数据源lindorm宽表模型(兼容HBase、Cassandra访问),HBase,Phoenix

准备工作

  1. 购买LTS数据迁移同步服务,配置LTS操作页面账户密码,并登录LTS操作页面。具体请参见开通LTS

  2. 打通LTS和Lindorm/HBase迁移集群、RDS实例的网络(全部在相同vpc内可以跳过)。具体请参见网络打通

操作步骤

  1. 创建任务。


    创建任务

    1. 导入Lindorm/HBase > RDS全增量同步页面,单击创建任务

    2. 选择RDS数据源、DTS数据源以及目标数据源

    3. 选择要同步的表,点击生成配置

    4. 点击创建完成任务创建

    说明

    • RDS全增量同步先进行全量历史数据迁移,迁移完成后在进行增量数据迁移

    • 导入CQL默认生成配置同RDS表字段和类型一一对应,需要更改可以手动编辑修改字段名和对应关系,详见配置说明

    • 导入HBase/Lindorm默认生成列簇f,RDS中的字段会和f下列一一对应,同时rowkey为RDS主键字符串拼接

    • 默认生成配置会跳过RDS删除操作如果保留需要手动修改配置,详见配置说明

  2. 添加数据源

    1. Lindorm宽表数据源

    2. RDS数据源添加

    3. DTS订阅通道

配置说明

点击编辑可以查看默认配置,同时可以修改

CQL表同步配置说明

{
    "reader": {
        "querySql": [
            "select * from dts.cluster where id < 1000",//全量同步查询语句,一个语句对应一个读取线程
      "select * from dts.cluster where id >= 1000"//建议进行拆分提高速度和减小重试代价
        ]
    },
    "writer": {
        "columns": [
            {
                "name": "id", //目标表中字段名称
                "value": "id",//原表中字段名称
                "isPk": true , //是否是主键
        "type": "bigint" //CQL中字段类型,可以不填写默认同RDS表类型一致
            },
      {
                "name": "cluster_id",
                "value": "cluster_id",
                "isPk": false
            },
      {
                "name": "id_and_cluster",
                "value": "{{concat(id, cluster_id)}}",//支持Jtwig域名对数据进行变换
                "isPk": true
            },
        ],
        "config": {
            "skipDelete": true //跳过删除操作
        }
    }
}{
    "reader": {
        "querySql": [
            "select * from dts.cluster where id < 1000",//全量同步查询语句,一个语句对应一个读取线程
      "select * from dts.cluster where id >= 1000"//建议进行拆分提高速度和减小重试代价
        ]
    },
    "writer": {
        "columns": [
            {
                "name": "id", //目标表中字段名称
                "value": "id",//原表中字段名称
                "isPk": true , //是否是主键
        "type": "bigint" //CQL中字段类型,可以不填写默认同RDS表类型一致
            },
      {
                "name": "cluster_id",
                "value": "cluster_id",
                "isPk": false
            },
      {
                "name": "id_and_cluster",
                "value": "{{concat(id, cluster_id)}}",//支持Jtwig域名对数据进行变换
                "isPk": true
            },
        ],
        "config": {
            "skipDelete": true //跳过删除操作
        }
    }
                

Jtwig语法说明

HBase API访问同步配置说明

{
    "reader": {
        "querySql": [
            "select * from dts.cluster where id < 1000",//全量同步查询语句,一个语句对应一个读取线程
      "select * from dts.cluster where id >= 1000"//建议进行拆分提高速度和减小重试代价
        ]
    },
    "writer": {
        "columns": [
            {
                "name": "f:id",//目标表中字段名称
                "value": "id", //原表中字段名称
        "isPk": false //不影响同步忽略
            },
            {
                "name": "f:cluster_id",
                "value": "cluster_id",
                "isPk": false
            },
      {
        "name": "f:id_and_cluster",
                "value": "{{concat(id, cluster_id)}}",//支持Jtwig域名对数据进行变换
       }
        ],
        "rowkey": {
            "value": "id" //hbase模型中rowkey由RDS哪些字段组成,支持Jtwig语法
        },
        "config": {
            "skipDelete": true//跳过删除操作
        },
        "table": {
            "name": "dts:cluster",// Lindorm/HBase中表名
            "parameter": {
                "compression": "ZSTD",//Lindorm/HBase中,新建表压缩算法,推荐使用ZSTD
        "split":["1", "5", "9", "b"] //指定splitkey,对新建表进行预分区
            }
        },
        "sourceTable": "dts.cluster"
    }
}{
    "reader": {
        "querySql": [
            "select * from dts.cluster where id < 1000",//全量同步查询语句,一个语句对应一个读取线程
      "select * from dts.cluster where id >= 1000"//建议进行拆分提高速度和减小重试代价
        ]
    },
    "writer": {
        "columns": [
            {
                "name": "f:id",//目标表中字段名称
                "value": "id", //原表中字段名称
        "isPk": false //不影响同步忽略
            },
            {
                "name": "f:cluster_id",
                "value": "cluster_id",
                "isPk": false
            },
      {
        "name": "f:id_and_cluster",
                "value": "{{concat(id, cluster_id)}}",//支持Jtwig域名对数据进行变换
       }
        ],
        "rowkey": {
           //hbase模型中rowkey由RDS那些字段组成,支持Jtwig语法,用到的字段必须在columns中配置
            "value": "id" 
        },
        "config": {
            "skipDelete": true//跳过删除操作
        },
        "table": {
            "name": "dts:cluster",// Lindorm/HBase中表名
            "parameter": {
                "compression": "ZSTD",//Lindorm/HBase中,新建表压缩算法,推荐使用ZSTD
        "split":["1", "5", "9", "b"] //指定splitkey,对新建表进行预分区
            }
        },
        "sourceTable": "dts.cluster"
    }
                

RDS&CQL类型对应关系

RDS

CQL

TINYINT

boolean

SMALLINT

smallint

MEDIUMINT

int

INTEGER

int

BIGINT

bigint

FLOAT

float

DOUBLE

double

DECIMAL

decimal

DATE

date

TIME

time

YEAR

int

DATETIME

text

TIMESTAMP

timestamp

CHAR

varchar

VARCHAR

varchar

TINYBLOB

blob

TINYTEXT

text

BLOB

blob

TEXT

text

MEDIUMBLOB

blob

MEDIUMTEXT

text

LONGBLOB

blob

LONGTEXT

text

请关注公众号获取更多资料

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注