杭州自助建站软件wordpress 移动端菜单
- 作者: 五速梦信息网
- 时间: 2026年03月21日 10:56
当前位置: 首页 > news >正文
杭州自助建站软件,wordpress 移动端菜单,网站建设公司哪家好要选磐石网络,百度 网站地图怎么做目录 1、消费者、消费组 2、心跳机制 3、消费者常见参数配置 4、订阅 5、反序列化 基本概念 自定义反序列化器 6、位移提交 6.1、自动提交 6.2、手动提交 同步提交 异步提交 7、再均衡 7.1、定义与基本概念 7.2、缺陷 7.3、如何避免再均衡 7.4、如何进行组内分…目录 1、消费者、消费组 2、心跳机制 3、消费者常见参数配置 4、订阅 5、反序列化 基本概念 自定义反序列化器 6、位移提交 6.1、自动提交 6.2、手动提交 同步提交 异步提交 7、再均衡 7.1、定义与基本概念 7.2、缺陷 7.3、如何避免再均衡 7.4、如何进行组内分区分配 7.5、谁来执行再均衡和消费组管理 8、消费者拦截器 作用 自定义消费者拦截器 1、消费者、消费组 消费者从订阅的主题消费消息消费消息的偏移量保存在kafka中的__consumer_offsets的主题中。多个消费同一个主题的消费者可以通过group.id配置加入到同一个消费组中。消费组均衡地给消费者分配分区每个分区只由消费组中的一个消费者消费防止重复消费。同一个消费组里一个分区只会对应一个消费者但一个消费者可以消费多个分区。group_id一半设置为应用或者业务的逻辑名称。 2、心跳机制 消费者4宕机重新分配分区3的消费者 分区3所在broker宕机重选分区3的leader分区 消费者宕机退出消费组触发再平衡重新给消费组中的消费者分配分区broker宕机分区3重选leader副本出发再平衡重新分配分区3消息。 心跳机制就是consumer和broker之间的健康检查。consumer和broker之间保持长连接通过心跳机制检测对方是否健康。心跳检测相关参数如下所示 在broker端可配置sessionTimeoutMs参数如果consumer心跳超期broker会把消费者从消费组中移除并触发再平衡重新分配分区 在consumer端可配置sessionTimeoutMs和rebalanceTimeoutMs参数如果broker心跳超期consumer则会告知broker主动退出消费组并触发再平衡。 3、消费者常见参数配置 4、订阅 主题、分区(leader和follower分区)、消费者、消费组、订阅。 主题topic用于分类管理消息的逻辑单元可以用于区分业务类型分区partition同一个topic的消息会被分散到多个分区中不同分区通常在不同broker上方便水平扩展。分区可分为leader分区和follower分区leader分区用于与生产者/消费者通信follower分区用于备份leader分区的数据消费者与分区长连接用于消费分区中的消息消费组消费组中可能会有多个消费者保证一个消费组获取到特定主题的全部消息。消费组可以保证一个主题的分区只会被消费组中的一个消费者消费订阅消费者订阅主题并将消费者加入到消费组中采用pull模式从broker分区中读取消息。kafka的消费者只有pull模式该模式下消费者可以自主控制消费消息的速率。 5、反序列化 基本概念 在Kafka中保存的数据都是字节数组。消息者接收消息后需要将消息反序列化为指定的数据格式进行处理。消费者通过key.deserializer和value.deserializer指定key和value的序列化器。Kafka使用org.apache.kafka.common.serialization.DeserializerT接口定义序列化器。Kafka已实现的序列化器有ByteArrayDeserializer、ByteBufferDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、StringDeserializer、LongDeserializer、ShortDeserializer。 自定义反序列化器 实现org.apache.kafka.common.serialization.DeserializerT接口并实现其中的deserializer方法。 public class UserDeserializer implements DeserializerUser {Overridepublic void configure(MapString, ? configs, boolean isKey) {}Overridepublic User deserialize(String topic, byte[] data) {ByteBuffer allocate ByteBuffer.allocate(data.length);allocate.put(data);allocate.flip();int userId allocate.getInt();int length allocate.getInt();System.out.println(length);String username new String(data, 8, length);return new User(userId, username);}Overridepublic void close() {} } 6、位移提交 位移 kafka分区消息的偏移量。kafka中有一个主题专门用于保存消费者的偏移量。消费者与分区一一对应消费者在消费分区消息时需要向kafka提交自己的位移(偏移量)信息kafka只记录该消费者在对应分区的偏移量信息。消费者向kafka提交偏移量的过程叫做位移提交。位移提交分为自动提交和手动提交也分为同步提交和异步提交。 6.1、自动提交 开启⾃动提交 enable.auto.committruekafka默认为自动提交。配置⾃动提交间隔Consumer端 auto.commit.interval.ms 默认 5s。 自动提交模式下Kafka会保证在开始调⽤ poll ⽅法时提交上次 poll 返回的所有消息因此⾃动提交不会出现消息丢失但会重复消费比如 Consumer 每 5s 提交一次offset 假设提交 offset 后的 3s 发⽣了 Rebalance Rebalance 之后的所有 Consumer 从上⼀次提交的 offset 处继续消费 因此 Rebalance 发⽣前 3s 的消息会被重复消费 6.2、手动提交 同步提交 while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常} } 使⽤ KafkaConsumer#commitSync()会提交 KafkaConsumer#poll() 返回的最新 offset ⼿动同步提交可以控制offset提交的时机和频率 调⽤ commitSync 时Consumer 处于阻塞状态直到 Broker 返回结果 会影响 TPS 如果提交间隔过长consumer重启后会有更多的消息被重复消费。 异步提交 while (true) {ConsumerRecordsString, String records consumer.poll(3_000);process(records); // 处理消息consumer.commitAsync((offsets, exception) - {if (exception ! null) {handle(exception);}}); } 使⽤ KafkaConsumer#commitAsync()会提交 KafkaConsumer#poll() 返回的最新 offset commitAsync出现问题不会⾃动重试可通过异步提交与同步提交相结合的方式解决。 7、再均衡 7.1、定义与基本概念 也叫做重平衡主要是为了让消费组下的消费者来重新分配主题下的每一个分区。再均衡的触发条件有如下三个 消费组内成员变更增加和减少消费者⽐如消费者宕机退出消费组或者新增一个消费者。主题的分区数发⽣变更kafka⽬前只⽀持增加分区当增加的时候就会触发再均衡。订阅的主题发⽣变化比如消费者组使⽤正则表达式订阅主题⽽恰好⼜新建了对应的主题就会触发再均衡。 7.2、缺陷 再均衡过程中消费者无法从kafka消费消息。如果kafka节点过多再均衡过程会及其耗时(数分钟甚至小时)过程中kafka基本处于不可用状态。 7.3、如何避免再均衡 完全避免那不可能因为你无法保证消费者不会故障。但是我们可以通过避免增加分区、增加订阅的主题、增加消费者这几种情况减少再均衡的触发。 但有时候kafka会错误地认为一个正常的消费者已经挂掉从而触发再均衡。我们要做的就是避免这种情况。 消费者和kafka之间通过心跳机制来做健康检查。当消费者宕机、网络阻塞或是消费者因负载过重没来得及发送心跳时kafka都会认为消费者挂掉了。所以设置合理的健康检查参数可以有效减少再均衡的发生。比较重要的参数如下 session.timout.ms控制⼼跳超时时间推荐设置为6sheartbeat.interval.ms控制⼼跳发送频率频率越高越不容易误判但也会消耗更多资源推荐设置为2smax.poll.interval.ms控制poll的间隔消费者poll数据后需要⼀些处理再进⾏拉取。如果两次拉取时间间隔超过这个参数设置的值那么消费者就会被踢出消费者组。推荐为消费者处理消息最长耗时 1分钟。 7.4、如何进行组内分区分配 有三种分配策略RangeAssignor和RoundRobinAssignor以及StickyAssignor。 7.5、谁来执行再均衡和消费组管理 kafka里有一个角色叫做Group Coordinator用于执行消费组的管理。 Group Coordinator——每个消费组分配一个消费组协调器⽤于组管理和位移管理。当消费组的第一个消费者启动的时候它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信。 8、消费者拦截器 作用 消费者在拉取了分区消息后会先通过反序列化对key和value进行处理然后可通过设置消费者拦截器对消息进行处理允许更改消费者接收到的消息或者做一些监控、日志处理应用程序处理消费者拉取的分区消息 自定义消费者拦截器 ConsumerInterceptor方法抛出的异常会被捕获、记录但是不会向下传播。如果用户配置了错误的key或value类型参数消费者不会抛出异常而仅仅是记录下来。 自定义消费者拦截器需要实现org.apache.kafka.clients.consumer.ConsumerInterceptorK, V 接口并实现其中的configure()、onConsume()、onCommit()、close()方法其中 onConsume()该方法在poll方法返回之前调用调用结束后poll方法就返回消息了。可通过该方法修改消费者消息返回新的消息。onCommit()当消费者提交偏移量时调用该方法。close()用于关闭该拦截器用到的资源如打开的文件、连接的数据库等。configure()用于获取消费者的参数配置。 public class MyInterceptor implements ConsumerInterceptorString, String {Overridepublic ConsumerRecordsString, String onConsume(ConsumerRecordsString, String records) {// poll方法返回结果之前最后要调用的方法System.out.println(MyInterceptor – 开始);// 消息不做处理直接返回return records;}Overridepublic void onCommit(MapTopicPartition, OffsetAndMetadata offsets) {// 消费者提交偏移量的时候经过该方法System.out.println(MyInterceptor – 结束);}Overridepublic void close() {// 用于关闭该拦截器用到的资源如打开的文件连接的数据库等}Overridepublic void configure(MapString, ? configs) {// 用于获取消费者的设置参数configs.forEach((k, v) - {System.out.println(k \t v);});} } 以上内容为个人学习理解如有问题欢迎在评论区指出。 部分内容截取自网络如有侵权联系作者删除。
- 上一篇: 杭州知名网站制作公司网站广告条效果
- 下一篇: 杭州做购物网站电商网页设计理念
相关文章
-
杭州知名网站制作公司网站广告条效果
杭州知名网站制作公司网站广告条效果
- 技术栈
- 2026年03月21日
-
杭州知名网站制作公司太原在线制作网站
杭州知名网站制作公司太原在线制作网站
- 技术栈
- 2026年03月21日
-
杭州知名的互联网公司商城网站的seo优化改怎么做
杭州知名的互联网公司商城网站的seo优化改怎么做
- 技术栈
- 2026年03月21日
-
杭州做购物网站电商网页设计理念
杭州做购物网站电商网页设计理念
- 技术栈
- 2026年03月21日
-
杭州做企业网站计算网站制作教程
杭州做企业网站计算网站制作教程
- 技术栈
- 2026年03月21日
-
杭州做网站多少钱wordpress分享QQ插件
杭州做网站多少钱wordpress分享QQ插件
- 技术栈
- 2026年03月21日
