中交建设集团网站建设明星网站的目的论文

当前位置: 首页 > news >正文

中交建设集团网站,建设明星网站的目的论文,要如何做才能拥有自己的网站呢,如何在百度发视频推广一、消费模式 1、pull(拉)模式(kafka采用这种方式) consumer采用从broker中主动拉取数据。 存在问题#xff1a;如果kafka中没有数据#xff0c;消费者可能会陷入循环中#xff0c;一直返回空数据 2、push(推)模式 由broker决定消息发送频率#xff0c;很难适应所有消费者…一、消费模式 1、pull(拉)模式(kafka采用这种方式) consumer采用从broker中主动拉取数据。 存在问题如果kafka中没有数据消费者可能会陷入循环中一直返回空数据 2、push(推)模式 由broker决定消息发送频率很难适应所有消费者的消费速率。 二、总体工作流程 案例一单独消费者并订阅主题 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties;public class KafkaConsumerTest {public static void main(String[] args) {Properties properties new Properties();//集群地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092,hadoop103:9092);//反序列化方式properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//消费者组必须指定properties.put(ConsumerConfig.GROUP_ID_CONFIG, test_group);//创建消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);//订阅主题ListString topicList new ArrayList();topicList.add(first);kafkaConsumer.subscribe(topicList);//消费数据while (true){try {ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String record : consumerRecords) {System.out.println(record.key() ——— record.value());}}catch (Exception e){e.printStackTrace();}}} } 控制台输出
案例二单独消费者订阅主题分区 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties;public class KafkaConsumerTest {public static void main(String[] args) {Properties properties new Properties();//集群地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092,hadoop103:9092);//反序列化方式properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//消费者组必须指定properties.put(ConsumerConfig.GROUP_ID_CONFIG, test_group);//创建消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);//订阅主题分区ListTopicPartition topicPartitionList new ArrayList();topicPartitionList.add(new TopicPartition(first, 0));kafkaConsumer.assign(topicPartitionList);//消费数据while (true){try {ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String record : consumerRecords) {System.out.println(record.key() ——— record.value());}}catch (Exception e){e.printStackTrace();}}} } 只消费了发往分区0的数据
案例三消费者组 启动多个消费案例一的消费者会自动指定消费的分区(partition) 启动3个消费者一个消费者消费一个分区 三、消费者组 由多个consumer组成(条件:groupid相同),是逻辑上的一个订阅者。 每个消费者负责消费不同分区的数据一个分区只能由一个组内消费者消费消费者组之间互不影响 1、初始化流程 coordinator辅助实现消费者组的初始化和分区的分配 coordinator节点选择groupid的hashCode值%50(consumer_offsets的分区数量) 例如groupid的hashCode11%501那么consumer_offsets主题的1号分区在哪个broker上就选择这个节点的coordinator作为这个消费者组的老大消费者组下所有的消费者提交offset的时候就往这个分区去提交offset 1.组内每个消费者向选中的coordinator节点发送joinGroup请求2.coordinator节点选择一个consumer作为leader3.coordinator节点把要消费的topic情况发送给消费者leader4.消费者leader负责制定消费方案5.把消费方案发送给coordinator节点6.coordinator节点把消费方案发送给各consumer7.每个消费者都会和coordinator节点保持心跳(默认3s)一旦超时(session.timeout.ms45s),该消费者会被移除并触发再平衡或者消费者处理的时间过长(max.poll.interval.ms5分钟)也会被移除并触发再平衡 2、分区分配以及再平衡 到底由哪个消费者来消费哪个partition的数据 分配策略Range、RoundRobin、Sticky、CooperativeStick配置参数partition.assignment.strategy(默认RangeCooperativeStick) import org.apache.kafka.clients.consumer.; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties;public class KafkaConsumerTest {public static void main(String[] args) {Properties properties new Properties();//集群地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092,hadoop103:9092);//反序列化方式properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//消费者组必须指定properties.put(ConsumerConfig.GROUP_ID_CONFIG, test_group);//设置分区分配策略,多个策略使用逗号拼接properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());//创建消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);//订阅主题ListString topicList new ArrayList();topicList.add(first);//再平衡的时候会触发ConsumerRebalanceListenerkafkaConsumer.subscribe(topicList, new ConsumerRebalanceListener() {// 重新分配完分区之前调用Overridepublic void onPartitionsRevoked(CollectionTopicPartition partitions) {System.out.println(回收的分区);for (TopicPartition partition : partitions) {System.out.println(partition partition);}}// 重新分配完分区后调用Overridepublic void onPartitionsAssigned(CollectionTopicPartition partitions) {System.out.println(重新得到的分区);for (TopicPartition partition : partitions) {System.out.println(partition partition);}}});//消费数据while (true){try {ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String record : consumerRecords) {System.out.println(record);}}catch (Exception e){e.printStackTrace();}}} } range 分配策略对同一个topic里面的分区序号排序对消费者按字母排序通过partition数量/consumer数量(如果除不尽那么前面几个消费者将会多消费1个分区) 这个只是针对一个topic而言C0消费者多消费一个分区影响不是很大但是如果这个消费者组消费多个topic容易产生数据倾斜 再平衡机制某一个消费者挂掉后45秒内产生的数据将会由某一个消费者代为消费45秒后产生的数据会重新分配 RoundRobin 分配策略对集群中所有的Topic而言把所有的partition和所有的consumer都列出来然后按照hashCode进行排序最后通过轮询算法来分配partition给各个消费者再平衡机制轮询分配(不是按数据是按分区) Sticky 分配策略分配带粘性执行一次新的分配时考虑原有的分配再平衡机制打散尽量均匀分配(不是按数据是按分区) 四、offset 1、默认维护位置 主题__consumer_offset keygroup.id topic 分区号 value当前offset的值 每隔一段时间kafka内部会对这个topic进行压缩(compact)也就是每一个group.id topic 分区号保留最新数据 2、自动提交offset 是否开启自动提交enable.auto.commit默认true 自动提交时间间隔auto.commit.interval.ms默认5s 基于时间的提交难以把握 3、手动提交offset 类别同步提交(commitSync)、异步提交(commitAsync) 相同点提交一批数据的最高偏移量 不同点同步阻塞当前现场失败会自动重试异步没有重试机制可以提交失败。 4.指定offset消费 如果没有初始偏移量(消费者第一次消费)或者服务器上不存在当前偏移量(被删除)如何指定offset进行消费 auto.offset.resetearliest(默认) | latest | none 在代码中设置方式为properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,earliest) earliest自动将偏移量重置为最早的偏移量(–from-beginning)latest自动将偏移量重置为最新的偏移量none没有偏移量抛出异常 除了这三中还可以自己来指定位置或者指定时间 指定位置开始消费案例 import org.apache.kafka.clients.consumer.; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.;public class KafkaConsumerTest {public static void main(String[] args) {Properties properties new Properties();//集群地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092,hadoop103:9092);//反序列化方式properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//消费者组必须指定properties.put(ConsumerConfig.GROUP_ID_CONFIG, test_group);//创建消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);//订阅主题ListString topicList new ArrayList();topicList.add(first);kafkaConsumer.subscribe(topicList);SetTopicPartition assignment new HashSet();while (assignment.size() 0){kafkaConsumer.poll(Duration.ofSeconds(1));//获取到消费者分区分配信息有了分区分配信息才能开始消费assignment kafkaConsumer.assignment();}//遍历所有分区并指定offset从100的位置开始消费for (TopicPartition partition : assignment) {kafkaConsumer.seek(partition, 100);}//消费数据while (true){try {ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String record : consumerRecords) {System.out.println(record);}}catch (Exception e){e.printStackTrace();}}} } 指定时间开始消费案例把指定的时间转为offset import org.apache.kafka.clients.consumer.; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.*;public class KafkaConsumerTest {public static void main(String[] args) {Properties properties new Properties();//集群地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092,hadoop103:9092);//反序列化方式properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//消费者组必须指定properties.put(ConsumerConfig.GROUP_ID_CONFIG, test_group);//创建消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);//订阅主题ListString topicList new ArrayList();topicList.add(first);kafkaConsumer.subscribe(topicList);SetTopicPartition assignment new HashSet();while (assignment.size() 0){kafkaConsumer.poll(Duration.ofSeconds(1));//获取到消费者分区分配信息有了分区分配信息才能开始消费assignment kafkaConsumer.assignment();}HashMapTopicPartition, Long timestampMap new HashMap();for (TopicPartition partition : assignment) {//一天前的毫秒数timestampMap.put(partition, System.currentTimeMillis() - 1*24*3600*1000);}//获取毫秒数对应的offset位置MapTopicPartition, OffsetAndTimestamp offsetAndTimestampMap kafkaConsumer.offsetsForTimes(timestampMap);OffsetAndTimestamp offsetAndTimestamp;//给每个patition设置offset位置for (TopicPartition partition : assignment) {offsetAndTimestamp offsetAndTimestampMap.get(partition);kafkaConsumer.seek(partition, offsetAndTimestamp.offset());}//消费数据while (true){try {ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String record : consumerRecords) {System.out.println(record);}}catch (Exception e){e.printStackTrace();}}} } 五、消费者事务 使用消费者事务进行精准一次消费将消费过程和提交offset过程做原子操作绑定。解决重复消费和漏消费问题 重复消费由自动提交offset引起。漏消费设置手动提交offset提交offset时数据还未落盘消费者进程被kill那么offset已经提交但是数据未处理导致这部分内存中数据丢失 六、数据挤压 消费能力不足增加分区数量同时提高消费者数量(注意分区数量≥消费者数量)处理不及时 拉去数据 / 处理时间 生产速度 拉去数据/处理时间生产速度 拉去数据/处理时间生产速度提高每批次拉去的数量。fetch.max.bytes(一次拉取得最大字节数默认524288050m)、max.poll.records(一次poll数据最大条数默认500条)