网站建设供需专业的移动客户端网站建设

当前位置: 首页 > news >正文

网站建设供需,专业的移动客户端网站建设,网站开发以图片为背景,网站开发技术语言RabbitMQ路由模式前言RabbitMQ模式的基本概念为什么要使用Rabbitmq 路由模式RabbitMQ路由模式组成元素路由模式完整代码Pom文件引入RabbtiMQ依赖RabbitMQ工具类生产者消费者1消费者2运行结果截图前言 通过本篇博客能够简单使用RabbitMQ的路由模式。 本篇博客主要是博主通过官网… RabbitMQ路由模式前言RabbitMQ模式的基本概念为什么要使用Rabbitmq 路由模式RabbitMQ路由模式组成元素路由模式完整代码Pom文件引入RabbtiMQ依赖RabbitMQ工具类生产者消费者1消费者2运行结果截图前言 通过本篇博客能够简单使用RabbitMQ的路由模式。 本篇博客主要是博主通过官网以及学习他人的博客总结出的RabbitMQ发布订阅模式。其中如果有误欢迎大家及时指正。 RabbitMQ模式的基本概念 路由模式是根据Routing Key有条件的将消息筛选后发送给消费者消费者只接受筛选之后的消息 路由模式的核心是 配置一个类型为direct的交换机并且需要指定不同的路由键(routing key)把对应的消息从交换机路由到不同的消息队列进行存储再由对应的消费者进行消费 为什么要使用Rabbitmq 路由模式 由于发布订阅模式是无条件将所有消息分发给所有消费者路由模式可以根据条件Routing Key将消息筛选之后发送给消费者。 应用场景 例如有一个股票分析机构每天都会有一些独家的股票分析报告。对于其他一些应用平台想要每天都到这家股票分析机构提供的百度的独家股票分析报告对于另外一些应用平台想要收到谷歌的独家股票分析报告就可以使用路由模式。 RabbitMQ路由模式组成元素 P生产者向交换机发送消息的是否需要指定routing key X交换机接收生产者发送的消息需要指定交换机的类型为direct并且将消息发送给与routing key匹配的队列 C1消费者1它所在队列指定了需要routing key为error的信息 C2消费者2其所在队列指定了需要routing key 为 info、error、warning 的消息 路由模式完整代码 业务场景生产者为日志分发平台分发info、warning、error级别的日志消费者1只接受日志级别为error的日志消费者2接收全部日志。 Pom文件引入RabbtiMQ依赖 dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.10.0/version/dependencyRabbitMQ工具类 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** author : [WangWei]* version : [v1.0]* className : RabbitMQUtils* description : [rabbitmq工具类]* createTime : [2023/1/17 8:49]* updateUser : [WangWei]* updateTime : [2023/1/17 8:49]* updateRemark : [描述说明本次修改内容]/ public class RabbitMQUtils {/** version V1.0 Title: getConnection* author Wangwei* description 创建rabbitmq连接* createTime 2023/1/17 8:52* param []* return com.rabbitmq.client.Connection/public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(ip);factory.setPort(5672);factory.setVirtualHost(虚拟主机);factory.setUsername(用户名);factory.setPassword(密码);//创建连接Connection connectionfactory.newConnection();return connection;}/** version V1.0 Title: getChannel* author Wangwei* description 创建信道* createTime 2023/1/17 8:55* param []* return com.rabbitmq.client.Channel/public static Channel getChannel() throws IOException, TimeoutException {Connection connectiongetConnection();Channel channelconnection.createChannel();return channel;} } 生产者 import com.rabbitmq.client.Channel;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;/** author : [WangWei]* version : [v1.0]* className : Producer* description : [生产者]* createTime : [2023/2/1 9:38]* updateUser : [WangWei]* updateTime : [2023/2/1 9:38]* updateRemark : [描述说明本次修改内容]*/ public class Producer {private static final String EXCHANGE_NAME direct_logs;public static void main(String[] args) throws IOException, TimeoutException {//建立连接RabbitMQUtils.getConnection();//声明通道Channel channel RabbitMQUtils.getChannel();//创建fanout类型交换机并命名为logschannel.exchangeDeclare(EXCHANGE_NAME,direct);//声明routingKeyString severityInfoinfo;String severityErrorerror;String severityWarningwarning;//循环发送2条消息for (int i 0; i 2 ; i) {String msg路由模式infoi;/*推送消息交换机命名不填写使用默认的交换机 routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish(EXCHANGE_NAME,severityInfo,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}//循环发送2条消息for (int i 0; i 2 ; i) {String msg路由模式errori;/*推送消息交换机命名不填写使用默认的交换机 routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish(EXCHANGE_NAME,severityError,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}//循环发送2条消息for (int i 0; i 2 ; i) {String msg路由模式warningi;/*推送消息交换机命名不填写使用默认的交换机 routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文/channel.basicPublish(EXCHANGE_NAME,severityWarning,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}} } 消费者1 import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;import java.io.IOException; import java.util.concurrent.TimeoutException;/** author : [WangWei]* version : [v1.0]* className : ConsumerOne* description : [消费者1]* createTime : [2023/2/1 9:39]* updateUser : [WangWei]* updateTime : [2023/2/1 9:39]* updateRemark : [描述说明本次修改内容]/ public class ConsumerOne {private static final String EXCHANGE_NAME direct_logs;public static void main(String[] args) throws IOException, TimeoutException {RabbitMQUtils.getConnection();Channel channel RabbitMQUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,direct);String queueName channel.queueDeclare().getQueue();//声明routingKey error)String severityErrorerror;//交换机与队列进行绑定-如果没有队列与交换机进行绑定那么消费者接受不到生产者的消息消息会丢失//queueName绑定了direct_logs交换机并且绑定了routingKeychannel.queueBind(queueName, EXCHANGE_NAME,severityError );//因为Rabbitmq服务器将异步地向我们推送消息所以我们以对象的形式提供了一个回调该回调将缓冲消息直到我们准备好使用它们。DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });} } 消费者2 import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;import java.io.IOException; import java.util.concurrent.TimeoutException;/** author : [WangWei]* version : [v1.0]* className : ConsumerTwo* description : [消费者2]* createTime : [2023/2/1 9:38]* updateUser : [WangWei]* updateTime : [2023/2/1 9:38]* updateRemark : [描述说明本次修改内容]*/ public class ConsumerTwo {private static final String EXCHANGE_NAME direct_logs;public static void main(String[] args) throws IOException, TimeoutException {RabbitMQUtils.getConnection();Channel channel RabbitMQUtils.getChannel();//创建fanout类型交换机并命名为logschannel.exchangeDeclare(EXCHANGE_NAME,direct);//创建了一个非持久的、排他的、自动删除的队列并生成了一个名称String queueName channel.queueDeclare().getQueue();//声明routingKey info,error,warning)String severityInfoinfo;String severityErrorerror;String severityWarningwarning;//交换机与队列进行绑定-如果没有队列与交换机进行绑定那么消费者接受不到生产者的消息消息会丢失//queueName绑定了direct_logs交换机并且绑定了3个routingKeychannel.queueBind(queueName, EXCHANGE_NAME,severityInfo );channel.queueBind(queueName, EXCHANGE_NAME,severityError );channel.queueBind(queueName, EXCHANGE_NAME,severityWarning );//因为Rabbitmq服务器将异步地向我们推送消息所以我们以对象的形式提供了一个回调该回调将缓冲消息直到我们准备好使用它们。DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}} 运行结果截图