做网站首选智投未来1免费商城app

当前位置: 首页 > news >正文

做网站首选智投未来1,免费商城app,wordpress双域名,能不能自己做视频网站1. 交换机概述 前面《RabbitMQ上篇》我们使用SpringAMQP来演示如何用Java代码操作RabbitMQ#xff0c;当时采用的是生产者直接将消息发布给队列#xff0c;但是实际开发中不建议这么做#xff0c;更加推荐生产者将消息发布到交换机(exchange)#xff0c;然后由exchange路由…1. 交换机概述 前面《RabbitMQ上篇》我们使用SpringAMQP来演示如何用Java代码操作RabbitMQ当时采用的是生产者直接将消息发布给队列但是实际开发中不建议这么做更加推荐生产者将消息发布到交换机(exchange)然后由exchange路由到队列其架构如下所示 可以看出在发布-订阅模型中新增一个交换机角色此后各个角色的任务如下 publisher不再是将message直接转发到queue而是将message转发给exchangeexchange一方面接收来自publisher生产的消息另一方面依据route key以及type将消息路由给绑定的不同的队列queue与以前一样暂存消息供消费者消费另外还需要同交换机建立绑定关系consumer与以前一样订阅queue中的消息并进行业务处理消费消息 注意由于我们的exchange不暂存消息只做消息的路由因此如果没有queue与exchange绑定或者routing key设置错误就会导致消息丢失 2. 交换机类型 RabbitMQ提供的交换机类型有如下四种 Fanout Exchange扇出交换机形象来说就是广播交换机会将消息路由给所有绑定的queueDirect Exchange定向交换机基于RoutingKey发给订阅的queueTopic Exchange 通配符订阅在Direct的基础上引入通配符Headers Exchange 头匹配基于MQ的消息头匹配使用场景较少此处不讲解 2.1 Fanout Exchange 下面是Fanout Exchange的工作流程图 特征Fanout Exchange将消息路由给全部跟它绑定的queue 操作步骤 在RabbitMQ控制台中新建两个队列fanout.queue1、fanout.queue2在RabbitMQ控制台中新建一个Fanout类型的Exchangefanout.exchange 将fanout.exchange与fanout.queue1、fanout.queue2分别建立binding关系 新建两个方法用于模拟consumer分别监听fanout.queue1以及fanout.queue2队列 /*** 订阅fanout.queue1队列* param msg 消息/ RabbitListener(queues fanout.queue1) public void listenFanoutQueue1(String msg) {log.info(listener1 从【fanout.queue1】接收到消息 msg); }/** 订阅fanout.queue2队列* param msg 消息/ RabbitListener(queues fanout.queue2) public void listenFanoutQueue2(String msg) {log.info(listener2 从【fanout.queue2】接收到消息 msg); }新建一个测试类方法模拟将消息发布给fanout.exchange /** 测试FanoutExchange交换机类型/ Test public void testFanoutExchange() {// 1. 定义exchange名称String exchangeName fanout.exchange;// 2. 定义消息体String msg 震惊某大学频频被曝出食堂安全问题;// 3. 发送消息rabbitTemplate.convertAndSend(exchangeName, , msg); }观察结果 结果如上图所示说明fanout.exchange雀氏将消息广播给了所有与之绑定的queue 2.2 Direct Exchange 特点Direct Exchange要求在与queue建立binding关系的时候定义一个BindingKey之后publisher生产者携带消息的同时也会指定RoutingKey只有RoutingKey与BindingKey一致的queue才会被路由消息 工作流程如上图所示其中queue1与exchange的Binding Key为blue以及redqueue2与exchange的Binding Key为yellow以及red此时当Routing Key为blueDirect Exchange只会将消息路由给queue1 操作步骤 在RabbitMQ控制台中新建两个队列direct.queue1、direct.queue2 在RabbitMQ控制台中新建一个Direct类型的Exchangedirect.exchange 将direct.exchange与direct.queue1、direct.queue2分别建立binding关系其中与queue1的binding key为blue与red与queue2的binding key为yellow与red 新建两个方法用于模拟consumer分别监听direct.queue1以及direct.queue2队列 /** 订阅direct.queue1队列* param msg 消息/ RabbitListener(queues direct.queue1) public void listenDirectQueue1(String msg) {log.info(listener1 从【direct.queue1】接收到消息 msg); }/** 订阅direct.queue2队列* param msg 消息/ RabbitListener(queues direct.queue2) public void listenDirectQueue2(String msg) {log.info(listener2 从【direct.queue2】接收到消息 msg); }新建一个测试类方法模拟将消息发布给direct.exchange并指定routing key为blue /** 测试DirectExchange交换机类型*/ Test public void testDirectExchange() {// 1. 定义交换机名称String exchangeName direct.exchange;// 2. 定义消息体String msg 今日份消息只交给幸运色为blue的哦~;// 3. 发送消息rabbitTemplate.convertAndSend(exchangeName, blue, msg); }观察结果 结果符合预期只有direct.queue1能够接受到消息 2.3 Topic Exchange Topic Exchange与Direct Exchange非常类似都可以依据BindingKey以及RoutingKey的匹配程度进而路由给特定符合条件的queue但是Topic Exchange定义Binding Key可以为一组词中间用.进行分隔并且支持使用通配符规则如下 #匹配0个或者多个词匹配1个单词 例如现在queue1的BindingKey为china.#“而queue2的BindingKey为”#.news而RoutingKey为china.reports此时可以路由给queue1但是无法路由给queue2如果RoutingKey为china.news则queue1、queue2均可以被路由 操作步骤 在RabbitMQ控制台中新建两个队列topic.queue1、topic.queue2 在RabbitMQ控制台中新建一个Topic类型的Exchangetopic.exchange 将topic.exchange与topic.queue1、topic.queue2分别建立binding关系其中与queue1的binding key为china.#“与queue2的binding key为”#.news 新建两个方法用于模拟consumer分别监听topic.queue1以及topic.queue2队列 /** 订阅topic.queue1队列* param msg 消息/ RabbitListener(queues topic.queue1) public void listenTopicQueue1(String msg) {log.info(listener1 从【topic.queue1】接收到消息 msg); }/** 订阅topic.queue2队列* param msg 消息/ RabbitListener(queues topic.queue2) public void listenTopicQueue2(String msg) {log.info(listener2 从【topic.queue2】接收到消息 msg); }新建一个测试类方法模拟将消息发布给topic.exchange并指定routing key为china.news /** 测试TopicExchange交换机类型*/ Test public void testTopicExchange() {// 1. 定义交换机名称String exchangeName topic.exchange;// 2. 定义消息体String msg 中国新闻报快来买呀;// 3. 发送消息rabbitTemplate.convertAndSend(exchangeName, china.news, msg); }观察结果 证明通配符生效

  1. 声明队列和交换机 前面我们收发消息的过程是使用Java代码实现的但是创建Queues以及Exchanges仍然需要我们在RabbitMQ提供的控制台实现那么如何使用Java代码来创建Queue以及Exchange呢 SpringAMQP API 声明队列使用new Queue(队列名称)创建声明交换机使用new FanoutExchange(交换机名称)以FanoutExchange为例声明绑定关系使用BindingBuilder.bind(队列对象).to(交换机对象)构建 3.1 Fanout声明 步骤 编写一个配置类使用Configuration 声明内部配置Queue、Exchange、Binding并使用Bean声明 Configuration public class FanoutConfig {/*** 声明FanoutExchange交换机* return 返回FanoutExchange对象/Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(code.fanout.exchange);}/** 声明FanoutQueue队列* return 返回FanoutQueue队列/Beanpublic Queue fanoutQueue() {return new Queue(code.fanout.queue);}/** 声明绑定关系* param fanoutExchange 交换机* param fanoutQueue 队列* return 绑定关系/Beanpublic Binding fanoutBinding(FanoutExchange fanoutExchange, Queue fanoutQueue) {return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);} }3.2 Direct声明 步骤 编写一个配置类使用Configuration 声明内部配置Queue、Exchange、Binding并使用Bean声明 Configuration public class DirectConfig {/** 声明一个DirectExchange交换机* return 返回一个DirectExchange类型对象/Beanpublic DirectExchange directExchange() {return new DirectExchange(code.direct.exchange);}/** 声明一个Queue队列* return 返回一个Queue类型对象/Beanpublic Queue directQueue() {return new Queue(code.direct.queue);}/** 声明一个绑定关系* return 返回Binding对象/Beanpublic Binding directBinding(DirectExchange directExchange, Queue directQueue) {return BindingBuilder.bind(directQueue).to(directExchange).with();} }3.3 基于注解声明 注解声明格式 Component Slf4j public class AnnotateRabbitListener {RabbitListener(bindings QueueBinding(value Queue(annotate.direct.queue),key {blue, red},exchange Exchange(name annotate.direct.exchange, type ExchangeTypes.DIRECT)))public void listenAnnotateDirect(String msg) {log.info(接收到消息 msg);} }4. 消息转换器 4.1 现象演示 前面我们都是将字符串类型的数据作为消息进行传输那么如果是对象类型的消息呢我们尝试发送一个自定义User类型作为消息传输 /** 自定义User类型* author 米饭好好吃/ Data AllArgsConstructor public class User implements Serializable {private String name;private Integer age; }Test public void testSendObject() {// 1. 声明队列名称String queueName work.queue;// 2. 定义消息体User user new User(jack, 22);// 3. 发送消息rabbitTemplate.convertAndSend(queueName, user); }从RabbitMQ控制台中查看消息内容如下
    4.2 追踪源码 我们发现实际调用了convertMessageIfNecessary(object)方法我们继续追踪进去 该方法判断object是否为Message类型如果不是就调用getRequiredMessageConverter()获取所需的消息转换器继续追踪进去 该方法返回了一个SimpleMessageConverter实例对象因此我们回到上一层获取到MessageConverter实例后又调用了toMessage方法我们继续追踪进去观察是如何转换消息的 在AbstruectMessageConverter中实现了toMessage方法而createMessage方法在子类 SimpleMessageConverter重写了该方法 可以看出调用了SerialzationUtils.serialize(object)进行了序列化继续追踪观察到底是如何序列化的 可以看出是借助ObjectOutputStream进行序列化的而这这个是JDK默认的序列化方式该方式有如下缺点 序列化过程不够安全可能存在注入风险序列化结果可读性较差序列化结果占用体积较大 因此我们需要重写消息转换器中的序列化机制 4.3 自定义JSON序列化器 因此JDK原生序列化器有诸多确定因此我们需要使用自定义的JSON序列化器此处需要引入jackson-databind相关依赖 dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version /dependency/**
    消息转换器配置* author 米饭好好吃*/ Configuration public class MessageConvertConfig {Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();} }验证结果 在控制台中我们可以发现消息格式就是熟悉的JSON格式了