本文共 5359 字,大约阅读时间需要 17 分钟。
Apache Flink 1.11 引入了基于 CDC(Change Data Capture)的连接器,这使得捕获数据库中的变更数据变得更加简便。通过 Flink 的 CDC 连接器,开发者可以高效地处理增量数据的实时分析与物化。以下将详细介绍 Flink 1.11 的 CDC 连接器,包括其特点、使用场景以及操作实践。
CDC(Change Data Capture)是一种数据库技术,用于实时捕获数据库中表的变更数据。Flink 的 CDC 连接器通过Debezium引擎实现数据变更的实时提取,支持多种数据库源和目标。
Flink 提供了多种表格式以适应不同场景,具体如下:
| 格式名称 | 支持连接器 |
|---|---|
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
| Apache Parquet | Filesystem |
| Apache ORC | Filesystem |
依赖管理:在项目中添加以下依赖:
com.alibaba.ververica flink-connector-mysql-cdc 1.0.0
Flink SQL Client 配置:添加 flink-sql-connector-mysql-cdc-1.0.0.jar 到 Flink 安装目录的 lib 文件夹。
依赖管理:添加以下依赖:
org.apache.flink flink-connector-kafka_2.11 1.11.0
Flink SQL Client 配置:添加 flink-sql-connector-kafka_2.11-1.11.0.jar 到 Flink 安装目录的 lib 文件夹。
依赖管理:添加以下依赖:
com.alibaba.ververica flink-format-changelog-json 1.0.0
Flink SQL Client 配置:添加 flink-format-changelog-json-1.0.0.jar 到 Flink 安装目录的 lib 文件夹。
-- 创建 `order_info` 表DROP TABLE IF EXISTS `order_info`;CREATE TABLE `order_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号', `consignee` varchar(100) DEFAULT NULL COMMENT '收货人', `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话', `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额', `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1表示下单,2表示支付', `user_id` bigint(20) DEFAULT NULL COMMENT '用户id', `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式', `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址', `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注', `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)', `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `operate_time` datetime DEFAULT NULL COMMENT '操作时间', `expire_time` datetime DEFAULT NULL COMMENT '失效时间', `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号', `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号', `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径', `province_id` int(20) DEFAULT NULL COMMENT '地区', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';-- 插入数据INSERT INTO `order_info` VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', NULL, NULL, NULL, NULL, NULL, NULL, 9, NULL, NULL, NULL, NULL, NULL, '2020-06-18 02:21:38');INSERT INTO `order_info` VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', NULL, NULL, NULL, NULL, NULL, NULL, 3, NULL, NULL, NULL, NULL, NULL, '2020-06-18 09:12:25');INSERT INTO `order_info` VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', NULL, NULL, NULL, NULL, NULL, NULL, 7, NULL, NULL, NULL, NULL, NULL, '2020-06-18 15:56:34');
-- 启动 Flink 集群$ ./flink.sh -Dconf/spark.rpc.port=6060-- 启动 SQL CLI$ ./flink-sql-cli.sh-- 执行创建命令CREATE TABLE order_info ( `id` bigint, `user_id` bigint, `create_time` timestamp(0), `operate_time` timestamp(0), `province_id` int, `order_status` string, `total_amount` decimal(10,5)) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'kms-1', 'port' = '3306', 'username' = 'root', 'password' = '123qwe', 'database-name' = 'mydw', 'table-name' = 'order_info');
CREATE TABLE region ( `id` bigint, `region_name` string) WITH ( 'connector' = 'kafka', 'topic' = 'mydw.base_region', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json', 'scan.startup.mode' = 'earliest-offset');
{ "data": [ { "id": "1", "region_name": "华北" }, { "id": "2", "region_name": "华东" }, { "id": "3", "region_name": "东北" }, { "id": "4", "region_name": "华中" }, { "id": "5", "region_name": "华南" }, { "id": "6", "region_name": "西南" }, { "id": "7", "region_name": "西北" } ], "database": "mydw", "es": 1597128441000, "id": 102, "isDdl": false, "mysqlType": { "id": "varchar(20)", "region_name": "varchar(20)" }, "old": null, "pkNames": null, "sql": "", "sqlType": { "id": 12, "region_name": 12 }, "table": "base_region", "ts": 1597128441424, "type": "INSERT"} CREATE TABLE order_gmv ( `day_str` string, `gmv` decimal(10,5)) WITH ( 'connector' = 'kafka', 'topic' = 'order_gmv_kafka', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'kms-3:9092', 'format' = 'changelog-json');-- 插入数据INSERT INTO order_gmv SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv FROM order_info WHERE order_status = '2' GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd');
本文详细介绍了 Apache Flink 1.11 的 CDC Connector,涵盖了其特点、使用场景以及操作实践。通过本文,读者可以轻松理解 Flink 的 CDC 连接器如何简化数据处理流程,并通过具体的代码示例掌握其应用。
转载地址:http://fqefk.baihongyu.com/