大型科技网站我自己的网站 怎样做防火墙
- 作者: 五速梦信息网
- 时间: 2026年03月21日 11:28
当前位置: 首页 > news >正文
大型科技网站,我自己的网站 怎样做防火墙,宁波seo资源,wordpress域名地址设置目录 一、RabbitMQ 初相识 二、基础概念速览 #xff08;一#xff09;消息队列是什么 #xff08;二#xff09;RabbitMQ 核心组件 三、RabbitMQ 基本使用 #xff08;一#xff09;安装与环境搭建 #xff08;二#xff09;简单示例 #xff08;三#xff09;…目录 一、RabbitMQ 初相识 二、基础概念速览 一消息队列是什么 二RabbitMQ 核心组件 三、RabbitMQ 基本使用 一安装与环境搭建 二简单示例 三工作队列模式Work Queue 四交换机类型详解 四、RabbitMQ 高级用法 一消息可靠性投递 二死信队列Dead Letter Queue 三延迟队列Delay Queue 四优先级队列Priority Queue 一、RabbitMQ 初相识 在当今分布式系统大行其道的技术领域中RabbitMQ 宛如一颗璀璨的明星占据着举足轻重的地位。它是一款开源的消息代理软件犹如一座桥梁在不同的应用程序之间搭建起高效通信的通道。 RabbitMQ 基于高级消息队列协议AMQP实现了生产者与消费者之间的解耦让应用程序能够更加专注于自身的业务逻辑而无需过多担忧消息传递的复杂细节。它能够高效地处理大量的消息无论是高并发的互联网应用还是对数据一致性要求极高的金融系统RabbitMQ 都能凭借其出色的性能和可靠性为系统的稳定运行提供坚实保障。 对于咱们程序员来说掌握 RabbitMQ 的使用方法无疑是为自己的技术栈增添了一件强大的武器。它不仅能够帮助我们解决分布式系统中的消息传递难题还能极大地提升系统的性能和可扩展性。接下来就让我们一起深入探索 RabbitMQ 的奥秘从基础用法到高级技巧逐步揭开它神秘的面纱。 二、基础概念速览 一消息队列是什么 消息队列从字面意义理解就是一个存放消息的队列。在计算机系统中它是一种进程间通信或同一进程的不同线程间通信的方式用于在不同应用程序、服务或组件之间传递消息。其核心原理基于先进先出FIFO的顺序即先进入队列的消息会先被处理。 消息队列在系统中扮演着至关重要的角色有着多方面的作用。在应用间异步通信场景下比如电商系统中用户下单后订单信息可通过消息队列异步发送给库存系统、物流系统等进行后续处理此时生产者下单系统无需等待消费者库存、物流系统处理完成就能立即响应用户极大地提升了系统的响应速度 。 解耦方面以大型微服务架构为例各个微服务之间通过消息队列进行通信。当某个微服务进行升级或修改时只要消息格式不变就不会影响其他依赖它的微服务正常运行从而降低了系统间的耦合度提高了系统的可维护性和可扩展性。 在削峰填谷场景中在电商促销活动时短时间内会产生大量的订单请求。消息队列可以将这些请求暂存起来按照系统能够处理的速度逐步发送给后端服务进行处理避免因瞬间高并发流量压垮系统同时在低峰期又能处理之前积压的请求充分利用系统资源。 二RabbitMQ 核心组件 生产者Producer消息的发送方负责产生消息并将其发送到 RabbitMQ 服务器。在实际应用中比如一个订单生成系统当用户完成下单操作后该系统就作为生产者将订单相关的消息发送到 RabbitMQ这些消息可能包含订单编号、商品信息、用户信息等。 消费者Consumer消息的接收方从 RabbitMQ 服务器获取消息并进行相应的处理。接着上面订单的例子库存管理系统可以作为消费者从 RabbitMQ 中接收订单消息然后根据消息内容进行库存扣减等操作。 队列Queue消息的存储地它类似于一个缓冲区生产者发送的消息会被放入队列中等待处理。队列可以存储大量的消息并且支持持久化即使 RabbitMQ 服务器重启持久化队列中的消息也不会丢失。多个生产者可以向同一个队列发送消息同时多个消费者也可以从同一个队列中获取消息实现了消息的多对多传递。 交换机Exchange接收来自生产者的消息并根据路由规则将消息发送到一个或多个队列。RabbitMQ 提供了多种类型的交换机如直接交换机Direct Exchange、主题交换机Topic Exchange、扇出交换机Fanout Exchange和头交换机Headers Exchange 。 路由键Routing Key在消息发送过程中生产者会为每条消息指定一个路由键交换机根据这个路由键和自身的类型及绑定规则决定将消息发送到哪些队列。例如在直接交换机中如果路由键与队列绑定的键完全匹配消息就会被发送到对应的队列。 三、RabbitMQ 基本使用 一安装与环境搭建 RabbitMQ 的安装步骤会因操作系统的不同而有所差异。在 Windows 系统中 首先需前往 RabbitMQ 官网下载适合 Windows 的安装程序.exe 文件。鉴于 RabbitMQ 是基于 Erlang 开发的在安装 RabbitMQ 之前必须先安装对应的 Erlang 环境。安装完成后可通过开始菜单中的快捷方式启动 RabbitMQ 服务也能使用命令行工具在安装目录下的 sbin 文件夹中来启动和管理 RabbitMQ。 在 Linux 系统如 Ubuntu中打开终端执行 “sudo apt - get update” 命令更新系统软件包列表以确保系统的软件包索引是最新的进而正确安装 RabbitMQ 相关的软件包。接着执行 “sudo apt - get install erlang - base erlang - asn1 erlang - crypto erlang - ssl erlang - inets erlang - public - key erlang - syntax - tools” 命令安装 Erlang 环境。完成后执行 “sudo apt - get install rabbitmq - server” 命令安装 RabbitMQ Server安装过程中系统会自动下载并配置 RabbitMQ 服务 。 安装过程中有诸多注意事项。要特别留意 RabbitMQ 与 Erlang 版本的兼容性不同版本的 RabbitMQ 对 Erlang 版本有特定要求可在 RabbitMQ 官网查看版本对应关系。安装完成后建议修改默认的用户密码增强安全性并根据实际需求进行虚拟主机、用户权限等的配置。 二简单示例 以下通过代码示例展示生产者向队列发送消息消费者从队列接收消息的过程。以 Python 语言为例使用 pika 库来操作 RabbitMQ。 在生产者代码中首先建立到 RabbitMQ 服务器的连接代码如下 import pika# 建立到RabbitMQ服务器的连接connection pika.BlockingConnection(pika.ConnectionParameters(localhost))channel connection.channel() 接着声明一个队列若队列不存在则创建它。这里将队列命名为 hello并通过设置 durable True 来实现队列持久化这样在 RabbitMQ 服务器重启后队列和其中的消息不会丢失。代码如下
声明一个队列以便发送消息如果队列不存在则创建。这里的队列名是 hello。# 可以通过在queue_declare方法中设置durableTrue来实现队列持久化。这样在RabbitMQ服务器重启后队列和其中的消息不会丢失。channel.queue_declare(queuehello, durableTrue) 随后发布一条消息到名为 hello 的队列中代码如下
发布一条消息到名为 hello 的队列中。channel.basic_publish(exchange, routing_keyhello, bodyHello World!)
最后关闭连接代码如下
关闭连接connection.close() 在消费者代码中同样先建立到 RabbitMQ 服务器的连接代码如下
关闭连接connection.close() 在消费者代码中同样先建立到 RabbitMQ 服务器的连接代码如下
import pika# 建立到RabbitMQ服务器的连接connection pika.BlockingConnection(pika.ConnectionParameters(localhost))channel connection.channel() 然后声明一个队列以便从中接收消息这里的队列名同样为 hello代码如下
声明一个队列以便从中接收消息。channel.queue_declare(queuehello, durableTrue) 接着定义一个回调函数来处理接收到的消息在回调函数中将接收到的消息内容打印出来代码如下
定义一个回调函数来处理接收到的消息def callback(ch, method, properties, body):print(fReceived {body}) 最后告诉 RabbitMQ 使用上面定义的回调函数来接收来自 hello 队列的消息并开始接收消息进入永久循环等待消息并在需要时运行回调函数代码如下
告诉RabbitMQ使用上面定义的回调函数来接收来自 hello 队列的消息。# callback的参数都是由RabbitMQ自动提供的不需要手动传递它们。# 当消息到达队列并且basic_consume方法已经注册了回调函数时RabbitMQ会负责调用回调函数并传递相应的参数。channel.basic_consume(queuehello, on_message_callbackcallback, auto_ackTrue)# 开始接收消息并进入永久循环等待消息并在需要时运行回调函数。print(Waiting for messages. To exit press CTRLC)channel.start_consuming() 在上述代码中生产者和消费者都声明了名为 hello 的队列当生产者发送消息到该队列后消费者便能从队列中获取并处理消息。 三工作队列模式Work Queue
工作队列模式Work Queue是 RabbitMQ 中一种常用的模式。在这种模式下存在一个生产者和多个消费者生产者将消息发送到队列中多个消费者可以同时从队列中获取消息进行处理。但需要注意的是每条消息只会被一个消费者获取并处理 。
它的适用场景非常广泛。以电商订单处理系统为例在促销活动期间会产生大量的订单。此时可以将订单消息发送到工作队列中由多个订单处理服务实例消费者同时从队列中获取订单消息进行处理这样能大大提高订单处理的速度避免单个服务实例因处理大量订单而出现性能瓶颈。
在工作队列模式中RabbitMQ 默认采用轮询分发Round - robin的方式将消息分配给消费者。也就是说它会按照顺序依次将消息发送给每个消费者而不考虑消费者的处理能力。这种方式在某些情况下可能不太合理比如当某个消费者的处理速度较慢时会导致它积压大量的消息而其他处理速度快的消费者却处于空闲状态。
为了解决这个问题可以采用公平分发Fair dispatch的方式。实现公平分发需要在消费者端进行一些配置。在 Python 中使用 pika 库时可以通过设置 basic_qos 方法的参数来实现。例如设置 basic_qos (prefetch_count 1)表示每个消费者在处理完当前消息之前RabbitMQ 不会再给它发送新的消息从而确保每个消费者都能合理地获取消息避免出现消息分配不均的情况 。实现公平分发的示例代码如下 import pika# 建立到RabbitMQ服务器的连接connection pika.BlockingConnection(pika.ConnectionParameters(localhost))channel connection.channel()# 声明一个队列以便从中接收消息。channel.queue_declare(queuehello, durableTrue)# 设置每个消费者在处理完当前消息之前RabbitMQ不会再给它发送新的消息channel.basic_qos(prefetch_count 1)# 定义一个回调函数来处理接收到的消息def callback(ch, method, properties, body):print(fReceived {body})# 手动确认消息已被处理ch.basic_ack(delivery_tag method.delivery_tag)# 告诉RabbitMQ使用上面定义的回调函数来接收来自 hello 队列的消息。# 这里设置auto_ackFalse需要手动确认消息channel.basic_consume(queuehello, on_message_callbackcallback, auto_ackFalse)# 开始接收消息并进入永久循环等待消息并在需要时运行回调函数。print(Waiting for messages. To exit press CTRLC)channel.start_consuming() 在上述代码中通过设置 basic_qos (prefetch_count 1) 实现了公平分发并且将 basic_consume 的 auto_ack 参数设置为 False改为手动确认消息即消费者在处理完消息后通过调用 basic_ack 方法来告诉 RabbitMQ 消息已被处理这样 RabbitMQ 才会将该消息从队列中移除。 四交换机类型详解
直连交换机Direct Exchange直连交换机是 RabbitMQ 中最基础的交换机类型之一。它的工作机制相对简单会根据消息的路由键Routing Key将消息精确地发送到与之绑定的队列中。当一个队列与直连交换机绑定时会指定一个绑定键Binding Key。只有当消息的路由键与绑定键完全匹配时该消息才会被路由到对应的队列 。假设我们有一个订单处理系统其中有一个队列专门用于处理紧急订单我们可以将这个队列与直连交换机进行绑定并设置绑定键为 “urgent_order”。当生产者发送一条消息并且消息的路由键也设置为 “urgent_order” 时这条消息就会被直连交换机准确地发送到处理紧急订单的队列中。直连交换机适用于需要精确匹配路由键的场景能够确保消息被准确无误地投递到目标队列 。
扇形交换机Fanout Exchange扇形交换机的特点是不关心消息的路由键它会将接收到的所有消息广播到所有与它绑定的队列中。在实际应用中这种交换机常用于广播通知的场景。以一个新闻发布系统为例当有新的新闻发布时我们希望将这条新闻同时推送给多个不同的订阅者队列如手机端订阅者队列、PC 端订阅者队列、邮件订阅者队列等。此时我们可以使用扇形交换机将这些队列都与扇形交换机进行绑定。当生产者向扇形交换机发送新闻消息时无论消息的路由键是什么扇形交换机都会将该消息广播到所有绑定的队列中从而实现新闻的广泛传播 。
主题交换机Topic Exchange主题交换机是一种功能非常强大且灵活的交换机类型它支持通配符的路由规则。在主题交换机中路由键和绑定键都是由多个单词组成单词之间用点号.分隔。它支持两种通配符星号和井号#。星号表示匹配一个单词井号#表示匹配零个或多个单词。例如有一个日志处理系统我们可以创建多个队列分别用于处理不同级别的日志如 error 日志队列、warning 日志队列、info 日志队列等。然后将这些队列与主题交换机进行绑定并设置相应的绑定键。将 error 日志队列的绑定键设置为 “logs.error.*”表示匹配所有以 “logs.error.” 开头的路由键将 warning 日志队列的绑定键设置为 “logs.warning”将 info 日志队列的绑定键设置为 “logs.#”表示匹配所有以 “logs.” 开头的路由键。当生产者发送一条消息路由键为 “logs.error.database_connection_failed” 时根据主题交换机的通配符规则这条消息会被发送到 error 日志队列中若路由键为 “logs.warning.memory_low”则会被发送到 warning 日志队列中若路由键为 “logs.info.system_started”会被发送到 info 日志队列中 。主题交换机适用于需要根据消息的类别或特征进行灵活路由的复杂场景通过合理设置通配符可以实现非常精细的消息路由控制 。 四、RabbitMQ 高级用法 一消息可靠性投递
在分布式系统中消息的可靠投递至关重要。RabbitMQ 提供了多种机制来确保消息从生产者到消费者的可靠传输 。
确认模式Confirm 模式生产者可以通过开启 Confirm 模式来确认消息是否成功发送到交换机。在使用 RabbitMQ 的 Java 客户端时通过调用 channel.confirmSelect() 方法开启 Confirm 模式。当消息成功到达交换机后RabbitMQ 会发送一个确认信号给生产者。生产者可以通过 channel.waitForConfirms() 方法来同步等待确认也可以通过添加确认监听器 channel.addConfirmListener(ConfirmListener listener) 来异步处理确认消息。以异步处理为例示例代码如下
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();// 开启Confirm模式channel.confirmSelect();// 添加确认监听器channel.addConfirmListener(new ConfirmListener() {Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(消息发送成功deliveryTag: deliveryTag , multiple: multiple);}Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println(消息发送失败deliveryTag: deliveryTag , multiple: multiple);}});String exchangeName ;String routingKey test;String message Hello, RabbitMQ!;channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 关闭资源channel.close();connection.close();}
}
在上述代码中通过 channel.addConfirmListener 添加了确认监听器在 handleAck 方法中处理消息成功发送到交换机的情况在 handleNack 方法中处理消息发送失败的情况 。
退回模式Return 模式当交换机无法将消息路由到队列时默认情况下消息会被丢弃。但通过开启 Return 模式可以让交换机将无法路由的消息退回给生产者。在 Java 客户端中需要设置 channel.basicPublish 方法的 mandatory 参数为 true并添加 ReturnListener 监听器来处理退回的消息。示例代码如下 import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();// 开启Confirm模式channel.confirmSelect();// 添加确认监听器channel.addConfirmListener(new ConfirmListener() {Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(消息发送成功deliveryTag: deliveryTag , multiple: multiple);}Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println(消息发送失败deliveryTag: deliveryTag , multiple: multiple);}});String exchangeName ;String routingKey test;String message Hello, RabbitMQ!;channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 关闭资源channel.close();connection.close();}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();// 添加Return监听器channel.addReturnListener(new ReturnListener() {Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, byte[] body) throws IOException {System.out.println(消息被退回replyCode: replyCode , replyText: replyText , exchange: exchange , routingKey: routingKey);System.out.println(退回的消息内容: new String(body));}});String exchangeName testExchange;String routingKey nonexistentQueue; // 不存在的队列String message This message will be returned;// 设置mandatory为true开启Return模式channel.basicPublish(exchangeName, routingKey, true, null, message.getBytes());// 关闭资源channel.close();connection.close();}
} 在上述代码中channel.addReturnListener 添加了 ReturnListener 监听器在 handleReturn 方法中处理被退回的消息包括打印错误码、错误信息、交换机、路由键以及消息内容 。
消费者确认机制消费者确认机制用于确保消费者正确处理消息。RabbitMQ 提供了三种消费者确认消息的方式。
自动确认Auto Ack在这种方式下当消费者接收到消息后RabbitMQ 会立即将该消息从队列中删除无论消费者是否成功处理了消息。在 Java 客户端中通过 channel.basicConsume(queueName, true, consumer) 方法开启自动确认其中第二个参数 true 表示开启自动确认 。
手动确认Manual Ack消费者在处理完消息后需要手动调用 channel.basicAck(deliveryTag, multiple) 方法来确认消息。deliveryTag 是消息的唯一标识multiple 表示是否批量确认。如果 multiple 为 true则表示确认所有小于等于 deliveryTag 的消息。例如在处理完一条消息后调用 channel.basicAck(deliveryTag, false) 来确认该条消息 。
根据异常情况确认消费者可以在处理消息的过程中根据是否发生异常来决定如何确认消息。在 try - catch 块中处理消息若处理成功则调用 basicAck 确认消息若发生异常则调用 channel.basicNack(deliveryTag, multiple, requeue) 或 channel.basicReject(deliveryTag, requeue) 方法。basicNack 方法可以批量拒绝消息requeue 参数表示是否将消息重新放回队列basicReject 方法只能拒绝单条消息同样通过 requeue 参数决定是否将消息重新放回队列 。示例代码如下 import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();String queueName testQueue;// 手动确认模式channel.basicConsume(queueName, false, myConsumerTag, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {String message new String(body, UTF - 8);System.out.println(Received message: message);// 处理消息// 处理成功后手动确认channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {// 处理失败拒绝消息并重新放回队列channel.basicReject(envelope.getDeliveryTag(), true);}}});}
} 在上述代码中通过 channel.basicConsume 方法设置为手动确认模式第二个参数为 false在 handleDelivery 方法中处理消息根据处理结果调用 basicAck 或 basicReject 方法 。
二死信队列Dead Letter Queue
死信队列顾名思义是用于存放那些无法被正常消费的 “死信” 的队列。在实际应用中了解死信的产生原因以及如何配置死信队列对于保障系统的稳定性和可靠性至关重要 。
死信产生原因
消息被拒绝当消费者调用 basic.reject 或 basic.nack 方法拒绝消息并且设置 requeue 参数为 false 时该消息会成为死信。例如在处理订单消息时如果订单数据格式错误消费者无法处理就可以拒绝该消息并设置不重新入队 。
消息过期RabbitMQ 支持为消息或队列设置过期时间TTLTime To Live。当消息在队列中停留的时间超过了设置的过期时间该消息就会变成死信。比如在电商系统中设置订单支付消息的过期时间为 30 分钟如果 30 分钟内未支付该订单消息就会过期成为死信 。
队列达到最大长度如果队列设置了最大长度x - max - length当队列中的消息数量达到这个最大值后再添加的消息会被丢弃或成为死信具体取决于队列的配置 。
死信队列的作用与配置死信队列的主要作用是对无法正常消费的消息进行统一管理和后续处理避免消息丢失导致的数据不一致或业务异常。配置死信队列需要经过以下步骤
定义死信交换机和队列首先需要定义一个死信交换机一般为 Direct 或 Topic 类型和对应的死信队列。例如在使用 Spring Boot 和 Spring AMQP 进行配置时可以通过配置类来定义 import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class DeadLetterQueueConfig {Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(dlx.exchange);}Beanpublic Queue deadLetterQueue() {return new Queue(dlx.queue);}Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(dlx.routing.key);}
} import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class DeadLetterQueueConfig {Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(dlx.exchange);}Beanpublic Queue deadLetterQueue() {return new Queue(dlx.queue);}Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(dlx.routing.key);}
}
在上述代码中定义了名为 dlx.exchange 的死信交换机和名为 dlx.queue 的死信队列并通过 Binding 将它们绑定在一起路由键为 dlx.routing.key 。
关联正常队列与死信队列将正常队列与死信队列进行关联当正常队列中的消息成为死信时会被发送到死信队列。可以通过在正常队列的声明中设置 x - dead - letter - exchange 和 x - dead - letter - routing - key 参数来实现。例如
Bean
public Queue normalQueue() {return QueueBuilder.durable(normal.queue).withArgument(x - dead - letter - exchange, dlx.exchange).withArgument(x - dead - letter - routing - key, dlx.routing.key).build();
} 在上述代码中normal.queue 正常队列通过设置 x - dead - letter - exchange 和 x - dead - letter - routing - key 参数将死信交换机和路由键与之前定义的死信队列相关联 。 三延迟队列Delay Queue
延迟队列在实际业务中有着广泛的应用它能够让消息在指定的延迟时间后才被处理。RabbitMQ 本身并没有直接提供延迟队列的功能但可以通过一些方式来实现 。
实现方式
使用插件可以通过安装 rabbitmq_delayed_message_exchange 插件来实现延迟队列。安装插件后在 RabbitMQ 管理界面中可以创建延迟交换机类型为 x - delayed - message。在发送消息时通过设置消息的 x - delay 属性来指定延迟时间。例如在使用 Java 客户端发送消息时
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();String exchangeName delayed.exchange;String routingKey delayed.routing.key;String message Delayed message;// 设置延迟时间为5000毫秒MapString, Object headers new HashMap();headers.put(x - delay, 5000);AMQP.BasicProperties properties new AMQP.BasicProperties.Builder().headers(headers).build();channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());// 关闭资源channel.close();connection.close();}
}
在上述代码中通过设置消息的 headers 中的 x - delay 属性为 5000表示该消息将延迟 5000 毫秒后才被处理 。
利用死信队列实现利用死信队列和消息过期时间TTL来模拟延迟队列的功能。通过设置正常队列的消息过期时间当消息过期后会被发送到死信队列从而实现延迟处理的效果。例如在配置类中定义正常队列和死信队列 import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class DelayQueueConfig {Beanpublic DirectExchange normalExchange() {return new DirectExchange(normal.exchange);}Beanpublic Queue normalQueue() {return QueueBuilder.durable(normal.queue).withArgument(x - message - ttl, 5000) // 设置消息过期时间为5000毫秒.withArgument(x - dead - letter - exchange, dlx.exchange).withArgument(x - dead - letter - routing - key, dlx.routing.key).build();}Beanpublic Binding normalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(normal.routing.key);}Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(dlx.exchange);}Beanpublic Queue deadLetterQueue() {return new Queue(dlx.queue);}Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(dlx.routing.key);}
} 在上述代码中normal.queue 正常队列设置了消息过期时间为 5000 毫秒当消息在该队列中过期后会根据 x - dead - letter - exchange 和 x - dead - letter - routing - key 参数被发送到死信队列 dlx.queue从而实现了延迟 5000 毫秒处理消息的效果 。
应用场景
订单超时处理在电商系统中当用户下单后如果在规定时间内未支付系统需要自动取消订单。可以将订单消息发送到延迟队列设置延迟时间为支付超时时间当延迟时间到达后消息从延迟队列中被消费触发订单取消的业务逻辑 。
任务定时执行例如在一个任务调度系统中需要在某个特定时间点执行某项任务。可以将任务消息发送到延迟队列设置延迟时间为距离任务执行时间的间隔当延迟时间结束任务消息被消费执行相应的任务 。
四优先级队列Priority Queue
优先级队列允许为消息设置不同的优先级使得高优先级的消息能够优先被处理。在实际应用场景中这种特性能够有效地优化资源分配和任务调度提升系统的整体性能和响应速度 。
在 RabbitMQ 中设置消息优先级首先需要在队列声明时启用优先级功能。以 Python 语言使用 pika 库为例代码如下
import pika# 建立到RabbitMQ服务器的连接
connection pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel connection.channel()# 声明一个支持优先级的队列这里设置最大优先级为10
channel.queue_declare(queuepriority_queue, arguments{x - max - priority: 10})
在上述代码中通过在 queue_declare 方法的 arguments 参数中设置 x - max - priority 为 10声明了一个名为 priority_queue 的队列并支持 0 - 10 共 11 个优先级 。
发送消息时可以指定消息的优先级。示例代码如下 # 发送高优先级消息 channel.basic_publish(exchange, routing_keypriority_queue, bodyHigh priority message, propertiespika.BasicProperties(priority 10)) # 发送低优先级消息 channel.basic_publish(exchange, routing_keypriority_queue, bodyLow priority message, propertiespika.BasicProperties(priority ## 五、实战案例剖析 ### 一电商场景下单流程优化 在电商系统中下单流程涉及多个系统的协同工作如订单系统、库存系统、支付系统、物流系统等。传统的下单流程中这些系统之间往往是紧密耦合的同步调用这会导致系统的响应速度慢、可扩展性差且一旦某个系统出现故障整个下单流程就会受到影响 。 引入RabbitMQ后下单流程得到了极大的优化。当用户下单时订单系统作为生产者将订单消息发送到RabbitMQ的订单队列中。库存系统、支付系统、物流系统等作为消费者从订单队列中获取订单消息进行异步处理 。 这样做带来了诸多好处。系统的响应速度大幅提升因为订单系统在发送消息后无需等待其他系统的处理结果就能立即返回给用户下单成功的响应。系统的可扩展性增强当业务量增加时可以通过增加消费者的实例数量来提高处理能力。系统的稳定性也得到了保障即使某个系统出现故障其他系统仍然可以继续处理已接收的消息不会影响整个下单流程的正常运转 。 以某知名电商平台为例在未使用RabbitMQ之前下单高峰期系统响应时间长达数秒且经常出现订单处理失败的情况。引入RabbitMQ后系统响应时间缩短至毫秒级订单处理成功率提升至99%以上大大提升了用户体验和业务的稳定性 。 ### 二日志处理系统搭建 在大型应用系统中日志的收集、存储和分析是非常重要的环节。传统的日志处理方式往往是同步写入文件或数据库这种方式在高并发场景下会严重影响系统的性能 。 利用RabbitMQ构建高效的日志处理系统可以很好地解决这些问题。应用程序作为生产者将日志消息发送到RabbitMQ的日志队列中。日志处理系统作为消费者从日志队列中获取日志消息并进行异步存储和分析 。 在实际应用中可以使用Elasticsearch来存储日志数据利用其强大的搜索和分析功能实现对日志的快速查询和统计。可以结合Kibana等可视化工具对日志数据进行直观的展示和分析 。 例如一个大型互联网公司的应用系统每天会产生海量的日志数据。通过使用RabbitMQ搭建日志处理系统将日志消息异步发送到Elasticsearch中存储再通过Kibana进行可视化分析运维人员可以实时监控系统的运行状态快速定位和解决问题大大提高了运维效率 。 ## 六、常见问题与解决方案
在使用RabbitMQ的过程中我们可能会遇到各种各样的问题。下面为大家列举一些常见问题并给出相应的解决方案 。 ### 一消息丢失 消息丢失是使用RabbitMQ时较为常见且严重的问题。它可能发生在消息生产、存储和消费的各个环节。在生产环节当网络出现波动时生产者可能会误以为消息已发送成功但实际上RabbitMQ服务器并未收到消息。在存储环节如果RabbitMQ服务器突然宕机或重启而队列和消息未进行持久化设置那么内存中的消息就会丢失。在消费环节若消费者在处理消息过程中出现异常而又采用了自动确认Auto Ack模式RabbitMQ会认为消息已被成功消费从而将消息从队列中删除导致消息丢失 。 为解决这一问题可从以下几个方面入手。在生产者端开启确认模式Confirm 模式并在发送消息后通过监听确认信号来判断消息是否成功发送到交换机。若未收到确认信号可进行消息重发。同时捕获发送过程中的异常以便及时处理。在存储环节将交换机、队列和消息都设置为持久化。在Java中声明交换机时设置 durable true声明队列时同样设置 durable true发送消息时通过 AMQP.BasicProperties.Builder().deliveryMode(2).build() 设置消息持久化 。在消费者端采用手动确认Manual Ack模式确保在消息处理完成后再向RabbitMQ发送确认信号 。 ### 二重复消费 消息重复消费也是一个需要关注的问题。其产生原因主要是网络波动或消费者服务异常。当消费者正常处理完消息但还没来得及向RabbitMQ发送确认时若出现网络抖动或者消费者服务挂掉的情况等网络恢复或者消费者服务重启后由于RabbitMQ之前未收到确认消息仍然在队列中并且因为有重试机制消费者就会重新消费这条消息 。 解决消息重复消费的问题可从业务层面保证幂等性。例如在电商系统中对于订单支付成功后的状态修改操作可将未支付状态作为修改语句的执行条件这样即使重复执行该操作也不会对结果产生影响。也可以通过设置消息唯一标识ID来解决。在消费者接收消息时对这个ID进行校验若该ID已被处理过则不再重复处理 。 ### 三性能瓶颈 随着业务量的增长RabbitMQ可能会出现性能瓶颈。例如在高并发场景下消息的处理速度跟不上消息的产生速度导致队列中消息堆积严重从而影响系统的整体性能。造成性能瓶颈的原因可能是多方面的如消费者处理逻辑复杂、系统资源如CPU、内存不足、队列配置不合理等 。 为优化性能可采取以下措施。优化消费者的处理逻辑减少不必要的计算和I/O操作提高处理速度。增加消费者的数量以并行处理更多的消息。合理配置队列参数如设置合适的预取计数prefetch count避免消费者一次性拉取过多消息导致内存溢出 。对系统资源进行监控和优化确保服务器有足够的资源来处理消息 。
- 上一篇: 大型集团网站主页网页设计
- 下一篇: 大型门户网站核心技术母婴的网站建设
相关文章
-
大型集团网站主页网页设计
大型集团网站主页网页设计
- 技术栈
- 2026年03月21日
-
大型机械网站建设公司连云港公司网站优化服务
大型机械网站建设公司连云港公司网站优化服务
- 技术栈
- 2026年03月21日
-
大型公司网站制作桂林两江四湖游船路线
大型公司网站制作桂林两江四湖游船路线
- 技术栈
- 2026年03月21日
-
大型门户网站核心技术母婴的网站建设
大型门户网站核心技术母婴的网站建设
- 技术栈
- 2026年03月21日
-
大型门户网站建设美丽wordpress 文章别名
大型门户网站建设美丽wordpress 文章别名
- 技术栈
- 2026年03月21日
-
大型门户网站建设需要哪些技术下载中国建设银行官网站
大型门户网站建设需要哪些技术下载中国建设银行官网站
- 技术栈
- 2026年03月21日






