网站正在建设中 免费初次建设网站的技巧
- 作者: 五速梦信息网
- 时间: 2026年03月21日 07:19
当前位置: 首页 > news >正文
网站正在建设中 免费,初次建设网站的技巧,建设个招聘网站,如何建立起个人网站Storm集成Kafka 一、整合说明二、写入数据到Kafka三、从Kafka中读取数据一、整合说明 Storm 官方对 Kafka 的整合分为两个版本#xff0c;官方说明文档分别如下#xff1a; Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持#xff1b;Storm Kafka …Storm集成Kafka 一、整合说明 二、写入数据到Kafka 三、从Kafka中读取数据 一、整合说明 Storm 官方对 Kafka 的整合分为两个版本官方说明文档分别如下 Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持Storm Kafka Integration (0.10.x) : 包含 Kafka 新版本的 consumer API主要对 Kafka 0.10.x 提供整合支持。 这里我服务端安装的 Kafka 版本为 2.2.0(Released Mar 22, 2019) 按照官方 0.10.x 的整合文档进行整合不适用于 0.8.x 版本的 Kafka。 二、写入数据到Kafka 2.1 项目结构 2.2 项目主要依赖 propertiesstorm.version1.2.2/storm.versionkafka.version2.2.0/kafka.version /propertiesdependenciesdependencygroupIdorg.apache.storm/groupIdartifactIdstorm-core/artifactIdversion\({storm.version}/version/dependencydependencygroupIdorg.apache.storm/groupIdartifactIdstorm-kafka-client/artifactIdversion\){storm.version}/version/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion${kafka.version}/version/dependency /dependencies2.3 DataSourceSpout /*** 产生词频样本的数据源/ public class DataSourceSpout extends BaseRichSpout {private ListString list Arrays.asList(Spark, Hadoop, HBase, Storm, Flink, Hive);private SpoutOutputCollector spoutOutputCollector;Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector spoutOutputCollector;}Overridepublic void nextTuple() {// 模拟产生数据String lineData productData();spoutOutputCollector.emit(new Values(lineData));Utils.sleep(1000);}Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields(line));}/** 模拟数据/private String productData() {Collections.shuffle(list);Random random new Random();int endIndex random.nextInt(list.size()) % (list.size()) 1;return StringUtils.join(list.toArray(), \t, 0, endIndex);}}产生的模拟数据格式如下 Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm2.4 WritingToKafkaApp /** 写入数据到 Kafka 中/ public class WritingToKafkaApp {private static final String BOOTSTRAP_SERVERS hadoop001:9092;private static final String TOPIC_NAME storm-topic;public static void main(String[] args) {TopologyBuilder builder new TopologyBuilder();// 定义 Kafka 生产者属性Properties props new Properties();/** 指定 broker 的地址清单清单里不需要包含所有的 broker 地址生产者会从给定的 broker 里查找其他 broker 的信息。 不过建议至少要提供两个 broker 的信息作为容错。/props.put(bootstrap.servers, BOOTSTRAP_SERVERS);/** acks 参数指定了必须要有多少个分区副本收到消息生产者才会认为消息写入是成功的。 acks0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。* acks1 : 只要集群的首领节点收到消息生产者就会收到一个来自服务器成功响应。* acksall : 只有当所有参与复制的节点全部收到消息时生产者才会收到一个来自服务器的成功响应。*/props.put(acks, 1);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);KafkaBolt bolt new KafkaBoltString, String().withProducerProperties(props).withTopicSelector(new DefaultTopicSelector(TOPIC_NAME)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());builder.setSpout(sourceSpout, new DataSourceSpout(), 1);builder.setBolt(kafkaBolt, bolt, 1).shuffleGrouping(sourceSpout);if (args.length 0 args[0].equals(cluster)) {try {StormSubmitter.submitTopology(ClusterWritingToKafkaApp, new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster new LocalCluster();cluster.submitTopology(LocalWritingToKafkaApp,new Config(), builder.createTopology());}} }2.5 测试准备工作 进行测试前需要启动 Kakfa
启动Kakfa Kafka 的运行依赖于 zookeeper需要预先启动可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的
zookeeper启动命令
bin/zkServer.sh start# 内置zookeeper启动命令 bin/zookeeper-server-start.sh config/zookeeper.properties启动单节点 kafka 用于测试
bin/kafka-server-start.sh config/server.properties2. 创建topic
创建用于测试主题
bin/kafka-topics.sh –create –bootstrap-server hadoop001:9092 –replication-factor 1 –partitions 1 –topic storm-topic# 查看所有主题bin/kafka-topics.sh –list –bootstrap-server hadoop001:90923. 启动消费者 启动一个消费者用于观察写入情况启动命令如下
bin/kafka-console-consumer.sh –bootstrap-server hadoop001:9092 –topic storm-topic –from-beginning2.6 测试
可以用直接使用本地模式运行也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin 进行打包打包命令如下
mvn clean package -D maven.test.skiptrue启动后消费者监听情况如下 三、从Kafka中读取数据
3.1 项目结构 3.2 ReadingFromKafkaApp /*** 从 Kafka 中读取数据/ public class ReadingFromKafkaApp {private static final String BOOTSTRAP_SERVERS hadoop001:9092;private static final String TOPIC_NAME storm-topic;public static void main(String[] args) {final TopologyBuilder builder new TopologyBuilder();builder.setSpout(kafka_spout, new KafkaSpout(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);builder.setBolt(bolt, new LogConsoleBolt()).shuffleGrouping(kafka_spout);// 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动if (args.length 0 args[0].equals(cluster)) {try {StormSubmitter.submitTopology(ClusterReadingFromKafkaApp, new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster new LocalCluster();cluster.submitTopology(LocalReadingFromKafkaApp,new Config(), builder.createTopology());}}private static KafkaSpoutConfigString, String getKafkaSpoutConfig(String bootstrapServers, String topic) {return KafkaSpoutConfig.builder(bootstrapServers, topic)// 除了分组 ID,以下配置都是可选的。分组 ID 必须指定,否则会抛出 InvalidGroupIdException 异常.setProp(ConsumerConfig.GROUP_ID_CONFIG, kafkaSpoutTestGroup)// 定义重试策略.setRetry(getRetryService())// 定时提交偏移量的时间间隔,默认是 15s.setOffsetCommitPeriodMs(10_000).build();}// 定义重试策略private static KafkaSpoutRetryService getRetryService() {return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));} } 3.3 LogConsoleBolt /** 打印从 Kafka 中获取的数据*/ public class LogConsoleBolt extends BaseRichBolt {private OutputCollector collector;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collectorcollector;}public void execute(Tuple input) {try {String value input.getStringByField(value);System.out.println(received from kafka : value);// 必须 ack,否则会重复消费 kafka 中的消息collector.ack(input);}catch (Exception e){e.printStackTrace();collector.fail(input);}}public void declareOutputFields(OutputFieldsDeclarer declarer) {} }这里从 value 字段中获取 kafka 输出的值数据。 在开发中我们可以通过继承 RecordTranslator 接口定义了 Kafka 中 Record 与输出流之间的映射关系可以在构建 KafkaSpoutConfig 的时候通过构造器或者 setRecordTranslator() 方法传入并最后传递给具体的 KafkaSpout。 默认情况下使用内置的 DefaultRecordTranslator其源码如下FIELDS 中 定义了 tuple 中所有可用的字段主题分区偏移量消息键值。 public class DefaultRecordTranslatorK, V implements RecordTranslatorK, V {private static final long serialVersionUID -5782462870112305750L;public static final Fields FIELDS new Fields(topic, partition, offset, key, value);Overridepublic ListObject apply(ConsumerRecordK, V record) {return new Values(record.topic(),record.partition(),record.offset(),record.key(),record.value());}Overridepublic Fields getFieldsFor(String stream) {return FIELDS;}Overridepublic ListString streams() {return DEFAULT_STREAM;} }3.4 启动测试 这里启动一个生产者用于发送测试数据启动命令如下
bin/kafka-console-producer.sh –broker-list hadoop001:9092 –topic storm-topic本地运行的项目接收到从 Kafka 发送过来的数据 用例源码下载地址storm-kafka-integration 参考资料
Storm Kafka Integration (0.10.x)
相关文章
-
网站正在建设 英文翻译成都最好的编程培训机构
网站正在建设 英文翻译成都最好的编程培训机构
- 技术栈
- 2026年03月21日
-
网站正在建设 敬请期待百姓网二手车买卖
网站正在建设 敬请期待百姓网二手车买卖
- 技术栈
- 2026年03月21日
-
网站正能量免费下载网站模板 div
网站正能量免费下载网站模板 div
- 技术栈
- 2026年03月21日
-
网站正在建设中 模板 下载小型办公室装修效果图
网站正在建设中 模板 下载小型办公室装修效果图
- 技术栈
- 2026年03月21日
-
网站正在建设中 英文可以让网友帮做任务的网站
网站正在建设中 英文可以让网友帮做任务的网站
- 技术栈
- 2026年03月21日
-
网站正在建设中 源码下载百度深圳网站开发搜索
网站正在建设中 源码下载百度深圳网站开发搜索
- 技术栈
- 2026年03月21日



