电脑公司网站建设模板清远东莞网站建设
- 作者: 五速梦信息网
- 时间: 2026年03月21日 11:26
当前位置: 首页 > news >正文
电脑公司网站建设模板,清远东莞网站建设,欢迎访问中国建设银行,专业的建设网站哪个好前言 ConsumerFilterManager 继承了ConfigManager配置管理组件#xff0c;拥有将内存数据持久化到磁盘文件consumerFilter.json的能力。它主要负责#xff0c;对在消费者拉取消息时#xff0c;进行消息数据过滤#xff0c;且只针对使用表达式过滤的消费者有效。 源码版本拥有将内存数据持久化到磁盘文件consumerFilter.json的能力。它主要负责对在消费者拉取消息时进行消息数据过滤且只针对使用表达式过滤的消费者有效。 源码版本4.9.3 源码架构图 核心数据结构 可以看到内存中维护了 topic - consumer group - ConsumerFilterData 映射关系的数据结构。 /*** Consumer filter data manager.Just manage the consumers use expression filter.* 消费者过滤数据管理组件。只管理使用表达式过滤的消费者。*/ public class ConsumerFilterManager extends ConfigManager {// 核心数据结构topic - consumer group - ConsumerFilterDataprivate ConcurrentMapString/Topic/, FilterDataMapByTopicfilterDataByTopic new ConcurrentHashMapString/Topic/, FilterDataMapByTopic(256);private transient BrokerController brokerController;// 布隆过滤器private transient BloomFilter bloomFilter; } 深入看下 FilterDataMapByTopic 类是上面数据结构的一个子集维护了 消费组 - 消费组过滤数据映射关系。 public static class FilterDataMapByTopic {// 核心数据结构consumer group - ConsumerFilterDataprivate ConcurrentMapString/consumer group/, ConsumerFilterDatagroupFilterData new ConcurrentHashMapString, ConsumerFilterData();private String topic;} 在深入一步看下 ConsumerFilterData 的数据结构包含了全部与消费者过滤有关的关键信息。 /*** Filter data of consumer./ public class ConsumerFilterData {// 消费组private String consumerGroup;// 主题private String topic;// 过滤器表达式private String expression;// 过滤器类型private String expressionType;// 过滤器编译后的表达式private transient Expression compiledExpression;// 过滤器创建时间private long bornTime;// 过滤器过期时间private long deadTime 0;// 过滤器版本private long version;// 布隆过滤器数据private BloomFilterData bloomFilterData;// 客户端版本private long clientVersion; } 核心数据行为 从下面代码可以看到ConsumerFilterManager的行为主要是注册订阅、取消订阅、清理过期订阅、序列化、反序列化等维护内存元数据的行为。过滤行为不在这个组件里体现在其他调用方法中会有具体使用方式。 /** Consumer filter data manager.Just manage the consumers use expression filter.* 消费者过滤数据管理组件。只管理使用表达式过滤的消费者。*/ public class ConsumerFilterManager extends ConfigManager {private static final InternalLogger log InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);private static final long MS_24_HOUR 24 * 3600 * 1000;// 核心数据结构topic - consumer group - ConsumerFilterDataprivate ConcurrentMapString/Topic/, FilterDataMapByTopicfilterDataByTopic new ConcurrentHashMapString/Topic/, FilterDataMapByTopic(256);private transient BrokerController brokerController;// 布隆过滤器private transient BloomFilter bloomFilter;public ConsumerFilterManager() {// just for testthis.bloomFilter BloomFilter.createByFn(20, 64);}public ConsumerFilterManager(BrokerController brokerController) {this.brokerController brokerController;this.bloomFilter BloomFilter.createByFn(brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(),brokerController.getBrokerConfig().getExpectConsumerNumUseFilter());// then set bit map length of store config.brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt(this.bloomFilter.getM());}/*** Build consumer filter data.Be care, bloom filter data is not included.** return maybe null/public static ConsumerFilterData build(final String topic, final String consumerGroup,final String expression, final String type,final long clientVersion) {if (ExpressionType.isTagType(type)) {return null;}ConsumerFilterData consumerFilterData new ConsumerFilterData();consumerFilterData.setTopic(topic);consumerFilterData.setConsumerGroup(consumerGroup);consumerFilterData.setBornTime(System.currentTimeMillis());consumerFilterData.setDeadTime(0);consumerFilterData.setExpression(expression);consumerFilterData.setExpressionType(type);consumerFilterData.setClientVersion(clientVersion);try {consumerFilterData.setCompiledExpression(FilterFactory.INSTANCE.get(type).compile(expression));} catch (Throwable e) {log.error(parse error: expr{}, topic{}, group{}, error{}, expression, topic, consumerGroup, e.getMessage());return null;}return consumerFilterData;}/** 在指定消费组注册消费者过滤数据* param consumerGroup* param subList*/public void register(final String consumerGroup, final CollectionSubscriptionData subList) {for (SubscriptionData subscriptionData : subList) {register(subscriptionData.getTopic(),consumerGroup,subscriptionData.getSubString(),subscriptionData.getExpressionType(),subscriptionData.getSubVersion());}// make illegal topic dead.CollectionConsumerFilterData groupFilterData getByGroup(consumerGroup);IteratorConsumerFilterData iterator groupFilterData.iterator();while (iterator.hasNext()) {ConsumerFilterData filterData iterator.next();boolean exist false;for (SubscriptionData subscriptionData : subList) {if (subscriptionData.getTopic().equals(filterData.getTopic())) {exist true;break;}}if (!exist !filterData.isDead()) {filterData.setDeadTime(System.currentTimeMillis());log.info(Consumer filter changed: {}, make illegal topic dead:{}, consumerGroup, filterData);}}}public boolean register(final String topic, final String consumerGroup, final String expression,final String type, final long clientVersion) {// 不支持tag类型if (ExpressionType.isTagType(type)) {return false;}if (expression null || expression.length() 0) {return false;}// 获取topic对应的消费者过滤数据FilterDataMapByTopic filterDataMapByTopic this.filterDataByTopic.get(topic);if (filterDataMapByTopic null) {FilterDataMapByTopic temp new FilterDataMapByTopic(topic);FilterDataMapByTopic prev this.filterDataByTopic.putIfAbsent(topic, temp);filterDataMapByTopic prev ! null ? prev : temp;}// 创建布隆过滤器数据BloomFilterData bloomFilterData bloomFilter.generate(consumerGroup # topic);// 注册过滤数据到topicreturn filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);}// 取消注册消费者过滤数据public void unRegister(final String consumerGroup) {for (EntryString, FilterDataMapByTopic entry : filterDataByTopic.entrySet()) {entry.getValue().unRegister(consumerGroup);}}public ConsumerFilterData get(final String topic, final String consumerGroup) {if (!this.filterDataByTopic.containsKey(topic)) {return null;}if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {return null;}return this.filterDataByTopic.get(topic).getGroupFilterData().get(consumerGroup);}// 获取消费组下所有过滤数据public CollectionConsumerFilterData getByGroup(final String consumerGroup) {CollectionConsumerFilterData ret new HashSetConsumerFilterData();IteratorFilterDataMapByTopic topicIterator this.filterDataByTopic.values().iterator();while (topicIterator.hasNext()) {FilterDataMapByTopic filterDataMapByTopic topicIterator.next();IteratorConsumerFilterData filterDataIterator filterDataMapByTopic.getGroupFilterData().values().iterator();while (filterDataIterator.hasNext()) {ConsumerFilterData filterData filterDataIterator.next();if (filterData.getConsumerGroup().equals(consumerGroup)) {ret.add(filterData);}}}return ret;}// 获取topic下所有过滤数据public final CollectionConsumerFilterData get(final String topic) {if (!this.filterDataByTopic.containsKey(topic)) {return null;}if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {return null;}return this.filterDataByTopic.get(topic).getGroupFilterData().values();}public BloomFilter getBloomFilter() {return bloomFilter;}Overridepublic String encode() {return encode(false);}Overridepublic String configFilePath() {if (this.brokerController ! null) {// 配置存储路径 config/consumerFilter.jsonreturn BrokerPathConfigHelper.getConsumerFilterPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());}return BrokerPathConfigHelper.getConsumerFilterPath(./unit_test);}// 将json字符串反序列化为ConsumerFilterManager对象Overridepublic void decode(final String jsonString) {ConsumerFilterManager load RemotingSerializable.fromJson(jsonString, ConsumerFilterManager.class);if (load ! null load.filterDataByTopic ! null) {boolean bloomChanged false;for (EntryString, FilterDataMapByTopic entry : load.filterDataByTopic.entrySet()) {FilterDataMapByTopic dataMapByTopic entry.getValue();if (dataMapByTopic null) {continue;}for (EntryString, ConsumerFilterData groupEntry : dataMapByTopic.getGroupFilterData().entrySet()) {ConsumerFilterData filterData groupEntry.getValue();if (filterData null) {continue;}try {filterData.setCompiledExpression(FilterFactory.INSTANCE.get(filterData.getExpressionType()).compile(filterData.getExpression()));} catch (Exception e) {log.error(load filter data error, filterData, e);}// check whether bloom filter is changed// if changed, ignore the bit map calculated before.if (!this.bloomFilter.isValid(filterData.getBloomFilterData())) {bloomChanged true;log.info(Bloom filter is changed!So ignore all filter data persisted! {}, {}, this.bloomFilter, filterData.getBloomFilterData());break;}log.info(load exist consumer filter data: {}, filterData);if (filterData.getDeadTime() 0) {// we think all consumers are dead when loadlong deadTime System.currentTimeMillis() - 30 * 1000;filterData.setDeadTime(deadTime filterData.getBornTime() ? filterData.getBornTime() : deadTime);}}}if (!bloomChanged) {this.filterDataByTopic load.filterDataByTopic;}}}// 将ConsumerFilterManager对象序列化为json字符串Overridepublic String encode(final boolean prettyFormat) {// clean{clean();}return RemotingSerializable.toJson(this, prettyFormat);}// 清理过期的过滤数据public void clean() {IteratorMap.EntryString, FilterDataMapByTopic topicIterator this.filterDataByTopic.entrySet().iterator();while (topicIterator.hasNext()) {Map.EntryString, FilterDataMapByTopic filterDataMapByTopic topicIterator.next();IteratorMap.EntryString, ConsumerFilterData filterDataIterator filterDataMapByTopic.getValue().getGroupFilterData().entrySet().iterator();while (filterDataIterator.hasNext()) {Map.EntryString, ConsumerFilterData filterDataByGroup filterDataIterator.next();ConsumerFilterData filterData filterDataByGroup.getValue();if (filterData.howLongAfterDeath() (this.brokerController null ? MS_24_HOUR : this.brokerController.getBrokerConfig().getFilterDataCleanTimeSpan())) {log.info(Remove filter consumer {}, died too long!, filterDataByGroup.getValue());filterDataIterator.remove();}}if (filterDataMapByTopic.getValue().getGroupFilterData().isEmpty()) {log.info(Topic has no consumer, remove it! {}, filterDataMapByTopic.getKey());topicIterator.remove();}}}public ConcurrentMapString, FilterDataMapByTopic getFilterDataByTopic() {return filterDataByTopic;}public void setFilterDataByTopic(final ConcurrentHashMapString, FilterDataMapByTopic filterDataByTopic) {this.filterDataByTopic filterDataByTopic;}public static class FilterDataMapByTopic {// 核心数据结构consumer group - ConsumerFilterDataprivate ConcurrentMapString/consumer group/, ConsumerFilterDatagroupFilterData new ConcurrentHashMapString, ConsumerFilterData();private String topic;public FilterDataMapByTopic() {}public FilterDataMapByTopic(String topic) {this.topic topic;}// 取消注册某个消费组的过滤器public void unRegister(String consumerGroup) {if (!this.groupFilterData.containsKey(consumerGroup)) {return;}ConsumerFilterData data this.groupFilterData.get(consumerGroup);if (data null || data.isDead()) {return;}long now System.currentTimeMillis();log.info(Unregister consumer filter: {}, deadTime: {}, data, now);data.setDeadTime(now);}public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData,long clientVersion) {ConsumerFilterData old this.groupFilterData.get(consumerGroup);if (old null) {// 构建过滤器数据ConsumerFilterData consumerFilterData build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData null) {return false;}// 设置布隆过滤器consumerFilterData.setBloomFilterData(bloomFilterData);// 放入内存数据结构old this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);if (old null) {log.info(New consumer filter registered: {}, consumerFilterData);return true;} else {if (clientVersion old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.warn(Ignore consumer({} : {}) filter(concurrent), because of version {} {}, but maybe info changed!old{}:{}, ignored{}:{},consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);}if (clientVersion old.getClientVersion() old.isDead()) {reAlive(old);return true;}return false;} else {this.groupFilterData.put(consumerGroup, consumerFilterData);log.info(New consumer filter registered(concurrent): {}, old: {}, consumerFilterData, old);return true;}}} else {// 当前版本号小于旧的版本号if (clientVersion old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.info(Ignore consumer({}:{}) filter, because of version {} {}, but maybe info changed!old{}:{}, ignored{}:{},consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);}if (clientVersion old.getClientVersion() old.isDead()) {reAlive(old);return true;}return false;}// 新版本号大于旧的版本号boolean change !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);if (old.getBloomFilterData() null bloomFilterData ! null) {change true;}if (old.getBloomFilterData() ! null !old.getBloomFilterData().equals(bloomFilterData)) {change true;}// if subscribe data is changed, or consumer is died too long.if (change) {// 构建过滤器数据ConsumerFilterData consumerFilterData build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData null) {// new expression compile error, remove old, let client report error.this.groupFilterData.remove(consumerGroup);return false;}consumerFilterData.setBloomFilterData(bloomFilterData);// 设置过滤器数据this.groupFilterData.put(consumerGroup, consumerFilterData);log.info(Consumer filter info change, old: {}, new: {}, change: {},old, consumerFilterData, change);return true;} else {// 版本号一致更新过滤器数据old.setClientVersion(clientVersion);if (old.isDead()) {reAlive(old);}return true;}}}protected void reAlive(ConsumerFilterData filterData) {long oldDeadTime filterData.getDeadTime();filterData.setDeadTime(0);log.info(Re alive consumer filter: {}, oldDeadTime: {}, filterData, oldDeadTime);}public final ConsumerFilterData get(String consumerGroup) {return this.groupFilterData.get(consumerGroup);}public final ConcurrentMapString, ConsumerFilterData getGroupFilterData() {return this.groupFilterData;}public void setGroupFilterData(final ConcurrentHashMapString, ConsumerFilterData groupFilterData) {this.groupFilterData groupFilterData;}public String getTopic() {return topic;}public void setTopic(final String topic) {this.topic topic;}} }
- 上一篇: 电脑访问手机网站跳转赤峰网站建设
- 下一篇: 电脑公司网站源码php怎么找电商平台合作
相关文章
-
电脑访问手机网站跳转赤峰网站建设
电脑访问手机网站跳转赤峰网站建设
- 技术栈
- 2026年03月21日
-
电脑搭建网站需要空间洛阳网站设计开发
电脑搭建网站需要空间洛阳网站设计开发
- 技术栈
- 2026年03月21日
-
电脑本地网站建设四川住房建设厅网站首页
电脑本地网站建设四川住房建设厅网站首页
- 技术栈
- 2026年03月21日
-
电脑公司网站源码php怎么找电商平台合作
电脑公司网站源码php怎么找电商平台合作
- 技术栈
- 2026年03月21日
-
电脑经销部开具网站建设费网站备案许可证号查询
电脑经销部开具网站建设费网站备案许可证号查询
- 技术栈
- 2026年03月21日
-
电脑上建设银行网站打不开c 做的网站又哪些
电脑上建设银行网站打不开c 做的网站又哪些
- 技术栈
- 2026年03月21日






