网站开发后乙方把源代码交给甲方网站建设财务计划与预测

当前位置: 首页 > news >正文

网站开发后乙方把源代码交给甲方,网站建设财务计划与预测,深圳网站建设qwyx100,iss服务器网站建设#x1f44f;作者简介#xff1a;大家好#xff0c;我是爱吃芝士的土豆倪#xff0c;24届校招生Java选手#xff0c;很高兴认识大家#x1f4d5;系列专栏#xff1a;Spring源码、JUC源码、Kafka原理#x1f525;如果感觉博主的文章还不错的话#xff0c;请#x1f44… 作者简介大家好我是爱吃芝士的土豆倪24届校招生Java选手很高兴认识大家系列专栏Spring源码、JUC源码、Kafka原理如果感觉博主的文章还不错的话请三连支持一下博主哦博主正在努力完成2023计划中源码溯源一探究竟联系方式nhs19990716加我进群大家一起学习一起进步一起对抗互联网寒冬 文章目录 API 开发producer 生产者生产者 api 示例必要的参数配置发送消息发后即忘( fire-and-forget同步发送sync 异步发送async API 开发consumer 消费subscribe 订阅主题消费者组再均衡分区分配策略Range StrategyRound-Robin StrategSticky StrategyCooperative Sticky Strategy 消费者组再均衡流程GroupCoordinator 介绍 eager 协议再均衡步骤细节定位 Group Coordinator加入组 Join The Group组信息同步 SYNC Group心跳联系 HEART BEAT再均衡流程 assign 订阅主题subscribe 与 assign 的区别取消订阅消息的消费模式指定位移消费自动提交消费者偏移量手动提交消费者偏移量调用 kafka api手动提交位移时机的选择消费者提交偏移量方式的总结 API 开发producer 生产者 生产者 api 示例 一个正常的生产逻辑需要具备以下几个步骤 1配置生产者参数及创建相应的生产者实例 2构建待发送的消息 3发送消息 4关闭生产者实例 首先引入 maven 依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.3.1/version /dependency采用默认分区方式将消息散列的发送到各个分区当中 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties; /kafka生产者api代码示例/ public class MyProducer {public static void main(String[] args) throws InterruptedException {Properties props new Properties();//设置 kafka 集群的地址 必选props.put(bootstrap.servers, doitedu01:9092,doitedu02:9092,doitedu03:9092);//ack 模式取值有 01-1all all 是最慢但最安全的 消息发送应答级别props.put(acks, all);//序列化器 因为业务数据有各种类型的但是kafka底层存储里面不可能有各种类型的只能是序列化的字节所以不管你要发什么东西给它都要提供一个序列化器帮你能够把key value序列化成二进制的字节// 因为kafka底层的存储是没有类型维护机制的用户所发的所有数据类型都必须 序列化成byte[]所以kafka的producer需要一个针对用户所发送的数据类型的序列化工具类且这个序列化工具类需要实现kafka所提供的序列工具接口。props.put(key.serializer,org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer,org.apache.kafka.common.serialization.StringSerializer);/需要额外的指定泛型key value/ProducerString, String producer new KafkaProducer(props);for (int i 0; i 100; i)// 其调用是异步的数据的发送动作在producer的底层是异步线程的producer.send(new ProducerRecordString, String(test,Integer.toString(i), dd:i));// 在这里面可以通过逻辑判断去指定发送到那个topic中//Thread.sleep(100);producer.close();} }消息对象 ProducerRecord除了包含业务数据外还包含了多个属性 public class ProducerRecordK, V {private final String topic;private final Integer partition;private final Headers headers;private final K key;private final V value;private final Long timestamp;其发送方法中根据参数的不同有不同的构造方法 其实这样也就意味着我们可以把消息发送到不同的topic。 必要的参数配置 Kafka 生产者客户端 KakaProducer 中有 3 个参数是必填的。 bootstrap.servers / key.serializer / value.serializer为了防止参数名字符串书写错误可以使用如下方式进行设置 pro.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); pro.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());发送消息 创建生产者实例和构建消息之后 就可以开始发送消息了。发送消息主要有 3 种模式 发后即忘( fire-and-forget 发后即忘它只管往 Kafka 发送并不关心消息是否正确到达。 在大多数情况下这种发送方式没有问题 不过在某些时候比如发生不可重试异常时会造成消息的丢失。 这种发送方式的性能最高可靠性最差。 FutureRecordMetadata send producer.send(rcd);同步发送sync
try {producer.send(rcd).get(); } catch (Exception e) {e.printStackTrace(); }因为Future的get方法是同步阻塞的。 异步发送async
回调函数会在 producer 收到 ack 时调用为异步调用该方法有两个参数分别是 RecordMetadata 和Exception如果 Exception 为 null说明消息发送成功如果 Exception 不为 null说明消息发送失败。 注意消息发送失败会自动重试不需要我们在回调函数中手动重试。 import org.apache.kafka.clients.producer.; import java.util.Properties; public class MyProducer {public static void main(String[] args) throws InterruptedException {Properties props new Properties();// Kafka 服务端的主机名和端口号props.put(bootstrap.servers, doitedu01:9092,doitedu02:9092,doitedu03:9092);// 等待所有副本节点的应答props.put(acks, all);// 消息发送最大尝试次数props.put(retries, 0);// 一批消息处理大小props.put(batch.size, 16384);// 增加服务端请求延时props.put(linger.ms, 1);// 发送缓存区内存大小props.put(buffer.memory, 33554432);// key 序列化props.put(key.serializer,org.apache.kafka.common.serialization.StringSerializer);// value 序列化props.put(value.serializer,org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String kafkaProducer new KafkaProducer(props);for (int i 0; i 50; i) {kafkaProducer.send(new ProducerRecordString, String(test, hello i),new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (metadata ! null) {System.out.println(metadata.partition() - metadata.offset());}}});}kafkaProducer.close();} }API 开发consumer 消费 import org.apache.kafka.clients.consumer.; import java.util.Arrays; import java.util.Properties; public class MyConsumer {public static void main(String[] args) {Properties props new Properties();// 定义 kakfa 服务的地址不需要将所有 broker 指定上// 客户端只要知道一台服务器就能通过这一台服务器来获知整个集群的信息所有的服务器、主机名等// 如果你只填写一台万一你得客户端启动的时候宕机了不在线那就无法连接到集群了// 如果你填写了堕胎有一个好处就是万一连不上其中一个可以去连接其它的props.put(bootstrap.servers, doitedu01:9092);// 制定 consumer groupprops.put(group.id, g1);// 按照一个时间间隔自动去提交偏移量// 是否自动提交 offsetprops.put(enable.auto.commit, true);// 自动提交 offset 的时间间隔props.put(auto.commit.interval.ms, 1000);// key 的反序列化类props.put(key.deserializer,org.apache.kafka.common.serialization.StringDeserializer);// value 的反序列化类props.put(value.deserializer,org.apache.kafka.common.serialization.StringDeserializer);// kafka的消费者默认是从属组之前所记录的偏移量开始消费如果找不到之前记录的偏移量则从如下参数配置的策略确定消费起始偏移量// 如果没有消费偏移量记录则自动重设为起始 offsetlatest, earliest, none/earliest 自动重置到每个分区的最前一条消息latest 自动重置到每个分区的最新一条消息none 没有重置策略/props.put(auto.offset.reset,earliest);// 定义 consumerKafkaConsumerString, String consumer new KafkaConsumer(props);// 消费者订阅的 topic, 可同时订阅多个// subscribe订阅是需要参与消费者组的再均衡机制才能真正获得自己要消费的topic及其分区// 只要消费者组里的消费者 变化了 就要发生再均衡consumer.subscribe(Arrays.asList(first, test,test1));// 显式指定消费起始偏移量(如果同时设置了消费者 偏移策略的话以手动指定的为准)// 在设置消费分区起始偏移量这里存在一个点如果此时到这里了然后消费者组再均衡机制还没有做完那么就会报错因为可能这个消费者还没有被分配到这个分区 针对这个问题其实动态再分配是有一个过程 和 时间的谁也不知道要等多久所以最好想的sleep就不容易实现了。想要解决这个问题有两种办法1.在这个过程中 拉一次数据能拉到就代表再均衡机制完成了 consumer.poll(Long.MAX_VALVE);这里是无意义的拉一次数据主要是为了确保分区分配已完成然后就能够去定位偏移量了。但是这种方式不符合最初的设计初衷如果是使用subscribe来订阅主题那就意味着是应该参与这个组的均衡的参与了那就不要去指定组的偏移量了应该听从组的分配。2.既然要自己指定一个确定的起始消费位置那通常隐含之意就是不需要去参与消费者组的自动再均衡机制那么就不要使用subscribe来订阅主题consumer.assign(Arrays.asList(new TopicPartition(ddd,0))) 使用这个是不参与消费者的自动再均衡的。//TopicPartition first0 new TopicPartition(first,0);//TopicPartition first1 new TopicPartition(first,1);//consumer.seek(first0,10);//consumer.seek(first1,15);/kafka消费者的起始消费位置有两种决定机制1.手动指定了起始位置它肯定从你指定的位置开始2.如果没有手动指定位置它会在找消费组之前所记录的偏移量开始3.如果之前的位置也获取不到就看参数 auto.offset.reset 所指定的重置策略/while (true) {// 读取数据读取超时时间为 100msConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records)// ConsumerRecord中不光有用户的业务数据还有kafka塞入的元数据String key record.key();String value record.value();// 本条数据所属的topicString topic record.topic();// 本条数据所属的分区int partition record.partition// 本条数据的offsetlong offset record.offset();// 当前这条数据所在分区的leader的朝代纪年OptionalInteger leaderEpoch record.leaderEpoch();// 在kafka的数据底层存储中不光有用户的业务数据还有大量元数据timestamp就是其中之一记录本条数据的时间戳但是时间戳有两种类型本条数据的创建时间生产者、本条数据的追加时间broker写入log文件的时间TimestampType timestampType record.timestampType();long timestamp record.timestamp();// 数据头是生产者在写入数据时附加进去的相当于用户自己的元数据// 在生产者发送数据的时候有一个构造方法可以允许你自己携带自己的 headersHeaders headers record.headers();System.out.printf(offset %d, key %s, value %s%n, record.offset(),record.key(), record.value());}} }如果消息还没生产到指定的位置呢这是一个很有趣的问题到底是等还是报错 kafka-console-consumer.sh –bootstrap-server doit01:9092 –topic test –offset 100000 –partition 0 假设分区0 中并没有offset 100000 的消息执行之后并不会报错但是如果超标了就会自动重置到最新的lastest。 如果如果指定的offset大于最大可用的offset那么就会定义到最后一条消息。 subscribe 订阅主题 subscribe 有如下重载方法 public void subscribe(CollectionString topics,ConsumerRebalanceListener listener) public void subscribe(CollectionString topics) public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) public void subscribe(Pattern pattern)通过这几个构造函数来看其中有ConsumerRebalanceListener listener 其实就是 再均衡 的监听器再均衡的过程中会调用这个方法。 Properties props new Properties(); // 从配置文件中加载写好的参数 props.load(Consumer.class.getClassLoader.getResourceAsStream(consumer.properties)); // 手动set一些参数进去 props.setProperty(); ……KafkaConsumerString,String consumer new KafkaConsumer(props);// reb-1 主题 3个分区 // reb-2 主题 2个分区 consumer.subscribe(Arrays.asList(reb-1,reb-2),new ConsumerRebalanceListener(){// 再均衡分配过程中消费者会取消先前所分配的主题、分区// 取消了之后consumer会调用下面的方法public void onPartitionsRevoked(CollectionTopicPartition partitions){}// 再均衡过程中消费者会重新分配到新的主题、分区// 分配了新的主题 和 分区之后consumer底层会调用下面的方法public void onPartitionAssigned(CollectionTopicPartition partitions){} }); 但是以上的过程 懒加载只有消费者真正 开始 poll的时候才会实现再均衡分配的过程。 现有的再均衡原则就是每次有消费者增减 都会重新分配其实就是先全部取消然后又重新分配了呢这过程中肯定存在消耗得先把工作暂停把偏移量记好另外一个人接手的时候还需要另外去读偏移量重新从对应的位置开始。 而在kafka2.4.1中解决了这个重分配的问题。但是大多数使用的框架没有到这个版本或者所使用的如spark flink等底层所依赖的kafka没有2.4.1这个版本。 消费者组再均衡分区分配策略 消费者组的意义何在为了提高数据处理的并行度 会触发 rebalance 的事件可能是如下任意一种 有新的消费者加入消费组。有消费者宕机下线消费者并不一定需要真正下线例如遇到长时间的 GC 、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时GroupCoordinator 会认为消费者己下线。有消费者主动退出消费组发送 LeaveGroupRequest 请求比如客户端调用了 unsubscrible()方法取消对某些主题的订阅。消费组所对应的 GroupCoorinator 节点发生了变更。消费组内所订阅的任一主题或者主题的分区数量发生变化。 将分区的消费权从一个消费者移到另一个消费者称为再均衡rebalance如何 rebalance 也涉及到分区分配策略。 kafka 有两种的分区分配策略range默认 和 round robin新版本中又新增了另外 2 种 我们可以通过 partition.assignment.strategy 参数选择 range 或 roundrobin。 partition.assignment.strategy 参数默认的值是 range。 partition.assignment.strategyorg.apache.kafka.clients.consumer.RoundRobinAssignor partition.assignment.strategyorg.apache.kafka.clients.consumer.RangeAssignor 这个参数属于“消费者”参数 Range Strategy 先将消费者按照 client.id 字典排序然后按 topic 逐个处理针对一个 topic将其 partition 总数/消费者数得到 商 n 和 余数 m则每个 consumer 至少分到 n个分区且前 m 个 consumer 每人多分一个分区 举例说明 2假设有 TOPIC_A 有 5 个分区由 3 个 consumerC1,C2,C3来消费 53 得到商 1余 2则每个消费者至少分 1 个分区前两个消费者各多 1 个分区 C1: 2 个分区C2:2 个分区, C3:1 个分区 接下来就按照“区间”进行分配 C1: TOPIC_A-0 TOPIC_A-1 C2: TOPIC_A-2 TOPIC_A_3 C3: TOPIC_A-4举例说明 2假设 TOPIC_A 有 5 个分区TOPIC_B 有 3 个分区由 2 个 consumerC1,C2来消费 先分配 TOPIC_A 52 得到商 2余 1则 C1 有 3 个分区C2 有 2 个分区得到结果 C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 C2: TOPIC_A-3 TOPIC_A-4再分配 TOPIC_B 32 得到商 1余 1则 C1 有 2 个分区C2 有 1 个分区得到结果 C1: TOPIC_B-0 TOPIC_B-1 C2: TOPIC_B-2最终分配结果 C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-0 TOPIC_B-1 C2: TOPIC_A-3 TOPIC_A-4 TOPIC_B-2如果共同订阅的主题很多那也就意味着排在前面的消费者拿到的分区会明显多余排在后面的。 而消费者本身有一个id是根据id号去排序 以上就是该种模式的弊端其实就是一个topic一个topic去分的。这个问题尤其是在订阅多个topic的时候最明显分配单个topic的情况也就多一个分区。 Round-Robin Strateg 将所有主题分区组成 TopicAndPartition 列表并对 TopicAndPartition 列表按照其 hashCode 排序然后以轮询的方式分配给各消费者。 以上述问题来举例 先对 TopicPartition 的 hashCode 排序假如排序结果如下 TOPIC_A-0 TOPIC_B-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-1 TOPIC_A-3 TOPIC_A-4 TOPIC_B- 然后按轮询方式分配 C1: TOPIC_A-0 TOPIC_A-1 TOPIC_B-1 TOPIC_A-4 C2: TOPIC_B-0 TOPIC_A-2 TOPIC_A-3 TOPIC_B-2 Sticky Strategy 对应的类叫做 org.apache.kafka.clients.consumer.StickyAssignor sticky 策略的特点 要去打成最大化的均衡尽可能保留各消费者原来分配的分区 再均衡的过程中还是会让各消费者先取消自身的分区然后再重新分配只不过是分配过程中会尽量让原来属于谁的分区依然分配给谁 以一个例子来看 —开始 C1:A-P0 B-P1 B-P2 C2:B-P0 A-P1—加入C3后再分配 Range Strategy C1:A-P0 A-P1 C2:B-P0 B-P2 C3:B-P1Sticky Strategy C1:A-P0 B-P1 C2:B-P0 A-P1 C3:B-P2Cooperative Sticky Strategy 对应的类叫做 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor最新的一种 2.4.1 sticky 策略的特点 逻辑与 sticky 策略一致支持 cooperative 再均衡机制再均衡的过程中不会让所有消费者取消掉所有分区然后再进行重分配影响到谁就针对那个消费者进行即可 消费者组再均衡流程 消费组在消费数据的时候有两个角色进行组内的各事务的协调 角色 1 Group Coordinator 组协调器 位于服务端就是某个 broker角色 2 Group Leader 组长 位于消费端就是消费组中的某个消费者 GroupCoordinator 介绍 每个消费组在服务端对应一个 GroupCoordinator 其进行管理GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。 消费者客户端中由 ConsumerCoordinator 组件负责与 GroupCoordinator 行交互 ConsumerCoordinator 和 GroupCoordinator 最重要的职责就是负责执行消费者 rebalance 操作 eager 协议再均衡步骤细节 定位 Group Coordinator coordinator 在我们组记偏移量的__consumer_offsets 分区的 leader 所在 broker 上 查找 Group Coordinator 的方式 先根据消费组 groupid 的 hashcode 值计算它应该所在consumer_offsets 中的分区编号 Utils.abc(groupId.hashCode) % groupMetadataTopicPartitionCount groupMetadataTopicPartitionCount 为 __consumer_offsets 的 分 区 总 数 这 个 可 以 通 过 broker 端 参 数 offset.topic.num.partitions 来配置默认值是 50找到对应的分区号后再寻找此分区 leader 副本所在 broker 节点则此节点即为自己的 Grouping Coordinator 加入组 Join The Group 此阶段的重要操作之 1选举消费组的 leader private val members new mutable.HashMap[String, MemberMetadata]var leaderid members.keys.head set集合本身无序的取头部的一个自然也是无序的消费组 leader 的选举策略就是随机 此阶段的重要操作之 2选择分区分配策略 最终选举的分配策略基本上可以看作被各个消费者支持的最多的策略具体的选举过程如下 1收集各个消费者支持的所有分配策略组成候选集 candidates。 2每个消费者从候选集 candidates 找出第一个自身支持的策略为这个策略投上一票。 3计算候选集中各个策略的选票数选票数最多的策略即为当前消费组的分配策略如果得票一样那就以组长的为主。 其实此逻辑并不需要 consumer 来执行而是由 Group Coordinator 来执行。 组信息同步 SYNC Group 此阶段主要是由消费组 leader 将分区分配方案通过 Group Coordinator 来转发给组中各消费者 心跳联系 HEART BEAT 进入这个阶段之后消费组中的所有消费者就会处于正常工作状态。 各消费者在消费数据的同时保持与 Group Coordinator的心跳通信。 消费者的心跳间隔时间由参数 heartbeat.interval.ms 指定默认值为 3000 即这个参数必须比 session.timeout.ms 参 数 设 定 的 值 要 小 一 般 情 况 下 heartbeat.interval.ms 的 配 置 值 不 能 超 过 session.timeout.ms 配置值的 13 。这个参数可以调整得更低以控制正常重新平衡的预期时间 如果一个消费者发生崩溃并停止读取消息那么 GroupCoordinator 会等待一小段时间确认这个消费者死亡之后才会触发再均衡。在这一小段时间内死掉的消费者并不会读取分区里的消息。 这 个 一 小 段 时 间 由 session.timeout. ms 参 数 控 制 该 参 数 的 配 置 值 必 须 在 broker 端 参 数 group.min.session.timeout. ms 默认值为 6000 即 6 秒和 group.max.session. timeout. ms 默认值为 300000 即 5 分钟允许的范围内 再均衡流程 eager 协议的再均衡过程整体流程如下图 特点再均衡发生时所有消费者都会停止工作等待新方案的同步 Cooperative 协议的再均衡过程整体流程如下图 特点cooperative 把原来 eager 协议的一次性全局再均衡化解成了多次的小均衡并最终达到全局均衡的收敛状态 指定集合方式订阅主题 consumer.subscribe(Arrays.asList(topicl));consumer.subscribe(Arrays.asList(topic2))正则方式订阅主题 如果消费者采用的是正则表达式的方式subscribe(Pattern)订阅 在之后的过程中如果有人又创建了新的主题并且主题名字与正表达式相匹配那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题并且可以处理不同的类型那么这种订阅方式就很有效。 正则表达式的方式订阅的示例如下 consumer.subscribe(Pattern.compile (topic.* ));利用正则表达式订阅主题可实现动态订阅 assign 订阅主题 消费者不仅可以通过 KafkaConsumer.subscribe() 方法订阅主题还可直接订阅某些主题的指定分区 在 KafkaConsumer 中提供了 assign() 方法来实现这些功能此方法的具体定义如下 public void assign(CollectionTopicPartition partitions)这个方法只接受参数 partitions用来指定需要订阅的分区集合。示例如下 consumer.assign(Arrays.asList(new TopicPartition (tpc_1 , 0),new TopicPartition(“tpc_2”,1))) ;subscribe 与 assign 的区别 通过 subscribe()方法订阅主题具有消费者自动再均衡功能
在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时分区分配关系会自动调整以实现消费负载均衡及故障自动转移。 assign() 方法订阅分区时是不具备消费者自动均衡的功能的 其实这一点从 assign()方法参数可以看出端倪两种类型 subscribe()都有 ConsumerRebalanceListener类型参数的方法而 assign()方法却没有。 取消订阅 既然有订阅那么就有取消订阅 可以使用 KafkaConsumer 中的 unsubscribe()方法采取消主题的订阅这个方法既可以取消通过subscribe( Collection方式实现的订阅也可以取消通过 subscribe(Pattem方式实现的订阅还可以取消通过 assign( Collection方式实现的订阅。示例码如下 consumer.unsubscribe();如果将 subscribe(Collection )或 assign(Collection集合参数设置为空集合作用与 unsubscribe方法相同如下示例中三行代码的效果相同 consumer.unsubscribe();consumer.subscribe(new ArrayListString()) ;consumer.assign(new ArrayListTopicPartition());消息的消费模式 Kafka 中的消费是基于拉取模式的。 消息的消费一般有两种模式推送模式和拉取模式。推模式是服务端主动将消息推送给消费者而拉模式是消费者主动向服务端发起请求来拉取消息。 Kafka 中的消息消费是一个不断轮询的过程消费者所要做的就是重复地调用 poll( ) 方法 poll( )方法返回的是所订阅的主题分区上的一组消息。 对于 poll ( ) 方法而言如果某些分区中没有可供消费的消息那么此分区对应的消息拉取的结果就为空如果订阅的所有分区中都没有可供消费的消息那么 poll( )方法返回为空的消息集 poll ( ) 方法具体定义如下 public ConsumerRecordsK, V poll(final Duration timeout)超时时间参数 timeout 用来控制 poll( ) 方法的阻塞时间在消费者的缓冲区里没有可用数据时会发生阻塞。如果消费者程序只用来单纯拉取并消费数据则为了提高吞吐率可以把 timeout 设置为Long.MAX_VALUE 消费者消费到的每条消息的类型为 ConsumerRecord public class ConsumerRecordK, V {public static final long NO_TIMESTAMP RecordBatch.NO_TIMESTAMP;public static final int NULL_SIZE -1;public static final int NULL_CHECKSUM -1;private final String topic;private final int partition;private final long offset;private final long timestamp;private final TimestampType timestampType;private final int serializedKeySize;private final int serializedValueSize;private final Headers headers;private final K key;private final V value;private volatile Long checksum;topic partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。 offset 表示消息在所属分区的偏移量。 timestamp 表示时间戳与此对应的 timestampType 表示时间戳的类型。 timestampType 有两种类型 CreateTime 和 LogAppendTime 分别代表消息创建的时间戳和消息追加 到日志的时间戳。 headers 表示消息的头部内容。 key value 分别表示消息的键和消息的值一般业务应用要读取的就是 value serializedKeySize、serializedValueSize 分别表示 key、value 经过序列化之后的大小如果 key 为空 则 serializedKeySize 值为 -1同样如果 value 为空则 serializedValueSize 的值也会为 -1 checksum 是 CRC32 的校验值。示例代码片段 /**

  • 订阅与消费方式 2 */ TopicPartition tp1 new TopicPartition(x, 0); TopicPartition tp2 new TopicPartition(y, 0); TopicPartition tp3 new TopicPartition(z, 0); ListTopicPartition tps Arrays.asList(tp1, tp2, tp3); consumer.assign(tps); while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (TopicPartition tp : tps) {ListConsumerRecordString, String rList records.records(tp);for (ConsumerRecordString, String r : rList) {r.topic();r.partition();r.offset();r.value();//do something to process record.}} }指定位移消费 有些时候我们需要一种更细粒度的掌控可以让我们从特定的位移处开始拉取消息而KafkaConsumer 中的 seek 方法正好提供了这个功能让我们可以追前消费或回溯消费。 seek方法的具体定义如下 public void seek(TopicPartiton partition,long offset)代码示例 public class ConsumerDemo3 指定偏移量消费 {public static void main(String[] args) {Properties props new Properties();props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,g002);props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,doit01:9092);props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,latest);// 是否自动提交消费位移props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 限制一次 poll 拉取到的数据量的最大值props.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,10240000);KafkaConsumerString, String consumer new KafkaConsumer(props);// assign 方式订阅 doit27-1 的两个分区TopicPartition tp0 new TopicPartition(doit27-1, 0);TopicPartition tp1 new TopicPartition(doit27-1, 1);consumer.assign(Arrays.asList(tp0,tp1));// 指定分区 0从 offset800 开始消费 分区 1从 offset650 开始消费consumer.seek(tp0,200);consumer.seek(tp1,250);// 开始拉取消息while(true){ConsumerRecordsString, String poll consumer.poll(Duration.ofMillis(3000));for (ConsumerRecordString, String rec : poll) {System.out.println(rec.partition(),rec.key(),rec.value(),rec.offset());}}} }自动提交消费者偏移量 Kafka 中默认的消费位移的提交方式是自动提交这个由消费者客户端参数 enable.auto.commit 配置默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次而是定期提交这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置默认值为 5 秒此参数生效的前提是 enable. auto.commit 参数为 true。 在默认的方式下消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交如果可以那么就会提交上一次轮询的位移。 Kafka 消费的编程逻辑中位移提交是一大难点自动提交消费位移的方式非常简便它免去了复杂的位移提交逻辑让编码更简洁。但随之而来的是重复消费和消息丢失的问题。 重复消费 假设刚刚提交完一次消费位移然后拉取一批消息进行消费在下一次自动提交消费位移之前消费者崩溃了那么又得从上一次位移提交的地方重新开始消费这样便发生了重复消费的现象对于再均衡的情况同样适用。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小但这样并不能避免重复消费的发送而且也会使位移提交更加频繁。 丢失消息 按照一般思维逻辑而言自动提交是延时提交重复消费可以理解那么消息丢失又是在什么情形下会发生的呢我们来看下图中的情形 拉取线程不断地拉取消息并存入本地缓存比如在 BlockingQueue 中另一个处理线程从缓存中读取消息并进行相应的逻辑处理。设目前进行到了第 yl 次拉取以及第 m 次位移提交的时候也就是x6 之前的位移己经确认提交了处理线程却还正在处理 x3 的消息此时如果处理线程发生了异常待其恢复之后会从第 m 次位移提交处也就是 x6 的位置开始拉取消息那么 x3 至 x6 之间的消息就没有得到相应的处理这样便发生消息丢失的现象。 手动提交消费者偏移量调用 kafka api 自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象但是在编程的世界里异常无可避免同时自动位移提交也无法做到精确的位移管理。在 Kafka 中还提供了手动位移提交的方式这样可以使得开发人员对消费位移的管理控制更加灵活。 很多时候并不是说拉取到消息就算消费完成而是需要将消息写入数据库、写入本地缓存或者是更加复杂的业务处理。在这些场景下所有的业务处理完成才能认为消息被成功消费 手动的提交方式可以让开发人员根据程序的逻辑在合适的时机进行位移提交。开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false 示例如下 props.put(ConsumerConf.ENABLE_AUTO_COMMIT_CONFIG, false);手动提交可以细分为同步提交和异步提交对应于 KafkaConsumer 中的 commitSync()和commitAsync()两种类型的方法。 同步提交的方式 commitSync()方法的定义如下 /**
  • 手动提交 offset */ while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String r : records) {//do something to process record.}consumer.commitSync(); }对于采用 commitSync()的无参方法它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的如果想寻求更细粒度的、更精准的提交那么就需要使用 commitSync()的另一个有参方法具体定义如下 public void commitSync(final MapTopicPartitionOffsetAndMetadata offsets示例代码如下 while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String r : records) {long offset r.offset();//do something to process record.TopicPartition topicPartition new TopicPartition(r.topic(), r.partition());consumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(offset1)));} }提交的偏移量 消费完的 record 的偏移量 1 因为__consumer_offsets 中记录的消费偏移量代表的是消费者下一次要读取的位置 异步提交方式 异步提交的方式 commitAsync在执行的时候消费者线程不会被阻塞可能在提交消费位移的结果还未返回之前就开始了新一次的拉取。异步提交可以让消费者的性能得到一定的增强。commitAsync 方法有一个不同的重载方法具体定义如下 示例代码 /**
  • 异步提交 offset */ while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String r : records) {long offset r.offset();//do something to process record.TopicPartition topicPartition new TopicPartition(r.topic(), r.partition());consumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(offset1)));consumer.commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset 1)), new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata map, Exception e) {if(e null ){System.out.println(map);}else{System.out.println(error commit offset);}}});} }手动提交位移时机的选择 数据处理完成之前先提交偏移量 可能会发生漏处理的现象数据丢失 反过来说这种方式实现了 at most once 的数据处理传递语义 数据处理完成之后再提交偏移量 可能会发生重复处理的现象数据重复 反过来说这种方式实现了 at least once 的数据处理传递语义 当然数据处理传递的理想语义是 exactly once精确一次 Kafka 也能做到 exactly once基于 kafka 的事务机制 消费者提交偏移量方式的总结 consumer 的消费位移提交方式 全自动 auto.offset.commit true定时提交到 consumer_offsets 半自动 auto.offset.commit false然后手动触发提交 consumer.commitSync()提交到 consumer_offsets 全手动 auto.offset.commit false写自己的代码去把消费位移保存到你自己的地方 mysql/zk/redis提交到自己所涉及的存储初始化时也需要自己去从自定义存储中查询到消费位移