下载贵州省建设厅网站模板网站定制

当前位置: 首页 > news >正文

下载贵州省建设厅网站,模板网站定制,中国核工业二三建设有限公司怎么样,公司建设网站请示目录1.Work queues工作队列模式1.1 模式说明1.2 代码1.3 测试1.4 小结2.订阅模式类型3.Publish/Subscribe发布与订阅模式3.1 模式说明3.2 代码3.3 测试3.4 小结4.Routing路由模式4.1 模式说明4.2 代码4.3 测试4.4 小结5.Topics通配符模式5.1 模式说明5.2 代码5.3 测试5.4 小结6… 目录1.Work queues工作队列模式1.1 模式说明1.2 代码1.3 测试1.4 小结2.订阅模式类型3.Publish/Subscribe发布与订阅模式3.1 模式说明3.2 代码3.3 测试3.4 小结4.Routing路由模式4.1 模式说明4.2 代码4.3 测试4.4 小结5.Topics通配符模式5.1 模式说明5.2 代码5.3 测试5.4 小结6.模式总结1.Work queues工作队列模式 1.1 模式说明 Work Queues与入门程序的简单模式相比多了一个或一些消费端多个消费端共同消费同一个队列中的消息。 应用场景对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。 1.2 代码 Work Queues与入门程序的简单模式的代码是几乎一样的可以完全复制并复制多一个消费者进行多个消费者同时消费消息的测试。 1抽取工具类 package com.donglin.rabbitmq.util;import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory new ConnectionFactory();//设置服务地址factory.setHost(192.168.121.140);//端口factory.setPort(5672);//设置账号信息用户名、密码、vhostfactory.setVirtualHost(/);factory.setUsername(admin);factory.setPassword(admin);// 通过工程获取连接Connection connection factory.newConnection();return connection;}public static void main(String[] args) throws Exception {Connection con ConnectionUtil.getConnection();System.out.println(con);// amqp://admin192.168.6.100:5672/con.close();} }2生产者 package com.donglin.rabbitmq.work;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Producer {static final String QUEUE_NAME work_queue;public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);for (int i 1; i 10; i) {String body ihello rabbitmq~~~;channel.basicPublish(,QUEUE_NAME,null,body.getBytes());}channel.close();connection.close();} } 3消费者1 package com.donglin.rabbitmq.work;import com.rabbitmq.client.*; import java.io.IOException;public class Consumer1 {static final String QUEUE_NAME work_queue;public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);DefaultConsumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);} }4消费者2 Consumer2与Consumer1类一样。省略。 运行两个消费者
1.3 测试 启动两个消费者然后再启动生产者发送消息到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。 1.4 小结 1.在一个队列中如果有多个消费者那么消费者之间对于同一个消息的关系是竞争的关系。 2.Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。 2.订阅模式类型 订阅模式示例图 前面2个案例中只有3个角色 P生产者也就是要发送消息的程序C消费者消息的接受者会一直等待消息到来。queue消息队列图中红色部分 而在订阅模型中多了一个exchange角色而且过程略有变化 P生产者也就是要发送消息的程序但是不再发送到队列中而是发给X交换机C消费者消息的接受者会一直等待消息到来。Queue消息队列接收消息、缓存消息。Exchange交换机图中的X。一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。Exchange有常见以下3种类型 Fanout广播将消息交给所有绑定到交换机的队列Direct定向把消息交给符合指定routing key 的队列Topic通配符把消息交给符合routing pattern路由模式 的队列
Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失 3.Publish/Subscribe发布与订阅模式 3.1 模式说明 发布订阅模式 1、每个消费者监听自己的队列。 2、生产者将消息发给broker由交换机将消息转发到绑定此交换机的每个队列每个绑定交换机的队列都将接收 到消息 3.2 代码 1生产者 package com.donglin.rabbitmq.fanout;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();/exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, MapString, Object arguments)参数1. exchange交换机名称2. type交换机类型DIRECT(direct),定向FANOUT(fanout),扇形广播,发送消息到每一个与之绑定队列。TOPIC(topic),通配符的方式HEADERS(headers);参数匹配3. durable是否持久化4. autoDelete自动删除5. internal内部使用。 一般false6. arguments参数/String exchangeName test_fanout;//5. 创建交换机channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);//6. 创建队列String queue1Name test_fanout_queue1;String queue2Name test_fanout_queue2;channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//7. 绑定队列和交换机/queueBind(String queue, String exchange, String routingKey)参数1. queue队列名称2. exchange交换机名称3. routingKey路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为/channel.queueBind(queue1Name,exchangeName,);channel.queueBind(queue2Name,exchangeName,);String body 日志信息张三调用了findAll方法…日志级别info…;//8. 发送消息channel.basicPublish(exchangeName,,null,body.getBytes());//9. 释放资源channel.close();connection.close();} }运行
2消费者1 package com.donglin.rabbitmq.fanout;import com.rabbitmq.client.; import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String queue1Name test_fanout_queue1;DefaultConsumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));System.out.println(将日志信息打印到控制台…..);}};channel.basicConsume(queue1Name,true,consumer);} }3消费者2 package com.donglin.rabbitmq.fanout;import com.rabbitmq.client.; import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String queue2Name test_fanout_queue2;DefaultConsumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));System.out.println(将日志信息打印到控制台…..);}};channel.basicConsume(queue2Name,true,consumer);} }3.3 测试 启动所有消费者然后使用生产者发送消息在每个消费者对应的控制台可以查看到生产者发送的所有消息到达广播的效果。 在执行完测试代码后其实到RabbitMQ的管理后台找到Exchanges选项卡点击 fanout_exchange 的交换机可以查看到如下的绑定
3.4 小结 交换机需要与队列进行绑定绑定之后一个消息可以被多个消费者都收到。 发布订阅模式与工作队列模式的区别 1、工作队列模式不用定义交换机而发布/订阅模式需要定义交换机。 2、发布/订阅模式的生产方是面向交换机发送消息工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。 3、发布/订阅模式需要设置队列和交换机的绑定工作队列模式不需要设置实际上工作队列模式会将队列绑 定到默认的交换机 。 4.Routing路由模式 4.1 模式说明 路由模式特点 队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息
图解 P生产者向Exchange发送消息发送消息时会指定一个routing key。XExchange交换机接收生产者的消息然后把消息递交给 与routing key完全匹配的队列C1消费者其所在队列指定了需要routing key 为 error 的消息C2消费者其所在队列指定了需要routing key 为 info、error、warning 的消息 4.2 代码 在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为Direct还有队列绑定交换机的时候需要指定routing key。 1生产者 package com.donglin.rabbitmq.routing;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String exchangeName test_direct;// 创建交换机channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);// 创建队列String queue1Name test_direct_queue1;String queue2Name test_direct_queue2;// 声明创建队列channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 队列绑定交换机// 队列1绑定errorchannel.queueBind(queue1Name,exchangeName,error);// 队列2绑定info error warningchannel.queueBind(queue2Name,exchangeName,info);channel.queueBind(queue2Name,exchangeName,error);channel.queueBind(queue2Name,exchangeName,warning);String message 日志信息张三调用了delete方法.错误了,日志级别warning;// 发送消息channel.basicPublish(exchangeName,warning,null,message.getBytes());System.out.println(message);channel.close();connection.close();} }运行
2消费者1 package com.donglin.rabbitmq.routing;import com.rabbitmq.client.; import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String queue1Name test_direct_queue1;DefaultConsumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));System.out.println(将日志信息打印到控制台…..);}};channel.basicConsume(queue1Name,true,consumer);} }3消费者2 package com.donglin.rabbitmq.routing;import com.rabbitmq.client.; import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String queue2Name test_direct_queue2;Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));System.out.println(将日志信息存储到数据库…..);}};channel.basicConsume(queue2Name,true,consumer);} }4.3 测试 启动所有消费者然后使用生产者发送消息在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息到达按照需要接收的效果。 4.4 小结 Routing模式要求队列在绑定交换机时要指定routing key消息会转发到符合routing key的队列。 5.Topics通配符模式 5.1 模式说明 Topic类型与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符 Routingkey 一般都是有一个或多个单词组成多个单词之间以”.”分割例如 item.insert 通配符规则 #匹配零个或多个词 *匹配不多不少恰好1个词 举例 item.#能够匹配item.insert.abc 或者 item.insert item . 只能匹配item.insert 图解 红色Queue绑定的是usa.# 因此凡是以 usa.开头的routing key 都会被匹配到黄色Queue绑定的是#.news 因此凡是以 .news结尾的 routing key 都会被匹配 5.2 代码 1生产者 使用topic类型的Exchange发送消息的routing key有3种 order.info package com.donglin.rabbitmq.topic;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String exchangeName test_topic;channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);String queue1Name test_topic_queue1;String queue2Name test_topic_queue2;channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 绑定队列和交换机/** 参数1. queue队列名称2. exchange交换机名称3. routingKey路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为/// routing key 系统的名称.日志的级别。//需求 所有error级别的日志存入数据库,所有order系统的日志存入数据库channel.queueBind(queue1Name,exchangeName,#.error);channel.queueBind(queue1Name,exchangeName,order.);channel.queueBind(queue2Name,exchangeName,.);String body 日志信息张三调用了findAll方法…日志级别info…;//发送消息goods.info,goods.errorchannel.basicPublish(exchangeName,order.info,null,body.getBytes());channel.close();connection.close();} }运行程序
2消费者1 接收两种类型的消息更新商品和删除商品 package com.donglin.rabbitmq.topic;import com.rabbitmq.client.; import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String queue1Name test_topic_queue1;DefaultConsumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));}};channel.basicConsume(queue1Name,true,consumer);} }3消费者2 接收所有类型的消息新增商品,更新商品和删除商品。 package com.donglin.rabbitmq.topic;import com.rabbitmq.client.; import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String queue2Name test_topic_queue2;DefaultConsumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));}};channel.basicConsume(queue2Name,true,consumer);} }5.3 测试 启动所有消费者然后使用生产者发送消息在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息到达按照需要接收的效果并且这些routing key可以使用通配符。
5.4 小结 Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能只是Topic在配置routing key 的时候可以使用通配符显得更加灵活。 6.模式总结 RabbitMQ工作模式 1、简单模式 HelloWorld 一个生产者、一个消费者不需要设置交换机使用默认的交换机
2、工作队列模式 Work Queue 一个生产者、多个消费者竞争关系不需要设置交换机使用默认的交换机
3、发布订阅模式 Publish/subscribe 需要设置类型为fanout的交换机并且交换机和队列进行绑定当发送消息到交换机后交换机会将消息发送到绑定的队列
4、路由模式 Routing 需要设置类型为direct的交换机交换机和队列进行绑定并且指定routing key当发送消息到交换机后交换机会根据routing key将消息发送到对应的队列
5、通配符模式 Topic 需要设置类型为topic的交换机交换机和队列进行绑定并且指定通配符方式的routing key当发送消息到交换机后交换机会根据routing key将消息发送到对应的队列