宁波网站建设联系方法企业做网站这些问题必须要注意

当前位置: 首页 > news >正文

宁波网站建设联系方法,企业做网站这些问题必须要注意,深圳保障性住房价格,网页样式与布局Flink CDC CDC相关介绍 CDC是什么? CDC是Change Data Capture(变更数据获取)的简称。核心思想是#xff0c;监测并捕获数据库的变动#xff08;包括数据或数据表的插入、更新以及删除等#xff09;#xff0c;将这些变更按发生的顺序完整记录下来#xff0c;写入到MQ以…Flink CDC CDC相关介绍 CDC是什么? CDC是Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动包括数据或数据表的插入、更新以及删除等将这些变更按发生的顺序完整记录下来写入到MQ以供其他服务进行订阅及消费 CDC分类 CDC主要分为基于查询和基于Binlog 基于查询基于Binlog开源产品Sqoop、DataXCanal、Maxwell、Debezium执行模式BatchStreaming是否可以捕获所有数据变化否是延迟性高延迟低延迟是否增加数据库压力是否 基于查询的都是Batch模式(即数据到达一定量后/一定时间才行会执行), 同时也因为这种模式, 那么延迟是必然高的, 而基于Streaming则是可以做到按条的粒度, 每条数据发生变化, 那么就会监听到 Flink CDC Flink社区开发了flink-cdc-connectors组件这是一个可以直接从MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的source组件。 目前也已开源开源地址https://github.com/ververica/flink-cdc-connectors Java中集成Flink CDC MySQL相关设置 执行初始化SQL数据 – 创建whitebrocade数据库 DROP DATABASE IF EXISTS whitebrocade; CREATE DATABASE whitebrocade; USER whitebrocade; – 创建student表 CREATE TABLE student (id bigint(20) NOT NULL AUTO_INCREMENT,name varchar(255) DEFAULT NULL,description varchar(255) DEFAULT NULL,PRIMARY KEY (id) ) ENGINEInnoDB AUTO_INCREMENT3 DEFAULT CHARSETutf8mb4– 插入数据 INSERT INTO whitebrocade.student(id, name, description) VALUES (1, 小牛马, 我是小牛马); INSERT INTO whitebrocade.student(id, name, description) VALUES (2, 中牛马, 我是中牛马);开启Binlog 通常来说默认安装MySQL的cnf都是存在/etc下的 sudo vim /etc/my.cnf# 添加如下配置信息,开启test以及test_route数据库的Binlog

数据库id

server-id 1

时区, 如果不修改数据库时区, 那么Flink MySQL CDC无法启动

default-time-zone 8:00

启动binlog该参数的值会作为binlog的文件名

log-binmysql-bin

binlog类型maxwell要求为row类型

binlog_formatrow

启用binlog的数据库需根据实际情况作出修改

binlog-do-dbwhitebrocade修改数据库时区 永久修改, 那么就修改my.cnf配置(刚刚配置已经修改了, 记得重启即可) default-time-zone 8:00临时修改(重启会丢失)

MySQL 8 执行这个

set persist time_zone8:00;# MySQL 5.x版本执行这个 set time_zone8:00;重启MySQL 注意了, 设置后需要重启MySQL! service mysqld restart代码(直接处理BaseLogHander或者kafka间接处理) pom依赖 propertiesjava.version11/java.versionproject.build.sourceEncodingUTF-8/project.build.sourceEncodingproject.reporting.outputEncodingUTF-8/project.reporting.outputEncodingspring-boot.version2.6.13/spring-boot.version!– 这里的依赖版本不要删除, 比如说es, easy-es的, 下边的案例会使用到 –es.vsersion7.12.0/es.vsersioneasy-es.vsersion2.0.0/easy-es.vsersionflink.version1.19.0/flink.versionkafka-clients.version3.8.0/kafka-clients.version /properties dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency!– hutool –dependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.8.32/version/dependency!– Flink CDC依赖 start–!– Flink核心依赖, 提供了Flink的核心API –dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion\({flink.version}/version/dependency!-- Flink流处理Java API依赖对于引入Scala还是Java, 参考下面这篇博客: https://developer.aliyun.com/ask/526584--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion\){flink.version}/version/dependency!– Flink客户端工具依赖, 包含命令行界面和实用函数 –dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion\({flink.version}/version/dependency!-- Flink连接器基础包, 包含连接器公共功能 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion\){flink.version}/version/dependency!– Flink Kafka连接器, 用于和Apache Kafka集成, 注意kafka软件和这个依赖的版本问题, 可能会抱错, 报错参考以下博客方式进行解决版本集成问题: 参考博客 https://blog.csdn.net/qq_34526237/article/details/130968153https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/configuration/overview/https://blog.csdn.net/weixin_55787608/article/details/141436268https://www.cnblogs.com/qq1035807396/p/16227816.htmlhttps://blog.csdn.net/g5guj/article/details/137229597https://blog.csdn.net/x950913/article/details/108249507–dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.2.0-1.19/versionexclusions!– 排除掉kafka client, 用自己指定的kafka client, 可能会因为kafka太新, 导致的版本不兼容 –exclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependency!– kafka client –dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion\({kafka-clients.version}/version/dependency!-- Flink Table Planner, 用于Table API和SQL的执行计划生成 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.12/artifactIdversion\){flink.version}/version/dependency!– Flink Table API桥接器, 连接DataStream API和Table API –dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion\({flink.version}/version/dependency!-- Flink JSON格式化数据依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion\){flink.version}/version/dependency!– 开启Web UI支持, 端口为8081, 默认为不开启–!–dependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web/artifactIdversion1.19.1/version/dependency–!– MySQL CDC依赖org.apache.flink的适用MySQL 8.0具体参照这篇博客 https://blog.csdn.net/kakaweb/article/details/129441408https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/–dependency!–MySQL 8.0适用–!–groupIdorg.apache.flink/groupIdartifactIdflink-sql-connector-mysql-cdc/artifactIdversion3.1.0/version–!– MySQL 5.7适用 , 2.3.0, 3.0.1均可用 –groupIdcom.ververica/groupIdartifactIdflink-sql-connector-mysql-cdc/artifactIdversion2.3.0/version!– version3.0.1/version –/dependency!– gson工具类 –dependencygroupIdcom.google.code.gson/groupIdartifactIdgson/artifactIdversion2.11.0/version/dependency!– ognl表达式 –dependencygroupIdognl/groupIdartifactIdognl/artifactIdversion3.1.1/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion2.0.31/version/dependency /dependenciesyaml

