网站域名使用代理进口外贸流程
- 作者: 五速梦信息网
- 时间: 2026年04月20日 07:23
当前位置: 首页 > news >正文
网站域名使用代理,进口外贸流程,北京网站建设网站建设,西安最新防疫信息本教程的演示都将在 Flink CDC CLI 中进行#xff0c;无需一行 Java/Scala 代码#xff0c;也无需安装 IDE。 这篇教程将展示如何基于 Flink CDC YAML 快速构建 MySQL 到 Kafka 的 Streaming ELT 作业#xff0c;包含整库同步、表结构变更同步演示和关键参数介绍。 准备阶段… 本教程的演示都将在 Flink CDC CLI 中进行无需一行 Java/Scala 代码也无需安装 IDE。 这篇教程将展示如何基于 Flink CDC YAML 快速构建 MySQL 到 Kafka 的 Streaming ELT 作业包含整库同步、表结构变更同步演示和关键参数介绍。 准备阶段 准备 Flink Standalone 集群 1. 下载 Flink 1.19.2[1]压缩包解压后并跳转至 Flink 目录下设置 FLINK_HOME 为 flink-1.19.2 所在目录。 tar -zxvf flink-1.19.2-bin-scala_2.12.tgz export FLINK_HOME$(pwd)/flink-1.19.2 2. 在 conf/config.yaml 文件追加下列参数开启 checkpoint每隔 3 秒做一次 checkpoint。 execution:checkpointing:interval: 3000 3. 使用下面的命令启动 Flink 集群。 ./bin/start-cluster.sh 启动成功后可以在 http://localhost:8081/访问到 Flink Web UI如下图所示 重复执行 start-cluster.sh 可以拉起多个 TaskManager。 注意如果是在云服务器上运行需要将 conf/config.yaml 里面 rest.bind-address 和 rest.address 的 localhost 改成0.0.0.0然后使用 公网IP:8081 即可访问。 准备 Docker 环境 使用下面的内容新建一个 docker-compose.yml 文件 services:Zookeeper:image: zookeeper:3.7.1ports:- 2181:2181environment:- ALLOW_ANONYMOUS_LOGINyesKafka:image: bitnami/kafka:2.8.1ports:- 9092:9092- 9093:9093environment:- ALLOW_PLAINTEXT_LISTENERyes- KAFKA_LISTENERSPLAINTEXT://:9092- KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.67.2:9092- KAFKA_ZOOKEEPER_CONNECT192.168.67.2:2181MySQL:image: debezium/example-mysql:1.1ports:- 3306:3306environment:- MYSQL_ROOT_PASSWORD123456- MYSQL_USERmysqluser- MYSQL_PASSWORDmysqlpw 注意文件里面的 192.168.67.2 为内网 IP可通过 ifconfig 查找。 该 Docker Compose 中包含的组件有 ● MySQL: 包含商品信息的数据库app_db ● Kafka: 存储从 MySQL 中根据规则映射过来的结果表 ● Zookeeper主要用于进行Kafka集群管理和协调 在 docker-compose.yml 所在目录下执行下面的命令来启动上述组件 docker-compose up -d 该命令以 detached 模式启动 Docker Compose 配置中定义的所有组件。可以通过 docker ps 来观察上述的组件是否正常启动。 在 MySQL 数据库中准备数据 通过执行如下命令可以进入 MySQL 容器 docker-compose exec MySQL mysql -uroot -p123456 创建数据库 app_db和表 orders,products,shipments 并插入数据 – 创建数据库CREATE DATABASE app_db;USE app_db;– 创建 orders 表CREATE TABLE orders (id INT NOT NULL,price DECIMAL(10,2) NOT NULL,PRIMARY KEY (id));– 插入数据INSERT INTO orders (id, price) VALUES (1, 4.00);INSERT INTO orders (id, price) VALUES (2, 100.00);– 创建 shipments 表CREATE TABLE shipments (id INT NOT NULL,city VARCHAR(255) NOT NULL,PRIMARY KEY (id));– 插入数据INSERT INTO shipments (id, city) VALUES (1, beijing);INSERT INTO shipments (id, city) VALUES (2, xian);– 创建 products 表CREATE TABLE products (id INT NOT NULL,product VARCHAR(255) NOT NULL,PRIMARY KEY (id));– 插入数据INSERT INTO products (id, product) VALUES (1, Beer);INSERT INTO products (id, product) VALUES (2, Cap);INSERT INTO products (id, product) VALUES (3, Peanut); 通过 Flink CDC CLI 提交任务 1. 下载压缩包并解压得到目录 flink-cdc-3.3.0 flink-cdc-3.3.0-bin.tar.gz[2]中包含 bin、lib、log、conf 四个目录。 2. 下载下面列出的 connector 包并且移动到 lib 目录下: ■ MySQL pipeline connector 3.3.0[3] ■ Kafka pipeline connector 3.3.0[4] 您还需要将下面的 Driver 包放在 Flink lib 目录下或通过 –jar 参数将其传入 Flink CDC CLI因为 CDC Connectors 不再打包这个依赖 ■ MySQL Connector Java[5] 3. 编写任务配置 yaml 文件 下面给出了一个整库同步的示例文件 mysql-to-kafka.yaml ################################################################################
Description: Sync MySQL all tables to Kafka
################################################################################ source:type: mysqlhostname: 0.0.0.0port: 3306username: rootpassword: 123456tables: app_db..server-id: 5400-5404server-time-zone: UTCsink:type: kafkaname: Kafka Sinkproperties.bootstrap.servers: 0.0.0.0:9092topic: yaml-mysql-kafkapipeline:name: MySQL to Kafka Pipelineparallelism: 1 其中 source 中的 tables: app_db.. 通过正则匹配同步 app_db 下的所有表。 4. 最后通过命令行提交任务到 Flink Standalone cluster bash bin/flink-cdc.sh mysql-to-kafka.yaml
参考一些自定义路径的示例 主要用于多版本flinkmysql驱动不一致等情况 如
bash /root/flink-cdc-3.3.0/bin/flink-cdc.sh /root/flink-cdc-3.3.0/bin/mysql-to-kafka.yaml –flink-home /root/flink-1.19. –jar /root/flink-cdc-3.3.0/lib/mysql-connector-java-8.0.27.jar 提交成功后返回信息如 Pipeline has been submitted to cluster.
Job ID: ba2afd0697524bd4857183936507b0bf Job Description: MySQL to Kafka Pipeline 在 Flink Web UI可以看到名为 MySQL to Kafka Pipeline 的任务正在运行。 通过 Kafka 客户端查看 Topic 数据可得到debezium-json格式的内容 docker-compose exec Kafka kafka-console-consumer.sh –bootstrap-server 192.168.31.229:9092 –topic yaml-mysql-kafka –from-beginning debezium-json 格式包含了 before/after/op/source 几个元素示例如下 {before: null,after: {id: 1,price: 4},op: c,source: {db: app_db,table: orders} } … {before: null,after: {id: 1,product: Beer},op: c,source: {db: app_db,table: products} } … {before: null,after: {id: 2,city: xian},op: c,source: {db: app_db,table: shipments} } 同步变更 进入 MySQL 容器: docker-compose exec MySQL mysql -uroot -p123456 接下来修改 MySQL 数据库中表的数据StarRocks 中显示的订单数据也将实时更新 1. 在 MySQL 的 orders 表中插入一条数据 INSERT INTO app_db.orders (id, price) VALUES (3, 100.00); 2. 在 MySQL 的 orders 表中增加一个字段 ALTER TABLE app_db.orders ADD amount varchar(100) NULL; 3. 在 MySQL 的 orders 表中更新一条数据 UPDATE app_db.orders SET price100.00, amount100.00 WHERE id1; 4. 在 MySQL 的 orders 表中删除一条数据 DELETE FROM app_db.orders WHERE id2; 通过 Kafka 消费者监控 Topic可以看到 Kafka 上也在实时接收到这些变更 {before: {id: 1,price: 4,amount: null},after: {id: 1,price: 100,amount: 100.00},op: u,source: {db: app_db,table: orders} } 同样地去修改 shipments, products 表也能在 Kafka对应的 Topic 中实时看到同步变更的结果。 特色功能 路由变更 Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置借助这种能力我们能够实现表名库名替换整库同步等功能。下面提供一个配置文件说明 ################################################################################
Description: Sync MySQL all tables to Kafka
################################################################################ source:type: mysqlhostname: 0.0.0.0port: 3306username: rootpassword: 123456tables: app_db..*server-id: 5400-5404server-time-zone: UTCsink:type: kafkaname: Kafka Sinkproperties.bootstrap.servers: 0.0.0.0:9092 pipeline:name: MySQL to Kafka Pipelineparallelism: 1 route:- source-table: app_db.orderssink-table: kafka_ods_orders- source-table: app_db.shipmentssink-table: kafka_ods_shipments- source-table: app_db.productssink-table: kafka_ods_products 通过上面的 route 配置会将 app_db.orders 表的结构和数据同步到 kafka_ods_orders 中。从而实现数据库迁移的功能。特别地source-table 支持正则表达式匹配多表从而实现分库分表同步的功能例如下面的配置 route:- source-table: app_db.order.*sink-table: kafka_ods_orders 这样就可以将诸如 app_db.order01、app_db.order02、app_db.order03 的表汇总到 kafka_ods_orders 中。利用 Kafka 自带的工具可查看对应 Topic 的建立详情可使用 kafka-console-consumer.sh 进行查询。 docker-compose exec Kafka kafka-topics.sh –bootstrap-server 192.168.67.2:9092 –list Kafka 中新建的 Topic 信息如下 __consumer_offsets kafka_ods_orders kafka_ods_products kafka_ods_shipments yaml-mysql-kafka 选取 kafka_ods_orders 这个 Topic 进行查询返回数据示例如下 {before: null,after: {id: 1,price: 100,amount: 100.00},op: c,source: {db: null,table: kafka_ods_orders} } 写入多个分区 使用 partition.strategy 参数可以定义发送数据到 Kafka 分区的策略 可以设置的选项有 ● all-to-zero将所有数据发送到 0 号分区默认值 ● hash-by-key所有数据根据主键的哈希值分发 我们基于 mysql-to-kafka.yaml 在sink下增加一行配置 partition.strategy: hash-by-key source:… sink:…topic: yaml-mysql-kafka-hash-by-keypartition.strategy: hash-by-key pipeline:… 同时我们利用 Kafka 的脚本新建一个 12 分区的 Kafka Topic: docker-compose exec Kafka kafka-topics.sh –create –topic yaml-mysql-kafka-hash-by-key –bootstrap-server 192.168.67.2:9092 –partitions 12 提交 YAML 作业后查看一下各个分区里面所存储的数据。 docker-compose exec Kafka kafka-console-consumer.sh –bootstrap-server192.168.67.2:9092 –topic yaml-mysql-kafka-hash-by-key –partition 0 –from-beginning 部分分区数据详情如下 • 分区0 {before: null,after: {id: 1,price: 100,amount: 100.00},op: c,source: {db: app_db,table: orders} } • 分区4 {before: null,after: {id: 2,product: Cap},op: c,source: {db: app_db,table: products} } {before: null,after: {id: 1,city: beijing},op: c,source: {db: app_db,table: shipments} } 输出格式 value.format 参数用于序列化 Kafka 消息的值部分数据的格式。可选的填写值包括 debezium-json和 canal-json,默认值为 debezium-json目前还不支持用户自定义输出格式。 debezium-json [6]格式会包含 before(变更前的数据)/after(变更后的数据)/op(变更类型)/source(元数据) 几个元素ts_ms 字段并不会默认包含在输出结构中需要在 Source 中指定 metadata.list 配合。canal-json [7]格式会包含 old/data/type/database/table/pkNames 几个元素但是 ts 并不会默认包含在其中原因同上。 可以在 YAML 文件的 sink 中定义 value.format: canal-json 来指定输出格式为 canal-json 类型 source:…sink:…topic: yaml-mysql-kafka-canalvalue.format: canal-json pipeline:… 查询对应 topic 的数据返回示例如下 {old: null,data: [{id: 1,price: 100,amount: 100.00}],type: INSERT,database: app_db,table: orders,pkNames: [id] } 自定义上下游映射关系 使用 sink.tableId-to-topic.mapping 参数可以指定上游表名到下游 Kafka Topic 名的映射关系。无需使用 route 配置。与之前介绍的通过 route 实现的不同点在于配置该参数可以在保留源表的表名信息的情况下设置写入的 Topic 名称。 在前面的 YAML 文件中增加 sink.tableId-to-topic.mapping 配置指定映射关系,每个映射关系由 ; 分割上游表的 TableId 和下游 Kafka 的 Topic 名由 : 分割 source:…sink:…sink.tableId-to-topic.mapping: app_db.orders:yaml-mysql-kafka-orders;app_db.shipments:yaml-mysql-kafka-shipments;app_db.products:yaml-mysql-kafka-products pipeline:… 运行后Kafka 中将会生成如下的 Topic … yaml-mysql-kafka-orders yaml-mysql-kafka-products yaml-mysql-kafka-shipments 上述 3 个 Topic 中的部分数据详情 {before: null,after: {id: 1,price: 100,amount: 100.00},op: c,source: {db: app_db,table: orders} } {before: null,after: {id: 2,product: Cap},op: c,source: {db: app_db,table: products} } {before: null,after: {id: 2,city: xian},op: c,source: {db: app_db,table: shipments} } 环境清理 本教程结束后在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器 docker-compose down 在 Flink 所在目录 flink-1.19.2 下执行如下命令停止 Flink 集群 ./bin/stop-cluster.sh 参考资料 [1] Flink 1.19.2: https://archive.apache.org/dist/flink/flink-1.19.2/flink-1.19.2-bin-scala_2.12.tgz [2] flink-cdc-3.3.0-bin.tar.gz: https://www.apache.org/dyn/closer.lua/flink/flink-cdc-3.3.0/flink-cdc-3.3.0-bin.tar.gz [3] MySQL pipeline connector 3.3.0: https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.3.0/flink-cdc-pipeline-connector-mysql-3.3.0.jar [4] Kafka pipeline connector 3.3.0: https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-kafka/3.3.0/flink-cdc-pipeline-connector-kafka-3.3.0.jar [5] MySQL Connector Java: https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar [6] debezium-json: https://debezium.io/documentation/reference/stable/integrations/serdes.html [7] canal-json: https://github.com/alibaba/canal/wiki 基于 Flink CDC 打造企业级实时数据同步方案 相比于传统数据集成流水线Flink CDC 提供了全量和增量一体化同步的解决方案。对于一个同步任务只需使用一个 Flink 作业即可将上游的全量数据和增量数据一致地同步到下游系统。此外 Flink CDC 使用了增量快照算法无需任何额外配置即可实现全量和增量数据的无缝切换。 现推出“Flink CDC 挑战任务”参与挑战不仅可快速体验《基于 Flink CDC 打造企业级实时数据同步方案》限时上传任务截图还可获得精美礼品。 点击即可跳转Flink CDC 挑战任务
- 上一篇: 网站域名申请已认证的微信公众号怎么改名
- 下一篇: 网站域名使用期四川招标采购交易信息网
相关文章
-
网站域名申请已认证的微信公众号怎么改名
网站域名申请已认证的微信公众号怎么改名
- 技术栈
- 2026年04月20日
-
网站域名申请企业做网站收费
网站域名申请企业做网站收费
- 技术栈
- 2026年04月20日
-
网站域名申请流程一级域名与二级域名玩法
网站域名申请流程一级域名与二级域名玩法
- 技术栈
- 2026年04月20日
-
网站域名使用期四川招标采购交易信息网
网站域名使用期四川招标采购交易信息网
- 技术栈
- 2026年04月20日
-
网站域名使用期制作网页可以有效控制什么的位置
网站域名使用期制作网页可以有效控制什么的位置
- 技术栈
- 2026年04月20日
-
网站域名跳转代码html如何做高校的网站版面设计
网站域名跳转代码html如何做高校的网站版面设计
- 技术栈
- 2026年04月20日
