陕西网站建设价位多少wordpress星座
- 作者: 五速梦信息网
- 时间: 2026年03月21日 09:25
当前位置: 首页 > news >正文
陕西网站建设价位多少,wordpress星座,wordpress建设QQ登录,设计素材免费下载网站有哪些1. 消费者的结构 能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。 这里面要涉及到一个动作叫做拉取。 首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用#xff0c;比如flume采集数据然后交给spark或者flink进行计算分析#xff0c;但是flume采用的…1. 消费者的结构 能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。 这里面要涉及到一个动作叫做拉取。 首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用比如flume采集数据然后交给spark或者flink进行计算分析但是flume采用的就是消息的push方式这个方式不能够保证推送的数据消费者端一定会消费完毕会出现数据的反压问题这个问题很难解决所以才出现了消息队列kafka它可以起到一个缓冲的作用生产者部分将数据直接全部推送到kafka然后消费者从其中拉取数据这边如果也采用推送的方式那么也就在计算端会出现反压问题所以kafka的消费者一般都是采用拉的方式pull并不是push 1.1 消费者组 在一个topic中存在多个分区可以分摊压力实现负载均衡那么整体topic中的数据会很多如果消费者只有一个的话很难全部消费其中的数据压力也会集中在一个消费者中并且在大数据行业中几乎所有的计算架构都是分布式的集群模式那么这个集群模式中计算的节点也会存在多个这些节点都是可以从kafka中拉取数据的所有消费者不可能只有一个一般情况下都会有多个消费者。 正因为topic存在多个分区每个分区中的数据是独立的那么消费者最好也是一个一个和分区进行一一对应的所以有几个分区应该对应存在几个消费者是最好的。 这个和分蛋糕是一样的一个蛋糕分成几块那么有几个人吃应该是对应关系的 消费者组内每个消费者负责消费不同分区的数据一个分区只能由一个组内消费者消费。 消费者组之间互不影响。所有的消费者都属于某个消费者组即消费者组是逻辑上的一个订阅者。 2. 消费者实现 在实现消费者的时候我们需要知道几个消费者的配置重要参数 参数解释bootstrap.servers集群地址key.deserializerkey反序列化器value.deserializervalue反序列化器group.id消费者组id 首先创建消费者对象 消费者对象订阅相应的topic然后拉取其中的数据进行消费 整体代码如下 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,nn1:9092);pro.put(ConsumerConfig.GROUP_ID_CONFIG,hainiu_group);//设定组idpro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定key的反序列化器pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定value的反序列化器KafkaConsumerString, String consumer new KafkaConsumerString, String(pro);ListString topics Arrays.asList(topic_a,topic_b);//一个消费者可以消费多个分区的数据consumer.subscribe(topics);//订阅这个topicwhile (true){//死循环要一直消费数据ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));//间隔一秒钟消费一次数据拉取一批数据过来IteratorConsumerRecordString, String it records.iterator();while(it.hasNext()){ConsumerRecordString, String record it.next();System.out.println(record.topic()-record.partition()- record.offset()-record.key()-record.value());}}} } [hexuanhadoop106 datas]$ kafka-console-producer.sh –bootstrap-server hadoop106:9092 –topic topic_b1 2 3 4
消费者与分区之间的对应关系 一个消费者组中的消费者和分区是一一对应的关系一个分区应该对应一个消费者但是如果消费者多了那么有的消费者就没有分区消费如果消费者少了那么会出现一个消费者消费多个分区的情况。
首先创建topic_c 用于测试分区和消费者的对应关系
kafka-topics.sh –bootstrap-server hadoop106:9092 –create –topic topic_c –partitions 3 –replication-factor 2
启动两个消费者 刚才我们写的消费者main方法运行两次
然后分别在不同的分区使用生产者发送数据看数据在消费者中的打印情况
首先选择任务可以并行执行 选择任务修改配置 我们可以看到允许多实例并行执行 启动两次这个时候我们就有了两个消费者实例 生产者线程:分别向三个分区中发送1 2 3元素 package com.hainiu.kafka.consumer;/*** ClassName : test3_producer* Package : com.hainiu.kafka.consumer* Description** Author HeXua* Create 2024/11/3 23:40* Version 1.0*/import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class test3_producer {public static void main(String[] args) {Properties pro new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop106:9092);pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducerString, String producer new KafkaProducerString, String(pro);ProducerRecordString, String record1 new ProducerRecord(topic_d, 0,null,1);ProducerRecordString, String record2 new ProducerRecord(topic_d, 1,null,2);ProducerRecordString, String record3 new ProducerRecord(topic_d, 2,null,3);producer.send(record1);producer.send(record2); // producer.send(record3);producer.close();} } 可以看到有的消费者消费了两个分区的数据 如果启动三个消费者会发现每个人消费一个分区的数据 如果启动四个消费者 我们发现有一个消费者没有数据
1 消费多topic的数据 不同组消费不同的topic或者一个组可以消费多个topic都是可以的 3.2 多个组消费一个topic 同一个topic可以由多个消费者组进行消费数据并且相互之间是没有任何影响的 修改同一份代码的组标识不同。启动两个实例查看里面的消费信息 pro.put(ConsumerConfig.GROUP_ID_CONFIG,hainiu_group1);pro.put(ConsumerConfig.GROUP_ID_CONFIG,hainiu_group2);//分别修改消费者组的id不同 package com.hainiu.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,nn1:9092);pro.put(ConsumerConfig.GROUP_ID_CONFIG,hainiu_group);pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumerString, String consumer new KafkaConsumerString, String(pro);ListString topics Arrays.asList(topic_c);//订阅多个topic的数据变化consumer.subscribe(topics);while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String it records.iterator();while(it.hasNext()){ConsumerRecordString, String record it.next();System.out.println(record.topic()-record.partition()- record.offset()-record.key()-record.value());}}} }
- 上一篇: 陕西网站建设多少钱建设门户网站都需要什么意思
- 下一篇: 陕西网站建设开发网址与网站的区别
相关文章
-
陕西网站建设多少钱建设门户网站都需要什么意思
陕西网站建设多少钱建设门户网站都需要什么意思
- 技术栈
- 2026年03月21日
-
陕西省住房城乡建设厅网站韩国男女直接做的视频网站
陕西省住房城乡建设厅网站韩国男女直接做的视频网站
- 技术栈
- 2026年03月21日
-
陕西省西安市建设局网站wordpress管理工具
陕西省西安市建设局网站wordpress管理工具
- 技术栈
- 2026年03月21日
-
陕西网站建设开发网址与网站的区别
陕西网站建设开发网址与网站的区别
- 技术栈
- 2026年03月21日
-
陕西网站建设哪家强青岛做网站公司
陕西网站建设哪家强青岛做网站公司
- 技术栈
- 2026年03月21日
-
陕西网站建设哪家强张家港做网站的
陕西网站建设哪家强张家港做网站的
- 技术栈
- 2026年03月21日