应用服务 WEB 访问端口

server:port: 9999# Flink CDC相关配置 flink-cdc:cdcConfig:parallelism: 1enableCheckpointing: 5000mysqlConfig:sourceName: mysql-sourcejobName: mysql-stream-cdchostname: 192.168.132.101port: 3306username: rootpassword: 12345678databaseList: whitebrocadetableList: whitebrocade.studentincludeSchemaChanges: falsekafkaConfig:sourceName: kafka-sourcejobName: kafka-stream-cdcbootstrapServers: localhost:9092groupId: test_grouptopics: test_topicFlinkCDCConfig /*** author whiteBrocade* version 1.0* description: Flink CDC配置/ Data Configuration ConfigurationProperties(flink-cdc) public class FlinkCDCConfig {private CdcConfig cdcConfig;private MysqlConfig mysqlConfig;private KafkaConfig kafkaConfig;Datapublic static class CdcConfig {/** 并行度/private Integer parallelism;/** 检查点间隔, 单位毫秒/private Integer enableCheckpointing;}Datapublic static class MysqlConfig {/** MySQL数据源名称/private String sourceName;/** JOB名称/private String jobName;/** 数据库地址/private String hostname;/** 数据库端口/private Integer port;/** 数据库用户名/private String username;/** 数据库密码/private String password;/** 数据库名/private String[] databaseList;/** 表名/private String[] tableList;/** 是否包含schema变更/private Boolean includeSchemaChanges;}Datapublic static class KafkaConfig {/** Kafka数据源名称/private String sourceName;/** JOB名称/private String jobName;/** kafka地址/private String bootstrapServers;/** 消费组id/private String groupId;/** kafka主题/private String topics;} }相关枚举 OperatorTypeEnum /** author whiteBrocade* version 1.0* description 操作类型枚举/ Getter AllArgsConstructor public enum OperatorTypeEnum {/** 新增/INSERT(1),/** 修改/UPDATE(2),/** 删除/DELETE(3),;/** 类型/private final int type;/** 根据type获取枚举** param type 类型* return OperatorTypeEnum/public static OperatorTypeEnum getEnumByType(int type) {for (OperatorTypeEnum operatorTypeEnum : OperatorTypeEnum.values()) {if (operatorTypeEnum.getType() type) {return operatorTypeEnum;}}throw new RuntimeException(StrUtil.format(未找到type{}的OperatorTypeEnum, type));} }StrategyEnum /** author whiteBrocade* version 1.0* description MySql处理策略枚举* todo 后续在这里新增相关枚举即可/ Getter AllArgsConstructor public enum MySqlStrategyEnum {/** Student处理策略/STUDENT(Student.class.getSimpleName(), Student.class, Introspector.decapitalize(StudentLogHandler.class.getSimpleName())),;/** 表名/private final String tableName;/** class对象/private final Class? varClass;/** MySql处理器名/private final String mySqlHandlerName;/** 策略选择器, 根据传入的 DataChangeInfo 对象中的 tableName 属性, 从一系列预定义的策略 (StrategyEnum) 中选择一个合适的处理策略, 并封装进 StrategyHandleSelector 对象中返回** param mySqlDataChangeInfo 数据变更对象* return StrategyHandlerSelector/public static MySqlStrategyHandleSelector getSelector(MySqlDataChangeInfo mySqlDataChangeInfo) {Assert.notNull(mySqlDataChangeInfo, MySqlDataChangeInfo不能为null);String tableName mySqlDataChangeInfo.getTableName();MySqlStrategyHandleSelector selector new MySqlStrategyHandleSelector();// 遍历所有的策略枚举(StrategyEnum), 寻找与当前表名相匹配的策略for (MySqlStrategyEnum mySqlStrategyEnum : values()) {// 如果找到匹配的策略, 创建并配置 StrategyHandleSelectorif (mySqlStrategyEnum.getTableName().equalsIgnoreCase(tableName)) {selector.setMySqlHandlerName(mySqlStrategyEnum.mySqlHandlerName);selector.setOperatorTime(mySqlDataChangeInfo.getOperatorTime());selector.setOperatorType(mySqlDataChangeInfo.getOperatorType());JSONObject jsonObject JSONUtil.parseObj(mySqlDataChangeInfo.getData());selector.setData(BeanUtil.copyProperties(jsonObject, mySqlStrategyEnum.varClass));return selector;}}throw new RuntimeException(StrUtil.format(没有找到的表名{}绑定的StrategyHandleSelector, tableName));} } model Student /** author whiteBrocade* version 1.0* description 学生类, 用于演示/ Data public class Student {/** id/private Long id;/** 姓名/private String name;/** 描述/private String description; }MySqlDataChangeInfo /** author whiteBrocade* version 1.0* description MySQL数据变更对象/ Data public class MySqlDataChangeInfo implements Serializable {/** 变更前数据/private String beforeData;/** 变更后数据/private String afterData;/** 操作的数据/private String data;/** 变更类型 1-新增 2-修改 3-删除/private Integer operatorType;/** binlog文件名/private String fileName;/** binlog当前读取点位/private Integer filePos;/** 数据库名/private String database;/** 表名/private String tableName;/** 变更时间/private Long operatorTime; } MySqlStrategyHandleSelector /** author whiteBrocade* version 1.0* description 策略处理选择器/ Data public class MySqlStrategyHandleSelector {/** MySql策略处理器名称, 当mySql的binLog变化时候如何处理, 就会调用对应的处理器进行处理/private String mySqlHandlerName;/** 数据源/private Object data;/** 操作时间/private Long operatorTime;/** 操作类型/private Integer operatorType; }自定义Sink LogSink /** author whiteBrocade* description: 日志算子/ Slf4j Service public class LogSink extends RichSinkFunctionMySqlDataChangeInfo implements Serializable {Overridepublic void invoke(MySqlDataChangeInfo mySqlDataChangeInfo, Context context) throws Exception {log.info(MySQL数据变化对象: {}, JSONUtil.toJsonStr(mySqlDataChangeInfo));} }CustomMySqlSink /** author whiteBrocade* version 1.0* description 自定义Sink算子, 这个是根据ognl表达式区分ddl语句类型, 搭配/ Slf4j Component public class CustomMySqlSink extends RichSinkFunctionString {Overridepublic void invoke(String json, Context context) throws Exception {// op字段: 该字段也有4种取值分别是C(create)、U(Update)、D(Delete)、Read// 对于U操作其数据部分同时包含了Before和After。log.info(监听到数据: {}, json);String op JSONUtil.getValue(json, op, String.class);// 语句的idInteger id null;// 如果是update语句if (u.equals(op)) {id JSONUtil.getValue(json, after.id, Integer.class);log.info(执行update语句);// 执行update语句}// 如果是delete语句if (d.equals(op)) {id JSONUtil.getValue(json, before.id, Integer.class);log.info(执行delete语句);// 执行删除语句}// 如果是新增if (c.equals(op)) {log.info(执行insert语句);}}// 前置操作Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);}// 后置操作Overridepublic void close() throws Exception {super.close();} }MySqlDataChangeSink /** author whiteBrocade* version 1.0* description Mysql变更Sink算子/ Slf4j Component AllArgsConstructor public class MySqlDataChangeSink extends RichSinkFunctionMySqlDataChangeInfo implements Serializable {/** BaseLogHandler相关的缓存* Spring自动将相关BaseLogHandler的Bean注入注入到本地缓存Map中/private final MapString, BaseLogHandler strategyHandlerMap;/** 数据处理逻辑/OverrideSneakyThrowspublic void invoke(MySqlDataChangeInfo mySqlDataChangeInfo, Context context) {log.info(收到变更原始数据:{}, mySqlDataChangeInfo);// 选择策略MySqlStrategyHandleSelector selector MySqlStrategyEnum.getSelector(mySqlDataChangeInfo);Assert.notNull(MySqlStrategyHandleSelector不能为空);BaseLogHandlerObject handler strategyHandlerMap.get(selector.getMySqlHandlerName());Integer operatorType selector.getOperatorType();OperatorTypeEnum operatorTypeEnum OperatorTypeEnum.getEnumByType(operatorType);switch (operatorTypeEnum) {case INSERT:// insert操作handler.handleInsertLog(selector.getData(), selector.getOperatorTime());break;case UPDATE:// update操作handler.handleUpdateLog(selector.getData(), selector.getOperatorTime());break;case DELETE:// delete操作handler.handleDeleteLog(selector.getData(), selector.getOperatorTime());break;default:throw new RuntimeException(不支持的操作类型);}}/** 写入逻辑/OverrideSneakyThrowspublic void writeWatermark(Watermark watermark) {log.info(触发了写入逻辑writeWatermark);super.writeWatermark(watermark);}/** 开始/OverrideSneakyThrowspublic void open(OpenContext openContext) {log.info(触发了开始逻辑open);super.open(openContext);}/** 结束/OverrideSneakyThrowspublic void finish() {log.info(触发了结束逻辑finish);super.finish();} }MySqlChangeInfoKafkaProducerSink /** author whiteBrocade* version 1.0* description Kafka队列中MySQL消息变更Sink/ Slf4j Service RequiredArgsConstructor public class MySqlChangeInfoKafkaProducerSink {/** Flink相关配置/private final FlinkCDCConfig flinkCDCConfig;/** 自定义kafKA序列化处理器/private final KafkaSerializer kafkaSerializer;/** 获取kafka生产者算子*/public KafkaSinkMySqlDataChangeInfo getKafkaProducerSink() {FlinkCDCConfig.KafkaConfig kafkaConfig flinkCDCConfig.getKafkaConfig();kafkaSerializer.setTopic(kafkaConfig.getTopics());// 创建KafkaSink算子KafkaSinkMySqlDataChangeInfo kafkaProducerSink KafkaSink.MySqlDataChangeInfobuilder()// 设置集群地址.setBootstrapServers(kafkaConfig.getBootstrapServers())// 设置事务前缀.setTransactionalIdPrefix(KafkaTransactional kafkaConfig.getTopics() IdUtil.getSnowflakeNextIdStr()).setRecordSerializer(kafkaSerializer)// 设置传递保证// At Most Once (至多一次) 系统保证消息要么被成功传递一次要么根本不被传递。这种保证意味着消息可能会丢失但不会被传递多次// At Least Once (至少一次) 系统保证消息至少会被传递一次但可能会导致消息的重复传递。这种保证确保了消息的不丢失但应用可能会多次消费, 需要自己实现幂等// Exactly Once (精确一次) 系统保证消息会被确切地传递一次而没有任何重复。这是最高级别的传递保证确保消息不会丢失且不会多次消费.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 设置kafka各种参数// .setKafkaProducerConfig(properties)/sinkProducer的超时时间默认为1个小时,但是kafka broker的超时时间默认是15分钟, kafka broker不允许producer的超时时间比他大所以有两种解决办法:1.生产者的超时时间调小2.将broker的超时时间调大这里选择方案一, 将生产者时间调小, 将kafka producer的超时时间调至和broker一致即可参考博客 https://blog.csdn.net/LangLang1111111/article/details/121395831https://blog.csdn.net/weixin_64261178/article/details/140298696/.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(15 * 60 * 1000)).build();return kafkaProducerSink;}MySqlChangeInfoKafkaConsumerSink /*** author whiteBrocade* description: 自定义 MySqlChangeInfo kafka消费者sink/ Slf4j Service public class MySqlChangeInfoKafkaConsumerSink extends RichSinkFunctionMySqlDataChangeInfo implements Serializable {/** 数据处理逻辑/OverrideSneakyThrowspublic void invoke(MySqlDataChangeInfo mySqlDataChangeInfo, Context context) {log.info(正在消费kafka数据:{}, JSONUtil.toJsonStr(mySqlDataChangeInfo));} }序列化器和反序列化器 KafkaDeserializer /** author whiteBrocade* description: 自定义kafka反序列化器/ Slf4j Service public class KafkaDeserializer implements KafkaRecordDeserializationSchemaMySqlDataChangeInfo {Overridepublic void deserialize(ConsumerRecordbyte[], byte[] record, CollectorMySqlDataChangeInfo collector) {String valueJsonStr new String(record.value(), StandardCharsets.UTF_8);// log.info(反序列化前kafka数据: {}, valueJsonStr);MySqlDataChangeInfo mySqlDataChangeInfo JSONUtil.toBean(valueJsonStr, MySqlDataChangeInfo.class);collector.collect(mySqlDataChangeInfo);}Overridepublic TypeInformationMySqlDataChangeInfo getProducedType() {return TypeInformation.of(MySqlDataChangeInfo.class);} }KafkaSerializer /** author whiteBrocade* version 1.0* description: kafka消息 自定义序列化器/ Slf4j Setter Service public class KafkaSerializer implements KafkaRecordSerializationSchemaMySqlDataChangeInfo {/** 主体名称/private String topic;/** 序列化/NullableOverridepublic ProducerRecordbyte[], byte[] serialize(MySqlDataChangeInfo mySqlDataChangeInfo, KafkaSinkContext context, Long timestamp) {Assert.notNull(topic, 必须指定发送的topic);String jsonStr JSONUtil.toJsonStr(mySqlDataChangeInfo);log.info(投递kafka到topic{}的数据key: {}, value:, topic, jsonStr);return new ProducerRecord(topic, jsonStr.getBytes());}Overridepublic void open(SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext) throws Exception {KafkaRecordSerializationSchema.super.open(context, sinkContext);} }MySqlDeserializer /** author whiteBrocade* version 1.0* description L自定义MySQ反序列化器/ Slf4j Service public class MySqlDeserializer implements DebeziumDeserializationSchemaMySqlDataChangeInfo {public static final String TS_MS ts_ms;public static final String BIN_FILE file;public static final String POS pos;public static final String CREATE CREATE;public static final String BEFORE before;public static final String AFTER after;public static final String SOURCE source;public static final String UPDATE UPDATE;/** 反序列化数据, 转为变更JSON对象** param sourceRecord SourceRecord* param collector CollectorDataChangeInfo/Overridepublic void deserialize(SourceRecord sourceRecord, CollectorMySqlDataChangeInfo collector) {try {// 根据主题的格式获取数据库名(database)和表名(tableName)String topic sourceRecord.topic();String[] fields topic.split(\.);String database fields[1];String tableName fields[2];Struct struct (Struct) sourceRecord.value();final Struct source struct.getStruct(SOURCE);MySqlDataChangeInfo mySqlDataChangeInfo new MySqlDataChangeInfo();// 获取操作类型 CREATE UPDATE DELETEEnvelope.Operation operation Envelope.operationFor(sourceRecord);String type operation.toString().toUpperCase();int eventType type.equals(CREATE) ? OperatorTypeEnum.INSERT.getType() : UPDATE.equals(type) ?OperatorTypeEnum.UPDATE.getType() : OperatorTypeEnum.DELETE.getType();// 一般情况是无需关心其之前之后数据的, 直接获取最新的日志数据即可, 但这里为了演示, 都进行输出// 获取变更前和变更后的数据并将其设置到DataChangeInfo对象中mySqlDataChangeInfo.setBeforeData(this.getJsonObject(struct, BEFORE).toJSONString());mySqlDataChangeInfo.setAfterData(this.getJsonObject(struct, AFTER).toJSONString());if (eventType OperatorTypeEnum.DELETE.getType()) {mySqlDataChangeInfo.setData(this.getJsonObject(struct, BEFORE).toJSONString());} else {mySqlDataChangeInfo.setData(this.getJsonObject(struct, AFTER).toJSONString());}mySqlDataChangeInfo.setOperatorType(eventType);mySqlDataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse());mySqlDataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x - Integer.parseInt(x.toString())).orElse(0));mySqlDataChangeInfo.setDatabase(database);mySqlDataChangeInfo.setTableName(tableName);mySqlDataChangeInfo.setOperatorTime(Optional.ofNullable(struct.get(TS_MS)).map(x - Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));// 输出数据collector.collect(mySqlDataChangeInfo);} catch (Exception e) {log.error(反序列binlog失败, e);}}/** 从源数据获取出变更之前或之后的数据** param value Struct* param fieldElement 字段* return JSONObject/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element value.getStruct(fieldElement);JSONObject jsonObject new JSONObject();if (element ! null) {Schema afterSchema element.schema();ListField fieldList afterSchema.fields();for (Field field : fieldList) {Object afterValue element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}Overridepublic TypeInformationMySqlDataChangeInfo getProducedType() {return TypeInformation.of(MySqlDataChangeInfo.class);} }LogHandler BaseLogHandler /** author whiteBrocade* version 1.0* description 日志处理器* todo 新建一个类实现该BaseLogHandler类, 添加相应的处理逻辑即可, 可参考StudentLogHandler实现/ public interface BaseLogHandlerT extends Serializable {/** 日志处理** param data 数据转换后模型* param operatorTime 操作时间/void handleInsertLog(T data, Long operatorTime);/** 日志处理** param data 数据转换后模型* param operatorTime 操作时间/void handleUpdateLog(T data, Long operatorTime);/** 日志处理** param data 数据转换后模型* param operatorTime 操作时间/void handleDeleteLog(T data, Long operatorTime); }StudentLogHandler /** author whiteBrocade* version 1.0* description Student对应处理器/ Slf4j Service RequiredArgsConstructor public class StudentLogHandler implements BaseLogHandlerStudent {Overridepublic void handleInsertLog(Student student, Long operatorTime) {log.info(处理Student表的新增日志: {}, student);}Overridepublic void handleUpdateLog(Student student, Long operatorTime) {log.info(处理Student表的修改日志: {}, student);}Overridepublic void handleDeleteLog(Student student, Long operatorTime) {log.info(处理Student表的删除日志: {}, student);} }JOB MySqlDataChangeJob /** author whiteBrocade* version 1.0* description MySQL数据变更 JOb/ Slf4j Component AllArgsConstructor public class MySqlDataChangeJob {/** Flink CDC相关配置/private final FlinkCDCConfig flinkCDCConfig;/** 自定义Sink算子* customSink: 通过ognl解析ddl语句类型* dataChangeSink: 通过struct解析ddl语句类型* kafkaSink: 将MySQL变化投递到Kafka* 通常两个选择一个就行/private final CustomMySqlSink customMySqlSink;private final MySqlDataChangeSink mySqlDataChangeSink;private final MySqlChangeInfoKafkaProducerSink mysqlChangeInfoKafkaProducerSink;private final LogSink logSink;/** 自定义MySQL反序列化处理器/private final MySqlDeserializer mySqlDeserializer;/** 启动Job/SneakyThrowspublic void startJob() {log.info(—————- MySqlDataChangeJob 开始启动 —————-);FlinkCDCConfig.CdcConfig cdcConfig flinkCDCConfig.getCdcConfig();FlinkCDCConfig.MysqlConfig mysqlConfig flinkCDCConfig.getMysqlConfig();// DataStream API执行模式包括// 流执行模式Streaming用于需要持续实时处理的无界数据流。默认情况下程序使用的就是Streaming执行模式// 批执行模式Batch专门用于批处理的执行模式// 自动模式AutoMatic由程序根据输入数据源是否有界来自动选择是流处理还是批处理执行// 执行模式选择可以通过命令行方式配置StreamExecutionEnvironment mySqlEnv this.buildStreamExecutionEnvironment();// 这里选择自动模式mySqlEnv.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// todo 下列的两个MySqlSource选择一个// 自定义的反序列化器MySqlSourceMySqlDataChangeInfo mySqlSource this.buildBaseMySqlSource(MySqlDataChangeInfo.class).deserializer(mySqlDeserializer).build();// Flink CDC自带的反序列化器// MySqlSourceString mySqlSource this.buildBaseMySqlSource(String.class)// .deserializer(new JsonDebeziumDeserializationSchema())// .build();// 从MySQL源中读取数据DataStreamSourceMySqlDataChangeInfo mySqlDataStreamSource mySqlEnv.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),mysqlConfig.getSourceName())// 设置该数据源的并行度.setParallelism(cdcConfig.getParallelism());// 添加一个日志sink, 用于观察mySqlDataStreamSource.addSink(logSink);// 添加sink算子mySqlDataStreamSource// todo 根据上述的选择选择对应的Sink算子// .addSink(customMySqlSink)// .addSink(mySqlDataChangeSink); // 添加Sink, 这里配合mySQLDeserializationdataChangeSink.sinkTo(mysqlChangeInfoKafkaProducerSink.getKafkaProducerSink()); // 将MySQL的数据变化投递到Kafka中// 启动服务// execute和executeAsync启动方式对比: https://blog.csdn.net/llg___/article/details/133798713mySqlEnv.executeAsync(mysqlConfig.getJobName());log.info(—————- MySqlDataChangeJob 启动完毕 —————-);}/** 构建流式执行环境** return StreamExecutionEnvironment/private StreamExecutionEnvironment buildStreamExecutionEnvironment() {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();FlinkCDCConfig.CdcConfig cdcConfig flinkCDCConfig.getCdcConfig();// 设置整个Flink程序的默认并行度env.setParallelism(cdcConfig.getParallelism());// 设置checkpoint 间隔env.enableCheckpointing(cdcConfig.getEnableCheckpointing());// 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);return env;}/** 构建基本的MySqlSourceBuilder** param clazz 返回的数据类型Class对象* param T 源数据中存储的类型* return MySqlSourceBuilder/private T MySqlSourceBuilderT buildBaseMySqlSource(ClassT clazz) {FlinkCDCConfig.MysqlConfig mysqlConfig flinkCDCConfig.getMysqlConfig();return MySqlSource.Tbuilder().hostname(mysqlConfig.getHostname()).port(mysqlConfig.getPort()).username(mysqlConfig.getUsername()).password(mysqlConfig.getPassword()).databaseList(mysqlConfig.getDatabaseList()).tableList(mysqlConfig.getTableList())/ initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)* latest: 只进行增量导入(不读取历史变化)* timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)/.startupOptions(StartupOptions.latest()).includeSchemaChanges(mysqlConfig.getIncludeSchemaChanges()) // 包括schema的改变.serverTimeZone(GMT8); // 时区} }KafkaMySqlDataChangeJob /** author whiteBrocade* version 1.0* description kafka接受 MySQL数据变更 JOb/ Slf4j Component AllArgsConstructor public class KafkaMySqlDataChangeJob {/** Flink CDC相关配置/private final FlinkCDCConfig flinkCDCConfig;/** 自定义kafKA序列化处理器/private final KafkaSerializer kafkaSerializer;/** 自定义Kafka反序列化处理器/private final KafkaDeserializer kafkaDeserializer;/** 自定义 MySqlChangeInfo kafka消费者sink/private final MySqlChangeInfoKafkaConsumerSink mySqlChangeInfoKafkaConsumerSink;SneakyThrowspublic void startJob() {log.info(—————- KafkaMySqlDataChangeJob 开始启动 —————-);FlinkCDCConfig.KafkaConfig kafkaConfig flinkCDCConfig.getKafkaConfig();StreamExecutionEnvironment kafkaEnv this.buildStreamExecutionEnvironment();// 创建kafka数据源KafkaSourceMySqlDataChangeInfo kafkaSource this.buildBaseKafkaSource(MySqlDataChangeInfo.class)// 1. 自定义反序列化器.setDeserializer(kafkaDeserializer)// 2. 使用Kafka 提供的解析器处理// .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))// 3. 只设置kafka的value反序列化// .setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceMySqlDataChangeInfo kafkaDataStreamSource kafkaEnv.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),kafkaConfig.getSourceName());// 添加消费组算子进行数据处理kafkaDataStreamSource.addSink(mySqlChangeInfoKafkaConsumerSink);// 启动服务// 启动报错java.lang.NoSuchMethodError: org.apache.kafka.clients.admin.DescribeTopicsResult.allTopicNames 参考博客 https://www.cnblogs.com/yeyuzhuanjia/p/18254652kafkaEnv.executeAsync(kafkaConfig.getJobName());log.info(—————- KafkaMySqlDataChangeJob 启动完毕 —————-);}/** 构建流式执行环境** return StreamExecutionEnvironment/private StreamExecutionEnvironment buildStreamExecutionEnvironment() {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();FlinkCDCConfig.CdcConfig cdcConfig flinkCDCConfig.getCdcConfig();// 设置整个Flink程序的默认并行度env.setParallelism(cdcConfig.getParallelism());// 设置checkpoint 间隔env.enableCheckpointing(cdcConfig.getEnableCheckpointing());// 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);return env;}/** 构建基本的kafka数据源* 参考 https://cloud.tencent.com/developer/article/2393696* https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/*/private T KafkaSourceBuilderT buildBaseKafkaSource(ClassT Clazz) {FlinkCDCConfig.KafkaConfig kafkaConfig flinkCDCConfig.getKafkaConfig();return KafkaSource.Tbuilder()// 设置kafka地址.setBootstrapServers(kafkaConfig.getBootstrapServers())// 设置消费组id.setGroupId(kafkaConfig.getGroupId())// 设置主题,支持多种主题组合.setTopics(kafkaConfig.getTopics())// 消费模式, 支持多种消费模式/* OffsetsInitializer#committedOffsets: 从消费组提交的位点开始消费不指定位点重置策略,这种策略会报异常,没有设置快照或设置自动提交* OffsetsInitializer#committedOffsets(OffsetResetStrategy.EARLIEST): 从消费组提交的位点开始消费如果提交位点不存在使用最早位点* OffsetsInitializer#timestamp(1657256176000L): 从时间戳大于等于指定时间戳毫秒的数据开始消费* OffsetsInitializer#earliest(): 从最早位点开始消费* OffsetsInitializer#latest(): 从最末尾位点开始消费即从注册时刻开始消费/.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))// 动态检查新分区, 10 秒检查一次新分区.setProperty(partition.discovery.interval.ms, 10000);} }Runner /** author whiteBrocade* description: 数据同步 Runner类/ Slf4j Component AllArgsConstructor public class DataSyncRunner implements ApplicationRunner {private final MySqlDataChangeJob mySqlDataChangeJob;private final KafkaMySqlDataChangeJob kafkaMySqlDataChangeJob;OverrideSneakyThrowspublic void run(ApplicationArguments args) {mySqlDataChangeJob.startJob();kafkaMySqlDataChangeJob.startJob();} }工具类 JSONUtil import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import ognl.Ognl; import ognl.OgnlContext;import java.util.Map;/** author whiteBrocade* version 1.0* description: JSON工具类/ public class JSONUtil {/** 将指定JSON转为Map对象, Key类型为String,对应JSON的key* Value分情况:* 1. Value是字符串, 自动转为字符串, 例如:{a,b}* 2. Value是其他JSON对象, 自动转为Map,例如::{a:{b:2}}* 3. Value是数组, 自动转为listMap,例如::{a:[:{b:2},c:3]}** param json 输入的的JSON对象* return 动态Map集合/public static MapString, Object transferToMap(String json) {Gson gson new Gson();MapString, Object map gson.fromJson(json, new TypeTokenMapString, Object() {}.getType());return map;}/** 获取指定JSON的指定路径的值** param json 原始JSON数据* param path OGNL原则表达式* param clazz Value对应的目标类* return clazz对应的数据*/public static T T getValue(String json, String path, ClassT clazz) {try {MapString, Object map JSONUtil.transferToMap(json);OgnlContext ognlContext new OgnlContext();ognlContext.setRoot(map);T value (T) Ognl.getValue(path, ognlContext, ognlContext.getRoot(), clazz);return value;} catch (Exception e) {throw new RuntimeException(e);}} }代码(投递到ActiveMQ) 新增ActiveMQ依赖 !– 新增 ActiveMQ, 接受Flink-CDC的日志 – dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-activemq/artifactId /dependencyyaml文件新增内容

