专业的西安免费做网站产品推广活动策划方案

当前位置: 首页 > news >正文

专业的西安免费做网站,产品推广活动策划方案,做网站分层技术,电子商务网站规划与建设步骤一、Flink 时间语义类型 Event Time#xff1a;是事件创建的时间。它通常由事件中的时间戳描述#xff0c;例如采集的日志数据中#xff0c;每一条日志都会记录自己的生成时间#xff0c;Flink 通过时间戳分配器访问事件时间戳Ingestion Time #xff1a;是数据进入 Flink…一、Flink 时间语义类型 Event Time是事件创建的时间。它通常由事件中的时间戳描述例如采集的日志数据中每一条日志都会记录自己的生成时间Flink 通过时间戳分配器访问事件时间戳Ingestion Time 是数据进入 Flink 的时间Processing Time是每一个执行基于时间操作的算子的本地系统时间与机器相关默认的时间属性就是 Processing Time 二、EventTime 引入 Flink 默认是按照 ProcessingTime 来处理数据的 /**在 Flink 的流式处理中绝大部分情况推荐使用 eventTime一般只在 eventTime 无法使用时才会被迫使用 ProcessingTime 或者 Ing estionTime 。使用 EventTime 需要先引入 EventTime 的时间属性 */ public class EventTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//引入 EvenetTime//TimeCharacteristic 是一个枚举类有 ProcessingTime、IngestionTime 和 EventTime 三个属性env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);} }三、Watermark

  1. 数据乱序情况 正常情况下Flink 接收到的事件应该要是按照事件的产生时间 (EventTime) 的先后顺序排列的实际情况下事件从产生到进入 source 再到触发 operator其中间是有一个过程和时间的而且由于网络、分布式等原因会造成 Flink 接收到的事件的先后顺序不是严格按照事件的 EventTime 顺序排列的即所谓的乱序数据乱序数据的问题会造成窗口触发关闭的时间混乱计算不准确Flink 处理乱序数据的机制Watermark allowedLateness sideOutputLateData
  2. Watermark 介绍 Watermark 是一种使用延迟触发 window 执行来处理乱序数据的机制原理当设置 Watermark t 时 (即延迟时长为 t)则 Flink 每一次都会获取已经到达的数据中的最大的 EventTime然后判断 maxEventTime - t 是否等于某一个窗口的触发时间如果相等则认为属于这个窗口的所有数据都已经到达这个窗口被触发执行关闭也可能存在数据丢失在数据有序的流中相当于 Watermark 0即已经到达的数据中的最大的 EventTime 等于某一个窗口的触发时间则这个窗口被触发执行关闭一般将 Watermark 设置为乱序数据流中最大的迟到时间差
  3. Watermark 特点和行为 水位线 (Watermark) 是作为一个特殊的数据插入到数据流中的一个标记水位线 (Watermark) 在 Flink 程序中是一个常量类有一个时间戳属性用来表示当前事件时间的进展水位线 (Watermark) 是基于数据的 EventTime 时间戳生成的水位线 (Watermark) 的时间戳必须单调递增以确保任务的事件时间时钟一直向前推进
  4. Watermark 在任务间的传递 任务并行度不为 1Watermark 设置的位置越靠近 Source 端越好 一个任务会接收上游多个并行任务的数据也会向下游多个并行任务发送数据从上游多个并行任务接收 Watermark使用 Partition WM 分别存储接收到的不同分区任务的 Watermark并以其中最小的 Watermark 作为自己当前的事件时间向下游多个并行任务发送 Watermark采取广播的分区策略向下游的每一个任务都发送一份 Watermark如果后续 Watermark 没有变更则不会重复发送
  5. Watermark 引入 5.1 核心代码 /方法签名DataStream.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarksT)DataStream.assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarksT)参数1.AssignerWithPeriodicWatermarks继承 TimestampAssigner 接口周期性的生成 watermark常用实现类为BoundedOutOfOrdernessTimestampExtractor 和 AscendingTimestampExtractor2.AssignerWithPunctuatedWatermarks继承 TimestampAssigner 接口间断式地生成 watermark */ public class WatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//引入 EvenetTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamString dataStream env.socketTextStream(localhost, 7777);DataStreamSensorReading inputStream dataStream.map(new MapFunctionSensorReading() {Overridepublic SensorReading map(String value) throws Exception {String[] fields value.split(,);return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//有序数据设置事件时间戳(毫秒数)和watermark//不需要传递watermark延迟时间默认是当前事件时间戳 - 1ms 作为watermarkinputStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractorSensorReading() {Overridepublic long extractAscendingTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});//乱序数据设置事件时间戳(毫秒数)和watermark//BoundedOutOfOrdernessTimestampExtractor 构造方法必须传入watermark延迟时间//生成的watermark时间戳 当前所有事件的最大时间戳 - 延迟时间inputStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorSensorReading(Time.seconds(2)) {Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});env.execute();} }5.2 AssignerWithPeriodicWatermarks 系统会周期性地生成 watermark 并插入到数据流中默认周期是 200 毫秒 /设置watermark生成周期env.getConfig.setAutoWatermarkInterval(milliseconds);产生watermark的逻辑每隔 0.2 秒钟Flink 会调用 AssignerWithPeriodicWatermarks 的 getCurrentWatermark() 方法获取一个时间戳如果大于之前水位的时间戳新的 watermark 会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳则不会产生新的 watermark自定义watermark周期生成器实现 AssignerWithPeriodicWatermarks 接口并重写 getCurrentWatermark 和 extractTimestamp 方法 */ public class MyPeriodicAssigner implements AssignerWithPeriodicWatermarksSensorReading {private Long bound 60 * 1000L; // watermark延迟时间private Long maxTs Long.MIN_VALUE; // 当前最大时间戳NullableOverridepublic Watermark getCurrentWatermark() {return new Watermark(maxTs - bound);}Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp) {maxTs Math.max(maxTs, element.getTimestamp()); //获取当前最大的事件时间戳return element.getTimestamp();} }5.3 AssignerWithPunctuatedWatermarks 间断式地生成 watermark可以根据需要对每条数据进行条件判断筛选来确定是否生成 watermark public class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarksSensorReading {private Long bound 60 * 1000L; // 延迟时间NullableOverridepublic Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractedTimestamp) {if(lastElement.getId().equals(sensor_1)) {return new Watermark(extractedTimestamp - bound);} else {return null;}}Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp) {return element.getTimestamp();} }四、EventTime 的 window 操作
  6. 滚动时间窗口操作 /需求统计 15 秒内的最小温度值设置 2 秒的延迟 */ public class TumblingEventTimeWindowTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);/sensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1sensor_1,1547718207,36.3sensor_1,1547718209,32.8sensor_1,1547718212,37.1…/DataStreamString inputStream env.socketTextStream(localhost, 7777);DataStreamSensorReading dataStream inputStream.map(new MapFunctionSensorReading() {Overridepublic SensorReading map(String value) {String[] fields value.split(,);return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorSensorReading(Time.seconds(2)) {Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});//开窗聚合SingleOutputStreamOperatorSensorReading minTempStream dataStream.keyBy(id).timeWindow(Time.seconds(15)).minBy(temperature);minTempStream.print(minTemp);/输出的结果分析1.在接收到 sensor_1,1547718212,37.1 时触发了一个窗口关闭此时数据的 EventTime 为 1547718212由于 watermark 延迟时间设置为 2所以该窗口触发关闭的时间戳为 1547718212 - 2 1547718210该窗口的范围为 [1547718195,1547718210)2.当前第一个窗口是 [1547718195,1547718210)其起始点的确定规则为2.1 滚动时间窗口使用的窗口分配器为 TumblingEventTimeWindows 类2.2 TumblingEventTimeWindows 的 assignWindows 方法中调用 getWindowStartWithOffset 方法获取起始点2.3 getWindowStartWithOffset(timestamp, offset, windowSize)方法逻辑为 timestamp - (timestamp - offset windowSize) % windowSize默认 offset 为 0所以最终得到的起始点应该是 windowSize 的整数倍在本例中的起始点为 1547718199 - (1547718199-015)%15 15477181953.偏移量 offset一般是用来处理不同时区的数据*/env.execute();}
    }2. 迟到数据处理 /需求统计 15 秒内的最小温度值设置 2 秒的延迟并允许 1 分钟的迟到数据1 分钟后的数据写入侧输出流 */ public class TumblingEventTimeWindowDelayTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamString inputStream env.socketTextStream(localhost, 7777);DataStreamSensorReading dataStream inputStream.map(new MapFunctionSensorReading() {Overridepublic SensorReading map(String value) {String[] fields value.split(,);return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorSensorReading(Time.seconds(2)) {Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});OutputTagSensorReading outputTag new OutputTagSensorReading(late){};//开窗聚合SingleOutputStreamOperatorSensorReading minTempStream dataStream.keyBy(id).timeWindow(Time.seconds(15)).allowedLateness(Time.minutes(1));.sideOutputLateData(outputTag).minBy(temperature);minTempStream.print(minTemp);minTempStream.getSideOutput(outputTag).print(late);/依次输入数据sensor_1,1547718199,35.8sensor_1,1547718206,36.3sensor_1,1547718210,34.7sensor_1,1547718211,31sensor_1,1547718209,34.9sensor_1,1547718212,37.1sensor_1,1547718213,33sensor_1,1547718206,34.2sensor_1,1547718202,36…sensor_1,1547718272,34sensor_1,1547718203,30.6输出的结果分析1.在接收到 sensor_1,1547718212,37.1 时触发 [1547718195,1547718210) 窗口执行此时输出数据 sensor_1,1547718209,34.9此时 2 秒内的延迟数据能被处理 2.在接收到 sensor_1,1547718206,34.2 时由于设置了允许 1 分钟迟到所以 [1547718195,1547718210) 窗口仍然没有关闭此时会更新数据为 sensor_1,1547718206,34.2此时的系统时间戳为 1547718213 - 2 1547718211 - 1547718210 603.在接收到 sensor_1,1547718202,36 时[1547718195,1547718210) 窗口仍然会更新输出一次数据 sensor_1,1547718206,34.24.在接收到 sensor_1,1547718272,34 时属于 [1547718210,1547718225) 窗口的数据会输出 sensor_1,1547718211,31此时的系统时间戳为 1547718272 - 2 1547718270由于 1547718270 - 1547718210 60所以 [1547718195,1547718210) 窗口会真正的关闭5.在之后接收到 sensor_1,1547718203,30.6 时会把数据输出到侧输出流中*/env.execute();}
    }