diff --git a/docs/content.zh/docs/get-started/quickstart/mysql-to-kafka.md b/docs/content.zh/docs/get-started/quickstart/mysql-to-kafka.md new file mode 100644 index 00000000000..cc943079881 --- /dev/null +++ b/docs/content.zh/docs/get-started/quickstart/mysql-to-kafka.md @@ -0,0 +1,588 @@ +--- +title: "MySQL 同步到 Kafka" +weight: 2 +type: docs +aliases: +- /try-flink-cdc/pipeline-connectors/mysql-Kafka-pipeline-tutorial.html +--- + + +# Streaming ELT 同步 MySQL 到 Kafka + +这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Kafka 的 Streaming ELT 作业,包含整库同步、表结构变更同步和分库分表同步的功能。 +本教程的演示都将在 Flink CDC CLI 中进行,无需一行 Java/Scala 代码,也无需安装 IDE。 + +## 准备阶段 +准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。 + +### 准备 Flink Standalone 集群 +1. 下载 [Flink 1.20.1](https://archive.apache.org/dist/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz) ,解压后得到 flink-1.20.1 目录。 + 使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.20.1 所在目录。 + + ```shell + tar -zxvf flink-1.20.1-bin-scala_2.12.tgz + exprot FLINK_HOME=$(pwd)/flink-1.20.1 + cd flink-1.20.1 + ``` + +2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。 + + ```yaml + execution: + checkpointing: + interval: 3000 + ``` + +3. 使用下面的命令启动 Flink 集群。 + + ```shell + ./bin/start-cluster.sh + ``` + +启动成功的话,可以在 [http://localhost:8081/](http://localhost:8081/) 访问到 Flink Web UI,如下所示: + +{{< img src="/fig/mysql-Kafka-tutorial/flink-ui.png" alt="Flink UI" >}} + +多次执行 start-cluster.sh 可以拉起多个 TaskManager。 +注:如果你是云服务器,无法访问本地,需要将 conf/config.yaml 里面 rest.bind-address 和 rest.address的 localhost 改成0.0.0.0,然后使用 公网IP:8081 即可访问。 + +### 准备 Docker 环境 +使用下面的内容创建一个 `docker-compose.yml` 文件: + + ```yaml + version: '2.1' + services: + Zookeeper: + image: zookeeper:3.7.1 + ports: + - "2181:2181" + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + Kafka: + image: bitnami/kafka:2.8.1 + ports: + - "9092:9092" + - "9093:9093" + environment: + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_LISTENERS=PLAINTEXT://:9092 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.67.2:9092 + - KAFKA_ZOOKEEPER_CONNECT=192.168.67.2:2181 + MySQL: + image: debezium/example-mysql:1.1 + ports: + - "3306:3306" + environment: + - MYSQL_ROOT_PASSWORD=123456 + - MYSQL_USER=mysqluser + - MYSQL_PASSWORD=mysqlpw + ``` +注意:文件里面的 192.168.67.2 为内网 IP,可通过 ifconfig 查找。 +该 Docker Compose 中包含的容器有: +- MySQL: 包含商品信息的数据库 `app_db` +- Kafka: 存储从 MySQL 中根据规则映射过来的结果表 +- Zookeeper:主要用于进行Kafka集群管理和协调 + +在 `docker-compose.yml` 所在目录下执行下面的命令来启动本教程需要的组件: + + ```shell + docker-compose up -d + ``` + +该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,如下所示。 +{{< img src="/fig/mysql-Kafka-tutorial/docker-ps.png" alt="Docker ps" >}} +#### 在 MySQL 数据库中准备数据 +1. 进入 MySQL 容器 + + ```shell + docker-compose exec MySQL mysql -uroot -p123456 + ``` + +2. 创建数据库 `app_db` 和表 `orders`,`products`,`shipments`,并插入数据 + + ```sql + -- 创建数据库 + CREATE DATABASE app_db; + + USE app_db; + + -- 创建 orders 表 + CREATE TABLE `orders` ( + `id` INT NOT NULL, + `price` DECIMAL(10,2) NOT NULL, + PRIMARY KEY (`id`) + ); + + -- 插入数据 + INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); + INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00); + + -- 创建 shipments 表 + CREATE TABLE `shipments` ( + `id` INT NOT NULL, + `city` VARCHAR(255) NOT NULL, + PRIMARY KEY (`id`) + ); + + -- 插入数据 + INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); + INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian'); + + -- 创建 products 表 + CREATE TABLE `products` ( + `id` INT NOT NULL, + `product` VARCHAR(255) NOT NULL, + PRIMARY KEY (`id`) + ); + + -- 插入数据 + INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); + INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); + INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut'); + ``` + +## 通过 Flink CDC CLI 提交任务 +1. 下载下面列出的二进制压缩包,并解压得到目录 `flink-cdc-{{< param Version >}}`; + [flink-cdc-{{< param Version >}}-bin.tar.gz](https://www.apache.org/dyn/closer.lua/flink/flink-cdc-{{< param Version >}}/flink-cdc-{{< param Version >}}-bin.tar.gz) + flink-cdc-{{< param Version >}} 下会包含 `bin`、`lib`、`log`、`conf` 四个目录。 + +2. 下载下面列出的 connector 包,并且移动到 lib 目录下; + **下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译。** + **请注意,您需要将 jar 移动到 Flink CDC Home 的 lib 目录,而非 Flink Home 的 lib 目录下。** + - [MySQL pipeline connector {{< param Version >}}](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/{{< param Version >}}/flink-cdc-pipeline-connector-mysql-{{< param Version >}}.jar) + - [Kafka pipeline connector {{< param Version >}}](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-kafka/{{< param Version >}}/flink-cdc-pipeline-connector-kafka-{{< param Version >}}.jar) + + 您还需要将下面的 Driver 包放在 Flink `lib` 目录下,或通过 `--jar` 参数将其传入 Flink CDC CLI,因为 CDC Connectors 不再包含这些 Drivers: + - [MySQL Connector Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar) + +3. 编写任务配置 yaml 文件。 + 下面给出了一个整库同步的示例文件 mysql-to-Kafka.yaml: + + ```yaml + ################################################################################ + # Description: Sync MySQL all tables to Kafka + ################################################################################ + source: + type: mysql + hostname: 0.0.0.0 + port: 3306 + username: root + password: 123456 + tables: app_db.\.* + server-id: 5400-5404 + server-time-zone: UTC + + sink: + type: kafka + name: Kafka Sink + properties.bootstrap.servers: 0.0.0.0:9092 + topic: yaml-mysql-kafka + + + pipeline: + name: MySQL to Kafka Pipeline + parallelism: 1 + + + ``` + +其中: +* source 中的 `tables: app_db.\.*` 通过正则匹配同步 `app_db` 下的所有表。 + +4. 最后,通过命令行提交任务到 Flink Standalone cluster + + ```shell + bash bin/flink-cdc.sh mysql-to-kafka.yaml + #参考,一些自定义路径的示例 主要用于多版本flink,mysql驱动不一致等情况 如下, + #bash /root/flink-cdc-3.4.0/bin/flink-cdc.sh /root/flink-cdc-3.4.0/bin/mysql-to-kafka.yaml --flink-home /root/flink-1.20.1 --jar /root/flink-cdc-3.4.0/lib/mysql-connector-java-8.0.27.jar + ``` + +提交成功后,返回信息如: + + ```shell + Pipeline has been submitted to cluster. + Job ID: 04fd88ccb96c789dce2bf0b3a541d626 + Job Description: MySQL to Kafka Pipeline + ``` + +在 Flink Web UI,可以看到一个名为 `Sync MySQL Database to Kafka` 的任务正在运行。 + +{{< img src="/fig/mysql-Kafka-tutorial/mysql-to-Kafka.png" alt="MySQL-to-Kafka" >}} + +可以通过kafka自带的客户端查看Topic情况,得到debezium-json格式的内容: +```shell + docker-compose exec Kafka kafka-console-consumer.sh --bootstrap-server 192.168.67.2:9092 --topic yaml-mysql-kafka --from-beginning +``` +debezium-json 格式包含了 before,after,op,source 几个元素,展示示例如下: +```json +{ + "before": null, + "after": { + "id": 1, + "price": 4 + }, + "op": "c", + "source": { + "db": "app_db", + "table": "orders" + } +} +... +{ + "before": null, + "after": { + "id": 1, + "product": "Beer" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "products" + } +} +... +{ + "before": null, + "after": { + "id": 2, + "city": "xian" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "shipments" + } +} +``` +### 同步变更 +进入 MySQL 容器: + +```shell + docker-compose exec mysql mysql -uroot -p123456 + ``` + +接下来,修改 MySQL 数据库中表的数据,Kafka 中显示的订单数据也将实时更新: +1. 在 MySQL 的 `orders` 表中插入一条数据 + + ```sql + INSERT INTO app_db.orders (id, price) VALUES (3, 100.00); + ``` + +2. 在 MySQL 的 `orders` 表中增加一个字段 + + ```sql + ALTER TABLE app_db.orders ADD amount varchar(100) NULL; + ``` + +3. 在 MySQL 的 `orders` 表中更新一条数据 + + ```sql + UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1; + ``` + +4. 在 MySQL 的 `orders` 表中删除一条数据 + + ```sql + DELETE FROM app_db.orders WHERE id=2; + ``` + +通过消费者监控 topic,我们可以看到 Kafka 上也在实时发生着这些变更: +```json +{ + "before": { + "id": 1, + "price": 4, + "amount": null + }, + "after": { + "id": 1, + "price": 100, + "amount": "100.00" + }, + "op": "u", + "source": { + "db": "app_db", + "table": "orders" + } +} +``` +同样的,去修改 `shipments`, `products` 表,也能在 Kafka 中对应的topic实时看到同步变更的结果。 + +### 路由变更 +Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。 +下面提供一个配置文件说明: + ```yaml + ################################################################################ + # Description: Sync MySQL all tables to Kafka + ################################################################################ + source: + type: mysql + hostname: localhost + port: 3306 + username: root + password: 123456 + tables: app_db.\.* + server-id: 5400-5404 + server-time-zone: UTC + + sink: + type: kafka + name: Kafka Sink + properties.bootstrap.servers: 0.0.0.0:9092 + pipeline: + name: MySQL to Kafka Pipeline + parallelism: 1 + route: + - source-table: app_db.orders + sink-table: kafka_ods_orders + - source-table: app_db.shipments + sink-table: kafka_ods_shipments + - source-table: app_db.products + sink-table: kafka_ods_products + ``` + +通过上面的 `route` 配置,会将 `app_db.orders` 表的结构和数据同步到 `kafka_ods_orders` 中。从而实现数据库迁移的功能。 +特别地,source-table 支持正则表达式匹配多表,从而实现分库分表同步的功能,例如下面的配置: + + ```yaml + route: + - source-table: app_db.order\.* + sink-table: kafka_ods_orders + ``` + +这样,就可以将诸如 app_db.order01、app_db.order02、app_db.order03 的表汇总到 kafka_ods_orders 中。利用kafka自带的工具,可查看对应Topic成功建立,数据详情可使用kafka-console-consumer.sh进行查询: +```shell + docker-compose exec Kafka kafka-topics.sh --bootstrap-server 192.168.67.2:9092 --list +``` +新创建的 Kafka Topic 信息如下: + +```shell + __consumer_offsets + kafka_ods_orders + kafka_ods_products + kafka_ods_shipments + yaml-mysql-kafka +``` + +选取 kafka_ods_orders 这个 Topic 进行查询,返回数据示例如下: +```json +{ + "before": null, + "after": { + "id": 1, + "price": 100, + "amount": "100.00" + }, + "op": "c", + "source": { + "db": null, + "table": "kafka_ods_orders" + } +} +``` + +### 写入多个分区 +使用 partition.strategy 参数可以定义发送数据到 Kafka 分区的策略, 可以设置的选项有: + - `all-to-zero`(将所有数据发送到 0 号分区),默认值 + - `hash-by-key`(所有数据根据主键的哈希值分发) + +我们基于mysql-to-kafka.yaml在 sink下定义一行partition.strategy: hash-by-key +```yaml +source: + ... +sink: + ... + topic: yaml-mysql-kafka-hash-by-key + partition.strategy: hash-by-key +pipeline: + ... +``` +同时我们利用 Kafka 的脚本新建一个12分区的 kafka Topic: +```shell +docker-compose exec Kafka kafka-topics.sh --create --topic yaml-mysql-kafka-hash-by-key --bootstrap-server 192.168.67.2:9092 --partitions 1 +``` +提交yaml程序后,这个时候我们指定一下分区消费,查看一下各个分区里面所存储的数据。 +```shell +docker-compose exec Kafka kafka-console-consumer.sh --bootstrap-server=192.168.67.2:9092 --topic yaml-mysql-kafka-hash-by-key --partition 0 --from-beginning +``` +部分分区数据详情如下: +```json +# 分区0 +{ + "before": null, + "after": { + "id": 1, + "price": 100, + "amount": "100.00" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "orders" + } +} +# 分区4 +{ + "before": null, + "after": { + "id": 2, + "product": "Cap" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "products" + } +} +{ + "before": null, + "after": { + "id": 1, + "city": "beijing" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "shipments" + } +} +``` + +### 输出格式 +value.format 参数用于序列化 Kafka 消息的值部分数据的格式。可选的填写值包括 [debezium-json](https://debezium.io/documentation/reference/stable/integrations/serdes.html) 和 [canal-json](https://github.com/alibaba/canal/wiki), 默认值为 `debezium-json`,目前还不支持用户自定义输出格式。 +- `debezium-json` 格式会包含 before(变更前的数据)/after(变更后的数据)/op(变更类型)/source(元数据) 几个元素,ts_ms 字段并不会默认包含在输出结构中(需要在 Source 中指定 metadata.list 配合)。 +- `canal-json` 格式会包含 old/data/type/database/table/pkNames 几个元素,但是 ts 并不会默认包含在其中(原因同上)。 + +可以在 YAML 文件的 sink 中定义 value.format: canal-json 来指定输出格式为 canal-json 类型: +```yaml +source: + ... + +sink: + ... + topic: yaml-mysql-kafka-canal + value.format: canal-json +pipeline: + ... +``` +查询对应 Topic 的数据,返回示例如下: +```json +{ + "old": null, + "data": [ + { + "id": 1, + "price": 100, + "amount": "100.00" + } + ], + "type": "INSERT", + "database": "app_db", + "table": "orders", + "pkNames": [ + "id" + ] +} +``` + + +### 上游表名到下游Topic名的映射关系 +使用 `sink.tableId-to-topic.mapping` 参数可以指定上游表名到下游 Kafka Topic 名的映射关系。无需使用 route 配置。与之前介绍的通过 route 实现的不同点在于,配置该参数可以在保留源表的表名信息的情况下设置写入的 Topic 名称。 +在前面的 YAML 文件中增加 `sink.tableId-to-topic.mapping` 配置指定映射关系,每个映射关系由 `;` 分割,上游表的 TableId 和下游 Kafka 的 Topic 名由` : `分割: +```yaml +source: + ... + +sink: + ... + sink.tableId-to-topic.mapping: app_db.orders:yaml-mysql-kafka-orders;app_db.shipments:yaml-mysql-kafka-shipments;app_db.products:yaml-mysql-kafka-products +pipeline: + ... +``` +运行后,Kafka 中将会生成如下的 Topic: +``` +... +yaml-mysql-kafka-orders +yaml-mysql-kafka-products +yaml-mysql-kafka-shipments +``` +Kafka 不同 Topic 中部分数据详情: +- yaml-mysql-kafka-orders + + ```json + { + "before": null, + "after": { + "id": 1, + "price": 100, + "amount": "100.00" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "orders" + } + } + ``` +- yaml-mysql-kafka-products + ```json + { + "before": null, + "after": { + "id": 2, + "product": "Cap" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "products" + } + } + ``` +- yaml-mysql-kafka-shipments + ```json + { + "before": null, + "after": { + "id": 2, + "city": "xian" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "shipments" + } + } + ``` + +## 环境清理 +本教程结束后,在 `docker-compose.yml` 文件所在的目录下执行如下命令停止所有容器: + + ```shell + docker-compose down + ``` + +在 Flink 所在目录 `flink-1.20.1` 下执行如下命令停止 Flink 集群: + + ```shell + ./bin/stop-cluster.sh + ``` + +{{< top >}} \ No newline at end of file diff --git a/docs/content/docs/get-started/quickstart/mysql-to-kafka.md b/docs/content/docs/get-started/quickstart/mysql-to-kafka.md new file mode 100644 index 00000000000..4a8efca77d0 --- /dev/null +++ b/docs/content/docs/get-started/quickstart/mysql-to-kafka.md @@ -0,0 +1,591 @@ +--- +title: "MySQL to Kafka" +weight: 2 +type: docs +aliases: +- /try-flink-cdc/pipeline-connectors/mysql-Kafka-pipeline-tutorial.html +--- + + +# Streaming ELT from MySQL to Kafka + +This tutorial is to show how to quickly build a Streaming ELT job from MySQL to StarRocks using Flink CDC, including the +feature of sync all table of one database, schema change evolution and sync sharding tables into one table. +All exercises in this tutorial are performed in the Flink CDC CLI, and the entire process uses standard SQL syntax, +without a single line of Java/Scala code or IDE installation. + +## Preparation +Prepare a Linux or MacOS computer with Docker installed. + + +### Prepare Flink Standalone cluster +1. Download [Flink 1.20.1](https://archive.apache.org/dist/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz) ,unzip and get flink-1.20.1 directory. + Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.20.1 is located. + + ```shell + tar -zxvf flink-1.20.1-bin-scala_2.12.tgz + exprot FLINK_HOME=$(pwd)/flink-1.20.1 + cd flink-1.20.1 + ``` + +2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds. + + ```yaml + execution: + checkpointing: + interval: 3000 + ``` + +3. Start the Flink cluster using the following command. + + ```shell + ./bin/start-cluster.sh + ``` + +If successfully started, you can access the Flink Web UI at [http://localhost:8081/](http://localhost:8081/), as shown below. + +{{< img src="/fig/mysql-Kafka-tutorial/flink-ui.png" alt="Flink UI" >}} + +Executing `start-cluster.sh` multiple times can start multiple TaskManager‘s. + +Note: If you are a cloud server and cannot access the local area, you need to change the localhost of rest.bd-address and rest.address in conf/config.yaml to 0.0.0.0, and then use the public IP address:8081 to access it. +### Prepare docker compose +The following tutorial will prepare the required components using `docker-compose`. +Create a `docker-compose.yml` file using the content provided below: + + ```yaml + version: '2.1' + services: + Zookeeper: + image: zookeeper:3.7.1 + ports: + - "2181:2181" + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + Kafka: + image: bitnami/kafka:2.8.1 + ports: + - "9092:9092" + - "9093:9093" + environment: + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_LISTENERS=PLAINTEXT://:9092 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.67.2:9092 + - KAFKA_ZOOKEEPER_CONNECT=192.168.67.2:2181 + MySQL: + image: debezium/example-mysql:1.1 + ports: + - "3306:3306" + environment: + - MYSQL_ROOT_PASSWORD=123456 + - MYSQL_USER=mysqluser + - MYSQL_PASSWORD=mysqlpw + ``` +Note: The 192.168.67.2 in the file is an internal network IP and can be found through ifconfig. +The Docker Compose should include the following services (containers): +- MySQL: include a database named `app_db` +- Kafka: Store the result table mapped from MySQL according to the rules +- Zookeeper:It is mainly used for Kafka cluster management and coordination + +To start all containers, run the following command in the directory that contains the `docker-compose.yml` file. + + ```shell + docker-compose up -d + ``` + +This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run docker ps to check whether these containers are running properly. +{{< img src="/fig/mysql-Kafka-tutorial/docker-ps.png" alt="Docker ps" >}} +#### Prepare records for MySQL +1. Enter MySQL container + + ```shell + docker-compose exec MySQL mysql -uroot -p123456 + ``` + +2. create `app_db` database and `orders`,`products`,`shipments` tables, then insert records + + ```sql + -- create database + CREATE DATABASE app_db; + + USE app_db; + + -- create orders table + CREATE TABLE `orders` ( + `id` INT NOT NULL, + `price` DECIMAL(10,2) NOT NULL, + PRIMARY KEY (`id`) + ); + + -- insert records + INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); + INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00); + + -- create shipments table + CREATE TABLE `shipments` ( + `id` INT NOT NULL, + `city` VARCHAR(255) NOT NULL, + PRIMARY KEY (`id`) + ); + + -- insert records + INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); + INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian'); + + -- create products table + CREATE TABLE `products` ( + `id` INT NOT NULL, + `product` VARCHAR(255) NOT NULL, + PRIMARY KEY (`id`) + ); + + -- insert records + INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); + INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); + INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut'); + ``` + +## Submit job with Flink CDC CLI +1. Download the binary compressed packages listed below and extract them to the directory `flink cdc-{{< param Version >}}'`: + [flink-cdc-{{< param Version >}}-bin.tar.gz](https://www.apache.org/dyn/closer.lua/flink/flink-cdc-{{< param Version >}}/flink-cdc-{{< param Version >}}-bin.tar.gz) + flink-cdc-{{< param Version >}} directory will contain four directory: `bin`, `lib`, `log`, and `conf`. + +2. Download the connector package listed below and move it to the `lib` directory + **Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release branches by yourself.** + **Please note that you need to move the jar to the lib directory of Flink CDC Home, not to the lib directory of Flink Home.** + - [MySQL pipeline connector {{< param Version >}}](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/{{< param Version >}}/flink-cdc-pipeline-connector-mysql-{{< param Version >}}.jar) + - [Kafka pipeline connector {{< param Version >}}](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-kafka/{{< param Version >}}/flink-cdc-pipeline-connector-kafka-{{< param Version >}}.jar) + + You also need to place MySQL connector into Flink `lib` folder or pass it with `--jar` argument, since they're no longer packaged with CDC connectors: + - [MySQL Connector Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar) + +3. Write task configuration yaml file. + Here is an example file for synchronizing the entire database `mysql-to-kafka.yaml`: + + ```yaml + ################################################################################ + # Description: Sync MySQL all tables to Kafka + ################################################################################ + source: + type: mysql + hostname: 0.0.0.0 + port: 3306 + username: root + password: 123456 + tables: app_db.\.* + server-id: 5400-5404 + server-time-zone: UTC + + sink: + type: kafka + name: Kafka Sink + properties.bootstrap.servers: 0.0.0.0:9092 + topic: yaml-mysql-kafka + + pipeline: + name: MySQL to Kafka Pipeline + parallelism: 1 + ``` + +Notice that: +* `tables: app_db.\.*` in source synchronize all tables in `app_db` through Regular Matching. + +4. Finally, submit job to Flink Standalone cluster using Cli. + + ```shell + bash bin/flink-cdc.sh mysql-to-kafka.yaml + #For reference, some examples of custom paths are mainly used in situations such as multiple versions of flink and inconsistent mysql drivers, as follows + #bash /root/flink-cdc-3.4.0/bin/flink-cdc.sh /root/flink-cdc-3.4.0/bin/mysql-to-kafka.yaml --flink-home /root/flink-1.20.1 --jar /root/flink-cdc-3.4.0/lib/mysql-connector-java-8.0.27.jar + ``` + +After successful submission, the return information is as follows: + + ```shell + Pipeline has been submitted to cluster. + Job ID: 04fd88ccb96c789dce2bf0b3a541d626 + Job Description: MySQL to Kafka Pipeline + ``` + +We can find a job named `Sync MySQL Database to Kafka` is running through Flink Web UI. + +{{< img src="/fig/mysql-Kafka-tutorial/mysql-to-Kafka.png" alt="MySQL-to-Kafka" >}} + +The Topic situation can be viewed through the built-in client of kafka to obtain the content in debezium-json format: +```shell + docker-compose exec Kafka kafka-console-consumer.sh --bootstrap-server 192.168.67.2:9092 --topic yaml-mysql-kafka --from-beginning +``` +The debezium-json format contains several elements such as before,after,op, and source. The demonstration example is as follows: +```json +{ + "before": null, + "after": { + "id": 1, + "price": 4 + }, + "op": "c", + "source": { + "db": "app_db", + "table": "orders" + } +} +... +{ + "before": null, + "after": { + "id": 1, + "product": "Beer" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "products" + } +} +... +{ + "before": null, + "after": { + "id": 2, + "city": "xian" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "shipments" + } +} +``` +### Synchronize Schema and Data changes +Enter MySQL container + + ```shell + docker-compose exec mysql mysql -uroot -p123456 + ``` + +Then, modify schema and record in MySQL, and the tables of StarRocks will change the same in real time: +1. insert one record in `orders` from MySQL: + + ```sql + INSERT INTO app_db.orders (id, price) VALUES (3, 100.00); + ``` + +2. add one column in `orders` from MySQL: + + ```sql + ALTER TABLE app_db.orders ADD amount varchar(100) NULL; + ``` + +3. update one record in `orders` from MySQL: + + ```sql + UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1; + ``` +4. delete one record in `orders` from MySQL: + + ```sql + DELETE FROM app_db.orders WHERE id=2; + ``` +By monitoring the topic through consumers, we can see that these changes are also taking place in real time on Kafka: +```json +{ + "before": { + "id": 1, + "price": 4, + "amount": null + }, + "after": { + "id": 1, + "price": 100, + "amount": "100.00" + }, + "op": "u", + "source": { + "db": "app_db", + "table": "orders" + } +} +``` +Similarly, by modifying the `shipments`,`products` table, you can see the results of the synchronized changes in real time at the corresponding topic in Kafka. + +### Route the changes +Flink CDC provides the configuration to route the table structure/data of the source table to other table names. +With this ability, we can achieve functions such as table name, database name replacement, and whole database synchronization. +Here is an example file for using `route` feature: + ```yaml + ################################################################################ + # Description: Sync MySQL all tables to Kafka + ################################################################################ + source: + type: mysql + hostname: localhost + port: 3306 + username: root + password: 123456 + tables: app_db.\.* + server-id: 5400-5404 + server-time-zone: UTC + + sink: + type: kafka + name: Kafka Sink + properties.bootstrap.servers: 0.0.0.0:9092 + pipeline: + name: MySQL to Kafka Pipeline + parallelism: 1 + route: + - source-table: app_db.orders + sink-table: kafka_ods_orders + - source-table: app_db.shipments + sink-table: kafka_ods_shipments + - source-table: app_db.products + sink-table: kafka_ods_products + ``` + +Using the upper `route` configuration, we can synchronize the table schema and data of `app_db.orders` to `kafka_ods_orders`, thus achieving the function of database migration. +Specifically, `source-table` support regular expression matching with multiple tables to synchronize sharding databases and tables. like the following: + + ```yaml + route: + - source-table: app_db.order\.* + sink-table: kafka_ods_orders + ``` + +In this way, we can synchronize sharding tables like `app_db.order01`、`app_db.order02`、`app_db.order03` into one kafka_ods_orders tables.By using the built-in tools of kafka, you can view the successful establishment of the corresponding Topic. Data details can be queried using kafka-console-Consumer.sh: + + +```shell +docker-compose exec Kafka kafka-topics.sh --bootstrap-server 192.168.67.2:9092 --list +``` +The information of the newly created Kafka Topic is as follows: +```shell + __consumer_offsets + kafka_ods_orders + kafka_ods_products + kafka_ods_shipments + yaml-mysql-kafka +``` + +Select the Topic "kafka_ods_orders" for query, and the returned data example is as follows: +```json +{ + "before": null, + "after": { + "id": 1, + "price": 100, + "amount": "100.00" + }, + "op": "c", + "source": { + "db": null, + "table": "kafka_ods_orders" + } +} +``` + +### Write to multiple partitions +The partition.strategy parameter can be used to define the strategy for sending data to the Kafka partition. The options that can be set are: + - `all-to-zero` (Send all data to partition 0), default value + - `hash-by-key` (All data are distributed based on the hash value of the primary key.) + +We define a row of partition.strategy: hash-by-key under the sink based on mysql-to-kafka.yaml +```yaml +source: + ... +sink: + ... + topic: yaml-mysql-kafka-hash-by-key + partition.strategy: hash-by-key +pipeline: + ... +``` +Meanwhile, we use the script of Kafka to create a 12-partition kafka Topic: +```shell +docker-compose exec Kafka kafka-topics.sh --create --topic yaml-mysql-kafka-hash-by-key --bootstrap-server 192.168.67.2:9092 --partitions 1 +``` +After submitting the yaml program, at this point, we should specify the partition consumption and check the data stored in each partition. +```shell +docker-compose exec Kafka kafka-console-consumer.sh --bootstrap-server=192.168.67.2:9092 --topic yaml-mysql-kafka-hash-by-key --partition 0 --from-beginning +``` +The details of some partition data are as follows: +```json +# partition 0 +{ + "before": null, + "after": { + "id": 1, + "price": 100, + "amount": "100.00" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "orders" + } +} +# partition 4 +{ + "before": null, + "after": { + "id": 2, + "product": "Cap" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "products" + } +} +{ + "before": null, + "after": { + "id": 1, + "city": "beijing" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "shipments" + } +} +``` + +### Output format +The value.format parameter is used to serialize the format of the value part data of Kafka messages. Optional fill in value including [debezium - json] (https://debezium.io/documentation/reference/stable/integrations/serdes.html) and [canal - json] (HTTPS: / / github.com/alibaba/canal/wiki), the default value is ` debezium - json `, currently does not support user-defined output format. +- `debezium-json` The format will contain several elements such as before(data before change)/after(data after change)/op(change type)/source(metadata). The ts_ms field will not be included in the output structure by default (metadata.list needs to be specified in the Source to cooperate). +- `canal-json` Format will contain old/data/type/database/table/pkNames several elements, but ts does not included by default (for). + +The `value.format`: `canal-json` type can be defined in the sink of the YAML file to specify the output format as `canal-json` type: +```yaml +source: + ... + +sink: + ... + topic: yaml-mysql-kafka-canal + value.format: canal-json +pipeline: + ... +``` +Query the data corresponding to the Topic, and the returned example is as follows: +```json +{ + "old": null, + "data": [ + { + "id": 1, + "price": 100, + "amount": "100.00" + } + ], + "type": "INSERT", + "database": "app_db", + "table": "orders", + "pkNames": [ + "id" + ] +} +``` + + +### The mapping relationship from the upstream table name to the downstream Topic name +The `sink.tableId-to-topic.mapping` parameter can be used to specify the mapping relationship from the upstream table name to the downstream Kafka Topic name. No route configuration is required. The difference from the previously introduced implementation through route lies in that configuring this parameter can set the Topic name to be written while retaining the table name information of the source table. + +In the previous YAML file, add `sink.tableId-to-topic.mapping` configuration to specify the mapping relationship, and each mapping relationship is composed of `;` The TableId of the upstream table and the Topic name of the downstream Kafka are separated by `:`. + +```yaml +source: + ... + +sink: + ... + sink.tableId-to-topic.mapping: app_db.orders:yaml-mysql-kafka-orders;app_db.shipments:yaml-mysql-kafka-shipments;app_db.products:yaml-mysql-kafka-products +pipeline: + ... +``` +After running, the following topics will be generated in Kafka: +``` +... +yaml-mysql-kafka-orders +yaml-mysql-kafka-products +yaml-mysql-kafka-shipments +``` +Details of some data in different topics of Kafka: +- yaml-mysql-kafka-orders + + ```json + { + "before": null, + "after": { + "id": 1, + "price": 100, + "amount": "100.00" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "orders" + } + } + ``` +- yaml-mysql-kafka-products + ```json + { + "before": null, + "after": { + "id": 2, + "product": "Cap" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "products" + } + } + ``` +- yaml-mysql-kafka-shipments + ```json + { + "before": null, + "after": { + "id": 2, + "city": "xian" + }, + "op": "c", + "source": { + "db": "app_db", + "table": "shipments" + } + } + ``` + +## Clean up +After finishing the tutorial, run the following command to stop all containers in the directory of `docker-compose.yml`: + + ```shell + docker-compose down + ``` + +Run the following command to stop the Flink cluster in the directory of Flink `flink-1.20.1`: + + ```shell + ./bin/stop-cluster.sh + ``` + +{{< top >}} \ No newline at end of file diff --git a/docs/static/fig/mysql-Kafka-tutorial/docker-ps.png b/docs/static/fig/mysql-Kafka-tutorial/docker-ps.png new file mode 100644 index 00000000000..441b4dc9a4a Binary files /dev/null and b/docs/static/fig/mysql-Kafka-tutorial/docker-ps.png differ diff --git a/docs/static/fig/mysql-Kafka-tutorial/flink-ui.png b/docs/static/fig/mysql-Kafka-tutorial/flink-ui.png new file mode 100644 index 00000000000..afa387e92ef Binary files /dev/null and b/docs/static/fig/mysql-Kafka-tutorial/flink-ui.png differ diff --git a/docs/static/fig/mysql-Kafka-tutorial/mysql-to-Kafka.png b/docs/static/fig/mysql-Kafka-tutorial/mysql-to-Kafka.png new file mode 100644 index 00000000000..83db514a921 Binary files /dev/null and b/docs/static/fig/mysql-Kafka-tutorial/mysql-to-Kafka.png differ