引入ActiveMQ为了解耦日志同步, 以及持久化, 这里和kafka一致, 其实Flink也有RabbitMQ相关的连接器

spring:activemq:# activemq urlbroker-url: tcp://localhost:61616# 用户名密码user: adminpassword: admin# 是否使用基于内存的ActiveMQ, 实际生产中使用基于独立安装的ActiveMQin-memory: truepool:# 如果此处设置为true需要添加activemq-pool的依赖包否则会⾃动配置失败⽆法注⼊JmsMessagingTemplateenabled: false# 我们需要在配置⽂件 application.yml 中添加⼀个配置# 发布/订阅消息的消息和点对点不同订阅消息支持多个消费者一起消费。其次SpringBoot中默认的点对点消息所以在使用Topic时会不起作用。jms:# 该配置是 false 的话则为点对点消息也是 Spring Boot 默认的# 这样是可以解决问题但是如果这样配置的话上⾯提到的点对点消息⼜不能正常消费了。所以⼆者不可兼得这并⾮⼀个好的解决办法# ⽐较好的解决办法是我们定义⼀个⼯⼚JmsListener 注解默认只接收 queue 消息如果要接收 topic 消息需要设置⼀下containerFactorypub-sub-domain: true配置类 /*** author whiteBrocade* version 1.0* description ActiveMqConfig配置/ Configuration public class ActiveMqConfig {/** 用于接受student表的消费信息/public static final String TOPIC_NAME activemq:topic:student;public static final String QUEUE_NAME activemq:queue:student;Beanpublic Topic topic() {return new ActiveMQTopic(TOPIC_NAME);}Beanpublic Queue queue() {return new ActiveMQQueue(QUEUE_NAME);}/** 接收topic消息,需要设置containerFactory/Beanpublic JmsListenerContainerFactory topicListenerContainer(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 相当于在application.yml中配置spring.jms.pub-sub-domaintruefactory.setPubSubDomain(true);return factory;} }生产者 /** author whiteBrocade* version 1.0* description CustomProducer/ Service RequiredArgsConstructor public class CustomProducer {private final JmsMessagingTemplate jmsMessagingTemplate;SneakyThrowspublic void sendQueueMessage(Queue queue, String msg) {String queueName queue.getQueueName();jmsMessagingTemplate.convertAndSend(queueName, msg);}SneakyThrowspublic void sendTopicMessage(Topic topic, String msg) {String topicName topic.getTopicName();jmsMessagingTemplate.convertAndSend(topicName, msg);} } 消费者 /** author whiteBrocade* version 1.0* description CustomQueueConsumer/ Slf4j Service RequiredArgsConstructor public class CustomQueueConsumer {JmsListener(destination ActiveMqConfig.QUEUE_NAME)public void receiveQueueMsg(String msg) {log.info(消费者1111收到Queue消息: {}, msg);StudentMqDTO mqDTO JSONUtil.toBean(msg, StudentMqDTO.class);Student student mqDTO.getStudent();Integer operatorType mqDTO.getOperatorType();OperatorTypeEnum operatorTypeEnum OperatorTypeEnum.getEnumByType(operatorType);switch (operatorTypeEnum) {case INSERT:log.info(新增Student);break;case UPDATE: log.info(修改Student);break;case DELETE:log.info(删除Student);break;}}JmsListener(destination ActiveMqConfig.TOPIC_NAME, containerFactory topicListenerContainer)public void receiveTopicMsg(String msg) {log.info(消费者1111收到Topic消息: {}, msg);} }/** author whiteBrocade* version 1.0* description Custom2QueueConsumer/ Slf4j Service public class Custom2QueueConsumer {JmsListener(destination ActiveMqConfig.TOPIC_NAME, containerFactory topicListenerContainer)public void receiveTopicMsg(String msg) {log.info(消费者2222收到Topic消息: {}, msg);} }model DTO /** author whiteBrocade* description: Student MQ DTO/ Data Builder public class StudentMqDTO implements Serializable {private static final long serialVersionUID 4308564438724519731L;/** 学生数据/private Student student;/** 数据在mysql中操作类型, 见OperatorTypeEnum的Type/private Integer operatorType; }修改StudentLogHandler, 增加MQ投递逻辑 /** author whiteBrocade* version 1.0* description Student对应处理器/ Slf4j Service RequiredArgsConstructor public class StudentLogHandler implements BaseLogHandlerStudent {private final Queue queue;Overridepublic void handleInsertLog(Student student, Long operatorTime) {log.info(处理Student表的新增日志: {}, student);this.sendMq(student, OperatorTypeEnum.INSERT);}Overridepublic void handleUpdateLog(Student student, Long operatorTime) {log.info(处理Student表的修改日志: {}, student);this.sendMq(student, OperatorTypeEnum.UPDATE);}Overridepublic void handleDeleteLog(Student student, Long operatorTime) {log.info(处理Student表的删除日志: {}, student);this.sendMq(student, OperatorTypeEnum.DELETE);}/** 发送MQ** param student Student* param operatorTypeEnum 操作类型枚举/private void sendMq(Student student, OperatorTypeEnum operatorTypeEnum) {StudentMqDTO mqDTO StudentMqDTO.builder().student(student).operatorType(operatorTypeEnum.getType()).build();String jsonStr JSONUtil.toJsonStr(mqDTO);CustomProducer customProducer SpringUtil.getBean(CustomProducer.class);// 发送到MQcustomProducer.sendQueueMessage(queue, jsonStr);} }Controller /** author whiteBrocade* version 1.0* description ActiveMqController, 用于测试发送ActiveMQ逻辑/ Slf4j RestController RequestMapping(/activemq) RequiredArgsConstructor public class ActiveMqController {private final CustomProducer customProducer;private final Queue queue;private final Topic topic;PostMapping(/send/queue)public String sendQueueMessage() {log.info(开始发送点对点的消息————-);Student student new Student();student.setId(IdUtil.getSnowflakeNextId());student.setName(小牛马);student.setDescription(我是小牛马);StudentMqDTO mqDTO StudentMqDTO.builder().student(student).operatorType(1).build();String jsonStr JSONUtil.toJsonStr(mqDTO);customProducer.sendQueueMessage(queue, jsonStr);return success;}PostMapping(/send/topic)public String sendTopicMessage() {log.info(开始发送订阅消息);Student student new Student();student.setId(IdUtil.getSnowflakeNextId());student.setName(小牛马);student.setDescription(我是小牛马);StudentMqDTO mqDTO StudentMqDTO.builder().student(student).operatorType(1).build();String jsonStr JSONUtil.toJsonStr(mqDTO);customProducer.sendTopicMessage(topic, jsonStr);return success;} }修改MySqlDataChangeJob, 将算子切换成mySqlDataChangeSink /** author whiteBrocade* version 1.0* description MySQL数据变更 JOb/ Slf4j Component AllArgsConstructor public class MySqlDataChangeJob {/** Flink CDC相关配置/private final FlinkCDCConfig flinkCDCConfig;/** 自定义Sink算子* customSink: 通过ognl解析ddl语句类型* dataChangeSink: 通过struct解析ddl语句类型* kafkaSink: 将MySQL变化投递到Kafka* 通常两个选择一个就行/private final CustomMySqlSink customMySqlSink;private final MySqlDataChangeSink mySqlDataChangeSink;private final MySqlChangeInfoKafkaProducerSink mysqlChangeInfoKafkaProducerSink;private final LogSink logSink;/** 自定义MySQL反序列化处理器/private final MySqlDeserializer mySqlDeserializer;/** 启动Job/SneakyThrowspublic void startJob() {log.info(—————- MySqlDataChangeJob 开始启动 —————-);FlinkCDCConfig.CdcConfig cdcConfig flinkCDCConfig.getCdcConfig();FlinkCDCConfig.MysqlConfig mysqlConfig flinkCDCConfig.getMysqlConfig();// DataStream API执行模式包括// 流执行模式Streaming用于需要持续实时处理的无界数据流。默认情况下程序使用的就是Streaming执行模式// 批执行模式Batch专门用于批处理的执行模式// 自动模式AutoMatic由程序根据输入数据源是否有界来自动选择是流处理还是批处理执行// 执行模式选择可以通过命令行方式配置StreamExecutionEnvironment mySqlEnv this.buildStreamExecutionEnvironment();// 这里选择自动模式mySqlEnv.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// todo 下列的两个MySqlSource选择一个// 自定义的反序列化器MySqlSourceMySqlDataChangeInfo mySqlSource this.buildBaseMySqlSource(MySqlDataChangeInfo.class).deserializer(mySqlDeserializer).build();// Flink CDC自带的反序列化器// MySqlSourceString mySqlSource this.buildBaseMySqlSource(String.class)// .deserializer(new JsonDebeziumDeserializationSchema())// .build();// 从MySQL源中读取数据DataStreamSourceMySqlDataChangeInfo mySqlDataStreamSource mySqlEnv.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),mysqlConfig.getSourceName())// 设置该数据源的并行度.setParallelism(cdcConfig.getParallelism());// 添加一个日志sink, 用于观察mySqlDataStreamSource.addSink(logSink);// 添加sink算子mySqlDataStreamSource// todo 根据上述的选择选择对应的Sink算子// .addSink(customMySqlSink).addSink(mySqlDataChangeSink); // 添加Sink, 这里配合mySQLDeserializationdataChangeSink// .sinkTo(mysqlChangeInfoKafkaProducerSink.getKafkaProducerSink()); // 将MySQL的数据变化投递到Kafka中// 启动服务// execute和executeAsync启动方式对比: https://blog.csdn.net/llg___/article/details/133798713mySqlEnv.executeAsync(mysqlConfig.getJobName());log.info(—————- MySqlDataChangeJob 启动完毕 —————-);}/** 构建流式执行环境** return StreamExecutionEnvironment/private StreamExecutionEnvironment buildStreamExecutionEnvironment() {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();FlinkCDCConfig.CdcConfig cdcConfig flinkCDCConfig.getCdcConfig();// 设置整个Flink程序的默认并行度env.setParallelism(cdcConfig.getParallelism());// 设置checkpoint 间隔env.enableCheckpointing(cdcConfig.getEnableCheckpointing());// 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);return env;}/** 构建基本的MySqlSourceBuilder** param clazz 返回的数据类型Class对象* param T 源数据中存储的类型* return MySqlSourceBuilder/private T MySqlSourceBuilderT buildBaseMySqlSource(ClassT clazz) {FlinkCDCConfig.MysqlConfig mysqlConfig flinkCDCConfig.getMysqlConfig();return MySqlSource.Tbuilder().hostname(mysqlConfig.getHostname()).port(mysqlConfig.getPort()).username(mysqlConfig.getUsername()).password(mysqlConfig.getPassword()).databaseList(mysqlConfig.getDatabaseList()).tableList(mysqlConfig.getTableList())/ initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)* latest: 只进行增量导入(不读取历史变化)* timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)/.startupOptions(StartupOptions.latest()).includeSchemaChanges(mysqlConfig.getIncludeSchemaChanges()) // 包括schema的改变.serverTimeZone(GMT8); // 时区} }代码(MySQL通过MQ同步到ES) 换成这里的MQ替换成Kafka也是同理 官方地址Easy-Es,它主要就是简化了ES相关的API, 使用起来像MP一样舒服, 这里不在过多介绍, 跑通下边这个案例要看博主另外一篇博客easy-es使用
同步方案有两种 Flink-CDC监听MySQL直接写入ESFlink-CDC监听MySQL写入ActiveMQ, MQ写入到ES(这里实现MQ的) 引入MQ保证同步的一个持久性, 即是宕机了, 那么重启恢复后也是可以继续使用的 新增ES和Eesy-ES依赖 !– es依赖 – !– 排除springboot中内置的es依赖,以防和easy-es中的依赖冲突– dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactIdexclusionsexclusiongroupIdorg.elasticsearch.client/groupIdartifactIdelasticsearch-rest-high-level-client/artifactId/exclusionexclusiongroupIdorg.elasticsearch/groupIdartifactIdelasticsearch/artifactId/exclusion/exclusions /dependency dependencygroupIdorg.elasticsearch.client/groupIdartifactIdelasticsearch-rest-high-level-client/artifactIdversion\({es.vsersion}/version /dependency dependencygroupIdorg.elasticsearch/groupIdartifactIdelasticsearch/artifactIdversion\){es.vsersion}/version /dependency!– easy-es – dependencygroupIdorg.dromara.easy-es/groupIdartifactIdeasy-es-boot-starter/artifactIdversion${easy-es.vsersion}/version /dependency修改消费者CustomQueueConsumer /**
author whiteBrocade* version 1.0* description CustomQueueConsumer/ Slf4j Service RequiredArgsConstructor public class CustomQueueConsumer {private final StudentEsMapper studentEsMapper;JmsListener(destination ActiveMqConfig.QUEUE_NAME)public void receiveQueueMsg(String msg) {log.info(消费者1111收到Queue消息: {}, msg);StudentMqDTO mqDTO JSONUtil.toBean(msg, StudentMqDTO.class);Student student mqDTO.getStudent();Integer operatorType mqDTO.getOperatorType();OperatorTypeEnum operatorTypeEnum OperatorTypeEnum.getEnumByType(operatorType);switch (operatorTypeEnum) {case INSERT:// 同步新增到Es中StudentEsEntity studentEsEntity new StudentEsEntity();BeanUtil.copyProperties(student, studentEsEntity);studentEsEntity.setMysqlId(student.getId());studentEsMapper.insert(studentEsEntity);break;case UPDATE:case DELETE:// 修改mysql, 再删除ESLambdaEsQueryWrapperStudentEsEntity wrapper new LambdaEsQueryWrapper();wrapper.eq(StudentEsEntity::getMysqlId, student.getId());studentEsMapper.delete(wrapper);break;}}JmsListener(destination ActiveMqConfig.TOPIC_NAME, containerFactory topicListenerContainer)public void receiveTopicMsg(String msg) {log.info(消费者1111收到Topic消息: {}, msg);} }/** author whiteBrocade* version 1.0* description Custom2QueueConsumer*/ Slf4j Service public class Custom2QueueConsumer {JmsListener(destination ActiveMqConfig.TOPIC_NAME, containerFactory topicListenerContainer)public void receiveTopicMsg(String msg) {log.info(消费者2222收到Topic消息: {}, msg);} }