如何把网站转网站长春网站开发培训
- 作者: 五速梦信息网
- 时间: 2026年03月21日 09:52
当前位置: 首页 > news >正文
如何把网站转网站,长春网站开发培训,物流软件开发工具,移动网站转码1.概叙
场景一#xff1a;数据增量实时同步
项目中业务数据量比较大#xff0c;每类业务表都达到千万级别#xff0c;虽然做了分库分表#xff0c;每张表数据控制在300W以下#xff0c;但是效率还是达不到要求#xff0c;为了提高查询效率#xff0c;打算使用ES进行数…1.概叙
场景一数据增量实时同步
项目中业务数据量比较大每类业务表都达到千万级别虽然做了分库分表每张表数据控制在300W以下但是效率还是达不到要求为了提高查询效率打算使用ES进行数据查询。
那么这个时候问题来了怎么把MySQL中增量数据同步到ES
场景二缓存一致性问题
Java web应用性能分析之【高并发之缓存-多级缓存】_java多级缓存-CSDN博客 在前面文章中有提到这个场景如何保证redis、EhCache、MySQL数据一致
我们都知道作为数据库写操作是不通过缓存的。假设商品服务实例 1 将 1 号商品价格调整为 80 元这会衍生一个新问题如何主动向应用程序推送数据变更的消息来保证它们也能同步更新缓存呢
针对上面两种场景可以看看canal的解决方案。
什么是Canal?
canal [kənæl]译意为水道/管道/沟渠主要用途是基于 MySQL 数据库增量日志解析提供增量数据订阅和消费。项目起源于阿里巴巴内部对于跨机房数据同步的需求通过解析MySQL的二进制日志Binary LogCanal能够捕获并推送数据库的变更事件满足了诸如数据库镜像、实时备份、索引实时维护等多种业务场景的需求。
GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅消费组件
Home · alibaba/canal Wiki · GitHub
支持范围
Canal当前支持MySQL数据库的多个版本包括但不限于5.1.x、5.5.x、5.6.x、5.7.x及8.0.x同时也兼容阿里云RDS等云数据库服务为用户提供了广泛的数据库兼容性保障。
部分支持MySQL体系数据库Mariadb 10.x、PolarDB-X 主要特性
高性能与低延迟Canal 1.1.x版本进行了深度优化性能提升高达150%。 Prometheus监控原生集成Prometheus监控便于系统健康状况的跟踪。 消息系统集成直接支持Kafka、RocketMQ消息投递便于与大数据平台对接。 云数据库支持无缝对接阿里云RDS解决了自动主备切换及离线Binlog解析问题。 Docker部署提供Docker镜像简化部署流程。 WebUI管理Canal-Admin工程引入WebUI实现动态配置、任务管理与日志查看等功能。
2.Canal原理 MySQL主备复制原理 MySQL master 将数据变更写入二进制日志binary log, 日志中的记录叫做二进制日志事件binary log events可以通过 show binlog events 进行查看 MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log) MySQL slave 重放 relay log 中事件将数据变更反映到它自己的数据 Canal工作原理
Canal巧妙地模拟了MySQL主从复制的机制。具体而言
伪装为MySQL Slavecanal 模拟 MySQL slave 的交互协议伪装自己为 MySQL slave 向 MySQL master 发送 dump 协议获取Binary LogMySQL Master接收到请求后开始推送Binary Log给Canal。解析日志事件Canal解析接收到的Binary Log将数据变更信息转换为易于处理的结构化数据。canal 解析 binary log 对象(原始为 byte 流)转换为json格式。数据同步Canal 客户端通过 TCP 协议或 MQ 形式监听 Canal 服务端同步数据到 ES、KFK、HBase、RocketMQ、Pulsar等。
优点 可以完全和业务代码解耦增量日志订阅。
缺点实时性不高订阅mysql日志DB中数据事务成功后开始同步至canal。
Canal总体架构 对应这些软件包
deployer包即canal server负责从master库同步binlog将数据通过tcp的方式同步给Adapter适配器经过Adapter适配同步给目标库通过MQ将数据同步给canal client或目标库canal client也可以再同步给目标库。 server包官方介绍https://github.com/alibaba/canal/wiki/DevGuideadapter包给目标库同步数据。目前支持Hbase、RDB、ES。 adapter包官方介绍https://github.com/alibaba/canal/wiki/ClientAdapteradmin包canal 1.1.4版本迎来最重要的WebUI能力引入canal-admin工程支持面向WebUI的canal动态管理能力支持配置、任务、日志等在线白屏运维能力具体文档Canal Admin Guide admin包官方介绍https://github.com/alibaba/canal/wiki/Canal-Admin-Guidecanal.client消费server数据 Canal设计了client-server架构支持多种语言客户端通过protobuf 3.0协议与之交互官方及社区提供了以下客户端Java客户端ClientExampleC#客户端CanalSharpGo客户端canal-goPython客户端canal-pythonPHP客户端canal-phpRust客户端canal-rsNode.js客户端canal-nodejs 除了基础功能Canal还支持丰富的进阶特性和周边生态工具如 Canal-Admin提供Web界面管理Canal实例实现配置、监控和运维的可视化操作。 canal2sql一个工具项目能根据Binlog生成SQL便于数据迁移或备份。 OtterCanal的消费端开源项目用于数据同步与数据集成。 Canal高可用架构 整个 HA 机制的控制主要是依赖了zookeeper的两个特性watcher、EPHEMERAL节点。canal的 HA 机制实现分为两部分canal server 和 canal client分别有对应的实现。
canal server实现流程如下
1、canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现创建 EPHEMERAL 节点谁创建成功就允许谁启动。2、创建 zookeeper 节点成功后对应的 canal server 就启动对应的 canal instance没有创建成功的 canal instance 就会处于 standby 状态。3、一旦 zookeeper 发现 canal server A 创建的节点消失后立即通知其他的 canal server 再次进行步骤1的操作重新选出一个 canal server 启动instance。4、canal client 每次进行connect时会首先向 zookeeper 询问当前是谁启动了canal instance然后和其建立链接一旦链接不可用会重新尝试connect。 为了减少对mysql dump的请求不同server上的instance要求同一时间只能有一个处于running其他的处于standby状态。 依赖
JDK1.8MySQL用于canal-admin存储配置和节点等相关数据Zookeeper
3.Canal实战
准备环境
名称版本MySQL5.7elasticsearch7.17.9canal1.1.7jdk1.8 对于自建 MySQL , 需要先开启 Binlog 写入功能配置 binlog-format 为 ROW 模式my.cnf 中配置如下 [mysqld]
log-binmysql-bin # 开启 binlog
binlog-formatROW # 选择 ROW 模式
server_id1 # 配置 MySQL replaction 需要定义不要和 canal 的 slaveId 重复注意针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant CREATE USER canal IDENTIFIED BY canal;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO canal%;
– GRANT ALL PRIVILEGES ON . TO canal% ;
FLUSH PRIVILEGES; 下载 canal 本次操作只需要下载admin、deployer两个包。
下载后解压即可用当然要运行起来的话需要配置jdk1.8这里就不再多说。 创建admin的数据库conf/canal_manager.sql 启动admin前先完成建库和见表包括admin的默认登录账号密码 admin/123456否则启动报错。主要是如下六张表。 修改配置文件
修改server的配置文件conf/canal.properties 和conf/example/instance.properties 切记数据库和配置文件中的密码要一致。 修改admin的配置文件conf/application.yml 分别启动admin和server
启动server:bin/startup.bat 启动adminbin/startup.bat 登录adminhttp://192.168.1.4:8089/#/login?redirect%2Fdashboard
默认账号密码admin/123456 配置java客户端
引入依赖 dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.4/version/dependency 客户端代码
package com.zxx.study.base.canal;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.SneakyThrows;import java.net.InetSocketAddress;
import java.util.List;/*** author zhouxx* create 2024-08-01 21:59/
public class CanalClient {SneakyThrowspublic static void main(String[] args) {try {// 创建canal客户端单链接模式CanalConnector canalConnector CanalConnectors.newSingleConnector(new InetSocketAddress(192.168.1.4,11111), example, , );// 创建连接canalConnector.connect();while (true) {// 订阅数据库// canalConnector.subscribe(mall);// 获取数据Message message canalConnector.get(100);// 获取Entry集合ListCanalEntry.Entry entries message.getEntries();// 判断集合是否为空,如果为空,则等待一会继续拉取数据if (entries.size() 0) {
// System.out.println(当次抓取没有数据休息一会。。。。。。);Thread.sleep(1000);} else {// 遍历entries单条解析for (CanalEntry.Entry entry : entries) {//1.获取表名String tableName entry.getHeader().getTableName();//2.获取类型CanalEntry.EntryType entryType entry.getEntryType();//3.获取序列化后的数据ByteString storeValue entry.getStoreValue();//4.判断当前entryType类型是否为ROWDATAif (CanalEntry.EntryType.ROWDATA.equals(entryType)) {//5.反序列化数据CanalEntry.RowChange rowChange CanalEntry.RowChange.parseFrom(storeValue);//6.获取当前事件的操作类型CanalEntry.EventType eventType rowChange.getEventType();//7.获取数据集ListCanalEntry.RowData rowDataList rowChange.getRowDatasList();//8.遍历rowDataList并打印数据集for (CanalEntry.RowData rowData : rowDataList) {JSONObject beforeData new JSONObject();ListCanalEntry.Column beforeColumnsList rowData.getBeforeColumnsList();for (CanalEntry.Column column : beforeColumnsList) {beforeData.put(column.getName(), column.getValue());}JSONObject afterData new JSONObject();ListCanalEntry.Column afterColumnsList rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {afterData.put(column.getName(), column.getValue());}//数据打印System.out.println(Table: tableName ,EventType: eventType ,Before: beforeData ,After: afterData);/** Table:test_user,EventType:UPDATE,Before:{name:1111221,id:1},After:{name:11tom,id:1}* Table:test_user,EventType:INSERT,Before:{},After:{name:zhouxx,id:17}* Table:test_user,EventType:INSERT,Before:{},After:{name:zhouxx1,id:18}* */}}}}}}catch (Exception e){e.printStackTrace();}}}运行效果在数据库里面忝删改查均可以在客户端中打印出来 cancel框架同步mysql数据到kafka
参考cancel框架同步mysql数据到kafka_mysql cancel-CSDN博客
利用canal进行MySQL到ES的数据实时同步
参考利用canal进行MySQL到ES的数据实时同步_canal es-CSDN博客 结合上图至此场景一和场景二中的问题均可以解决。
- 上一篇: 如何把网站上线网页制作工具知乎
- 下一篇: 如何把网站做成app手机百度下载安装
相关文章
-
如何把网站上线网页制作工具知乎
如何把网站上线网页制作工具知乎
- 技术栈
- 2026年03月21日
-
如何把网站上传到网上网站建设学习什么
如何把网站上传到网上网站建设学习什么
- 技术栈
- 2026年03月21日
-
如何把网站能搜到什么是网站的二级目录下
如何把网站能搜到什么是网站的二级目录下
- 技术栈
- 2026年03月21日
-
如何把网站做成app手机百度下载安装
如何把网站做成app手机百度下载安装
- 技术栈
- 2026年03月21日
-
如何把网站做成软件网站设计公司苏州
如何把网站做成软件网站设计公司苏州
- 技术栈
- 2026年03月21日
-
如何百度搜到网站微分销系统定制开发
如何百度搜到网站微分销系统定制开发
- 技术栈
- 2026年03月21日






