网站建设标志头像图片公司签约网站
- 作者: 五速梦信息网
- 时间: 2026年04月20日 07:53
当前位置: 首页 > news >正文
网站建设标志头像图片,公司签约网站,单页成品网站,网上如何推广产品背景 当项目有很多数据源的时候#xff0c;通常会在启动的时候就把数据源连接加载缓存上#xff0c;当数据源进行变更后如何自动实时将缓存的数据源进行更新呢#xff1f;如果是单个项目直接调接口方法就行了#xff0c;但是涉及到分布式多个系统呢#xff1f; 解决方案…背景 当项目有很多数据源的时候通常会在启动的时候就把数据源连接加载缓存上当数据源进行变更后如何自动实时将缓存的数据源进行更新呢如果是单个项目直接调接口方法就行了但是涉及到分布式多个系统呢 解决方案 使用Redis轻量级消息队列它可以实现实时通知实时状态更新等功能,配合AOP实现自动更新数据源状态。 下面结合代码写一个使用示例 1.首先创建数据源对象 import cn.hutool.core.collection.CollectionUtil; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Getter; import lombok.Setter; import lombok.ToString; import lombok.experimental.Accessors; import org.apache.commons.lang3.StringUtils; import java.io.Serializable; import java.util.ArrayList; import java.util.Date; import java.util.List;/**** author ws* since 2022-08-12/ Getter Setter ToString Accessors(chain true) TableName(ed_datasource_info) public class DatasourceInfo implements Serializable {private static final long serialVersionUID 1L;TableId(value id, type IdType.AUTO)private Integer id;/** 数据源编码/TableField(datasource_code)private String datasourceCode;/** 数据源名称/TableField(datasource_name)private String datasourceName;/** 数据源类型/TableField(datasource_type)private String datasourceType;/** 类型 0:数据库 1:Rest-api/TableField(type)private Integer type;/** 创建人/TableField(creator)private String creator;/** 模式/TableField(schema_name)private String schemaName;TableField(create_time)private Date createTime;TableField(update_time)private Date updateTime;/** 数据源连接信息/TableField(link_json)private String linkJson;}2.初始化启动加载数据源 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.sztech.common.constant.DataSourceTypeEnum; import com.sztech.entity.DatasourceInfo; import com.sztech.service.DatasourceInfoService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit;Slf4j Component public class DataSourceRecovery implements InitializingBean {Resourceprivate DatasourceInfoService datasourceInfoService;Overridepublic void afterPropertiesSet() throws Exception {refresh();}private void refresh() throws Exception{this.refresh(null);}public void refresh(String sourceCode){QueryWrapperDatasourceInfo queryWrapper new QueryWrapper();queryWrapper.eq(type, DataSourceTypeEnum.DB.getKey());if(StringUtils.isNotBlank(sourceCode)){queryWrapper.eq(datasource_code,sourceCode);}ListDatasourceInfo list datasourceInfoService.list(queryWrapper);if(CollectionUtils.isEmpty(list)){return;}CountDownLatch countDownLatch new CountDownLatch(list.size());for(DatasourceInfo datasourceInfo : list){new Thread(new ReadloadThread(datasourceInfo, countDownLatch)).start();}try {countDownLatch.await(1,TimeUnit.MINUTES);} catch (InterruptedException e) {log.error(数据源加载等待超时,e);}}/** 多线程加载数据源,提高启动速度/static class ReadloadThread implements Runnable {private DatasourceInfo datasourceInfo;private CountDownLatch countDownLatch;public ReadloadThread() {}public ReadloadThread(DatasourceInfo datasourceInfo,CountDownLatch countDownLatch) {this.datasourceInfo datasourceInfo;this.countDownLatch countDownLatch;}Overridepublic void run() {try {DataSourceContext.setClientMap(datasourceInfo);DataSourceContext.setConfigMap(datasourceInfo.getDatasourceCode(),datasourceInfo);}catch (Exception e){log.error(datasource:{},加载失败,datasourceInfo.getDatasourceCode(),e);}finally {countDownLatch.countDown();}}} }3.创建DataSourceContext,用于数据源缓存数据源连接 import com.sztech.core.tool.DBTool; import com.sztech.entity.DatasourceInfo; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;/** User: wangsheng* Date: 2022-02-11* Time: 14:05/ public class DataSourceContext {/** 客户端缓存/private final static MapString, IClient clientMap new ConcurrentHashMap();/** 数据源配置缓存/private final static MapString, DatasourceInfo configMap new ConcurrentHashMap();public static void setClientMap(DatasourceInfo datasourceInfo) {if(clientMap.containsKey(datasourceInfo.getDatasourceCode())){try {clientMap.get(datasourceInfo.getDatasourceCode()).close();}catch (Exception ignored){}}clientMap.put(datasourceInfo.getDatasourceCode(),DBTool.buildClient(datasourceInfo));}public static void setConfigMap(String key, DatasourceInfo datasourceInfo) {configMap.put(key, datasourceInfo);}public static void removeClientMap(String key) {if(clientMap.containsKey(key)){try {clientMap.get(key).close();}catch (Exception ignored){}}clientMap.remove(key);}public static void removeConfigMap(String key) {configMap.remove(key);}public static IClient getClientMap(String key) {IClient client clientMap.get(key);if(null client){throw new RuntimeException(String.format(数据源编码:[%s]不存在或被删除…, key));}return client;}public static DatasourceInfo getConfigMap(String key) {DatasourceInfo datasourceInfo configMap.get(key);if(null datasourceInfo){throw new RuntimeException(String.format(数据源编码:[%s]不存在或被删除…, key));}return datasourceInfo;} }package com.sztech.core.tool;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.aliyun.odps.Instance; import com.sztech.common.constant.ResultEnum; import com.sztech.common.exception.BizException; import com.sztech.common.utils.ReflectionUtils; import com.sztech.common.utils.SpringUtils; import com.sztech.common.utils.ThreadPoolUtil; import com.sztech.core.datasource.DataSourceContext; import com.sztech.core.datasource.IClient; import com.sztech.core.datasource.rdbms.RdbmsConfig; import com.sztech.entity.; import com.sztech.pojo.dto.ColumnDto; import com.sztech.pojo.dto.QueryTableDto; import com.sztech.pojo.dto.TableDto; import com.sztech.pojo.node.PartitionColumn; import com.sztech.pojo.vo.; import com.sztech.service.CreateTableLogService; import lombok.extern.slf4j.Slf4j; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;import java.sql.; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor;/*** Description:* User: wangsheng* Date: 2022-08-12* Time: 16:59/ Slf4j public class DBTool {/** 建立客户端/public static IClient buildClient(DatasourceInfo datasourceInfo) {IClient client ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), type, IClient.class);return client.open(datasourceInfo);}/** 测试数据源** return/public static boolean testSource(DatasourceInfo datasourceInfo) {IClient client ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), type, IClient.class);return client.testSource(datasourceInfo);}public static ListString getSchemas(DatasourceInfo datasourceInfo) {ListString schemas new ArrayList();Connection conn null;try {IClient client ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), type, IClient.class);Class.forName(client.driverName());String linkJson datasourceInfo.getLinkJson();RdbmsConfig rdbmsConfig JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);conn DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getDecodePassword());DatabaseMetaData metadata conn.getMetaData();try (ResultSet resultSet metadata.getSchemas()) {while (resultSet.next()) {String schemaName resultSet.getString(TABLE_SCHEM);schemas.add(schemaName);}}} catch (SQLException e) {throw new RuntimeException(e);} catch (ClassNotFoundException e) {throw new RuntimeException(e);} finally {if (conn ! null) {try {conn.close();} catch (SQLException ex) {ex.printStackTrace();}}}return schemas;}/** 获取驱动名称/public static String getDriverName(String datasourceType) {IClient client ReflectionUtils.getInstanceFromCache(datasourceType, type, IClient.class);return client.driverName();}/** 获取表中列信息/public static ListColumnDto getColumns(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getColumns(tableName);}/** 获取表中分区列信息/public static ListString getPartitionColumns(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getPartitionColumns(tableName);}/** 获取表信息/public static ListString getTableNames(String datasourceCode, String tableNameLike) {return DataSourceContext.getClientMap(datasourceCode).getTableNames(tableNameLike);}/** 获取表信息/public static ListTableDto getTables(String datasourceCode) {return DataSourceContext.getClientMap(datasourceCode).getTables();}/** 获取单个表信息/public static TableDto getTableByName(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableByName(tableName);}/** 获取单个表信息(创建时间字段数)/public static TableDto getTableField(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableField(tableName);}/** 获取表信息获取创建时间** param dto* return/public static TableInfoVo getTableData(QueryTableDto dto) {IClient client DataSourceContext.getClientMap(dto.getDataSourceCode());return client.getTableInfo(dto.getTableName());}/** 根据字段type建表/public static void createTableByColumns(ListColumnDto columnDtos, String tableName, String datasourceCode) {IClient client DataSourceContext.getClientMap(datasourceCode);ListString sqls client.buildTableSql(columnDtos, tableName, true);log.info(执行建表语句为 JSON.toJSONString(sqls));sqls.forEach(s - client.executeCommandSyn(s, new HashMap()));}/** 根据字段type建表/public static void createTableByNotTransformedColumns(ListColumnDto columnDtos, String tableName, String datasourceCode) {IClient client DataSourceContext.getClientMap(datasourceCode);ListString sqls client.buildTableSql(columnDtos, tableName, false);log.info(执行建表语句为 JSON.toJSONString(sqls));sqls.forEach(s - client.executeCommandSyn(s, new HashMap()));}/** 创建索引* 注: oracle 索引名在整个库里必须唯一 否则建立失败** param datasourceCode 数据源编码* param tableName 表名* param filedNames filed1,filed2…* param unique 唯一/public static void createIndex(String datasourceCode, String tableName, String filedNames, Boolean unique) {DataSourceContext.getClientMap(datasourceCode).createIndex(tableName, filedNames, unique);}/** sql校验** param datasourceCode* param sql* param sourceType* return/public static MapString, Object checkSql(String datasourceCode, String sql, String sourceType) {IClient client DataSourceContext.getClientMap(datasourceCode);return client.checkSql(sql, sourceType);}/** 根据sql创建表** param datasourceCode* param sql/public static void createTableWithSql(String datasourceCode, String sql) {IClient client DataSourceContext.getClientMap(datasourceCode);log.info(执行建表语句为 JSON.toJSONString(sql));client.executeCommandSyn(sql, new HashMap()); // DataSourceContext.getClientMap(datasourceCode).createTableWithSql(sql);}/** 删除表** param datasourceCode* param tableName/public static void dropTable(String datasourceCode, String tableName) {DataSourceContext.getClientMap(datasourceCode).dropTable(tableName);}/** 单表查询数据/public static ListMapString, Object selectDataFromTable(String datasourceCode, ListDataTableColumn columns, String tableName, String search, Integer limit) {IClient client DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql client.getSelectSql(columns, tableName, search, limit);log.info(执行语句 selectSql);return client.selectDataFromTable(selectSql, null);}/** 单表查询数据/public static ListMapString, Object selectFromTable(String datasourceCode, ListFormColumn columns, ListFormColumn searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql client.getFormSelectSql(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info(执行语句 selectSql);return client.selectDataFromTable(selectSql, params);}/** 单表查询数据/public static ListMapString, Object selectFromForBackUp(String datasourceCode, ListFormColumn columns, ListFormColumn searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql client.selectFromForBackUp(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info(执行语句 selectSql);return client.selectDataFromTable(selectSql, params);}/** 单表查询数据/public static ListMapString, Object selectFromFile(String datasourceCode, ListFormColumn columns, ListFormColumn searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql client.getFormSelectSqlForFile(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info(执行语句 selectSql);return client.selectDataFromTable(selectSql, params);}/** 查询单表是否存在文件名/public static ListMapString, Object getExistOldName(String datasourceCode, String tableName, String search) {IClient client DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql client.getExistOldName( tableName, search);log.info(执行语句 selectSql);return client.selectDataFromTable(selectSql, null);}/** 单表查询数据(查询归集表专门使用)/public static ListMapString, Object selectCollectTable(CollectConditionVo vo) {IClient client DataSourceContext.getClientMap(vo.getDatasourceCode());// 获取查询语句String selectSql client.getCollectTable(vo);log.info(执行语句 selectSql);return client.selectDataFromTable(selectSql, vo.getParams());}/** 单表查询数据量/public static MapString, Object getFormCount(String datasourceCode, ListFormColumn columns, ListFormColumn searchColumns, String tableName, String search, MapSqlParameterSource params) {IClient client DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql client.getCountSql(columns, searchColumns, tableName, search, params);log.info(执行语句 selectSql);return client.getCount(selectSql, params);}/** 查询区县库表的数据量/public static MapString, Object getCountryCount(String datasourceCode, String tableName, MapSqlParameterSource params) {IClient client DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql select count(1) as count from tableName;log.info(执行语句 selectSql);return client.getCount(selectSql, params);}public static MapString, Object getFormCountForFile(String datasourceCode, ListFormColumn columns, ListFormColumn searchColumns, String tableName, String search, MapSqlParameterSource params) {IClient client DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql client.getCountSqlForFile(columns, searchColumns, tableName, search, params);log.info(执行语句 selectSql);return client.getCount(selectSql, params);}/** 查询表数据量/public static Long getTableRows(String datasourceCode, String tableName) {IClient client DataSourceContext.getClientMap(datasourceCode);return client.getTableRows(tableName);}/** 查询表对应分区数据量/public static Long getTablePartitionRows(String datasourceCode, String tableName, ListPartitionColumn partitionColumns) {IClient client DataSourceContext.getClientMap(datasourceCode);return client.getTablePartitionRows(tableName, partitionColumns);}/** 查询表数据量/public static Integer getTablePhysicalSize(String datasourceCode, String tableName) {IClient client DataSourceContext.getClientMap(datasourceCode);return client.getPhysicalSize(tableName);}/** 获取表最大值** param datasourceCode 数据源编码* param tableName 表名* param incColumnName 自增列名* return {link Integer}/public static Object getMaxValue(String datasourceCode, String tableName, String incColumnName, String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxValue(tableName, incColumnName, condition);}public static Object getMaxValue(String datasourceCode, String schema, String tableName, String incColumnName, String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxValue(schema, tableName, incColumnName, condition);}public static Object getMaxTime(String datasourceCode, String schema, String tableName, String incColumnName, String tongId,String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxTime(schema, tableName, incColumnName,tongId, condition);}/** 字段存在** param datasourceCode 数据源编码* param tableName 表名* param fieldName 字段名* return {link Boolean}/public static Boolean fieldExist(String datasourceCode, String tableName, String fieldName) {ListColumnDto columns getColumns(datasourceCode, tableName);return columns.stream().anyMatch(s - s.getName().equalsIgnoreCase(fieldName));}/** 数据预览 获取前十条** return/public static String dataView(String datasourceCode, String tableName, String condition) {return DataSourceContext.getClientMap(datasourceCode).dataView(tableName, condition);}/** 创建分区临时表* odps适用/public static void createPartitionedTableByColumns(ListColumnDto columnDtos, String tableName, String tableComment, String partitionedField, String datasourceCode) {DataSourceContext.getClientMap(datasourceCode).createPartitionedTableByColumns(columnDtos, tableName, tableComment, partitionedField);}/** 同步执行命令/public static void executeCommandSyn(String datasourceCode, String command, MapString, Object params) {DataSourceContext.getClientMap(datasourceCode).executeCommandSyn(command, params);}/** 异步执行命令* odps适用/public static Instance executeCommandASyn(String datasourceCode, String command, MapString, Object params) {return DataSourceContext.getClientMap(datasourceCode).executeCommandASyn(command, params);}/** 是否有导出权限* odps适用** param datasourceCode 数据源编码* param tableName 表名* return {link Boolean}/public static Boolean exportEnable(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).exportEnable(tableName);}/** 插入单条数据** param datasourceCode* param vo* return/public static Integer insert(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).insert(vo);}/** 批量插入数据** param datasourceCode* param vo* return/public static Integer[] betchInsert(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsert(vo);}/** 批量插入数据** param datasourceCode* param vo* return/public static Integer[] betchInsertByConnection(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsertByConnection(vo);}/** 这个方法不需要分装参数直接传字段名称list就好了* param datasourceCode* param vo* return/public static Integer[] betchInsertForCommom(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsertForCommom(vo);}/** 删除数据** param datasourceCode* param vo* return/public static Integer delete(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).delete(vo);}/** 这个删除方法可以自定义条件服号* param datasourceCode* param vo* return/public static Integer deleteForCommon(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForCommon(vo);}public static Integer deleteForFile(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForFile(vo);}public static String deleteForPre(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForPre(vo);}/** 修改数据** param datasourceCode* param vo* return/public static Integer update(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).update(vo);}/** 修改数据** param datasourceCode* param vo* return/public static Integer updateForFile(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).updateForFile(vo);}/** 获取表单基本信息** param vo* return/public static TableMetaDataVo getTableBasicInfo(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).getTableBasicInfo(vo);}/** 根据字段type建表/public static void createCollectTable(ListCatalogColumnInfo columnDtos, String tableName, String datasourceCode, String tableComment, Boolean ifPartition) {IClient client DataSourceContext.getClientMap(datasourceCode);ListString sqls client.buildTableSqlForCollect(columnDtos, tableName, tableComment, ifPartition);log.info(执行建表语句为 JSON.toJSONString(sqls));try {sqls.forEach(s - client.executeCommandSyn(s, new HashMap()));} catch (Exception e) {e.printStackTrace();String message e.getMessage();if (e instanceof BizException) {BizException exception (BizException) e;message exception.getMsg();}log.error(建表错误{}:, message);ThreadPoolExecutor instance ThreadPoolUtil.instance();String finalMessage message;instance.submit(() - {CreateTableLog createTableLog new CreateTableLog();createTableLog.setErrorLog(finalMessage);createTableLog.setParams(JSON.toJSONString(sqls));createTableLog.setCode(tableName);CreateTableLogService createTableLogService SpringUtils.getBean(CreateTableLogService.class);createTableLogService.save(createTableLog);});throw new BizException(ResultEnum.ERROR.getCode(), 建表失败请联系管理员);}}/** 根据字段type建表/public static void updateCollectTable(CreateCollectVo vo) {IClient client DataSourceContext.getClientMap(vo.getDatasourceCode());ListString sqls client.buildTableSqlForUpdate(vo);log.info(执行更新表语句为 JSON.toJSONString(sqls));try {sqls.forEach(s - client.executeCommandSyn(s, new HashMap()));} catch (Exception e) {e.printStackTrace();String message e.getMessage();if (e instanceof BizException) {BizException exception (BizException) e;message exception.getMsg();}log.error(建表错误{}:, message);ThreadPoolExecutor instance ThreadPoolUtil.instance();String finalMessage message;instance.submit(() - {CreateTableLog createTableLog new CreateTableLog();createTableLog.setErrorLog(finalMessage);createTableLog.setParams(JSON.toJSONString(sqls));createTableLog.setCode(vo.getTableName());CreateTableLogService createTableLogService SpringUtils.getBean(CreateTableLogService.class);createTableLogService.save(createTableLog);});log.info(建表失败了开始准备抛出了————————————–);throw new BizException(ResultEnum.ERROR.getCode(), 建表失败请联系管理员);}}/** 获取数据源下所有表信息包括表名表字段数表创建时间** param datasourceCode* param tableNameLike* return/public static ListTableDto getTablesDetail(String datasourceCode, String tableNameLike, Integer start, Integer pageSize, String specifyTableName) {return DataSourceContext.getClientMap(datasourceCode).getTablesDetail(tableNameLike, start, pageSize, specifyTableName);}/** 获取表数量* param datasourceCode* param tableName* return/public static Long getTableCountSchema(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableCountSchema(tableName);}public static Integer getTableColumnCount(String dataSourceCode, String tableName) {return DataSourceContext.getClientMap(dataSourceCode).getTableColumnCount(tableName);}public static Integer getPreTableColumnCount(String dataSourceCode, String tableName) {return DataSourceContext.getClientMap(dataSourceCode).getPreTableColumnCount(tableName);}/** 获取符号* return/public static String getSymbol(String datasourceCode) {return DataSourceContext.getClientMap(datasourceCode).getSymbol();}}import lombok.extern.slf4j.Slf4j; import org.reflections.Reflections; import java.lang.reflect.Modifier; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock;Slf4j public class ReflectionUtils {private static final MapString, Set? clazzMap new ConcurrentHashMap();private static final ReentrantLock clazzLock new ReentrantLock();/** 通过反射获取接口/抽象类的所有实现类* 通过缓存类信息减少查找时间* 接口与抽象类必须放在实现类的同级目录或者父目录/SuppressWarnings(unchecked)public static T SetClass? extends T getReflections(ClassT clazz) {if (clazzMap.containsKey(clazz.getName())) {return (SetClass? extends T) clazzMap.get(clazz.getName());}try {clazzLock.lock();if (clazzMap.containsKey(clazz.getName())) {return (SetClass? extends T) clazzMap.get(clazz.getName());}Reflections reflections new Reflections(clazz.getPackage().getName());SetClass? extends T subTypesOf reflections.getSubTypesOf(clazz);clazzMap.put(clazz.getName(), subTypesOf);return subTypesOf;} catch (Exception e) {log.error(getReflections error, e);} finally {clazzLock.unlock();}return new HashSet();}/** 通过反射获取新对象* param type type* param methodName methodName* param clazz clazz* return T/public static T T getInstance(String type, String methodName, ClassT clazz) {SetClass? extends T set getReflections(clazz);for (Class? extends T t : set) {try {//排除抽象类if (Modifier.isAbstract(t.getModifiers())) {continue;}Object obj t.getMethod(methodName).invoke(t.newInstance());if (type.equalsIgnoreCase(obj.toString())) {return t.newInstance();}} catch (Exception e) {log.error(getInstance error, e);}}throw new RuntimeException(implement class not exist);}/** 通过反射获取新对象* param type type* param methodName methodName* param clazz clazz* return T/public static T T getInstanceFromCache(String type, String methodName, ClassT clazz) {return getInstance(type, methodName, clazz);}}client客户接口端适配多种数据源 import com.ws.websocket.entity.DatasourceInfo;/** Description:* User: wangsheng* Date: 2022-12-30* Time: 10:31/ public interface IClient {/** 连接数据源** param dataSourceInfo 数据源信息* return {link IClient}/IClient open(DatasourceInfo dataSourceInfo);/** 关闭数据源/void close();/** 驱动类型** return/String driverName();/** 数据源类型** return {link String}/String type();/** 测试数据源** param datasourceInfo* return/boolean testSource(DatasourceInfo datasourceInfo);} import com.ws.websocket.entity.DatasourceInfo; //公共查询 public abstract class AbsClient implements IClient {protected DatasourceInfo datasourceInfo; }package com.ws.websocket.util; import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.fastjson.JSONObject; import com.ws.websocket.entity.DatasourceInfo; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.Properties;Slf4j public abstract class AbsRdbmsClient extends AbsClient {protected DruidDataSource druidDataSource;Overridepublic IClient open(DatasourceInfo datasourceInfo) {RdbmsConfig rdbmsConfig JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);DruidDataSource druidDataSource new DruidDataSource();druidDataSource.setInitialSize(5);druidDataSource.setMinIdle(30);druidDataSource.setMaxActive(300);druidDataSource.setMaxWait(10000);druidDataSource.setBreakAfterAcquireFailure(true);// 跳出重试循环druidDataSource.setConnectionErrorRetryAttempts(3);// 重试三次druidDataSource.setTimeBetweenConnectErrorMillis(3000);druidDataSource.setLoginTimeout(3);druidDataSource.setUrl(rdbmsConfig.getJdbcUrl());druidDataSource.setDriverClassName(driverName());druidDataSource.setUsername(rdbmsConfig.getUsername());//解密// druidDataSource.setPassword(RsaUtils.decode(rdbmsConfig.getPassword()));druidDataSource.setPassword(rdbmsConfig.getPassword());// 设置 MetaUtil 工具类所需参数Properties properties new Properties();properties.put(remarks, true);properties.put(useInformationSchema, true);druidDataSource.setConnectProperties(properties);this.druidDataSource druidDataSource;this.datasourceInfo datasourceInfo;return this;}Overridepublic void close() {druidDataSource.close();}Overridepublic boolean testSource(DatasourceInfo datasourceInfo) {Connection connection null;try {Class.forName(driverName());String linkJson datasourceInfo.getLinkJson();RdbmsConfig rdbmsConfig JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);connection DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getPassword());// 有效if (connection.isValid(3)) {return true;} else {return false;}} catch (SQLException e) {log.error(数据源测试失败, e);return false;} catch (ClassNotFoundException e) {log.error(未找到驱动信息{}, driverName());return false;} finally {if (connection ! null) {try {connection.close();} catch (SQLException ex) {ex.printStackTrace();}}}}Dataclass RdbmsConfig {private String jdbcUrl;private String username;private String password;public void setSSL() {String lowerCase this.jdbcUrl.toLowerCase();if (!lowerCase.contains(usessl)) {if (this.jdbcUrl.contains(?)) {this.jdbcUrl this.jdbcUrl useSSLfalse;} else {this.jdbcUrl this.jdbcUrl ?useSSLfalse;}}}} }import com.alibaba.fastjson.JSONObject; import com.ws.websocket.entity.DatasourceInfo; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils;Slf4j public class DmClient extends AbsRdbmsClient {private String schema;Overridepublic String type() {return DMDB;}Overridepublic String driverName() {return dm.jdbc.driver.DmDriver;}Overridepublic IClient open(DatasourceInfo datasourceInfo) {RdbmsConfig commonLinkParams JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);this.schema StringUtils.isNotBlank(datasourceInfo.getSchemaName()) ? datasourceInfo.getSchemaName() : commonLinkParams.getUsername().toUpperCase();datasourceInfo.setSchemaName(schema);return super.open(datasourceInfo);}Overridepublic void close() {}Overridepublic boolean testSource(DatasourceInfo datasourceInfo) {return false;} }4.创建redis订阅数据源操作频道配置 import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer;/** Author: wangsheng* Data: 2022/8/16 16:40/ Slf4j Configuration public class RedisListenerConfig {/** 订阅数据源操作频道** param connectionFactory connectionFactory* param dataSourceMonitor 数据源监视器* return RedisMessageListenerContainer/BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,DataSourceMonitor dataSourceMonitor){RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(dataSourceMonitor, new PatternTopic(DATASOURCE_CHANNEL));log.info(dataSourceMonitor.getClass().getName() 订阅频道 {}, DATASOURCE_CHANNEL);return container;} } 5.redis监听数据源操作 import com.alibaba.fastjson.JSONObject; import com.ws.websocket.entity.DatasourceInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets;/** Description: redis监听数据源操作* User: wangsheng* Date: 2022-08-12* Time: 17:07/ Slf4j Component public class DataSourceMonitor implements MessageListener {Overridepublic void onMessage(Message message, byte[] bytes) {JSONObject box JSONObject.parseObject(new String(message.getBody(), StandardCharsets.UTF_8));String operation box.getString(key);if (SAVE_OR_UPDATE.equals(operation)) {// 更新 DataSourceContextDatasourceInfo datasourceInfo box.getObject(value, DatasourceInfo.class);if (datasourceInfo.getType().equals(0)) {String datasourceCode datasourceInfo.getDatasourceCode();DataSourceContext.setConfigMap(datasourceCode, datasourceInfo);DataSourceContext.setClientMap(datasourceInfo);log.info(redis 监听到数据源 {} 新增或更新更新 DataSourceContext 完成, datasourceCode);}} else {String datasourceCode box.getString(value);// 更新 DataSourceContextDataSourceContext.removeConfigMap(datasourceCode);DataSourceContext.removeClientMap(datasourceCode);log.info(redis 监听到数据源 {} 删除更新 DataSourceContext 完成, datasourceCode);}}} 6.创建AOP自动监听数据源变化 import com.alibaba.fastjson.JSONObject; import com.ws.websocket.entity.DatasourceInfo; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.Aspect; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map;/** Author: wangsheng* Data: 2022/8/15 16:37/ Slf4j Aspect Component public class DatasourceAspect {Resourceprivate StringRedisTemplate stringRedisTemplate;/** 新增或编辑数据源时发布 Redis 消息/AfterReturning(value execution( com.ws.service.DatasourceInfoService.saveOrUpdateDatasourceInfo(..)), returning datasourceInfo)public void saveOrUpdate(JoinPoint joinPoint, DatasourceInfo datasourceInfo) {HashMapString, Object box new HashMap(4);box.put(key, SAVE_OR_UPDATE);box.put(value, datasourceInfo);// 发布 Redis 消息stringRedisTemplate.convertAndSend(DATASOURCE_CHANNEL,JSONObject.toJSONString(box));log.info(新增或更新数据源 {} 方法切面发布 Redis 消息完成, datasourceInfo.getDatasourceCode());}/*** 删除数据源时发布 Redis 消息/AfterReturning(value execution( com.ws.service.DatasourceInfoService.deleteDatasourceInfo(..)), returning datasourceCode)public void delete(JoinPoint joinPoint, String datasourceCode) {MapString, Object box new HashMap(4);box.put(key, DELETE);box.put(value, datasourceCode);// 发布 Redis 消息stringRedisTemplate.convertAndSend(DATASOURCE_CHANNEL, JSONObject.toJSONString(box));log.info(删除数据源 {} 方法切面发布Redis消息完成, datasourceCode);} } 这样就解决了数据源连接信息自动加载更新同步的问题但还是有个问题当数据源重启后缓存的连接信息会失效且AOP无法监听到数据源重启变动这个时候还需要一个定时任务对数据源进行连接测试如果失效则重新连接缓存上。 7.创建定时任务 import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.ws.websocket.entity.DatasourceInfo; import com.ws.websocket.service.DatasourceInfoService; import com.ws.websocket.util.DBTool; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.HashMap; import java.util.List;Component RequiredArgsConstructor Slf4j public class DataSourceRetryConnectSchedule {Resourceprivate DatasourceInfoService datasourceInfoService;Resourceprivate StringRedisTemplate stringRedisTemplate;//每2小时执行一次Scheduled(cron 0 0 */2 * * ?)public void RetryConnect() {log.info(开始监测数据源连接);QueryWrapperDatasourceInfo queryWrapper new QueryWrapper();queryWrapper.eq(type, 0);ListDatasourceInfo list datasourceInfoService.list(queryWrapper);if (CollectionUtils.isEmpty(list)) {return;}for (DatasourceInfo datasourceInfo : list) {Boolean bb DBTool.testSource(datasourceInfo);if (!bb) {log.info(数据源重连{}datasourceInfo.getDatasourceName());HashMapString, Object box new HashMap(4);box.put(key, SAVE_OR_UPDATE);box.put(value, datasourceInfo);// 发布 Redis 消息stringRedisTemplate.convertAndSend(DATASOURCE_CHANNEL, JSONObject.toJSONString(box));}}} }
相关文章
-
网站建设编写代码出错Wordpress建站用什么系统
网站建设编写代码出错Wordpress建站用什么系统
- 技术栈
- 2026年04月20日
-
网站建设编辑叫什么岗位好的做网站的
网站建设编辑叫什么岗位好的做网站的
- 技术栈
- 2026年04月20日
-
网站建设编程怎么写余姚做网站的公司
网站建设编程怎么写余姚做网站的公司
- 技术栈
- 2026年04月20日
-
网站建设表格的属性建个网站需要投资多少钱
网站建设表格的属性建个网站需要投资多少钱
- 技术栈
- 2026年04月20日
-
网站建设财务分析苏州工业园区做政务网站的公司
网站建设财务分析苏州工业园区做政务网站的公司
- 技术栈
- 2026年04月20日
-
网站建设参考的文献网站开发学什么语音
网站建设参考的文献网站开发学什么语音
- 技术栈
- 2026年04月20日
