网站推广互联网推广跨境电子商务主要学什么
- 作者: 五速梦信息网
- 时间: 2026年03月21日 07:24
当前位置: 首页 > news >正文
网站推广互联网推广,跨境电子商务主要学什么,源码下载网站,wordpress前台显示异常后台进不去66 消息队列 基础概念 参考资料#xff1a;消息队列MQ快速入门#xff08;概念、RPC、MQ实质思路、队列介绍、队列对比、应用场景#xff09; 消息队列就是一个使用队列来通信的组件#xff1b;为什么需要消息队列#xff1f; 在实际的商业项目中#xff0c;它这么做肯…66 消息队列 基础概念 参考资料消息队列MQ快速入门概念、RPC、MQ实质思路、队列介绍、队列对比、应用场景 消息队列就是一个使用队列来通信的组件为什么需要消息队列 在实际的商业项目中它这么做肯定是有道理的。那么没有引入消息队列之前服务存在哪些问题呢就拿支付服务来说你提交了一个支付订单后后台需要进行扣库存、扣款、短信通知等等你需要等待后台把所有该做的做完了才能知道自己有没有购买成功用户等等时间过长后台请求链太多很多业务不需要马上做完比如短信通知等等这些响应速度对于业务来说无关紧要所以就提出了异步处理。 异步处理就是指我现在不做这个工作我把这个工作丢给箱子里有人会来这个箱子里找属于它的工作我丢完我的工作就做完了就可以给用户响应了解耦异步提高性能。 应用在服务解耦、流量控制有好处也有坏处坏处就是服务的稳定性降低人多就不好控制系统也一样。 消息队列具有两种模型队列模型和发布/订阅模型。这两个模型简单的来说就是队列模型即一条消息只能被一个消费者消费、发布订阅模型即一条消息可以被多个消费者消费。 其设计模式就是一发一存一消费生产者——消费者模型。 五种队列 简单队列 一言以蔽之简单队列——一个消息对应一个消费者 工作队列 一个生产者对应多个消费者但是只能有一个消费者获得消息 竞争消费者模式。 这条消息具体会被哪个消费者消费事先并不知。 如何分发消息使之最大限度的发挥每一个消费者的效率——负载均衡。 发布/订阅模型 一个消费者将消息首先发送到交换器交换器绑定到多个队列然后被监听该队列的消费者所接收并消费。 ps:X表示交换器在RabbitMQ中交换器主要有四种类型:direct、fanout、topic、headers这里的交换器是 fanout。下面我们会详细介绍这几种交换器。 路由模式 生产者将消息发送到direct交换器在绑定队列和交换器的时候有一个路由key生产者发送的消息会指定一个路由key那么消息只会发送到相应key相同的队列接着监听该队列的消费者消费消息。 也就是让消费者有选择性的接收消息。 主题模式 上面的路由模式是根据路由key进行完整的匹配完全相等才发送消息这里的通配符模式通俗的来讲就是模糊匹配。 符号“#”表示匹配一个或多个词符号“”表示匹配一个词。 交换机 前面五种队列模式介绍完了但是实际上只有三种第一种简单队列第二种工作模式剩下的三种都是和交换器绑定的合起来称为一种这小节我们就来详细介绍交换器。 交换器分为四种分别是direct、fanout、topic和 headers。 前面三种分别对应路由模式、发布订阅模式和通配符模式headers 交换器允许匹配 AMQP 消息的 header 而非路由键除此之外header 交换器和 direct 交换器完全一致但是性能却差很多因此基本上不会用到该交换器这里也不详细介绍。 ①、direct 如果路由键完全匹配的话消息才会被投放到相应的队列。 ②、fanout 当发送一条消息到fanout交换器上时它会把消息投放到所有附加在此交换器上的队列。 ③、topic 设置模糊的绑定方式“”操作符将“.”视为分隔符匹配单个字符“#”操作符没有分块的概念它将任意“.”均视为关键字的匹配部分能够匹配多个字符。 常用6种消息队列介绍和对比 RabbitMQ RabbitMQ是流行的开源消息队列系统用erlang语言开发。RabbitMQ是AMQP高级消息队列协议的标准实现。支持多种客户端如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等支持AJAX持久化。用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。 ActiveMQ ActiveMQ 是Apache出品最流行的能力强劲的开源消息总线。对Spring的支持ActiveMQ可以很容易内嵌到使用Spring的系统里面去而且也支持Spring2.0的特性 ZeroMQ 号称史上最快的消息队列它实际类似于Socket的一系列接口他跟Socket的区别是普通的socket是端到端的1:1的关系而ZMQ却是可以NM 的关系人们对BSD套接字的了解较多的是点对点的连接点对点连接需要显式地建立连接、销毁连接、选择协议TCP/UDP和处理错误等而ZMQ屏蔽了这些细节让你的网络编程更为简单。ZMQ用于node与node间的通信node可以是主机或者是进程。 Kafka 特征 分布式消息发布订阅系统其分区特性、可复制和可容错都是其不错的特性快速持久化可在O(1)的系统开销下进行消息持久化高吞吐在一台普通的服务器上就可以达到10W/s的吞吐率完全的分布式系统Broker、Producer、Consumer都原生自动支持分布式、自动实现负载均衡支持同步和异步复制两种支持数据批量发送和拉取zero-copy减少IO操作步骤数据迁移、扩容对用户透明无需停机即可扩展机器其他特性严格的消息顺序、丰富的消息拉取机制、高效订阅者水平扩展、实时的消息订阅、亿级的消息堆积能力、定期删除机制 优点 客户端语言丰富性能卓越、单机写入TPS约在百万级/秒消息大小为10个字节提供完全分布式架构并有replica机制拥有较高的可用性和可靠性理论上支持消息无限堆积支持批量操作消费者采用Pull方式获取消息消息有序通过控制能够保证所有消息被消费且仅被消费一次有优秀的第三方Kafka Web管理界面kafka-Manager在日志领域比较成熟被多家公司和多个开源项目使用 缺点 Kafka单机超过64个队列/分区Load会发生明显的飙高现象队列越多load越高发送消息响应时间变长使用短轮询方式实时性取决于轮询间隔时间消费失败不支持重试支持消息顺序但是一台代理宕机后就会产生消息乱序 RocketMQ 特征 具有高性能、高可靠、高实时、分布式特点Producer、Consumer、队列都可以分布式Producer向一些队列轮流发送消息队列集合称为TopicConsumer如果做广播消费则一个consumer实例消费这个Topic对应的所有队列如果做集群消费则多个Consumer实例平均消费这个topic对应的队列集合能够保证严格的消息顺序提供丰富的消息拉取模式高效的订阅者水平扩展能力实时的消息订阅机制亿级消息堆积能力较少的依赖可以运行在Java语言所支持的平台之上 优点 单机支持1万以上持久化队列所有消息都是持久化的先写入系统PageCache然后刷盘可以保证内存和磁盘都有一份数据访问时直接从内存取模型简单接口易用性能优越可以大量堆积消息在broker中支持多种消费包括集群消费广播消费等各个环节分布式扩展设计开发都较活跃版本更新快 缺点 没有web管理界面提供了一个CLI管理工具来查询、管理和诊断各种问题没有在MQ核心去实现JMS等接口 选择 ActiveMQ最早的时候大家都用但现在用的不是很多了没经过大规模吞吐量场景的验证社区不是很活跃主流上不选择这个RabbitMQ较为成熟一些在可用性、稳定性、可靠性上RabbitMQ都要超过kafka综合性能不错但是erlang语言阻止了大量java工程师深入研究且不支持事务消息吞吐能力有限Kafka的性能是比RabbitMQ要更强的RabbitMQ在有大量的消息堆积时性能会下降而Kafka不会但是Kafka的设计初衷是处理日志的可以看做一个日志系统针对性非常强没有具备一个成熟MQ应该具备的特性它还是个孩子啊RocketMQ的思路起源于Kafka,但它对消息的可靠传输及事务性做了优化适合一些大规模的分布式系统应用但是生态不够成熟会有黄掉的风险ZeroMQ只是一个网络编程的Pattern库将常见的网络请求形式分组管理、链接管理、发布订阅等进行模式化、组件化。简单来说就是在socket之上、MQ之下。使用ZeroMQ的话需要对自己的业务代码进行改造不利于服务解耦 基于Netty实现huiMQ自定义消息队列 Netty服务端将会作为huiMQ这里先搭一个Netty服务端与Netty客户端具体的代码请参考68 Netty 具体项目代码请查看点击查看 生产者 发送消息队列 所有需要发送的消息都会被统一存在一个队列中然后由一个线程来对这个队列中的消息发送到Netty服务端中。但是这里会存在消息生产者发送到消息队列时失败从而导致消息丢失所以为了保证消息不丢失在该线程发送后会把这条消息暂时存放在一个Map中等到消息队列发送确认响应后才会把这条消息消除如果超过2秒还未回复就会把这个消息重新放回待发送消息队列中同时把重传次数加一如果重传次数大于5次就会写入日志。 我们待发送消息队列选择BlockingQueue该队列是线程安全的阻塞队列BlockingQueue 是一个支持两个附加操作的队列。这两个附加的操作是在队列为空时获取元素的线程会等待队列变为非空。当队列满时存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景生产者是往队列里添加元素的线程消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器而消费者也只从容器里拿元素。 // 采用阻塞队列保证线程的安全 以便免多线程的情况下导致数据丢失 同时阻塞队列也可以当队列满的时候阻塞线程让其之后再重新添加运行private static final BlockingQueueMessageBase.Message blockingQueue new ArrayBlockingQueueMessageBase.Message(1024);// 等待确认的消息Mapprivate static final ConcurrentHashMapString, MessageBase.Message waitAckMessageMap new ConcurrentHashMap();// 等待确认消息超时Mapprivate static final ConcurrentHashMapString, LocalDateTime messageTimeOutMap new ConcurrentHashMap();// 超时private static final Long timeout 2000L;// 最多超时重传次数private static final Integer maxRetries 5;private static SocketChannel socketChannel;发送消息线程 /**** Description 构建一个线程来一直对队列中的消息发送到Netty服务端* return {link }* Author yaoHui* Date 2024/10/11/private void sendMessageByThread(){new Thread(() - {while(true){try{log.info(sendMessageByThread ready);MessageBase.Message message blockingQueue.take();log.info(sendMessageByThread working);sendMessage(message);} catch (Exception e) {log.error(sendMessageByThread is interrupt 发送消息线程被中断);Thread.currentThread().interrupt();break;}}}).start();}/**** Description 线程调用 让消息发送到Netty服务端的具体实现逻辑 param message* return {link }* Author yaoHui* Date 2024/10/11/private void sendMessage(MessageBase.Message message){// 超过最大重传次数if(message.getRetryCount() maxRetries){log.error(消息传输失败次数超过5次 message.toString());}else{if (socketChannel.isActive()){socketChannel.writeAndFlush(message);waitAckMessageMap.put(message.getRequestId(),message);messageTimeOutMap.put(message.getRequestId(),LocalDateTime.now().plusSeconds(timeout));}else{log.info(Netty连接失败请重试);addMessageBlockingQueue(message);}}}确认消息定时任务 /**** Description 将超时的消息重新加入队列中重新进行发送 return {link }* Author yaoHui* Date 2024/10/11/private void messageRetryThread(){ScheduledExecutorService scheduledExecutorService new ScheduledThreadPoolExecutor(1);Runnable task () - {log.info(messageRetryThread working);for (Map.EntryString, MessageBase.Message entry : waitAckMessageMap.entrySet()){MessageBase.Message message entry.getValue();String key entry.getKey();LocalDateTime time messageTimeOutMap.get(key);if(time.isBefore(LocalDateTime.now())){MessageBase.Message newMessage MessageBase.Message.newBuilder().setRequestId(message.getRequestId()).setCmd(message.getCmd()).setContent(message.getContent()).setRetryCount(message.getRetryCount()1).setUrlPath(message.getUrlPath()).build();addMessageBlockingQueue(newMessage);messageTimeOutMap.remove(key);waitAckMessageMap.remove(key);}}};// 安排任务在延迟2秒后开始执行之后每隔3秒执行一次scheduledExecutorService.scheduleAtFixedRate(task,2,3,TimeUnit.SECONDS);}接收Netty服务端ACK消息 Slf4j public class NettyClientHandler extends ChannelInboundHandlerAdapter {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof MessageBase.Message) {MessageBase.Message message (MessageBase.Message) msg;if(message.getCmd() MessageBase.Message.CommandType.ACK){SendMessageThread.getAckAndRemoveMessage(message.getRequestId());log.info(收到ACK message.getRequestId());}else{System.out.println(Received response from server:);System.out.println(ID: message.getRequestId());System.out.println(Content: message.getContent());}} else {System.err.println(Received an unknown message type: msg.getClass().getName());}}Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info(客户端连接成功);}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error(客户端发生异常, cause);ctx.close();} }/**** Description 根据RequestId来消除待确认中的消息 param key* return {link }* Author yaoHui* Date 2024/10/11/public static void getAckAndRemoveMessage(String key){messageTimeOutMap.remove(key);waitAckMessageMap.remove(key);}全部代码 更详细的代码请查看点击查看 NettyClient Component Slf4j public class NettyClient {private static final EventLoopGroup group new NioEventLoopGroup();private static final Integer port 54021;private static final String host localhost;private static SocketChannel socketChannel;private static SendMessageThread sendMessageThread new SendMessageThread();/**** Description 添加消息到阻塞队列中 为消息生产者调用 param message* return {link }* Author yaoHui* Date 2024/10/11/public void addMessageBlockingQueue(MessageBase.Message message){if(!socketChannel.isActive()){this.start();}sendMessageThread.addMessageBlockingQueue(message);}/**** Description 该方法提供获取SocketChannel 暂时无用 return {link SocketChannel }* Author yaoHui* Date 2024/10/11/public static SocketChannel getSocketChannel() throws Exception {if (!socketChannel.isActive()) {return socketChannel; // socketChannel socketChannel1;}return socketChannel;}/**** Description 连接断开重新连接 return {link SocketChannel }* Author yaoHui* Date 2024/10/11/private static SocketChannel retryConnect(){Bootstrap bootstrap new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).remoteAddress(host, port).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()// 空闲检测.addLast(new IdleStateHandler(60, 60, 60)) // 60秒写空闲30秒读空闲.addLast(new HeartbeatHandler()).addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance())).addLast(new ProtobufVarint32LengthFieldPrepender()).addLast(new ProtobufEncoder()).addLast(new NettyClientHandler()); // 自定义处理器}});ChannelFuture future bootstrap.connect();if (future.isSuccess()){return (SocketChannel) future.channel();}else{return null;}}/**** Description Netty客户端启动函数 调用Start可以启动对Netty服务端的连接 return {link }* Author yaoHui* Date 2024/10/11/PostConstructprivate void start(){Bootstrap bootstrap new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).remoteAddress(host, port).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()// 空闲检测.addLast(new IdleStateHandler(60, 60, 60)) // 60秒写空闲30秒读空闲.addLast(new HeartbeatHandler()).addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance())).addLast(new ProtobufVarint32LengthFieldPrepender()).addLast(new ProtobufEncoder()).addLast(new NettyClientHandler()); // 自定义处理器}});ChannelFuture future bootstrap.connect();//客户端断线重连逻辑future.addListener((ChannelFutureListener) future1 - {if (future1.isSuccess()) {log.info(连接Netty服务端成功);} else {log.info(连接失败进行断线重连);future1.channel().eventLoop().schedule(this::start, 10, TimeUnit.SECONDS);}});socketChannel (SocketChannel) future.channel();sendMessageThread.setSocketChannel(socketChannel);}} SendMessageThread package com.fang.screw.client.Thread;import com.fang.screw.client.component.NettyClient; import com.fang.screw.client.protocol.MessageBase; import io.netty.channel.socket.SocketChannel; import lombok.extern.slf4j.Slf4j;import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; import java.util.concurrent.;/*** FileName SendMessageThread* Description* Author yaoHui* date 2024-10-11/ Slf4j public class SendMessageThread {// 采用阻塞队列保证线程的安全 以便免多线程的情况下导致数据丢失 同时阻塞队列也可以当队列满的时候阻塞线程让其之后再重新添加运行private static final BlockingQueueMessageBase.Message blockingQueue new ArrayBlockingQueueMessageBase.Message(1024);// 等待确认的消息Mapprivate static final ConcurrentHashMapString, MessageBase.Message waitAckMessageMap new ConcurrentHashMap();// 等待确认消息超时Mapprivate static final ConcurrentHashMapString, LocalDateTime messageTimeOutMap new ConcurrentHashMap();// 超时private static final Long timeout 2000L;// 最多超时重传次数private static final Integer maxRetries 5;private static SocketChannel socketChannel;public SendMessageThread(){sendMessageByThread();messageRetryThread();}public SendMessageThread(SocketChannel socketChannel1){socketChannel socketChannel1;}public void setSocketChannel(SocketChannel socketChannel1){socketChannel socketChannel1;}public void addMessageBlockingQueue(MessageBase.Message message){blockingQueue.add(message);}/** Description 构建一个线程来一直对队列中的消息发送到Netty服务端* return {link }* Author yaoHui* Date 2024/10/11/private void sendMessageByThread(){new Thread(() - {while(true){try{log.info(sendMessageByThread ready);MessageBase.Message message blockingQueue.take();log.info(sendMessageByThread working);sendMessage(message);} catch (Exception e) {log.error(sendMessageByThread is interrupt 发送消息线程被中断);Thread.currentThread().interrupt();break;}}}).start();}/**** Description 线程调用 让消息发送到Netty服务端的具体实现逻辑 param message* return {link }* Author yaoHui* Date 2024/10/11/private void sendMessage(MessageBase.Message message){// 超过最大重传次数if(message.getRetryCount() maxRetries){log.error(消息传输失败次数超过5次 message.toString());}else{if (socketChannel.isActive()){socketChannel.writeAndFlush(message);}else{log.info(Netty连接失败请重试); // socketChannel NettyClient.getSocketChannel();addMessageBlockingQueue(message);}}}/**** Description 将超时的消息重新加入队列中重新进行发送 return {link }* Author yaoHui* Date 2024/10/11/private void messageRetryThread(){ScheduledExecutorService scheduledExecutorService new ScheduledThreadPoolExecutor(1);Runnable task () - {log.info(messageRetryThread working);for (Map.EntryString, MessageBase.Message entry : waitAckMessageMap.entrySet()){MessageBase.Message message entry.getValue();String key entry.getKey();LocalDateTime time messageTimeOutMap.get(key);if(time.isBefore(LocalDateTime.now())){MessageBase.Message newMessage MessageBase.Message.newBuilder().setRequestId(message.getRequestId()).setCmd(message.getCmd()).setContent(message.getContent()).setRetryCount(message.getRetryCount()1).setUrlPath(message.getUrlPath()).build();addMessageBlockingQueue(newMessage);messageTimeOutMap.remove(key);waitAckMessageMap.remove(key);}}};// 安排任务在延迟2秒后开始执行之后每隔3秒执行一次scheduledExecutorService.scheduleAtFixedRate(task,2,3,TimeUnit.SECONDS);}/**** Description 根据RequestId来消除待确认中的消息 param key* return {link }* Author yaoHui* Date 2024/10/11/public static void getAckAndRemoveMessage(String key){messageTimeOutMap.remove(key);waitAckMessageMap.remove(key);}} huiMQ 保存消息 /**** Description 保存消息到消息队列中 并且保存到MySQL数据库持久化 注意这里是一个事务 要么都成功要么都别成功 param message* return {link boolean }* Author yaoHui* Date 2024/10/12/Transactional(rollbackFor Exception.class)public boolean saveMessage(MessageBase.Message message){try{// 保存到消息队列中if(!messageQueueMap.containsKey(message.getChannel())){BlockingQueueMessageBase.Message messageBlockingQueue new ArrayBlockingQueueMessageBase.Message(1024);messageQueueMap.put(message.getChannel(),messageBlockingQueue);}messageQueueMap.get(message.getChannel()).add(message);// 保存到MySQL数据库中if(!messageQueueMapper.exists(Wrappers.MessageQueuePOlambdaQuery().eq(MessageQueuePO::getRequestId,message.getRequestId()).eq(MessageQueuePO::getDelFlag,0))){MessageQueuePO messageQueuePO new MessageQueuePO();messageQueuePO.setRequestId(message.getRequestId());messageQueuePO.setCmd(message.getCmdValue());messageQueuePO.setContent(message.getContent());messageQueuePO.setUrlPath(message.getUrlPath());messageQueueMapper.insert(messageQueuePO);log.info(HuiMQ保存消息 messageQueuePO.toString());}}catch (Exception e){log.info(保存消息至消息队列错误);e.printStackTrace();return false;}return true;}收到消费端发来请求消息 // 消费端请求发送消息// 检查是否有超时的消息 如果有则将其重新放置于待重传的消息队列中HuiMessageQueue.checkTimeOutMessage();if(HuiMessageQueue.messageQueueMap.containsKey(message.getChannel())){BlockingQueueMessageBase.Message queue HuiMessageQueue.messageQueueMap.get(message.getChannel());while(!queue.isEmpty()){MessageBase.Message sendMessage queue.take();log.info(HuiMQ向消费者发送消息 sendMessage.toString());ctx.writeAndFlush(sendMessage);// 将消费列为带确认消息HuiMessageQueue.waitAckMessageMap.put(sendMessage.getRequestId(),sendMessage);HuiMessageQueue.messageTimeOutMap.put(sendMessage.getRequestId(), LocalDateTime.now().plusSeconds(2L));}}/**** Description 检查是否有超时没有收到确认消息的消息 将其重新放置在待发送消息队列中 return {link }* Author yaoHui* Date 2024/10/13/public static void checkTimeOutMessage(){for(Map.EntryString,LocalDateTime mapEntry : messageTimeOutMap.entrySet()){LocalDateTime time mapEntry.getValue();if(time.isBefore(LocalDateTime.now())){String s mapEntry.getKey();MessageBase.Message message waitAckMessageMap.get(s);waitAckMessageMap.remove(s);messageTimeOutMap.remove(s);// 保存到消息队列中if(!messageQueueMap.containsKey(message.getChannel())){BlockingQueueMessageBase.Message messageBlockingQueue new ArrayBlockingQueueMessageBase.Message(1024);messageQueueMap.put(message.getChannel(),messageBlockingQueue);}messageQueueMap.get(message.getChannel()).add(message);}}}收到消费端ACK消息 log.info(收到消费端发来的ACK报文 message.getRequestId()); HuiMessageQueue.setMessageAck(message.getRequestId());/**** Description param requestId* return {link }* Author yaoHui* Date 2024/10/13/public static void setMessageAck(String requestId){messageTimeOutMap.remove(requestId);waitAckMessageMap.remove(requestId);}消费者 标记需要监听消息方法 /** FileName HuiListener* Description HuiMQ监听注解* Author yaoHui* date 2024-10-12/ Target(value ElementType.METHOD) Retention(RetentionPolicy.RUNTIME) public interface HuiListener {String queueName(); }获取所有被HuiListener注解标记的方法 通过BeanPostProcessor来获取所有被指定注解标记的方法BeanPostProcessor会在每个Bean初始化前后调用分别为postProcessBeforeInitialization和postProcessAfterInitialization。 这里会将所有被HuiListener标记的方法和Bean注册到huiListenerRegistry中为了方便之后通过反射的方式来直接运行指定方法。 /* FileName HuiListenerAnnotationBeanPostProcessor* Description* Author yaoHui* date 2024-10-12/ Component public class HuiListenerAnnotationBeanPostProcessor implements BeanPostProcessor, InitializingBean {private static final HuiListenerRegistry huiListenerRegistry new HuiListenerRegistry();public static boolean huiListenerFlag false;/** Description 在 bean 的初始化方法如 PostConstruct 注解的方法或 init-method 指定的方法之前调用。* param bean* param beanName* return {link Object }* Author yaoHui* Date 2024/10/12/Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {return BeanPostProcessor.super.postProcessBeforeInitialization(bean, beanName);}/**** Description 在 bean 的初始化方法之后调用。查看当前的bean是否存在被HuiListener注解过的方法 param bean* param beanName* return {link Object }* Author yaoHui* Date 2024/10/12/Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Method[] methods bean.getClass().getMethods();for(Method method : methods){if(method.isAnnotationPresent(HuiListener.class)){processHuiListener(method,bean);huiListenerFlag true;}}return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);}private void processHuiListener(Method method,Object bean){HuiListener huiListener method.getAnnotation(HuiListener.class);HuiListenerEndpoint huiListenerEndpoint new HuiListenerEndpoint();huiListenerEndpoint.setBean(bean);huiListenerEndpoint.setMethod(method);huiListenerRegistry.registerListenerEndpoint(huiListener.queueName(),huiListenerEndpoint);}Overridepublic void afterPropertiesSet() throws Exception {} }/** FileName HuiListenerEndpoint* Description* Author yaoHui* date 2024-10-12/ Data public class HuiListenerEndpoint {private Method method;private Object bean; }/* FileName HuiListenerRegistry* Description* Author yaoHui* date 2024-10-12/ Slf4j public class HuiListenerRegistry {public static final ConcurrentHashMapString,HuiListenerEndpoint huiListenerEndpointConcurrentHashMap new ConcurrentHashMap();/** Description 添加HuiListener监听的方法* param queueName* param huiListenerEndpoint* return {link }* Author yaoHui* Date 2024/10/12/public void registerListenerEndpoint(String queueName,HuiListenerEndpoint huiListenerEndpoint){huiListenerEndpointConcurrentHashMap.put(queueName,huiListenerEndpoint);}/**** Description 消费者处理收到消息的主要逻辑 param message* return {link boolean }* Author yaoHui* Date 2024/10/12/public boolean handleMessage(MessageBase.Message message){HuiListenerEndpoint huiListenerEndpoint huiListenerEndpointConcurrentHashMap.get(message.getChannel());if(ObjectUtils.isEmpty(huiListenerEndpoint)){log.info(消息无对应Channel消费 message.getChannel());return true;}Method method huiListenerEndpoint.getMethod();Object bean huiListenerEndpoint.getBean();try{Class?[] classes method.getParameterTypes();method.invoke(bean,JSON.parseObject(message.getContent(),classes[0]));}catch (Exception e){log.info(消息消费异常);e.printStackTrace();return false;}return true;} }定期发送请求消息 /** FileName getMessageThread* Description* Author yaoHui* date 2024-10-12/ Slf4j public class GetMessageThread {private NettyClient nettyClient;// private static HuiListenerRegistry huiListenerRegistry new HuiListenerRegistry();public GetMessageThread(NettyClient nettyClient){this.nettyClient nettyClient;regularGetMessage();}/** Description 定时发送是否存在消息* return {link }* Author yaoHui* Date 2024/10/12/private void regularGetMessage(){ScheduledExecutorService scheduledExecutorService new ScheduledThreadPoolExecutor(1);Runnable task () - { // log.info(regularGetMessage is running);SetString channel HuiListenerRegistry.huiListenerEndpointConcurrentHashMap.keySet();for(String s : channel){MessageBase.Message message MessageBase.Message.newBuilder().setCmd(MessageBase.Message.CommandType.SEND_MESSAGE).setChannel(s).build();nettyClient.sendMessage(message);}};scheduledExecutorService.scheduleAtFixedRate(task,2,3, TimeUnit.SECONDS);}} 消费端启动定期请求消息 如果这个服务中有被HuiListener注解标记的方法就会启用这个方法SmartInitializingSingleton会在所有的Bean初始化之后运行。 /** FileName MyBeanInjector* Description* Author yaoHui* date 2024-10-12/ Component Slf4j public class MyBeanInjector implements SmartInitializingSingleton {private final NettyClient nettyClient;public MyBeanInjector(NettyClient nettyClient){this.nettyClient nettyClient;}Overridepublic void afterSingletonsInstantiated() {log.info(GetMessageThread is ready);if (HuiListenerAnnotationBeanPostProcessor.huiListenerFlag){GetMessageThread getMessageThread new GetMessageThread(nettyClient);log.info(GetMessageThread is running);}} }使用HuiMQ /* FileName ReceiveHuiMessage* Description* Author yaoHui* date 2024-10-12**/ Component Slf4j public class ReceiveHuiMessage {HuiListener(queueName queue)public void noticeUserHaveComment(CommentVO commentVO){log.info(Chat模块接收到消息 commentVO.toString());}}进阶 如何保证消息不丢失 对于生产者来说需要做到失败重传采用确认机制可以有效避免消息丢失 对于Broker来说需要控制给生产者确认的时机在Broker保存消息到MySQL后再进行返回可以有效避免消息丢失 对于消费者来说需要在消息真正消费之后再给Broker进行确认可以避免消息丢失 如何保证消息不会被重复消费 对于正常的业务消息在消息队列中是不可避免会存在重复消费问题所以我们只能在业务层面进行消除该影响。 这种问题就是典型的幂等性问题即对于同样的一种操作所带来的结果是一致的比如这种update t1 set money 150 where id 1 and money 100; 执行多少遍money都是150这就叫幂等。 所以我们一般的解决方法是添加版本号version在执行SQL时判断version是否一致不一致则不执行。或者记录关键的key像订单号这种执行过的就不需要再执行。 如何保证消息的有序性 如何处理消息堆积 消息队列根本原因是消费者消费跟不上生产者我们可以优化下消费逻辑比如之前是一条一条消息消费处理的这次我们批量处理比如数据库的插入一条一条插和批量插效率是不一样的。 假如逻辑我们已经都优化了但还是慢那就得考虑水平扩容了增加Topic的队列数和消费者数量注意队列数一定要增加不然新增加的消费者是没东西消费的。一个Topic中一个队列只会分配给一个消费者。
- 上一篇: 网站推广和宣传的方法百度新闻首页新闻全文
- 下一篇: 网站推广技巧有哪些?受欢迎自适应网站建设地址
相关文章
-
网站推广和宣传的方法百度新闻首页新闻全文
网站推广和宣传的方法百度新闻首页新闻全文
- 技术栈
- 2026年03月21日
-
网站推广合同欢迎访问陕西省交通建设集团公司网站
网站推广合同欢迎访问陕西省交通建设集团公司网站
- 技术栈
- 2026年03月21日
-
网站推广广告营销方案深圳律师网站建设
网站推广广告营销方案深圳律师网站建设
- 技术栈
- 2026年03月21日
-
网站推广技巧有哪些?受欢迎自适应网站建设地址
网站推广技巧有哪些?受欢迎自适应网站建设地址
- 技术栈
- 2026年03月21日
-
网站推广技术哪家好推广网app下载
网站推广技术哪家好推广网app下载
- 技术栈
- 2026年03月21日
-
网站推广建设阶段邵阳邵东网站建设
网站推广建设阶段邵阳邵东网站建设
- 技术栈
- 2026年03月21日
