南昌外贸网站建设建设部职称评审的网站
- 作者: 五速梦信息网
- 时间: 2026年03月21日 10:18
当前位置: 首页 > news >正文
南昌外贸网站建设,建设部职称评审的网站,营销型网站建设用途,网站建设搜索键如何设置链接背景
Flink版本 1.12.2 Kafka 客户端 2.4.1 在公司的Flink平台运行了一个读Kafka计算DAU的流程序#xff0c;由于公司Kafka的缩容#xff0c;直接导致了该程序一直在重启#xff0c;重启了一个小时都还没恢复#xff08;具体的所容操作是下掉了四台kafka broker#xff0…背景
Flink版本 1.12.2 Kafka 客户端 2.4.1 在公司的Flink平台运行了一个读Kafka计算DAU的流程序由于公司Kafka的缩容直接导致了该程序一直在重启重启了一个小时都还没恢复具体的所容操作是下掉了四台kafka broker而当时flink配置了12台kafka broker当时具体的现场如下
JobManaer上的日志如下
2023-10-07 10:02:52.975 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table[[default_catalog, default_database, ubt_start, watermark[-(LOCALTIMESTAMP, 1000:INTERVAL SECOND)]]]) (34⁄64) (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED on container_e08_1690538387235_2599_01_000010 task-xxxx-shanghai.emr.aliyuncs.com (dataPortxxxx).
org.apache.flink.streaming.connectors.kafka.internals.Handover\(ClosedException: nullat org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda\)createAndStartDiscoveryLoop\(2(FlinkKafkaConsumerBase.java:913)at java.lang.Thread.run(Thread.java:750)对应的 TaskManagertask-xxxx-shanghai.emr.aliyuncs.com上的日志如下2023-10-07 10:02:24.604 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientIdxxxx] Connection to node 46129 (sh-bs-b1-303-i14-kafka-129-46.ximalaya.local/192.168.129.46:9092) could not be established. Broker may not be available.2023-10-07 10:02:52.939 WARN org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(t) (34/64)#0 (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.internals.Handover\)ClosedException: nullat org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda\(createAndStartDiscoveryLoop\)2(FlinkKafkaConsumerBase.java:913)at java.lang.Thread.run(Thread.java:750)2023-10-07 10:04:58.205 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientIdxxx, groupIdxxxx] Connection to node -4 (xxxx:909) could not be established. Broker may not be available.
2023-10-07 10:04:58.205 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientIdxxx, groupIdxxxx] Bootstrap broker sxxxx:909 (id: -4 rack: null) disconnected
2023-10-07 10:04:58.206 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientIdxxx, groupIdxxxxu] Connection to node -5 (xxxx:9092) could not be established. Broker may not be available.
2023-10-07 10:04:58.206 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientIdxxx, groupIdxxxxu] Bootstrap broker xxxx:9092 (id: -5 rack: null) disconnected2023-10-07 10:08:15.541 WARN org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(xxx) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata当时Flink中kafka source的相关配置如下
scan.topic-partition-discovery.interval 300000
restart-strategy.type fixed-delay
restart-strategy.fixed-delay.attempts 50000000
jobmanager.execution.failover-strategy region结论以及解决
目前在kafka 消费端有两个参数default.api.timeout.ms默认60000request.timeout.ms默认30000这两个参数来控制kakfa的客户端从服务端请求超时也就是说每次请求的超时时间是30s超时之后可以再重试如果在60s内请求没有得到任何回应则会报TimeOutException,具体的见如下分析 我们在flink kafka connector中通过设置如下参数来解决
properties.default.api.timeout.ms 600000,
properties.request.timeout.ms 5000,
// max.block.ms是设置kafka producer的超时
properties.max.block.ms 600000,分析
在Flink中对于Kafka的Connector的DynamicTableSourceFactory是KafkaDynamicTableFactory这里我们只讨论kafka作为source的情况 而该类的方法createDynamicTableSource最终会被调用至于具体的调用链可以参考Apache Hudi初探(四)(与flink的结合)–Flink Sql中hudi的createDynamicTableSource/createDynamicTableSink/是怎么被调用–只不过把Sink改成Source就可以了所以最终会到KafkaDynamicSource类:
Overridepublic ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {final DeserializationSchemaRowData keyDeserialization createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);final DeserializationSchemaRowData valueDeserialization createDeserialization(context, valueDecodingFormat, valueProjection, null);final TypeInformationRowData producedTypeInfo context.createTypeInformation(producedDataType);final FlinkKafkaConsumerRowData kafkaConsumer createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);return SourceFunctionProvider.of(kafkaConsumer, false);}该类的getScanRuntimeProvider方法会被调用所有kafka相关的操作都可以追溯到FlinkKafkaConsumer类继承FlinkKafkaConsumerBase中对于该类重点的方法如下: Overridepublic final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore context.getOperatorStateStore();this.unionOffsetStates stateStore.getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));… }Overridepublic void open(Configuration configuration) throws Exception {// determine the offset commit modethis.offsetCommitMode OffsetCommitModes.fromConfiguration(getIsAutoCommitEnabled(),enableCommitOnCheckpoints,((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());// create the partition discovererthis.partitionDiscoverer createPartitionDiscoverer(topicsDescriptor,getRuntimeContext().getIndexOfThisSubtask(),getRuntimeContext().getNumberOfParallelSubtasks());this.partitionDiscoverer.open();subscribedPartitionsToStartOffsets new HashMap();final ListKafkaTopicPartition allPartitions partitionDiscoverer.discoverPartitions();if (restoredState ! null) {…} else {// use the partition discoverer to fetch the initial seed partitions,// and set their initial offsets depending on the startup mode.// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily// determined// when the partition is actually read.switch (startupMode) {。。。default:for (KafkaTopicPartition seedPartition : allPartitions) {subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());}}if (!subscribedPartitionsToStartOffsets.isEmpty()) {switch (startupMode) {…case GROUP_OFFSETS:LOG.info(Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {},getRuntimeContext().getIndexOfThisSubtask(),subscribedPartitionsToStartOffsets.size(),subscribedPartitionsToStartOffsets.keySet());}} else {LOG.info(Consumer subtask {} initially has no partitions to read from.,getRuntimeContext().getIndexOfThisSubtask());}}this.deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext(), metricGroup - metricGroup.addGroup(user)));}Overridepublic void run(SourceContextT sourceContext) throws Exception {if (subscribedPartitionsToStartOffsets null) {throw new Exception(The partitions were not set for the consumer);}// initialize commit metrics and default offset callback methodthis.successfulCommits this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);this.failedCommits this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);final int subtaskIndex this.getRuntimeContext().getIndexOfThisSubtask();this.offsetCommitCallback new KafkaCommitCallback() {Overridepublic void onSuccess() {successfulCommits.inc();}Overridepublic void onException(Throwable cause) {LOG.warn(String.format(Consumer subtask %d failed async Kafka commit.,subtaskIndex),cause);failedCommits.inc();}};// mark the subtask as temporarily idle if there are no initial seed partitions;// once this subtask discovers some partitions and starts collecting records, the subtasks// status will automatically be triggered back to be active.if (subscribedPartitionsToStartOffsets.isEmpty()) {sourceContext.markAsTemporarilyIdle();}LOG.info(Consumer subtask {} creating fetcher with offsets {}.,getRuntimeContext().getIndexOfThisSubtask(),subscribedPartitionsToStartOffsets);// from this point forward:// - snapshotState will draw offsets from the fetcher,// instead of being built from subscribedPartitionsToStartOffsets// - notifyCheckpointComplete will start to do work (i.e. commit offsets to// Kafka through the fetcher, if configured to do so)this.kafkaFetcher createFetcher(sourceContext,subscribedPartitionsToStartOffsets,watermarkStrategy,(StreamingRuntimeContext) getRuntimeContext(),offsetCommitMode,getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),useMetrics);if (!running) {return;}if (discoveryIntervalMillis PARTITION_DISCOVERY_DISABLED) {kafkaFetcher.runFetchLoop();} else {runWithPartitionDiscovery();}}Overridepublic final void snapshotState(FunctionSnapshotContext context) throws Exception {…HashMapKafkaTopicPartition, Long currentOffsets fetcher.snapshotCurrentState();if (offsetCommitMode OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.EntryKafkaTopicPartition, Long kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}… }}Overridepublic final void notifyCheckpointComplete(long checkpointId) throws Exception {…fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);…}主要是initializeStateopen,run,snapshotState,notifyCheckpointComplete这四个方法下面带着问题逐一介绍一下 注意:对于initializeState和open方法的先后顺序可以参考StreamTask类,其中如下的调用链
invoke()||\/
beforeInvoke()||\/
operatorChain.initializeStateAndOpenOperators||\/
FlinkKafkaConsumerBase.initializeState||\/
FlinkKafkaConsumerBase.open就可以知道 initializeState方法的调用是在open之前的
initializeState方法
这里做的事情就是从持久化的State中恢复kafkaTopicOffset信息,我们这里假设是第一次启动
open方法
offsetCommitMode offsetCommitMode OffsetCommitModes.fromConfiguration 这里获取设置的kafka offset的提交模式,这里会综合enable.auto.commit的配置默认是true,enableCommitOnCheckpoints默认是truecheckpointing设置为true(默认是false)综合以上得到的值为OffsetCommitMode.ON_CHECKPOINTSpartitionDiscoverer 这里主要是进行kafka的topic的分区发现,主要路程是 partitionDiscoverer.discoverPartitions,这里的涉及的流程如下AbstractPartitionDiscoverer.discoverPartitions||\/
AbstractPartitionDiscoverer.getAllPartitionsForTopics ||\/
KafkaPartitionDiscoverer.kafkaConsumer.partitionsFor||\/
KafkaConsumer.partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs)) //这里的defaultApiTimeoutMs 来自于default.api.timeout.ms||\/
Fetcher.getTopicMetadata //这里面最后抛出 new TimeoutException(Timeout expired while fetching topic metadata);||\/
Fetcher.sendMetadataRequest NetworkClient.leastLoadedNode //这里会根据某种策略选择配置的broker的节点||\/
client.poll(future, timer) NetworkClient.poll selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // 这里的 defaultRequestTimeoutMs 来自配置request.timeout.ms 综上所述discoverPartitions做的就是根据某种策略选择配置的broker节点对每个节点进行请求request.timeout.ms超时后再根据策略选择broker直至总的时间达到了配置的default.api.timeout.ms,这里默认default.api.timeout.ms 为60秒request.timeout.ms为30秒subscribedPartitionsToStartOffsets 根据startupMode模式默认是StartupMode.GROUP_OFFSETS(默认从上次消费的offset开始消费),设置开启的kafka offset,这在kafkaFetcher中会用到
run方法
设置一些指标successfulCommits/failedCommitsKafkaFetcher 这里主要是从kafka获取数据以及如果有分区发现则循环进kafka的topic分区发现这里会根据配置scan.topic-partition-discovery.interval默认配置为0,实际中设置的为300000即5分钟。该主要的流程为在方法runWithPartitionDiscovery: private void runWithPartitionDiscovery() throws Exception {final AtomicReferenceException discoveryLoopErrorRef new AtomicReference();createAndStartDiscoveryLoop(discoveryLoopErrorRef);kafkaFetcher.runFetchLoop();// make sure that the partition discoverer is waked up so that// the discoveryLoopThread exitspartitionDiscoverer.wakeup();joinDiscoveryLoopThread();// rethrow any fetcher errorsfinal Exception discoveryLoopError discoveryLoopErrorRef.get();if (discoveryLoopError ! null) {throw new RuntimeException(discoveryLoopError);}} createAndStartDiscoveryLoop 这个会启动单个线程以while sleep方式实现以scan.topic-partition-discovery.interval为间隔来轮询进行Kafka的分区发现注意这里会吞没Execption,并不会抛出异常 private void createAndStartDiscoveryLoop(AtomicReferenceException discoveryLoopErrorRef) {discoveryLoopThread new Thread(…while (running) {…try {discoveredPartitions partitionDiscoverer.discoverPartitions();} catch (AbstractPartitionDiscoverer.WakeupException| AbstractPartitionDiscoverer.ClosedException e) {break;}if (running !discoveredPartitions.isEmpty()) {kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);}if (running discoveryIntervalMillis ! 0) {try {Thread.sleep(discoveryIntervalMillis);} catch (InterruptedException iex) {break;}}}} catch (Exception e) {discoveryLoopErrorRef.set(e);} finally {// calling cancel will also let the fetcher loop escape// (if not running, cancel() was already called)if (running) {cancel();}}},Kafka Partition Discovery for getRuntimeContext().getTaskNameWithSubtasks());discoveryLoopThread.start();
}这里的kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);中subscribedPartitionStates变量会把发现分区信息保存起来这在kafkaFetcher.runFetchLoop中会设置已经提交的offset信息,并且会在snapshotState会用到 kafkaFetcher.runFetchLoop 这里会从kafka拉取数据并设置kafka的offset,具体的流程如下 runFetchLoop ||\/subscribedPartitionStates 这里会获取*subscribedPartitionStates*变量||\/partitionConsumerRecordsHandler||\/emitRecordsWithTimestamps||\/emitRecordsWithTimestamps||\/partitionState.setOffset(offset);这里的offset就是从消费的kafka记录中获取的
snapshotState方法
这里会对subscribedPartitionStates中的信息进行处理主要是加到pendingOffsetsToCommit变量中
offsetCommitMode 这里上面说到是OffsetCommitMode.ON_CHECKPOINTS,如果是ON_CHECKPOINTS,则会从fetcher.snapshotCurrentState获取subscribedPartitionStates 并加到pendingOffsetsToCommit并持久化到unionOffsetStates中这实际的kafka offset commit操作在notifyCheckpointComplete中
notifyCheckpointComplete方法
获取到要提交的kafka offset信息并持久化保存kafka中
参考
open 和 initailizeState的初始化顺序A single failing Kafka broker may cause jobs to fail indefinitely with TimeoutException: Timeout expired while fetching topic metadata
- 上一篇: 南昌市建网站的公司天水市秦州区建设局网站
- 下一篇: 南昌网站seo费用邢台微商城制作设计
相关文章
-
南昌市建网站的公司天水市秦州区建设局网站
南昌市建网站的公司天水市秦州区建设局网站
- 技术栈
- 2026年03月21日
-
南昌市建网站的公司关键词拓展工具有哪些
南昌市建网站的公司关键词拓展工具有哪些
- 技术栈
- 2026年03月21日
-
南昌企业做网站做画册好的网站
南昌企业做网站做画册好的网站
- 技术栈
- 2026年03月21日
-
南昌网站seo费用邢台微商城制作设计
南昌网站seo费用邢台微商城制作设计
- 技术栈
- 2026年03月21日
-
南昌网站seo外包服务网络推广工作室 是干啥的
南昌网站seo外包服务网络推广工作室 是干啥的
- 技术栈
- 2026年03月21日
-
南昌网站搭建做一个wordpress模板下载
南昌网站搭建做一个wordpress模板下载
- 技术栈
- 2026年03月21日
