南开集团网站建设wordpress资源销售
- 作者: 五速梦信息网
- 时间: 2026年03月21日 10:17
当前位置: 首页 > news >正文
南开集团网站建设,wordpress资源销售,51星变网页游戏官网,.net 网站优化读、写队列 创建主题时#xff0c;可以指定 writeQueueNums#xff08;写队列的个数#xff09;、readQueueNums#xff08;读队列的个数#xff09;。生产者发送消息时#xff0c;使用写队列的个数返回路由信息#xff1b;消费者消费消息时#xff0c;使用读队列的个…读、写队列 创建主题时可以指定 writeQueueNums写队列的个数、readQueueNums读队列的个数。生产者发送消息时使用写队列的个数返回路由信息消费者消费消息时使用读队列的个数返回路由信息。在物理文件层面只有写队列才会创建文件。默认读、写队列的个数都是 16。 比如写队列的个数是 16则创建 16 个文件夹代表 0 - 15读队列的个数是 8则只会消费 0 - 7 这 8 个队列中的消息。 要求 readQueueNums writeQueueNums最佳方案是两者相等。RocketMQ 设置读、写队列的目的是方便队列的扩容、缩容。 比如在原来指定读、写队列都是 16 的基础上进行扩容到 8 个。在不需要重启应用程序的情况下先缩容写队列由 0 - 15 缩容至 0 - 7。等到 8 - 15 队列中的消息全部消费完之后再缩容读队列由 0 - 15 缩容至 0 - 7。 队列的选择 方式一、指定 queueId 来选择具体的队列 DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueue 参数。而 MessageQueue 可以指定 topic、queueId、brokerName 三个参数。 public MessageQueue(String topic, String brokerName, int queueId) {this.topic topic;this.brokerName brokerName;this.queueId queueId; }方式二、根据 MessageQueueSelector 策略来选择队列 DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueueSelector 参数。 public SendResult send(Message msg, MessageQueueSelector selector, Object arg); public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback);RocketMQ 内部定义了三种 MessageQueueSelector 策略。 SelectMessageQueueByHash基于方法参数arg的哈希值对队列总数取模选择对应下标的队列。SelectMessageQueueByRandom基于队列总数生成一个随机数选择对应下标的队列。SelectMessageQueueByMachineRoom返回空。 public class SelectMessageQueueByHash implements MessageQueueSelector {Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {// 取arg方法参数的哈希值再对队列总数取模int value arg.hashCode() % mqs.size();if (value 0) {value Math.abs(value);}// 选择对应的队列return mqs.get(value);} }public class SelectMessageQueueByRandom implements MessageQueueSelector {private Random random new Random(System.currentTimeMillis());Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {// 基于队列总数生成一个随机数int value random.nextInt(mqs.size());// 选择对应的队列return mqs.get(value);} }方式三、基于Broker的可用性采取轮询的策略选择队列 DefaultMQProducer 的 send / sendOneway 方法可以不携带 MessageQueue、MessageQueueSelector简单看下这种方式的队列是如何选择。 这种方式下的 send / sendOneway 方法中内部会调用如下方法 MessageQueue mqSelected this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);进入方法内部看一下处理逻辑。 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }MQFaultStrategy public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 如果开启了发送延迟规避机制默认falseif (this.sendLatencyFaultEnable) {try {int index tpInfo.getSendWhichQueue().incrementAndGet();for (int i 0; i tpInfo.getMessageQueueList().size(); i) {int pos Math.abs(index) % tpInfo.getMessageQueueList().size();if (pos 0)pos 0;// 获取指定下标的队列MessageQueue mq tpInfo.getMessageQueueList().get(pos);// 如果队列对应的Broker判定为可用则返回该队列否则基于轮询的策略选择下一个队列重复上述步骤进行判断if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}final String notBestBroker latencyFaultTolerance.pickOneAtLeast();// 根据BrokerName获取存储的写队列的总数int writeQueueNums tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums 0) {final MessageQueue mq tpInfo.selectOneMessageQueue();if (notBestBroker ! null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error(Error occurred when selecting message queue, e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName); }LatencyFaultToleranceImpl Override public boolean isAvailable(final String name) {// 从缓存中获取指定brokerName对应的FaultItem实例final FaultItem faultItem this.faultItemTable.get(name);// 如果缓存命中if (faultItem ! null) {// 判断是否可用即当前时间-startTimestamp是否0return faultItem.isAvailable();}return true; }Override public String pickOneAtLeast() {final EnumerationFaultItem elements this.faultItemTable.elements();ListFaultItem tmpList new LinkedListFaultItem();while (elements.hasMoreElements()) {final FaultItem faultItem elements.nextElement();tmpList.add(faultItem);}if (!tmpList.isEmpty()) {Collections.sort(tmpList);final int half tmpList.size() / 2;if (half 0) {return tmpList.get(0).getName();} else {final int i this.whichItemWorst.incrementAndGet() % half;return tmpList.get(i).getName();}}return null; }Override public void remove(final String name) {this.faultItemTable.remove(name); }TopicPublishInfo public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName null) {return selectOneMessageQueue();} else {for (int i 0; i this.messageQueueList.size(); i) {int index this.sendWhichQueue.incrementAndGet();int pos Math.abs(index) % this.messageQueueList.size();if (pos 0)pos 0;MessageQueue mq this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();} }public MessageQueue selectOneMessageQueue() {int index this.sendWhichQueue.incrementAndGet();int pos Math.abs(index) % this.messageQueueList.size();if (pos 0)pos 0;return this.messageQueueList.get(pos); }额外分析一下 DefaultMQProducerImpl 的 updateFaultItem 方法。 public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); }接着看下 MQFaultStrategy 的 updateFaultItem 方法。 private long[] latencyMax {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {// 如果开启了发送延迟规避机制if (this.sendLatencyFaultEnable) {// 根据延迟时间计算不可用的时间long duration computeNotAvailableDuration(isolation ? 30000 : currentLatency);// 更新faultItemTable缓存this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);} }private long computeNotAvailableDuration(final long currentLatency) {for (int i latencyMax.length - 1; i 0; i–) {// 根据延迟时间计算不可用的时间if (currentLatency latencyMax[i])return this.notAvailableDuration[i];}return 0; }接着分析 LatencyFaultToleranceImpl 的 updateFaultItem 方法的处理逻辑。 Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {// 从缓存中获取指定BrokerName对应的FaultItem实例FaultItem old this.faultItemTable.get(name);// 如果缓存未命中if (null old) {// 构造 FaultItem 实例final FaultItem faultItem new FaultItem(name);// 更新 currentLatecy、startTimestamp 属性faultItem.setCurrentLatency(currentLatency);faultItem.setStartTimestamp(System.currentTimeMillis() notAvailableDuration);// 更新缓存old this.faultItemTable.putIfAbsent(name, faultItem);if (old ! null) {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() notAvailableDuration);}// 如果缓存命中 } else {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() notAvailableDuration);} }
- 上一篇: 南京做网站南京乐识好都匀网站开发公司
- 下一篇: 南开网站建设公司谷歌外贸平台有哪些
相关文章
-
南京做网站南京乐识好都匀网站开发公司
南京做网站南京乐识好都匀网站开发公司
- 技术栈
- 2026年03月21日
-
南京做网站建设有哪些wordpress超详细教程视频教程
南京做网站建设有哪些wordpress超详细教程视频教程
- 技术栈
- 2026年03月21日
-
南京做网站建设的公司海南网站建设哪家不错
南京做网站建设的公司海南网站建设哪家不错
- 技术栈
- 2026年03月21日
-
南开网站建设公司谷歌外贸平台有哪些
南开网站建设公司谷歌外贸平台有哪些
- 技术栈
- 2026年03月21日
-
南开网站建设网业制作
南开网站建设网业制作
- 技术栈
- 2026年03月21日
-
南康网站建设公司wordpress主题添加授权
南康网站建设公司wordpress主题添加授权
- 技术栈
- 2026年03月21日
