网站部署 模板放网站的图片做多大分辨率
- 作者: 五速梦信息网
- 时间: 2026年04月20日 08:13
当前位置: 首页 > news >正文
网站部署 模板,放网站的图片做多大分辨率,中企动力做的网站山西太原,网址没封的来一个文章目录 前言1. Simple 简单模式2. Work Queue 工作队列模式3. Pubulish/Subscribe 发布/订阅模式Exchange 的类型 4. Routing 路由模式5. Topics 通配符模式6. RPC RPC通信7. Publisher Confirms 发布确认1. 单独确认2. 批量确认3. 异步确认 前言
前面我们学习了 RabbitMQ 的… 文章目录 前言1. Simple 简单模式2. Work Queue 工作队列模式3. Pubulish/Subscribe 发布/订阅模式Exchange 的类型 4. Routing 路由模式5. Topics 通配符模式6. RPC RPC通信7. Publisher Confirms 发布确认1. 单独确认2. 批量确认3. 异步确认 前言
前面我们学习了 RabbitMQ 的基本使用以及 RabbitMQ 的快速上手那么这篇文章我将为大家介绍 RabbitMQ 提供的 7 种工作模式我们上一篇快速入门实现的案例其实就是 7 种工作模式中的简单模式。
第一种Simple 简单模式 第二种Work Queue 工作队列模式 第三种Publish/Subscribe 发布/订阅模式 第四种Routing 路由模式 第五种Topic 通配符模式 第六种RPC 模式
第七种就是 Publisher Confirms 发布确认模式。
- Simple 简单模式 简单模式主要由一个 Producer、一个 Queue和一个 Consumer 组成。 简单模式的特点就是一个生产者 P一个消费者 C消息只能被消费一次也称为点对点Point-to-Point模式。 这里的具体实现上一篇文章我就写了这里就不写了大家可以去看看。
- Work Queue 工作队列模式 工作队列模式由一个生产者 P一个队列 Queue 和多个消费者 C1、C2…组成在这种模式下Work Queue 会将消息分派给不同的消费者每个消费者都会接收到不同的消息意思就是 Work Queue 接收到了 10 条消息那么 Work Queue 会将这 10 条消息分成两部分每个部分 5 条消息每条消息都不重复然后将这五条消息分别发送给 C1 和 C2。 特点消息不会重复分配给不同的消费者。 适用场景集群环境中做异步处理。比如我们平时在银行中办理业务取号的时候当要办理业务的人取号生产者了之后那么这些号码就会被存放在队列中银行中的每个窗口消费者会给不同号的人办理业务。 那么我们看看通过 Java 代码如何实现一个工作队列模式。 对于这些经常使用到的变量我们将其归到一个类中进行管理 public class Constants {public static final String IP ...;public static final Integer PORT 5672;public static final String VIRTUAL_HOST test;public static final String USER_NAME admin;public static final String PASSWORD xxx;public static final String WORK_QUEUE work.queue; }生产者代码 public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection factory.newConnection();//开启信道Channel channel connection.createChannel();//声明队列channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//发送消息for (int i 0; i 10; i) {String msg work queue i;channel.basicPublish(,Constants.WORK_QUEUE,null,msg.getBytes());}System.out.println(消息发送成功);channel.close();connection.close();} }消费者1 和消费者 2 的代码是一样的 public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection factory.newConnection();//开启信道Channel channel connection.createChannel();//声明队列channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//消费消息Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(接受到消息: new String(body));}};channel.basicConsume(Constants.WORK_QUEUE,consumer);//释放资源//channel.close(); 我们这里可以先不释放资源不然当我们先运行消费者的时候queue中没有消息consumer的连接就会直接关闭了//connection.close();} }先启动两个消费者然后再启动生产者 可以看到 Consumer1 和 Consumer2 拿到的消息都是不重复的消息。
- Pubulish/Subscribe 发布/订阅模式 Exchange 的类型 可以发现这个模式相较于前面两个模式多出了一个 X这个 X 指的是 Exchange 交换机。 交换机的作用是生产者将消息发送到 Exchange由交换机将消息按照一定的规则路由到一个或者多个队列中。 AMQP 协议中的交换机的类型有六种fanoutdirecttopicheadersSystem和自定义但是 RabbitMQ 中交换机的类型只有前四种。 Fanout广播将消息交给所有绑定到交换机的队列Publish/Subcribe模式Direct定向把消息交给符合指定 routing key 的队列Routing 模式Topic通配符把消息交给符合 routing pattern路由模式的队列Topic 模式headers类型的交换器不依赖于路由键的匹配规则来路由消息而是根据发送的消息内容中的 headers 属性进行匹配。headers 类型的交换机性能很差而且也不实用基本上不会看到它的存在 Exchaneg 只负责转发消息不具备存储消息的能力因此如果没有队列与 Exchange 绑定或者没有符合路由规则的队列那么消息就会丢失。 Routing Key路由键。生产者将消息发送给转换机的时候指定的一个字符串用来告诉交换机应该如何处理这个消息Binding Key绑定。RabbitMQ 中通过 Binding 将交换机和队列关联起来在绑定的时候一般会指定一个 Binding Key这样 RabbitMQ 就知道如何正确的将消息路由到队列了。 当 Exchange 拿到生产者发送来的消息之后会将消息中带的 Routing Key 和与该交换机绑定的队列的 Binding Key 进行匹配然后将这个消息发送给 Routing Key 和 Binding Key 匹配的队列。 当知道了交换机的几种类型之后我们来看看如何使用代码实现出来一个 Publish/Subscribe 模式。 生产者代码 首先还是与 RabbitMQ-Server 建立连接开启信道与前面不同的操作是前面我们声明交换机的时候因为使用的是默认的交换机所以就没有显式的声明交换机但是在涉及到交换机类型的时候我们就需要显式的声明交换机虽然 RabbitMQ 默认为我们提供了各个类型的交换机但是名字可能不好记所以不如我们自己实现一个 Java 中声明一个交换机的方法主要是 exchangeDeclare() 方法这个方法有很多个重载方法但是我们主要使用下面的这种方法 AMQP.Exchange.DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2, boolean var3) throws IOException;String var1: 这个参数是交换机的名称。它是必须的用于在RabbitMQ中唯一标识一个交换机。你可以根据需要为这个交换机命名。BuiltinExchangeType var2: 这个参数指定了交换机的类型。该类是一个枚举类内部枚举了交换器的类型boolean var3: 这个布尔值参数指定交换机是否应该被标记为持久的即在RabbitMQ重启后仍然存在。如果设置为true交换机将持久化如果设置为false交换机则不会持久化。 public enum BuiltinExchangeType {DIRECT(direct),FANOUT(fanout),TOPIC(topic),HEADERS(headers);private final String type;private BuiltinExchangeType(String type) {this.type type;}public String getType() {return this.type;} }声明完成交换器后就是声明队列声明队列之后就是需要绑定交换器和队列了 绑定交换器和队列使用的方法是 queueBind() 方法该方法也是有两个重载的方法 AMQP.Queue.BindOk queueBind(String var1, String var2, String var3) throws IOException;AMQP.Queue.BindOk queueBind(String var1, String var2, String var3, MapString, Object var4) throws IOException;String var1: 队列的名称。这是你想要绑定到交换机的队列的唯一标识符。String var2: 交换机的名称。这是你想要将队列绑定到的交换机的名称。String var3: 路由键。当消息发送到交换机时交换机将使用路由键来确定哪些队列应该接收这个消息。路由键可以是任何字符串其解释取决于交换机的类型。MapString, Object var4: 绑定参数。这是一个可选参数允许你为绑定指定额外的参数这些参数将根据交换机和队列的特定需求进行解释。例如对于某些交换机类型如headers交换机绑定参数可能用于定义消息头中的条件。对于大多数用途这个参数可能为空或未使用。 我们这里没有使用到额外的参数所以就使用三个参数的方法 channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,); channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,);public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection factory.newConnection();//开启信道Channel channel connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);//声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//绑定交换机和队列channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,);channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,);//生产消息for (int i 0; i 10; i) {String msg fanout exchange i;channel.basicPublish(Constants.FANOUT_EXCHANGE,,null,msg.getBytes());}System.out.println(消息发送成功);channel.close();connection.close();} }消费者代码 public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection factory.newConnection();//开启信道Channel channel connection.createChannel();//声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//消费消息Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(接收到消息: new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1,consumer);} }两个消费者的代码基本上都一样就是声明队列的时候声明的是两个不同的队列。 先启动两个消费者然后再启动生产者 可以看到Exchange 将收到的生产者生产的消息复制为 N 份发送给了所有与其绑定的队列然后消费者拿到的消息就是一样的消息。这就是 Publish/Subscribe 模式。
- Routing 路由模式 路由模式其实和发布/订阅模式非常相似它是在发布订阅模式的基础上增加了路由 key其实也不算增加只不过发布/订阅模式交换器和队列的 Binding key 都是一样的然后生产者发送的消息中携带的 Routing Key 也是和这些 Binding Key 是相匹配的所有交换器会将收到的消息发送给所有与其绑定的队列。 而路由模式则是交换器和队列的 Binding Key 并不是完全相同的而是存在差异这样当交换器接收到 Routing Key 的时候就会将消息发送给与之匹配的队列。那么我们看看如何使用代码来实现 路由模式。 生产者代码 public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection factory.newConnection();//开启信道Channel channel connection.createChannel();//声明交换器channel.exchangeDeclare(Constants.ROUTING_EXCHANGE, BuiltinExchangeType.DIRECT,true);//声明队列channel.queueDeclare(Constants.ROUTING_QUEUE1,true,false,false,null);//绑定交换器和队列channel.queueBind(Constants.ROUTING_QUEUE1,Constants.ROUTING_EXCHANGE,a);channel.queueBind(Constants.ROUTING_QUEUE1,Constants.ROUTING_EXCHANGE,b);channel.queueBind(Constants.ROUTING_QUEUE2,Constants.ROUTING_EXCHANGE,a);//生产消息String msg routing exchange a;channel.basicPublish(Constants.ROUTING_EXCHANGE,a,null,msg.getBytes());msg routing exchange b;channel.basicPublish(Constants.ROUTING_EXCHANGE,b,null,msg.getBytes());System.out.println(消息发送成功);channel.close();connection.close();} }消费者代码 public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection factory.newConnection();//开启信道Channel channel connection.createChannel();//声明队列channel.queueDeclare(Constants.ROUTING_QUEUE1,true,false,false,null);//消费消息Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(接收到消息: new String(body));}};channel.basicConsume(Constants.ROUTING_QUEUE1,consumer);} }5. Topics 通配符模式 跟 Java、MySQL 一样我们的 RabbitMQ 也是支持 通配符的所以我们的 Routing Key 和 Binding Key 也是可以有通配符的。在 RabbitMQ 中的匹配字符有两个 * 和 #。 匹配一个单词#匹配0个或者多个单词 注意 RabbitMQ 中匹配字符匹配的不是字符而是单词RabbitMQ 中由 . 分隔一个单词。a.b.ca b c 都叫做一个单词aa.bb.ccaa bb cc 也就做一个单词。 生产者代码 public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection factory.newConnection();//开启信道Channel channel connection.createChannel();//声明交换器channel.exchangeDeclare(Constants.TOPICS_EXCHANGE, BuiltinExchangeType.TOPIC,true);//声明队列channel.queueDeclare(Constants.TOPICS_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.TOPICS_QUEUE2,true,false,false,null);//绑定交换器和队列channel.queueBind(Constants.TOPICS_QUEUE1,Constants.TOPICS_EXCHANGE,.a.);channel.queueBind(Constants.TOPICS_QUEUE1,Constants.TOPICS_EXCHANGE,c.#);channel.queueBind(Constants.TOPICS_QUEUE2,Constants.TOPICS_EXCHANGE,.a.*);//生产消息String msg topics exchange .a.;channel.basicPublish(Constants.TOPICS_EXCHANGE,hello.a.r,null,msg.getBytes());msg topics exchange c.#;channel.basicPublish(Constants.TOPICS_EXCHANGE,c.world,null,msg.getBytes());channel.close();connection.close();} }消费者代码 public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection factory.newConnection();//开启信道Channel channel connection.createChannel();//声明队列channel.queueDeclare(Constants.TOPICS_QUEUE1,true,false,false,null);//消费消息Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(接收到消息: new String(body));}};channel.basicConsume(Constants.TOPICS_QUEUE1,consumer);} }6. RPC RPC通信 在 RPC 通信的过程中没有生产者和消费者比较像咱们的 RPC 远程调用大概就是通过两个队列实现了一个可回调的过程。 RPC 通信的过程 客户端发送消息到一个指定的队列并在消息属性中设置 replyTo 字段这个字段指定了一个回调队列用于接收服务器的响应服务端收到请求后处理请求并发送响应消息到 replyTo 指定的回调队列客户端在回调队列上等待消息一旦收到响应客户端会检查消息的 replyTo 属性以确保它是所期望的响应 RPC 通信客户端代码 public class RpcClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection factory.newConnection();//开启信道Channel channel connection.createChannel();//声明交换器 我们这是使用默认的交换器//生命队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);//发送请求String msg rpc…;//设置请求的唯一标识String correlationId UUID.randomUUID().toString();//设置请求的相关属性AMQP.BasicProperties properties new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(Constants.RPC_RESPONSE_QUEUE).build();channel.basicPublish(,Constants.RPC_REQUEST_QUEUE,properties,msg.getBytes());//接收响应//通过阻塞队列来接收响应BlockingQueueString blockingQueue new ArrayBlockingQueue(1);Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String response new String(body);System.out.println(接收到回调消息: response);if (correlationId.equals(properties.getCorrelationId())) {blockingQueue.offer(response);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,consumer);String result blockingQueue.take();System.out.println([RPC Client 响应结果]: result);} }RPC 服务端代码 public class RpcServer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection factory.newConnection();//开启信道Channel channel connection.createChannel();//声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);//接收请求channel.basicQos(1); //这个的作用后面再为大家介绍Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request new String(body);System.out.println(接收到请求: request);String response 针对request: request 相应成功;AMQP.BasicProperties properties1 new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish(,Constants.RPC_RESPONSE_QUEUE,properties1,response.getBytes());//envelope.getDeliveryTag() 每个消息都有一个唯一的deliveryTag//false表示是否批量确认消息channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE,consumer);} }7. Publisher Confirms 发布确认 RabbitMQ的Publisher Confirms发布确认机制是一种确保消息从生产者Publisher安全发送到RabbitMQ服务器的机制。当生产者向RabbitMQ发送消息时它可能希望知道消息是否已经被RabbitMQ服务器成功接收并存储起来以确保消息的可靠性。Publisher Confirms机制就是为了满足这一需求而设计的。 生产者将 Channel 设置为 confirm 模式 通过调用 channel.confirmSelect()完成后发布的每一条消息都会获得一个唯一的 ID生产者可以将这些序列号与消息关联起来以便跟踪消息的状态当消息被 RabbitMQ 服务器接收并处理之后服务器会异步的像生产者发送一个确认 ACK 给生产者包含消息的唯一ID表明消息已经送达 通过这个 Publisher Confirms 模式可以避免消息的丢失问题。 消息丢失大概分为三种情况 生产者问题因为程序故障网络抖动等原因生产者没有向 Broker 发送消息消息中间价问题也就是我们的 RabbitMQ Broker 出现问题生产者将消息成功的发送给了 Broker但是 Broker 没有将消息保存好导致消息丢失消费者问题Broker 将消息成功发送给了消费者但是消费者在消费的时候因为没有处理好导致消费者这里的消息丢失了并且 broker 也将消费者失败的消息从队列中删除了 RabbitMQ 对于上面可能出现的三种问题都给出了解决问题 2 可以通过持久化机制解决问题 3 可以通过消息应答机制解决。针对问题 1则可以通过 Publisher Confirms 机制解决。 RabbitMQ 发布确认是 0.9.1 协议的扩展默认情况下他不会被启用生产者可以通过 channel.confirmSelect() 将信道设置为 confirm 模式。 RabbitMQ 提供的发布确认有三种策略那么接下来我们来了解一下这三种策略的优劣。
- 单独确认 它要求生产者Publisher在发送每一个消息后都等待RabbitMQ服务器的确认Confirm确保消息已经被RabbitMQ成功接收并处理然后再继续发送下一个消息。
- 批量确认 在 RabbitMQ 的发布确认Publisher Confirms机制中批量确认Batch Acknowledgment是一个重要的特性它允许 RabbitMQ 在一次确认消息中同时确认多个消息。这对于提高性能和减少网络开销非常有帮助。
- 异步确认 RabbitMQ 发布确认中的异步确认策略是一种高效且可靠的机制用于在消息发送过程中异步地接收确认回调以提高生产者的吞吐量和性能。 public class PublisherConfirms {private static final Integer MESSAGE_COUNT 1000;static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);return factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//Strategy #1: Publishing Messages Individually//单独确认publishingMessagesIndividually();//Strategy #2: Publishing Messages in Batches//批量确认publishingMessagesInBatches();//Strategy #3: Handling Publisher Confirms Asynchronously//异步确认handlingPublisherConfirmsAsynchronously();}/*** 单独确认* throws IOException* throws TimeoutException* throws InterruptedException/private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {try (Connection connection createConnection()) {//1. 开启信道Channel channel connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);//4. 发送消息, 并等待确认long start System.currentTimeMillis();for (int i 0; i MESSAGE_COUNT; i) {String msg publisher confirmsi;channel.basicPublish(,Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());//等待确认channel.waitForConfirmsOrDie(5000);}long end System.currentTimeMillis();System.out.printf(单独确认策略, 消息条数: %d, 耗时: %d ms \n,MESSAGE_COUNT, end-start);}}/** 批量确认* throws IOException* throws TimeoutException* throws InterruptedException/private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {try(Connection connection createConnection()) {//1. 开启信道Channel channel connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//4. 发送消息, 并进行确认long start System.currentTimeMillis();int batchSize 100;int outstandingMessageCount 0;for (int i 0; i MESSAGE_COUNT; i) {String msg hello publisher confirmsi;channel.basicPublish(,Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());outstandingMessageCount;if (outstandingMessageCountbatchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount 0;}}if (outstandingMessageCount0){channel.waitForConfirmsOrDie(5000);}long end System.currentTimeMillis();System.out.printf(批量确认策略, 消息条数: %d, 耗时: %d ms \n,MESSAGE_COUNT, end-start);}}/** 异步确认* throws IOException* throws TimeoutException*/private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException {try (Connection connection createConnection()) {//开启信道Channel channel connection.createChannel();//设置信道为confirm模式channel.confirmSelect();//声明转换器 这里我们使用默认的转换器//声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null);//监听confirm//记录开始时间long start System.currentTimeMillis();//该集合用来存放未确认的消息的IDSortedSetLong confirmSeqNo Collections.synchronizedSortedSet(new TreeSet());channel.addConfirmListener(new ConfirmListener() {Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {//deliveryTag 是消息的唯一ID multiple 表示是否批量确认if (multiple) {//如果是批量确认则将deliveryTag之前的消息ID都删除了confirmSeqNo.headSet(deliveryTag 1).clear();}else {confirmSeqNo.remove(deliveryTag);}}Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {//这里为了简单当RabbitMQ Broker无法正确处理消息的话我们也认为它处理了if (multiple) {confirmSeqNo.headSet(deliveryTag 1).clear();}else {confirmSeqNo.remove(deliveryTag);}}});//发送消息for (int i 0; i MESSAGE_COUNT; i) {String msg pulisher confirms i;long seqNo channel.getNextPublishSeqNo();channel.basicPublish(,Constants.PUBLISHER_CONFIRMS_QUEUE3,null,msg.getBytes());confirmSeqNo.add(seqNo);}while (!confirmSeqNo.isEmpty()) {try {Thread.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}}long end System.currentTimeMillis();System.out.printf(异步确认策略消息条数%d耗时%d ms \n,MESSAGE_COUNT,end - start);}} }可以看到单独确认策略所需要的时间是比较多的而异步策略则能够快速的处理这些。
- 上一篇: 网站部署 模板wordpress配置资源
- 下一篇: 网站部署环境蒙文网站建设情况汇报材料
相关文章
-
网站部署 模板wordpress配置资源
网站部署 模板wordpress配置资源
- 技术栈
- 2026年04月20日
-
网站步骤做网站里面内容编写
网站步骤做网站里面内容编写
- 技术栈
- 2026年04月20日
-
网站布局怎么用dw做网站集约化建设性能要求
网站布局怎么用dw做网站集约化建设性能要求
- 技术栈
- 2026年04月20日
-
网站部署环境蒙文网站建设情况汇报材料
网站部署环境蒙文网站建设情况汇报材料
- 技术栈
- 2026年04月20日
-
网站采集功能网页无法访问如何解决360浏览器
网站采集功能网页无法访问如何解决360浏览器
- 技术栈
- 2026年04月20日
-
网站采集功能网站制作公司官网首页
网站采集功能网站制作公司官网首页
- 技术栈
- 2026年04月20日
