博客
关于我
Flink1.11中的CDC Connectors操作实践
阅读量:798 次
发布时间:2023-04-02

本文共 5359 字,大约阅读时间需要 17 分钟。

Apache Flink 1.11 CDC Connector 使用指南

1. 引言

Apache Flink 1.11 引入了基于 CDC(Change Data Capture)的连接器,这使得捕获数据库中的变更数据变得更加简便。通过 Flink 的 CDC 连接器,开发者可以高效地处理增量数据的实时分析与物化。以下将详细介绍 Flink 1.11 的 CDC 连接器,包括其特点、使用场景以及操作实践。

2. CDC 简介

CDC(Change Data Capture)是一种数据库技术,用于实时捕获数据库中表的变更数据。Flink 的 CDC 连接器通过Debezium引擎实现数据变更的实时提取,支持多种数据库源和目标。

Flink 提供了多种表格式以适应不同场景,具体如下:

格式名称 支持连接器
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

4. 使用注意事项

4.1 MySQL CDC 注意事项

  • 依赖管理:在项目中添加以下依赖:

    com.alibaba.ververica
    flink-connector-mysql-cdc
    1.0.0
  • Flink SQL Client 配置:添加 flink-sql-connector-mysql-cdc-1.0.0.jar 到 Flink 安装目录的 lib 文件夹。

4.2 Canal JSON 注意事项

  • 依赖管理:添加以下依赖:

    org.apache.flink
    flink-connector-kafka_2.11
    1.11.0
  • Flink SQL Client 配置:添加 flink-sql-connector-kafka_2.11-1.11.0.jar 到 Flink 安装目录的 lib 文件夹。

4.3 Changelog JSON 注意事项

  • 依赖管理:添加以下依赖:

    com.alibaba.ververica
    flink-format-changelog-json
    1.0.0
  • Flink SQL Client 配置:添加 flink-format-changelog-json-1.0.0.jar 到 Flink 安装目录的 lib 文件夹。

5. 操作实践

5.1 MySQL CDC 操作示例

5.1.1 创建MySQL 数据源表

-- 创建 `order_info` 表DROP TABLE IF EXISTS `order_info`;CREATE TABLE `order_info` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1表示下单,2表示支付',  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  `operate_time` datetime DEFAULT NULL COMMENT '操作时间',  `expire_time` datetime DEFAULT NULL COMMENT '失效时间',  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',  `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',  `province_id` int(20) DEFAULT NULL COMMENT '地区',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';-- 插入数据INSERT INTO `order_info` VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', NULL, NULL, NULL, NULL, NULL, NULL, 9, NULL, NULL, NULL, NULL, NULL, '2020-06-18 02:21:38');INSERT INTO `order_info` VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', NULL, NULL, NULL, NULL, NULL, NULL, 3, NULL, NULL, NULL, NULL, NULL, '2020-06-18 09:12:25');INSERT INTO `order_info` VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', NULL, NULL, NULL, NULL, NULL, NULL, 7, NULL, NULL, NULL, NULL, NULL, '2020-06-18 15:56:34');
-- 启动 Flink 集群$ ./flink.sh -Dconf/spark.rpc.port=6060-- 启动 SQL CLI$ ./flink-sql-cli.sh-- 执行创建命令CREATE TABLE order_info (  `id` bigint,  `user_id` bigint,  `create_time` timestamp(0),  `operate_time` timestamp(0),  `province_id` int,  `order_status` string,  `total_amount` decimal(10,5)) WITH (  'connector' = 'mysql-cdc',  'hostname' = 'kms-1',  'port' = '3306',  'username' = 'root',  'password' = '123qwe',  'database-name' = 'mydw',  'table-name' = 'order_info');

5.2 Canal JSON 操作示例

5.2.1 创建 Canal JSON 格式表

CREATE TABLE region (  `id` bigint,  `region_name` string) WITH (  'connector' = 'kafka',  'topic' = 'mydw.base_region',  'properties.bootstrap.servers' = 'kms-3:9092',  'properties.group.id' = 'testGroup',  'format' = 'canal-json',  'scan.startup.mode' = 'earliest-offset');

5.2.2 插入数据

{  "data": [    { "id": "1", "region_name": "华北" },    { "id": "2", "region_name": "华东" },    { "id": "3", "region_name": "东北" },    { "id": "4", "region_name": "华中" },    { "id": "5", "region_name": "华南" },    { "id": "6", "region_name": "西南" },    { "id": "7", "region_name": "西北" }  ],  "database": "mydw",  "es": 1597128441000,  "id": 102,  "isDdl": false,  "mysqlType": {    "id": "varchar(20)",    "region_name": "varchar(20)"  },  "old": null,  "pkNames": null,  "sql": "",  "sqlType": {    "id": 12,    "region_name": 12  },  "table": "base_region",  "ts": 1597128441424,  "type": "INSERT"}

5.3 Changelog JSON 操作示例

5.3.1 创建 Changelog JSON 格式表

CREATE TABLE order_gmv (  `day_str` string,  `gmv` decimal(10,5)) WITH (  'connector' = 'kafka',  'topic' = 'order_gmv_kafka',  'scan.startup.mode' = 'earliest-offset',  'properties.bootstrap.servers' = 'kms-3:9092',  'format' = 'changelog-json');-- 插入数据INSERT INTO order_gmv SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv FROM order_info WHERE order_status = '2' GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd');

6. 总结

本文详细介绍了 Apache Flink 1.11 的 CDC Connector,涵盖了其特点、使用场景以及操作实践。通过本文,读者可以轻松理解 Flink 的 CDC 连接器如何简化数据处理流程,并通过具体的代码示例掌握其应用。

转载地址:http://fqefk.baihongyu.com/

你可能感兴趣的文章
Oracle 返回结果集的 存储过程
查看>>
Oracle 递归
查看>>
Oracle 递归函数与拼接
查看>>
oracle 逻辑优化,提升高度,综合SQL上下文进行逻辑优化
查看>>
oracle 闪回关闭,关闭闪回即disable flashback的操作步骤
查看>>
oracle 限制用户并行,insert /*parallel */ 到不同用户,并行起不来的问题
查看>>
oracle--用户,权限,角色的管理
查看>>
Oracle-定时任务-JOB
查看>>
oracle.dataaccess 连接池,asp.net使用Oracle.DataAccess.dll连接Oracle
查看>>
oracle00205报错,Oracle控制文件损坏报错场景
查看>>
Oracle10g EM乱码之快速解决
查看>>
Oracle10g下载地址--多平台下的32位和64位
查看>>
Oracle10g安装了11g的ODAC后,PL/SQL连接提示TNS:无法解析指定的连接标识符
查看>>
oracle11g dataguard物理备库搭建(关闭主库cp数据文件到备库)
查看>>
Oracle11G基本操作
查看>>
Oracle11g服务详细介绍及哪些服务是必须开启的?
查看>>
Oracle11g静默安装dbca,netca报错处理--直接跟换操作系统
查看>>
oracle12安装软件后安装数据库,然后需要自己配置监听
查看>>
Oracle——08PL/SQL简介,基本程序结构和语句
查看>>
Oracle——distinct的用法
查看>>