网站访问代理在线设计网页三大工具

当前位置: 首页 > news >正文

网站访问代理在线,设计网页三大工具,今天广西紧急通知最新,网站建设技术概述一、前言 本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成#xff0c;比如动态新增 RabbitMQ 交换机、队列等操作。二、默认RabbitMQ中的exchange、queue动态新增及监听 1、新增RabbitMQ配置 RabbitMQConfig.java import org.springframework.amqp.rabbit.a…一、前言 本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成比如动态新增 RabbitMQ 交换机、队列等操作。二、默认RabbitMQ中的exchange、queue动态新增及监听 1、新增RabbitMQ配置 RabbitMQConfig.java import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** className: RabbitConfig* program: chain* description: RabbitMQ 配置类* author: kenny* create: 2024-10-03 21:59* version: 1.0.0/ Configuration EnableRabbit public class RabbitMQConfig {/** 创建 RabbitTemplate, 用于发送消息** return RabbitTemplate/Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate();}/** 创建 RabbitAdmin, 用于创建 Exchange 和 Queue** param rabbitTemplate RabbitTemplate* return RabbitAdmin/Beanpublic RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {return new RabbitAdmin(rabbitTemplate);} }2、新增RabbitMQ动态操作组件 RabbitDynamicConfigService.java RabbitDynamicConfigService.java 中包含了不同类型Exchange的创建、删除Queue的创建和删除、绑定Exchangeimport lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;import java.util.Map;/*** className: RabbitDynamicConfigService* program: chain* description: 动态创建队列和交换机* author: kenny* create: 2024-10-03 23:49* version: 1.0.0/ Slf4j Service public class RabbitDynamicConfigService {/** 为了解决循环依赖问题/private final RabbitAdmin rabbitAdmin;private final RabbitListenerService rabbitListenerService;Autowiredpublic RabbitDynamicConfigService(RabbitAdmin rabbitAdmin,RabbitListenerService rabbitListenerService) {this.rabbitAdmin rabbitAdmin;this.rabbitListenerService rabbitListenerService;}/** 动态创建队列并持久化** param queueName 队列名称/public void createQueue(String queueName) {// 队列持久化Queue queue new Queue(queueName, true);// 创建队列rabbitAdmin.declareQueue(queue);System.out.println(队列创建成功: queueName);}/** 动态创建队列并持久化** param queueName 队列名称/public void createQueue(String queueName, Boolean isListener) {// 队列持久化Queue queue new Queue(queueName, true);// 创建队列rabbitAdmin.declareQueue(queue);System.out.println(队列创建成功: queueName);if (!isListener) {return;}rabbitListenerService.createListener(queueName);}/** 动态创建交换机并持久化** param exchangeName 交换机名称/public void createExchange(String exchangeName) {// 交换机持久化DirectExchange exchange new DirectExchange(exchangeName, true, false);rabbitAdmin.declareExchange(exchange);log.info(交换机创建成功: {}, exchangeName);}// 动态创建 Fanout 交换机public void createDirectExchange(String exchangeName) {DirectExchange fanoutExchange new DirectExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(fanoutExchange);log.info(Direct 交换机创建成功: {}, exchangeName);}// 动态创建 Fanout 交换机public void createFanoutExchange(String exchangeName) {FanoutExchange fanoutExchange new FanoutExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(fanoutExchange);log.info(Fanout 交换机创建成功: {}, exchangeName);}// 动态创建 Topic 交换机public void createTopicExchange(String exchangeName) {TopicExchange topicExchange new TopicExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(topicExchange);log.info(Topic 交换机创建成功: {}, exchangeName);}// 动态创建 Headers 交换机public void createHeadersExchange(String exchangeName) {HeadersExchange headersExchange new HeadersExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(headersExchange);log.info(Headers 交换机创建成功: {}, exchangeName);}/** 动态绑定队列到交换机并指定路由键** param queueName 队列名称* param exchangeName 交换机名称* param routingKey 路由键/public void bindQueueToExchange(String queueName, String exchangeName, String routingKey) {Queue queue new Queue(queueName);DirectExchange exchange new DirectExchange(exchangeName);Binding binding BindingBuilder.bind(queue).to(exchange).with(routingKey);rabbitAdmin.declareBinding(binding);log.info(绑定创建成功: {}, queueName - {}, exchangeName 使用路由键: {}, routingKey);}/** 动态绑定队列到交换机并指定路由键** param queueName 队列名称* param exchangeName 交换机名称* param routingKey 路由键/public void moreExchangeTypeBindQueueToExchange(String queueName, String exchangeType, String exchangeName, String routingKey, MapString, Object headers) {switch (exchangeType) {case fanout - bindQueueToExchange(queueName, exchangeName, routingKey);case direct - bindQueueToDirectExchange(queueName, exchangeName, routingKey);case topic - bindQueueToTopicExchange(queueName, exchangeName, routingKey);case headers - bindQueueToHeadersExchange(queueName, exchangeName, headers);default - throw new IllegalArgumentException(不支持的交换机类型: exchangeType);}}/** 动态绑定队列到交换机并指定路由键exchange: direct** param queueName 队列名称* param exchangeName 交换机名称/public void bindQueueToFanoutExchange(String queueName, String exchangeName) {Queue queue new Queue(queueName);FanoutExchange exchange new FanoutExchange(exchangeName);Binding binding BindingBuilder.bind(queue).to(exchange);rabbitAdmin.declareBinding(binding);log.info(绑定创建成功: {}, queueName - {}, exchangeName);}/** 动态绑定队列到交换机并指定路由键exchange: direct** param queueName 队列名称* param exchangeName 交换机名称* param routingKey 路由键/public void bindQueueToDirectExchange(String queueName, String exchangeName, String routingKey) {Queue queue new Queue(queueName);DirectExchange exchange new DirectExchange(exchangeName);Binding binding BindingBuilder.bind(queue).to(exchange).with(routingKey);rabbitAdmin.declareBinding(binding);log.info(绑定创建成功: {}, queueName - {}, exchangeName 使用路由键: {}, routingKey);}/** 动态绑定队列到交换机并指定路由键exchange: topic** param queueName 队列名称* param exchangeName 交换机名称* param routingKey 路由键/public void bindQueueToTopicExchange(String queueName, String exchangeName, String routingKey) {Queue queue new Queue(queueName);TopicExchange exchange new TopicExchange(exchangeName);Binding binding BindingBuilder.bind(queue).to(exchange).with(routingKey);rabbitAdmin.declareBinding(binding);log.info(绑定创建成功: {}, queueName - {}, exchangeName 使用路由键: {}, routingKey);}/** 动态绑定队列到交换机并指定路由键exchange: headers** param queueName 队列名称* param exchangeName 交换机名称* param headers 路由键/public void bindQueueToHeadersExchange(String queueName, String exchangeName, MapString, Object headers) {Queue queue new Queue(queueName);HeadersExchange exchange new HeadersExchange(exchangeName);Binding binding BindingBuilder.bind(queue).to(exchange).whereAll(headers).match();rabbitAdmin.declareBinding(binding);log.info(队列 {}, queueName 已绑定到 Headers 交换机 {}, exchangeName 使用头部匹配规则: {}, headers);}/** 动态删除队列** param queueName 队列名称/public void deleteQueue(String queueName) {rabbitAdmin.deleteQueue(queueName);log.info(队列删除成功: {}, queueName);}/** 动态删除交换机** param exchangeName 交换机名称/public void deleteExchange(String exchangeName) {rabbitAdmin.deleteExchange(exchangeName);log.info(交换机删除成功: {}, exchangeName);} }3、RabbitMQ中队列的动态监听 RabbitListenerService.java import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;/** className: RabbitListenerService* program: chain* description: RabbitMQ监听器Service组件* author: kenny* create: 2024-10-04 01:40* version: 1.0.0/ Slf4j Service public class RabbitListenerService {// 为了解决循环依赖问题private final SimpleRabbitListenerContainerFactory listenerContainerFactory;private final ConnectionFactory connectionFactory;Autowiredpublic RabbitListenerService(SimpleRabbitListenerContainerFactory listenerContainerFactory,ConnectionFactory connectionFactory) {this.listenerContainerFactory listenerContainerFactory;this.connectionFactory connectionFactory;}/** 创建监听器容器并启动监听** param queueName 队列名称/public void createListener(String queueName) {// 创建并启动监听器容器SimpleMessageListenerContainer container listenerContainerFactory.createListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(queueName);// 监听逻辑处理container.setMessageListener(new MessageListenerAdapter(new Object() {public void handleMessage(String message) {System.out.println(收到来自RabbitMQ中队列 queueName 队列的消息 message);}}));// 启动监听器容器container.start();System.out.println(RabbitMQ队列监听器已启动 queueName);} }4、RabbitMQ中的Exchange、Queue动态操作接口 RabbitDynamicChannelController.java import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService; import jakarta.annotation.Resource; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import java.util.HashMap; import java.util.Map;/** className: RabbitDynamicController* program: chain* description: RabbitMQ 动态创建队列、交换机绑定等操作* author: kenny* create: 2024-10-04 00:22* version: 1.0.0/ RestController RequestMapping(/rabbit/dynamic/channel) public class RabbitDynamicChannelController {/** 动态创建队列和交换机/Resourceprivate RabbitDynamicConfigService rabbitDynamicConfigService;/** 动态创建队列** param queueName 队列名称* return 处理结果/GetMapping(/createQueue)public String createQueue(RequestParam(queueName) String queueName) {rabbitDynamicConfigService.createQueue(queueName);return 队列已创建: queueName;}/** 动态创建交换机** param exchangeName 交换机名称* return 处理结果/GetMapping(/createExchange)public String createExchange(RequestParam(exchangeName) String exchangeName) {rabbitDynamicConfigService.createExchange(exchangeName);return 交换机已创建: exchangeName;}/** 动态绑定队列和交换机** param queueName 队列名称* param exchangeName 交换机名称* param routingKey 路由键* return 处理结果/GetMapping(/bindQueue)public String bindQueueToExchange(RequestParam(queueName) String queueName,RequestParam(exchangeName) String exchangeName,RequestParam(routingKey) String routingKey) {rabbitDynamicConfigService.bindQueueToExchange(queueName, exchangeName, routingKey);return 队列和交换机已绑定: queueName - exchangeName;}/** 动态删除队列** param queueName 队列名称* return 处理结果/GetMapping(/deleteQueue)public String deleteQueue(RequestParam(queueName) String queueName) {rabbitDynamicConfigService.deleteQueue(queueName);return 队列已删除: queueName;}/** 动态删除交换机** param exchangeName 交换机名称* return 处理结果/GetMapping(/deleteExchange)public String deleteExchange(RequestParam(exchangeName) String exchangeName) {rabbitDynamicConfigService.deleteExchange(exchangeName);return 交换机已删除: exchangeName;}// 创建并绑定 Fanout 交换机GetMapping(/createDirectExchange)public String createDirectExchange(RequestParam String exchangeName, RequestParam String queueName, RequestParam String routingKey) {rabbitDynamicConfigService.createDirectExchange(exchangeName);rabbitDynamicConfigService.bindQueueToDirectExchange(queueName, exchangeName, routingKey);return Fanout Exchange and Queue Binding created: exchangeName - queueName with routing key: routingKey;}// 创建并绑定 Fanout 交换机GetMapping(/createFanoutExchange)public String createFanoutExchange(RequestParam String exchangeName, RequestParam String queueName) {rabbitDynamicConfigService.createFanoutExchange(exchangeName);rabbitDynamicConfigService.bindQueueToFanoutExchange(queueName, exchangeName);return Fanout Exchange and Queue Binding created: exchangeName - queueName;}// 创建并绑定 Topic 交换机GetMapping(/createTopicExchange)public String createTopicExchange(RequestParam String exchangeName, RequestParam String queueName, RequestParam String routingKey) {rabbitDynamicConfigService.createTopicExchange(exchangeName);rabbitDynamicConfigService.bindQueueToTopicExchange(queueName, exchangeName, routingKey);return Topic Exchange and Queue Binding created: exchangeName - queueName with routing key: routingKey;}// 创建并绑定 Headers 交换机GetMapping(/createHeadersExchange)public String createHeadersExchange(RequestParam String exchangeName, RequestParam String queueName, RequestParam MapString, String headersMap) {MapString, Object headers new HashMap(headersMap);rabbitDynamicConfigService.createHeadersExchange(exchangeName);rabbitDynamicConfigService.bindQueueToHeadersExchange(queueName, exchangeName, headers);return Headers Exchange and Queue Binding created: exchangeName - queueName with headers: headers;} }5、RabbitMQ中的Queue消息监听动态操作接口 RabbitChannelListenerController.java import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService; import jakarta.annotation.Resource; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;/** className: RabbitListenerController* program: chain* description: RabbitMQ 监听器 Controller 组件* author: kenny* create: 2024-10-04 01:30* version: 1.0.0/ RestController RequestMapping(/rabbit/channel/listener) public class RabbitChannelListenerController {Resourceprivate RabbitDynamicConfigService rabbitDynamicConfigService;/** 创建监听器监听指定队列** param queueName 队列名称* return 处理结果*/GetMapping(/queue)public String listenQueue(RequestParam(queueName) String queueName) {rabbitDynamicConfigService.createQueue(queueName, true);return 开始监听队列 queueName;} }三、动态exchange、queue的测试 1、测试Exchange、Queue的动态创建和删除 2、测试Exchange和Queue的动态绑定 3、发送、接收消息测试动态创建Exchange、Queue 4、测试Queue的动态监听接口 下一篇7、Spring Boot 3.x集成RabbitMQ动态实例等操作