贵州网站建设hsyunso哪些网站可以做移动端模板

当前位置: 首页 > news >正文

贵州网站建设hsyunso,哪些网站可以做移动端模板,免费模板网站都有什么区别,哈尔滨调整部分地区风险等级pulsar 基于3.x最新官方文档学习记录 概念与架构 典型的推送订阅模式。生产者发送消息#xff0c;消费者订阅topic消费信息并回应ACK。订阅创建后#xff0c;Pulsar会保留所有消息。仅消息被所有订阅 成功消费了才会丢弃#xff08;可以配置消息保留机制保留一定量#…pulsar 基于3.x最新官方文档学习记录 概念与架构 典型的推送订阅模式。生产者发送消息消费者订阅topic消费信息并回应ACK。订阅创建后Pulsar会保留所有消息。仅消息被所有订阅 成功消费了才会丢弃可以配置消息保留机制保留一定量 而且还可以重新给broker投递重新消费 消息 消息是pulsar传递的基本单元。有如下元素组成 组成成分描述value/data payload消息携带的数据key消息的键或分区键。消息可以用键标记对于topic压缩有用properties消息属性。key-value对的map组成producer name生产者名topic nametopic名称schema version生产消息所用的schema版本sequence ID每个消息在自己的topic里都有一个有序的序列id由生产者赋值。ID可以用于去重。如果brokerDeduplicationEnabled设置为true。那么每个消息的sequencee id在主题里是唯一的message idbookies赋值的持久化的消息id。在pulsar集群中是唯一的publish time消息发布时间戳生产者自动赋值的event time一个应用程序附加到消息的可选时间戳信息。业务一半也很少用自己在消息里用一个字段就完了 消息有默认大小5MB。可以通过配置修改 In the broker.conf file. # The max size of a message (in bytes). maxMessageSize5242880In the bookkeeper.conf file. # The max size of the netty frame (in bytes). Any messages received larger than this value are rejected. The default value is 5 MB. nettyMaxFrameSizeBytes5253120响应 当应用消费并处理完消息后应该回复一个ack代表改消息消费成功。消息会在所有订阅消费完后删除。当然可以配置消息保留策略保留一定量的消息。对于批量的信息也可以批量ACK ack方式 单独确认 ​ 仅确认选择的消息 consumer.acknowledge(msg);累计确认 确认后该消息之前的消息不会再消费到。但是注意 这个不能用再shared和key_shared 订阅类型。因为这种订阅类型下有多个消费者
consumer.acknowledgeCumulative(msg);NACK NACK机制消费者可以通知broker自己没有处理该消息需要重新消费。broker会将该消息重新投递给consumer消费 在独占和故障转移订阅下只能nack最近一条消息在share和key_shared下可以选择nack某一条消息 对有序订阅类型的订阅独占key_shared nack会导致乱序 可以设置重新投递次数以及投递延迟 例如配置延时在1s~60s Consumerbyte[] consumer pulsarClient.newConsumer().topic(topic).subscriptionName(sub-negative-ack).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(60 * 1000).multiplier(2) // 重试延迟乘数.build()).subscribe();消息重新投递和投递延迟如下 Redelivery countRedelivery delay11 seconds22 seconds34 seconds48 seconds516 seconds632 seconds760 seconds860 seconds 如果启用了批处理那么一批数据都会重新投递 ack timeout 默认关闭。超时机制设置客户端跟踪 消费的时间如果超过时间限制会给broker回nack触发重新 投递 可以配合重新投递的设置使用 consumer.ackTimeout(10, TimeUnit.SECOND).ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(60 * 1000).multiplier(2).build());The message redelivery behavior should be as follows. Redelivery countRedelivery delay110 1 seconds210 2 seconds310 4 seconds410 8 seconds510 16 seconds610 32 seconds710 60 seconds810 60 seconds 注意 如果启用批处理那么一批数据都会重新投递 自己单独NACK比超时NACK更可取。超时值不好设置容易重复消费不需要的消息
Retry letter topic 重试队列 用来存储消费失败的消息并且稍后重试消费。用户可以自定义重试延迟时间。重试队列重试次数超过阈值后消息就回到死信队列了。 重试队列消息有一些特殊proerties REAL_TOPIC persistent://public/default/my-topic # 源topic ORIGIN_MESSAGE_ID 1:0:-1:0 # 原始Message ID RECONSUMETIMES 6 # 重试次数 DELAY_TIME 3000 # 重试间隔重试队列与延迟投递目的是不一样的重试队列主要是为了处理失败的消息。延迟投递则是想要在指定时间消费消息 重试队列默认名字subscriptionname-RETRY,官方不推荐用。因为如果同一命名空间下的topicx消费有多相同订阅名那么不同topic他的重试队列名字会是一样的。会导致互相消费 重试队列操作 MapString, String customProperties new HashMapString, String(); customProperties.put(custom-key-1, custom-value-1); customProperties.put(custom-key-2, custom-value-2); consumer.reconsumeLater(msg, customProperties, 3, TimeUnit.SECONDS);比起nack。重试队列更适合大量重新消费的场景 。重试丢列的消费会持久化到bookie而nack只会缓存到客户端 dead letter topic 死信队列里一般存储了消费失败的消息。客户可以自定义如何处理这些失败的消息。例如存储到ES里供后面查询指标监控等 Consumerbyte[] consumer pulsarClient.newConsumer(Schema.BYTES).topic(my-topic).subscriptionName(my-subscription).subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).deadLetterTopic(my-dead-letter-topic-name).build()).subscribe();死信队列默认格式为subscriptionname-DLQ与重试队列一样建议是自定义topic名字 压缩 pulsar支持消息压缩。lz4 zlib zstd snappy都行。比较推荐启用 batching 批处理开启后生产者会累计发送 。此时backlog siez 代表该批次总数而不是消息总数。批次也会作为一个单元进行存储。 一般来说该批次所有消息被确认成功则该批次 确认成功否则认为失败进行重新投。但pulsar 2.6后就有批量确认。可以将当前索引之前的消息 都Ack了。 chunking 分块Pulsar可以通过分块来处理大消息。当消息分块启用对持久化topic才生效当消息超过指定大小 maxMessageSize消息将会被如下处理。消息结尾默认为OFF 将源消息拆分并将他们与分块元数据chunked metadata按顺序发送broker会将分块的消息存储在一个ledgere中。并用chunkedMessageRate参数来记录该主题分块消息速率。消费者会缓存所有消息的分块并聚合到接收队列中从接受队列消费消息 注意 分块和批处理不能同时开启 消费示例 单个消费者消费连续 分块的消息 单个消费者消费乱序分块的消息 Topic pulsar主题是将数据组织成流的单元。名称为一个特定结构的url {persistent|non-persistent}://tenant/namespace/topicTopic name componentDescriptionpersistent / non-persistent标识主题类型pulsar支持两种主题持久化的和非持久化的。默认是持久化的tenant租户名pulsar多租户概念namespace命名空间每个租户都有多个命名空间topictopic名字没什么别的含义 pulsar客户端如果生产或消费一个不存在的topic。pulsar会尝试建立。 但一般不推荐Pulsar会变得不好维护。最好是部署脚本啥的提前键好方便统计维护topic namespace 命名空间是租户下对topic的逻辑分组 Subscriptions 订阅名。每个消费者去订阅topic的时候都需要有个订阅名如果没有puslar也会默认创建一个 订阅类型 独占exclusive 默认的消费方式。消费者独占该订阅名的消费 共享shared 多个消费者可以使用一个订阅名消费该topic。适合需要动态扩容的消费业务 灾备failover 主消费者断开后消息的消费会交给队列中下一个消费者。对于分区的topic。新消费者可能会消费乱序 对于分区的topic.broker会按照优先级和消费者name字典序进行排序。代理尝试将分区平均分给具有最高优先级的消费者。 如果分区数小于消费者数有如下情况。例如2个分区四个消费者。那么每个分区都会有一个激活的消费者消费和三个待消费的消费者 如果分区数大于消费者数。例如九个分区三个消费者。如下图A负责P0 P3 P6 那么BC就是P0 P3 P6 候选消费的消费者。其余同理 对于未分区的topic。 topic比消费者少 topic比消费者多。例如 A消费 topic1和topic4 那么B就是topic1和topic4的候选 key共享key_shared
​ 几个消费者共享消费但是根据哈希key可以自己指定有几种分流。相同key或ordering key只会 投递给同一个消费者无论如何重新投递。那么当有新消费者加入时pulsar会记录当前已经读取的消息位置只有当该位置前的消息都被ack了才会给新消费者投递消息。那万一其他消费者卡住了怎么办可以开启allowOutOfOrderDelivery放松下条件只会有短暂的不符合key_shared的消费情况 ​ key_shared 形势下得批处理。如果启用key_shared要么关闭批处理要么shengchanzhe 需要启用 key-based batching .起始就是给批次打key。给单条消息打key没啥用默认得处理方法可能没办法将相同Key的数据打包到相同批次。而消费者消费时会将第一条数据的key当作该批数据的key从而导致上下文错误 大致消费形式总结如下 订阅模式 按游标是否是持久型来划分 Subscription modeDescriptionNoteDurable持久型游标订阅重启broker重启后会从最近消费的消息消费默认的订阅模式NonDurable非持久型游标broker重启后游标会丢失读者的订阅模式本质上是非持久的因为他无法阻止topic数据的删除 多topic订阅 正则形式 persistent://public/default/finance-.*列表形式 topic分区 单分区的topic只会被一个broker持有这限制了该topic的吞吐量。 分区可以将topic分布给多个broker。从而提高吞吐量 topic分区会均匀分布在集群broker上。分区topic只能由admin api创建。分区数需要在创建topic时指定和普通topic工作方式没啥区别。 路由方式 当推送消息给分区的topic时必须指定路由模式这决定了该消息推给哪个分区 当前有三种消息路由方式 ModeDescriptionRoundRobinPartition如果没有提供key那么生产者以轮询 的方式在分区上发布消息。但这个轮询不是指针对单个消息。而是设置成一个相同 的批处理延迟边界一批一批的发给不同分区确保批处理有效。如果消息有key那么就会按key散列给不同分区SinglePartition如果消息没key那么就随机一个分区发送。如果指定key那么就会散列到特定分区CustomPartition自定义路由分区通过重写客户端接口实现java的话是MessageRouter 顺序保证 消息的顺序和路由方式以及msg的key有关通常情况下用户希望每个分区保持顺序 如果msg有key那么 在使用RoundRobin和single模式时候消息 都会根据哈希JavaStringHash, murmur3_32Hash,推荐是用后者路由到特定分区 消息顺序 rdering guaranteeDescriptionRouting Mode and KeyPer-key-partition所有 相同key的消息 会在一个分区按顺序排列使用RoundRobin和single模式并提供keyPer-producer从同一个生产者出来的消息会按序排列需要使用single模式并且消息不附加key 非持久topic 默认情况下Pulsar会持久化所有没ack的消息。消息 会交给BookKeeper bookies存储。 但pulsar也提供非持久化的 topic。一旦broker挂了或者订阅挂了期间的消息都找不回来 格式 non-persistent://tenant/namespace/topic 非持久化topic一系列行为都只基于内存。非持久化的topic也不会持久化到zk意味着该元数据zk是不知道的。如果对应broker挂了非持久化的topic不会分给新的broker。解决方案是将allowAUtoTopicCreation设置为trueallowAutoTOpicCreationTYpe设置为non-partitioned 性能 因为持久化topic需要存磁盘存状态等操作总之会比非持久化的topic性能差一些 client api PulsarClient client PulsarClient.builder().serviceUrl(pulsar://localhost:6650).build(); String npTopic non-persistent://public/default/my-topic; String subscriptionName my-subscription-name;Consumerbyte[] consumer client.newConsumer().topic(npTopic).subscriptionName(subscriptionName).subscribe();system topic system topic 主要用于实现某些功能并消除对第三方组件的依赖。例如心跳检测事务主题策略资源组服务等。以心跳检测为例子 系统主题进行健康检查 可以让生产者消费者生产/读取heartberat ns下的消息来探测服务是否存活 NamespaceTopicNameDomainCountUsagepulsar/systemtransaction_coordinatorassign\({id}PersistentDefault 16事务协调pulsar/system__transaction_log_\){tc_id}PersistentDefault 16事务日志pulsar/systemresource-usageNon-persistentDefault 4资源组服务host/portheartbeatPersistent1心跳检测User-defined-nschange_eventsPersistentDefault 4topic相关事件User-defined-nstransaction_buffer_snapshotPersistentOne per namespace事务缓存快照User-defined-ns${topicName}__transaction_pending_ackPersistent每个订阅有事务的都会有一个事务ack管理 消息重新投递 看前面ack、nack、ack timeout 和重试队列 消息保留和过期 默认情况情况下会立即删除消费者已确认过的消息并将未确认的消息持久化在消息积压中。不过puslar可以设置更改这种行为。pulsar中的消息保留有两种定义一种用于非积压消息的持久化。一种用于积压的消息 在puslar中 可以设置两种 方式进行消息保留命名空间级 进行消息的持久化存储可以存储 已经ack的消息针对已ack数据保留X小时Y大小的数据设置消息TTL过期数据丢弃针对未ack的数据ACK 消息保留策略 看这两张图就知道了 。深蓝色是还在保留范围内的数据。浅蓝是不在保留范围内的数据可以删除的 。白色是 未ack积压的数据 开启消息保留 的时候必须设置默认的大小限制defaultRetentionSizeInMB和时间限制defaultRetentionTimeInMinutes) Time limitSize limitMessage retention-1-1无限制 保留-10在范围内保留0-1在范围内保留00不开启消息保留默认 00Invalid00Invalid00超过一个就丢 当然也可以手动设置 topic下消息的保留期限和大小 int retentionTime 10; // 10 minutes int retentionSize 500; // 500 megabytes RetentionPolicies policies new RetentionPolicies(retentionTime, retentionSize); admin.namespaces().setRetention(namespace, policies);积压配额 backlogs是用来存未ACK数据的。pulsar默认会存储所有 未ACK数据。你也可以控制存储数据的大小和时间。pulsar用配额来限制backlog 该图展示了pulsar如何限制backlog的数据。超过backlog配额的会启用一定消息保留策略。策略如下 PolicyActionproducer_request_hold超出配额时生产者会保留消息重试直到超出客户端配置的sendTimeoutMsproducer_exception发消息时抛异常告知已经超出配额consumer_backlog_eviction超出配额时broker开始丢弃积压数据 设置积压配额 admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().retentionPolicy(RetentionPolicy.producer_request_hold).limitSize(2147483648L).limitTime(60 * 60).build());TTL 默认情况下puslar会持久化所有未ack的数据。但可以设置TTL来决定什么时候丢弃未ack的数据 pulsar-admin namespaces set-message-ttl my-tenant/my-ns
–messageTTL 120 # TTL of 2 minutesadmin.namespaces().setNamespaceMessageTTL(namespace, ttlInSeconds);消息去重 当消息被持久化多次时会进行去重。确保消息在磁盘只存一次即使该消息被生产者重复发送。 前两个图是没开启去重后两个图是开启去重。消息去重有broker层级ns层级和topic层级 该功能默认是关闭的。如果启用建议生产者无限尝试发送消息。将sendTImeout配置为0 生产者幂等性 该方式也是消息去重的一种。意味着每条消息仅生成一次。缺点是会将重复数据删除的工作推到应用程序。 重复数据删除和effectively-once语义 消息 重复数据删除使pulsar成为理想的消息传递系统可以与流处理引擎和其他要提供一次有效处理语义的系统结合使用 消息延迟传递 消息延迟传递可以让消费者延迟一定时间 再消费到消息 。在这个机制中 消息存在bookkeeper中消息发到broker后DelayedDeliveryTracker在内存中维护时间索引。timemessageId 一旦指定延迟结束消息就会传递给消费者只能在shared可key-shared订阅使用。该机制默认启用 注意 在消息保留机制下pulsar会 删除 topic前面的ledger可以理解为一个存储单元, 但不会删除中间的ledgere。意味着如果你发了一条延迟很长的消息。即使该消息后面的消息都被消费了到期被消费。由于这条消息卡在中间那么后面的消息对应的ledger都不会被删除在启用积压配额的机制下如果超过配额依旧会触发对应的消息保留策略在启用TTL机制下消息到期时即使是延迟消息也会被删除