网络水果有哪些网站可以做沈阳网站建设专业公司

当前位置: 首页 > news >正文

网络水果有哪些网站可以做,沈阳网站建设专业公司,广西网站,深圳积分商城网站建设kafka 案例 目录概述需求#xff1a; 设计思路实现思路分析1.kafka案例_API 带回调函数的生产者2.kafka案例_API生产者分区策略测试3.kafka案例_自定义分区的生产者4.kafka案例_API同步发送生产者5.kafka案例_API简单消费者5.kafka案例_API消费者重置offset 参考资料和推荐阅读… kafka 案例 目录概述需求 设计思路实现思路分析1.kafka案例_API 带回调函数的生产者2.kafka案例_API生产者分区策略测试3.kafka案例_自定义分区的生产者4.kafka案例_API同步发送生产者5.kafka案例_API简单消费者5.kafka案例_API消费者重置offset 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busyskip hardness,make a better result,wait for change,challenge Survive. happy for hardess to solve denpendies. 目录 概述 需求 设计思路 实现思路分析 1.kafka案例_API 带回调函数的生产者 以下是一个使用 Kafka 的 Java 生产者带回调函数的案例 import org.apache.kafka.clients.producer.;import java.util.Properties;public class ProducerExample {public static void main(String[] args) {Properties properties new Properties();properties.put(bootstrap.servers, localhost:9092);properties.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);properties.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer new KafkaProducer(properties);ProducerRecordString, String record new ProducerRecord(topic-name, key, value);producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null) {System.out.println(消息发送成功);} else {System.out.println(消息发送失败 exception.getMessage());}}});producer.close();} }请注意替换 bootstrap.servers 的值为 Kafka 服务器的地址和端口并替换 topic-name、key 和 value 分别为实际使用的主题、键和值。 在这个案例中我们使用 KafkaProducer 类创建一个 Kafka 生产者。然后通过创建一个 ProducerRecord 对象我们指定了要发送到的主题、键和值。 然后我们调用 producer.send() 方法来发送消息。此方法接受一个 Callback 参数该参数用于在消息发送完成后执行回调函数。在回调函数中我们可以检查发送结果并采取相应的操作。 最后我们调用 producer.close() 方法来关闭生产者。 2.kafka案例_API生产者分区策略测试 在Kafka中可以使用API生产者分区策略来决定将消息发送到哪个分区。以下是一个展示如何使用API生产者分区策略的示例代码。 首先创建一个新的Java类例如ProducerPartitionStrategyTest.java并导入Kafka相关的依赖项。 import org.apache.kafka.clients.producer.;import java.util.Properties;接下来定义一个自定义的Partitioner类用于实现分区策略。在这个例子中我们将根据消息的键来决定分区。如果键为偶数则将消息发送到分区0如果键为奇数则将消息发送到分区1。 class CustomPartitioner implements Partitioner {Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int numPartitions cluster.partitionCountForTopic(topic);int partition 0;try {int keyValue Integer.parseInt(key.toString());if (keyValue % 2 0) {partition 0;} else {partition 1;}} catch (NumberFormatException e) {partition Math.abs(key.hashCode() % numPartitions);}return partition;}Overridepublic void close() {// 不做任何操作}Overridepublic void configure(MapString, ? configs) {// 不做任何操作} }然后在主方法中创建一个Producer并设置自定义分区策略。 public class ProducerPartitionStrategyTest {public static void main(String[] args) {// 配置Kafka生产者Properties properties new Properties();properties.put(bootstrap.servers, localhost:9092);properties.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);properties.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);properties.put(partitioner.class, com.example.CustomPartitioner);// 创建Kafka生产者ProducerString, String producer new KafkaProducer(properties);// 发送消息for (int i 0; i 10; i) {String key String.valueOf(i);String value Message i;ProducerRecordString, String record new ProducerRecord(topic, key, value);producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception ! null) {System.err.println(Error producing message: exception.getMessage());} else {System.out.println(Message sent to partition metadata.partition() , offset metadata.offset());}}});}// 关闭生产者producer.close();} }在以上示例中我们首先配置了Kafka生产者的一些属性例如Kafka服务器的地址、键和值的序列化程序以及分区策略。然后我们创建了一个Kafka生产者并使用自定义分区策略将消息发送到Kafka集群中。 在发送消息的循环中我们创建了一个ProducerRecord对象它包含要发送的消息的主题、键和值。然后我们使用send()方法将消息发送到Kafka集群并使用回调函数处理发送结果。 最后我们关闭了生产者。 运行以上代码后你将会看到消息被发送到正确的分区并打印出消息的分区和偏移量的信息。 这就是一个简单的演示如何使用API生产者分区策略的例子。你可以根据自己的需求来实现自定义的分区策略并将消息发送到合适的分区中。 3.kafka案例_自定义分区的生产者 自定义分区的生产者示例代码 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.StringSerializer;import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random;public class CustomPartitionProducer {public static void main(String[] args) {// Kafka 服务器地址String bootstrapServers localhost:9092;// 主题名称String topic custom-partition-topic;// 创建生产者配置Properties props new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 创建自定义分区器props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());// 创建生产者KafkaProducerString, String producer new KafkaProducer(props);// 发送消息到自定义分区Random random new Random();for (int i 0; i 10; i) {String key key i;String value value i;producer.send(new ProducerRecord(topic, key, value));System.out.println(Sent message: key key , value value);}// 关闭生产者producer.close();}public static class CustomPartitioner implements org.apache.kafka.clients.producer.Partitioner {Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {ListPartitionInfo partitions cluster.partitionsForTopic(topic);int numPartitions partitions.size();// 这里自定义分区逻辑// 根据key的尾数来决定消息被发送到哪个分区int partition Integer.parseInt(key.toString().substring(key.toString().length() - 1)) % numPartitions;System.out.println(Custom partitioner: topic topic , key key , partition partition);return partition;}Overridepublic void close() {}Overridepublic void configure(MapString, ? configs) {}} }上面的代码示例首先创建一个自定义分区器CustomPartitioner然后在生产者配置中指定该分区器类 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());在自定义分区器中根据key的尾数来决定消息被发送到哪个分区 int partition Integer.parseInt(key.toString().substring(key.toString().length() - 1)) % numPartitions;然后启动生产者发送消息到指定的主题。每条消息的key都是以数字结尾的字符串根据key的尾数来选择分区。输出中会打印出消息的详细信息包括主题、key和分区信息。 4.kafka案例_API同步发送生产者 下面是使用Kafka客户端库进行API同步发送的一个示例 import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {String bootstrapServers localhost:9092;String topic test-topic;String key key1;String value value1;// 配置Kafka生产者Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 创建Kafka生产者实例ProducerString, String producer new KafkaProducer(props);// 创建消息实例ProducerRecordString, String record new ProducerRecord(topic, key, value);try {// 发送消息并等待返回结果RecordMetadata metadata producer.send(record).get();System.out.println(消息发送成功Topic: metadata.topic() , Partition: metadata.partition() , Offset: metadata.offset());} catch (Exception e) {System.out.println(消息发送失败 e.getMessage());} finally {// 关闭Kafka生产者producer.close();}} }这个示例使用了Kafka的Java客户端库并创建了一个Kafka生产者实例。代码中设置了Kafka服务器的地址、要发送的主题、消息的键和值。还配置了键和值的序列化器为StringSerializer。 然后创建了一个ProducerRecord实例来包装要发送的消息。通过调用producer.send(record).get()方法发送消息并等待返回结果。发送成功后通过返回的RecordMetadata对象获取到消息的元数据包括发送到的Topic、Partition和Offset。 最后关闭Kafka生产者。 5.kafka案例_API简单消费者 以下是一个简单的Kafka案例使用Kafka的Java API实现一个简单的消费者。 首先需要安装Kafka并启动Kafka服务。然后创建一个Kafka消费者来消费指定的主题。 import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Collections; import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {// Kafka broker的地址String bootstrapServers localhost:9092;// 消费者组的IDString groupId test-group;// 要消费的主题String topic test-topic;// 创建Kafka消费者的配置Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建Kafka消费者ConsumerString, String consumer new KafkaConsumer(props);// 订阅主题consumer.subscribe(Collections.singletonList(topic));// 消费消息while (true) {ConsumerRecordsString, String records consumer.poll(1000);// 处理每条消息records.forEach(record - {System.out.println(Received message: record.value());});}} }在这个例子中首先定义了Kafka broker的地址、消费者组的ID和要消费的主题。然后创建了一个Kafka消费者的配置并设置了必要的属性。然后创建了一个Kafka消费者并使用subscribe()方法订阅了指定的主题。最后使用poll()方法从Kafka集群拉取消息并使用forEach()方法对每条消息进行处理。 运行这个消费者应用程序它将开始消费指定主题的消息并打印出来。 注意这只是一个简单的Kafka消费者示例没有处理异常或实现自动提交偏移量。在实际应用中需要根据具体需求添加更多的处理逻辑。 5.kafka案例_API消费者重置offset 要在Java中重置Kafka消费者的偏移量(offset)您可以使用以下代码片段 import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition;import java.util.Arrays; import java.util.Properties;public class ConsumerOffsetResetExample {public static void main(String[] args) {String topic your-topic;String bootstrapServers localhost:9092;String groupId your-group-id;// 创建Kafka消费者配置Properties properties new Properties();properties.setProperty(bootstrap.servers, bootstrapServers);properties.setProperty(group.id, groupId);properties.setProperty(enable.auto.commit, false); // 禁用自动提交偏移量// 创建Kafka消费者KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);// 订阅主题consumer.subscribe(Arrays.asList(topic));// 将偏移量重置到最早的可用位置consumer.seekToBeginning(consumer.assignment());// 或者将偏移量重置到最新的可用位置// consumer.seekToEnd(consumer.assignment());// 处理消息try {while (true) {// 拉取消息// ConsumerRecordString, String record consumer.poll(Duration.ofMillis(100)).iterator().next();// 处理消息// …// 手动提交偏移量// consumer.commitSync();}} finally {// 关闭Kafka消费者consumer.close();}} }在上面的代码中我们使用seekToBeginning方法将偏移量重置为最早的可用位置。您还可以使用seekToEnd方法将偏移量重置为最新的可用位置。请根据您的需求选择适当的方法。 参考资料和推荐阅读 参考资料 官方文档 开源社区 博客文章 书籍推荐 暂无 欢迎阅读各位老铁如果对你有帮助点个赞加个关注呗同时期望各位大佬的批评指正~如果有兴趣可以加文末的交流群大家一起进步哈