太原网站建设地图室内设计应届生简历

当前位置: 首页 > news >正文

太原网站建设地图,室内设计应届生简历,成都市温江建设局网站,站长百度RabbitMQ的入门及使用 一、什么是RabbitMQ#xff1f; MQ全称为Message Queue#xff0c;即消息队列。消息队列是在消息的传输过程中保存消息的容器。它是典型的#xff1a;生产者、消费者模型。生产者不断向消息队列中生产消息#xff0c;消费者不断的从队列中获取消息。…RabbitMQ的入门及使用 一、什么是RabbitMQ MQ全称为Message Queue即消息队列。消息队列是在消息的传输过程中保存消息的容器。它是典型的生产者、消费者模型。生产者不断向消息队列中生产消息消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的而且只关心消息的发送和接收没有业务逻辑的侵入这样就实现了生产者和消费者的解耦 二、RabbitMQ与Kafka的全面对比 对比项KafkaRabbitMQ开发语言scala、Javaerlang是否支持多租户2.x.x支持支持是否支持topic优先级不支持支持是否支持消息全局有序不支持支持是否支持消息分区有序支持支持是否有内置监控无有是否支持多个生产者支持支持是否支持多个消费者支持支持是否支持一个分区多个消费者不支持不支持是否支持JMX支持不支持是否支持加密支持支持消息队列协议支持仅支持自定义协议支持AMQP、MQTT、STOMP协议客户端语言支持支持支持是否支持消息追踪不支持支持是否支持消费者推模式不支持支持否支持消费者拉模式支持支持是否支持广播消息支持支持是否支持消息回溯支持消息回溯因为消息持久化消息被消费后会记录offset和timstamp不支持消息确认被消费后会被删除是否支持消息数据持久化支持支持是否支持流量控制支持支持是否支持事务性消息支持不支持元数据管理通过zookeeper进行管理支持默认服务端口90925672默认监控端口kafka web console 9000;kafka manager 9000;15672相对网络开销较小较大相对内存消耗较小较大相对cpu消耗较大较小 实际场景选择 Kafka 常作为消息传输的数据管道 优势主要体现在吞吐量上虽然可以通过策略实现数据不丢失严谨性上不如 RabbitMQ但 kafka保证每条消息最少送达一次有较小的概率会出现数据重复发送的情况 若消息吞吐量极大则Kafka RabbitMQ RabbitMQ金融场景中经常使用 常作为交易数据作为数据传输管道 具有较高的严谨性数据丢失的可能性更小具备更高的实时性和Spring是统一厂商开发后期支持比较好目前最流行的对容错性的处理比较完善 RabbitMQ 支持发布订阅、轮询分发、公平分发、重发、消息拉取 Kafka 不支持重发、事务 三、Linux上安装RabbitMQ 这次安装将RabbitMQ部署在Linux上可以在电脑本地安装一台Linux也可以购买云服务器若购买云服务器则需要在安全组内把后面要用到的端口给打开 1、使用Docker安装最简单的方法 拉取RabbitMQ 以下均为Linux指令 #拉取RabbitMQ docker pull RabbitMQ#启动rabbitmq docker run -d –hostname my-rabbit –name rabbit -p 15672:15672 -p 5672:5672 rabbitmq#查看docker目前在运行的容器是否有rabbitmq docker ps2、图形化安装插件 #进入运行中的容器 docker exec -it 镜像ID /bin/bash#rabbitmq图形化安装插件 rabbitmq-plugins enable rabbitmq_management3、WEB页面开启资源监控 #进入容器 docker exec -it rabbitmq /bin/bash#切到对应目录 cd /etc/rabbitmq/conf.d/#修改 management_agent.disable_metrics_collector false echo management_agent.disable_metrics_collector false management_agent.disable_metrics_collector.conf#退出容器 exit#重启容器 docker restart rabbitmq然后在浏览器内输入服务器地址端口15672即可进入WEB管理页面默认账号密码均为guest 四、Docker配置RabbitMQ集群 先停止在运行的MQ docker stop 运行容器ID#启动三个容器 docker run -d –hostname rabbitmq01 –name rabbitmqCluster01 -p 15672:15672 -p 5672:5672 -p 1883:1883 -e RABBITMQ_ERLANG_COOKIErabbitmqCookie rabbitmqdocker run -d –hostname rabbitmq02 –name rabbitmqCluster02 -p 15673:15672 -p 5673:5672 -p 1884:1883 -e RABBITMQ_ERLANG_COOKIErabbitmqCookie –link rabbitmqCluster01:rabbitmq01 rabbitmqdocker run -d –hostname rabbitmq03 –name rabbitmqCluster03 -p 15674:15672 -p 5674:5672 -p 1885:1883 -e RABBITMQ_ERLANG_COOKIErabbitmqCookie –link rabbitmqCluster01:rabbitmq01 –link rabbitmqCluster02:rabbitmq02 rabbitmq#Erlang Cookie 值必须相同也就是一个集群内 RABBITMQ_ERLANG_COOKIE 参数的值必须相同。因为 RabbitMQ 是用Erlang实现的Erlang Cookie 相当于不同节点之间通讯的密钥Erlang节点通过交换 Erlang Cookie 获得认证#进入第二个容器 docker exec -it rabbitmqCluster02 bash rabbitmqctl stop_app rabbitmqctl reset#加入集群 rabbitmqctl join_cluster –ram rabbitrabbitmq01 rabbitmqctl start_app exit#进入第三个容器 docker exec -it rabbitmqCluster03 bash rabbitmqctl stop_app rabbitmqctl reset#加入集群 rabbitmqctl join_cluster –ram rabbitrabbitmq01 rabbitmqctl start_app exit#主要参数 -p 15672:15672 management 界面管理访问端口 -p 5672:5672 amqp 访问端口 -p 1883:1883 mqtt 访问端口然后依次运行2、3的步骤安装插件和开启资源监控 上诉15672、15673、15674端口均需要在服务器安全组内开启 完成后即可的WEB页面看到MQ的集群 五、SpringBoot整合使用RabbitMQ 依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId /dependencyyml文件的配置 spring:rabbitmq:username: guestpassword: guestvirtual-host: /host: 通道运行的地址port: 5672生产者配置类Config 模拟美团外卖下单正常下单后若马上被接单则被这条信息直接消费若5秒内没人接单该订单消息将进入加急派单队列死信队列 所以我们需要两台交换机和两个通道 Configuration public class mtRabbitConfig {//正常通道的交换机Beanpublic FanoutExchange mtExchange(){return new FanoutExchange(mt_fanout_exchange,true,false);}//死信通道的交换机Beanpublic FanoutExchange mtDeadExchange(){return new FanoutExchange(mt_fanout_dead_exchange,true,false);}//正常通道给消息设计过期时间超过该时间未被消费则进入指定的mt_fanout_dead_exchangeBeanpublic Queue mtQueue(){MapString,Object args new HashMap();args.put(x-message-ttl,5000);args.put(x-dead-letter-exchange,mt_fanout_dead_exchange);return new Queue(mt_queue,true,false,false,args);}Beanpublic Queue mtDaedQueue(){return new Queue(mt_dead_queue,true,false,false);}//交换机与通道绑定Beanpublic Binding mtBinding(){return BindingBuilder.bind(mtQueue()).to(mtExchange());}Beanpublic Binding mtDeadBinding(){return BindingBuilder.bind(mtDaedQueue()).to(mtDeadExchange());} }生产者模拟用户下单 Autowiredprivate RabbitTemplate rabbitTemplate;//模拟美团订单下单若5秒不接单消费,则进入死信队列加急派单public void mtTakeOutOrder(String name, String food, String number) {UUID takeOutId UUID.randomUUID();String orderTime DateFormat.getDateTimeInstance().format(new Date());String exchangeName mt_fanout_exchange;String takeOutMes 美团订单编号 takeOutId orderTime name food number;String routingKey ;//将消息放入通道内Object result rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, takeOutMes);System.out.println(配送中心响应result);}测试类 Testvoid mtTakeOutOrder() throws InterruptedException {takeOutOrder.mtTakeOutOrder(小张i, 麻辣烫, 10086);}消费者外卖接单中心 正常接单消费者 RabbitListener(bindings QueueBinding(value Queue(value mt_queue, autoDelete false),exchange Exchange(value mt_fanout_exchange, type ExchangeTypes.FANOUT))) Component public class mtTakeOutDelivery {RabbitHandlerpublic String buyTrainTickets(String message) {System.out.println(正常美团外卖订单已接单 message);return 配送中心已接单;} }加急接单消费者死信队列内的消息 RabbitListener(bindings QueueBinding(value Queue(value mt_dead_queue, autoDelete false),exchange Exchange(value mt_fanout_dead_exchange, type ExchangeTypes.FANOUT))) Component public class mtDeadTakeOutDelivery {RabbitHandlerpublic String buyTrainTickets(String message) {System.out.println(加急饿了么外卖订单已接单 message);return 配送中心已接单;} }消费者运行后只要监听的两个通道内有消息就会被消费 六、RabbitMQ的手动ACK 为了确保消息不会丢失RabbitMQ支持消息应答。消费者发送一个消息应答告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。 自动应答就是上面的案例只要被消费者取出通道内就会删除这个消息万一这个消息在消费者那边处理异常因为通道里已经没用这条消息了就会出现消息丢失。所以在有些场景需要改为手动应答ACK就是消费者把这条消息确认处理完毕后再告诉通道删除消息若异常这条消息将返回通道内可以重新处理这就是手动应答。 还是一样来配置两个通道 生产者配置 Configuration public class TestQueueConfig {Beanpublic FanoutExchange TestExchange() {return new FanoutExchange(test_exchange, true, false);}Beanpublic FanoutExchange TestDeadExchange() {return new FanoutExchange(test_dead_exchange, true, false);}Beanpublic Queue TestDeadQueue() {return new Queue(test_dead_queue, true, false, false);}Beanpublic Queue TestQueue() {MapString, Object args new HashMap();//20秒钟未消费转到死信队列args.put(x-message-ttl, 5000);args.put(x-dead-letter-exchange, test_dead_exchange);return new Queue(test_queue, true, false, false, args);}Beanpublic Binding TestBinding() {return BindingBuilder.bind(TestQueue()).to(TestExchange());}Beanpublic Binding TestDeadBinding() {return BindingBuilder.bind(TestDeadQueue()).to(TestDeadExchange());} } 生产者业务代码不变与上个案例一致也行 消费者配置类 Configuration public class MyselfReceiverConfig {Autowiredprivate CachingConnectionFactory cachingConnectionFactory;Autowiredprivate MyselfReceiver myselfReceiver;Autowiredprivate MyselfDeadReceiver myselfDeadReceiver;Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(){SimpleMessageListenerContainer container new SimpleMessageListenerContainer(cachingConnectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(10);//手动确认container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setQueueNames(test_queue);container.setMessageListener(myselfReceiver);return container;}Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer_Dead(){SimpleMessageListenerContainer container new SimpleMessageListenerContainer(cachingConnectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(10);//手动确认container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setQueueNames(test_dead_queue);container.setMessageListener(myselfDeadReceiver);return container;} }正常消费者 Component public class MyselfReceiver implements ChannelAwareMessageListener {Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try {byte[] bytes message.getBody();String mes new String(bytes);String substring mes.replace(\\,);System.out.println(正常通道内消息substring);//业务主体//若业务处理无异常则回复通道删除消息channel.basicAck(deliveryTag,true);}catch (Exception e){channel.basicReject(deliveryTag,true);}} }死信通道消费者 Component public class MyselfDeadReceiver implements ChannelAwareMessageListener {Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try {byte[] bytes message.getBody();String mes new String(bytes);String result mes.replace(\\, );System.out.println(死信通道内消息 result);//业务主体//若业务处理无异常则回复通道删除消息channel.basicAck(deliveryTag, true);} catch (Exception e) {//有异常把消息返回通道channel.basicReject(deliveryTag, true);}} }这样就完成了手动ACK若消费者处理没有异常将使用channel.basicAck(deliveryTag, true); 若出现了异常将使用channel.basicReject(deliveryTag, true);此消息将重新进入通道内这也确保了未消费成功的消息不会出现丢失的情况。