书店网站建设可行性分析徐州网站建站
- 作者: 五速梦信息网
- 时间: 2026年04月20日 08:28
当前位置: 首页 > news >正文
书店网站建设可行性分析,徐州网站建站,管理咨询的工作形式与特点包括了,荣耀手机官网网站文章目录 1、增量聚合之ReduceFunction2、增量聚合之AggregateFunction3、全窗口函数full window functions4、增量聚合函数搭配全窗口函数5、会话窗口动态获取间隔值6、触发器和移除器7、补充 //窗口操作 stream.keyBy(key selector).window(window assigner)… 文章目录 1、增量聚合之ReduceFunction2、增量聚合之AggregateFunction3、全窗口函数full window functions4、增量聚合函数搭配全窗口函数5、会话窗口动态获取间隔值6、触发器和移除器7、补充 //窗口操作 stream.keyBy(key selector).window(window assigner).aggregate(window function) 上一节的窗口分配器指明了窗口类型知道了数据属于哪个窗口并收集。而窗口函数则是定义如何对这些数据做计算操作。 增量聚合来一条数据计算一条数据窗口触发的时候输出计算结果全窗口函数数据来了不计算存起来窗口触发的时候计算并输出计算结果 1、增量聚合之ReduceFunction public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction()).keyBy(r - r.getId())// 设置滚动事件时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(30))).reduce(new ReduceFunctionWaterSensor() {Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println(调用reduce方法value1:value1 ,value2:value2);return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc()value2.getVc());}}).print();env.execute();} } 运行输入数据查看控制台 2、增量聚合之AggregateFunction 上面使用ReduceFunction的限制是输入数据的类型、聚合中间状态的类型、输出结果的类型必须一致AggregateFunction则没有这个限制。AggregateFunction接口有四个方法 createAccumulator创建一个累加器这就是为聚合创建了一个初始状态每个聚合任务只会调用一次。add将输入的元素添加到累加器中。getResult从累加器中提取聚合的输出结果。merge合并两个累加器并将合并后的状态作为一个累加器返回 AggregateFunction的工作原理是首先调用createAccumulator()为任务初始化一个状态累加器而后每来一个数据就调用一次add()方法对数据进行聚合得到的结果保存在状态中等到了窗口需要输出时再调用getResult()方法得到计算结果 public class WindowAggregateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction()); //自定义的实现类String转自定义对象WaterSensorKeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperatorString aggregate sensorWS.aggregate(new AggregateFunctionWaterSensor, Integer, String() {Overridepublic Integer createAccumulator() {System.out.println(创建累加器);return 0;}//value即输入的数据accumulator即之前的计算结果Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(调用add方法,valuevalue);return accumulator value.getVc();}Overridepublic String getResult(Integer accumulator) {System.out.println(调用getResult方法);return accumulator.toString();}Overridepublic Integer merge(Integer a, Integer b) {System.out.println(调用merge方法);return null;}});aggregate.print();env.execute();} } 运行输入数据查看控制台 3、全窗口函数full window functions 全窗口函数即数据来了不计算存起来窗口触发的时候计算并输出计算结果Flink全窗口函数有两种第一种为apply方法下的 stream.keyBy(key selector).window(window assigner).apply(new MyWindowFunction());传入一个WindowFunction的实现类该方法已被第二种ProcessWindowFunction全覆盖因而逐渐弃用。ProcessWindowFunction除了可以拿到窗口中的所有数据之外还可以获取到一个“上下文对象”Context通过这个上下文对象可以获取窗口对象、窗口处理时间、事件时间水位线 public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperatorString process sensorWS.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {/*** 全窗口函数计算逻辑窗口结束时触发才调用一次* s 分组的key* context 上下文对象* elements 窗口内存的所有数据* out 采集器对象*/Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long count elements.spliterator().estimateSize();long windowStartTs context.window().getStart();long windowEndTs context.window().getEnd();String windowStart DateFormatUtils.format(windowStartTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(windowEndTs, yyyy-MM-dd HH:mm:ss.SSS);out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}});process.print();env.execute();} } 效果 4、增量聚合函数搭配全窗口函数 可以看出增量和全窗口各有好处 增量聚合下来一条计算一条只存储中间计算结果占用空间少全窗口函数则是可以通过上下文对象来实现灵活的功能 像同时拥有两者的优点可以调用aggregate方法的另一个重载方法 // ReduceFunction与WindowFunction结合 public R SingleOutputStreamOperatorR reduce(ReduceFunctionT reduceFunctionWindowFunctionTRKW function) // ReduceFunction与ProcessWindowFunction结合 public R SingleOutputStreamOperatorR reduce(ReduceFunctionT reduceFunctionProcessWindowFunctionTRKW function)// AggregateFunction与WindowFunction结合 public ACCVR SingleOutputStreamOperatorR aggregate(AggregateFunctionTACCV aggFunctionWindowFunctionVRKW windowFunction)// AggregateFunction与ProcessWindowFunction结合 public ACCVR SingleOutputStreamOperatorR aggregate(AggregateFunctionTACCV aggFunction,ProcessWindowFunctionVRKW windowFunction) 此时 基于第一个参数即增量聚合函数来处理数据来一条聚合一条窗口触发后调用第二个参数的处理逻辑此时把增量聚合的结果只有一条数据再传递给全窗口函数也就是说全窗口的Iterable elements长度为1注意全窗口不再缓存所有数据经过全窗口执行处理和包装再输出 public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));//sensorWS.reduce() //也可以传两个SingleOutputStreamOperatorString result sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}} public class MyAgg implements AggregateFunctionWaterSensor, Integer, String{Overridepublic Integer createAccumulator() {System.out.println(创建累加器);return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(调用add方法,valuevalue);return accumulator value.getVc();}Overridepublic String getResult(Integer accumulator) {System.out.println(调用getResult方法);return accumulator.toString();}Overridepublic Integer merge(Integer a, Integer b) {System.out.println(调用merge方法);return null;}}// 全窗口函数的输入类型 增量聚合函数的输出类型 public class MyProcess extends ProcessWindowFunctionString,String,String,TimeWindow{Overridepublic void process(String s, Context context, IterableString elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());} }注意二者搭配时根据前面分析可以知道必有增量聚合函数的输出类型 全窗口函数的输入类型 5、会话窗口动态获取间隔值 到此窗口API需要的窗口分配器见上一篇和窗口函数都已整理完。上面demo中用的窗口分配器都是滚动窗口但应该有以下这些 时间滚动窗口时间滑动窗口时间会话窗口计数滚动窗口计数滑动窗口 这里再记录下时间会话窗口动态获取会话间隔 public class WindowSessionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(ProcessingTimeSessionWindows.withDynamicGap(t - t.getTs() * 1000L));SingleOutputStreamOperatorString process sensorWS.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long count elements.spliterator().estimateSize();long windowStartTs context.window().getStart();long windowEndTs context.window().getEnd();String windowStart DateFormatUtils.format(windowStartTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(windowEndTs, yyyy-MM-dd HH:mm:ss.SSS);out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}});process.print();env.execute();} }来一条数据根据这条数据获取一个值做为会话间隔到达这个间隔前下条数据到来了则会话间隔又成了另一个值动态的。运行 可以看到会话间隔动态获取到达间隔时下条数据还没来则结束本窗户窗口口结束时触发才调用一次process和分析的一致。最后补充一点展开demo代码里的Lambda表达式其实是一个抓取会话间隔的方法定义了会话窗口间隔的获取逻辑。 再贴个计数滑动窗口 6、触发器和移除器 触发器主要是用来控制窗口什么时候触发计算即什么时候执行窗口函数 //基于WindowedStream调用.trigger()方法就可以传入一个自定义的窗口触发器Trigger stream.keyBy(…).window(…).trigger(new MyTrigger()) 移除器主要用来定义移除某些数据的逻辑 基于WindowedStream调用.evictor()方法就可以传入一个自定义的移除器Evictor。Evictor是一个接口不同的窗口类型都有各自预实现的移除器。 stream.keyBy(…).window(…).evictor(new MyEvictor()) Flink提供的几个窗口比如滑动、滚动等都有对触发器和移除器的默认实现不用自定义。 7、补充 窗口的划分 窗口开始时间start是窗口长度的整数倍向下取整 窗口结束时间是start窗口长度 窗口是左闭右开因为属于本窗口的最大时间戳为end-1 窗口的生命周期创建是属于本窗口的第一条数据来的时候现new的放入一个singleton单例的集合中窗口的销毁是时间的进展 窗口的最大时间戳end-1ms) 允许迟到的时间默认0窗口什么时候触发输出当时间进展 窗口的最大时间戳end -1ms)
- 上一篇: 书城网站开发安徽智能网站建设推荐
- 下一篇: 书法网站模板下载php网站修改主页内容
相关文章
-
书城网站开发安徽智能网站建设推荐
书城网站开发安徽智能网站建设推荐
- 技术栈
- 2026年04月20日
-
书吧网站设计论文wordpress部署文件夹
书吧网站设计论文wordpress部署文件夹
- 技术栈
- 2026年04月20日
-
售卖网站建设实验报告西安网页制作与设计
售卖网站建设实验报告西安网页制作与设计
- 技术栈
- 2026年04月20日
-
书法网站模板下载php网站修改主页内容
书法网站模板下载php网站修改主页内容
- 技术栈
- 2026年04月20日
-
书画艺术网站建设概况跨境电商最好卖的产品
书画艺术网站建设概况跨境电商最好卖的产品
- 技术栈
- 2026年04月20日
-
书画展示网站模板网络培训平台有哪些
书画展示网站模板网络培训平台有哪些
- 技术栈
- 2026年04月20日
