会员充值网站怎么做地名网站安全建设方案

当前位置: 首页 > news >正文

会员充值网站怎么做,地名网站安全建设方案,网站尾部设计,无锡个人网站建设Flink任务如何跑起来之 2.算子 StreamOperator 前文介绍了Transformation创建过程#xff0c;大多数情况下通过UDF完成DataStream转换中#xff0c;生成的Transformation实例中#xff0c;核心逻辑是封装了SimpleOperatorFactory实例。 UDF场景下#xff0c;DataStream到…Flink任务如何跑起来之 2.算子 StreamOperator 前文介绍了Transformation创建过程大多数情况下通过UDF完成DataStream转换中生成的Transformation实例中核心逻辑是封装了SimpleOperatorFactory实例。 UDF场景下DataStream到Transformationg过程中SimpleOperatorFactory实例的创建过程大致如下伪代码所示。 // 具体的函数实例 Function function ; // 将函数实例封装到算子实例中 AbstractUdfStreamOperator operator new AbstractUdfStreamOperator(function); // 通过算子实例得到其SimpleOperatorFactory实例 SimpleOperatorFactory factory SimpleOperatorFactory.of(operator)这里的UDF可以简单理解为需要我们自己传入对应Function实现类的操作如map、filter等。 问题 StreamOperator是什么 为什么需要将Function封装到StreamOperator中

  1. Flink算子 在应用程序中通过各种各样的Function完成DataStream转换但是Function仅表示数据处理逻辑并不关心数据从哪里来到哪里去。 以MapFunction为例map方法中仅包含对每一条到来数据的具体处理逻辑并不清楚map方法何时被调用结果返回到哪。 一个完整的数据处理逻辑应该是获取数据-处理数据-输出数据在Flink中这个最小的完整逻辑通过算子表示顶层抽象接口为StreamOperator。 因此Function作为算子的一部分参与后续的数据加工。 算子包含生命周期、状态和容错管理、数据处理3个方面。设计时分为两条线 生命周期、状态和容错管理主要是AbstractStreamOperator抽象类及其子类实现以及未来的AbstractStreamOperatorV2抽象类。数据处理主要是OneInputStreamOperator、TwoInputStreamOperator和MultipleInputStreamOperator接口分别表示单流、双流和多流的数据处理。在接口中定义了数据的处理方法。 StreamOperator完整的顶层抽象如下。 AbstractStreamOperator所有流运算的基类。提供了生命周期和属性方法的默认实现。 包含UDF的算子需继承其AbstractUdfStreamOperator子类 对于其具体实现还必须实现OneInputStreamOperator或TwoInputStreamOperator其中一个。 将来将会使用AbstractStreamOperatorV2替换该基类OneInputStreamOperator支持单流输入的运算符接口如果要实现自定义运算符需要使用AbatractUdfStreamOperator作为基类TwoInputStreamOperator支持双流输入的运算符基类。同样需要和AbstractStreamOperator一起使用。AbstractStreamOperatorV2所有流运算符的新基类旨在取代AbatractUdfStreamOperator。 当前仅仅用于和MultipleInputStreamOperator一起配合使用。 OneInputStreamOperator、TwoInputStreamOperator和MultipleInputStreamOperator分别对应了Tranformation实现类的OneInputTransformation、TwoInputTransformation和AbstractMultipleInputTransformation。 MultipleInputStreamOperator和AbstractStreamOperatorV2是高版本中才加入的。因此flink中最初仅支持单流或双流的输入多流场景下需要拆分成单流或双流进行处理。在支持不同输入的流的实现中梳理数据的方法分别如下 // 单流输入 public interface OneInputStreamOperatorIN, OUT extends StreamOperatorOUT, InputIN {// 处理数据void processElement(StreamRecordIN element) throws Exception; }// 双流输入 public interface TwoInputStreamOperatorIN1, IN2, OUT extends StreamOperatorOUT {// 处理双流输入中第一个流上的元素void processElement1(StreamRecordIN1 element) throws Exception;// 处理双流输入中第二个流上的元素void processElement2(StreamRecordIN2 element) throws Exception; }// 多流输入这里的Input和单流输入继承的Input父类为同一个 public interface MultipleInputStreamOperatorOUT extends StreamOperatorOUT {ListInput getInputs(); }在AbstractStreamOperator众多子类中AbstractUdfStreamOperator抽象类中封装了Function接口并且其中open、close等算子生命周期等方法实际上就是调用Function实例的对应方法。 public abstract class AbstractUdfStreamOperatorOUT, F extends Functionextends AbstractStreamOperatorOUT implements OutputTypeConfigurableOUT {// 封装Functionprotected final F userFunction;// 通过Function实现进行算子的实例化public AbstractUdfStreamOperator(F userFunction) {this.userFunction requireNonNull(userFunction);checkUdfCheckpointingPreconditions();}// 算子生命周期的相关方法实际上调用Function的方法Overridepublic void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, new Configuration());}Overridepublic void finish() throws Exception {super.finish();if (userFunction instanceof SinkFunction) {((SinkFunction?) userFunction).finish();}}Overridepublic void close() throws Exception {super.close();FunctionUtils.closeFunction(userFunction);} }常用的实现类基本继承自AbstractUdfStreamOperator抽象类。 单流输入如map、fliter、source、sink等实现类 sink算子有两个实现类分别是SinkOperator和StreamSinkIN。二者的关系为SinkOperator是StreamSinkRowData的特例。 双流输入如concat、intervalJoin等实现类 本文开头提到通过SimpleOperatorFactory.of方式生成SimpleOperatorFactory实例该方法如下 public static OUT SimpleOperatorFactoryOUT of(StreamOperatorOUT operator) {if (operator null) {return null;} else if (operator instanceof StreamSource ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) {// 通过addSoure方法添加的Source方式且SourceFunction为InputFormatSourceFunction的子类return new SimpleInputFormatOperatorFactoryOUT((StreamSource) operator);} else if (operator instanceof StreamSink ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {// 通过addSink方法添加的sink方式且SinkFunction为OutputFormatSinkFunction的子类return new SimpleOutputFormatOperatorFactory((StreamSink) operator);} else if (operator instanceof AbstractUdfStreamOperator) {return new SimpleUdfStreamOperatorFactoryOUT((AbstractUdfStreamOperator) operator);} else {return new SimpleOperatorFactory(operator);} }得到SimpleOperatorFactory实例后在实际执行时通过其createStreamOperator方法得到StreamOperator实例。 1.1. 算子生成示例 上述内容偏概念更多一些通过map为例实际观察Function-StreamOperator-StreamOperatorFactory-Transformation的过程 // 步骤1业务代码中使用map操作 DataStreamTuple2String, Integer counts text.map(row - Tuple2.of(row, 1))// 步骤2将业务代码中提供的MapFunction封装成StreamMap public R SingleOutputStreamOperatorR map(MapFunctionT, R mapper, TypeInformationR outputType) {// 将MapFunction封装成StreamMapStreamMap为AbstractUdfStreamOperator子类return transform(Map, outputType, new StreamMap(clean(mapper))); }// 步骤3根据StreamMap获取其对应的SimpleOperatorFactory工厂实例 public R SingleOutputStreamOperatorR transform(String operatorName,TypeInformationR outTypeInfo,OneInputStreamOperatorT, R operator) {// 获取StreamMap对应的StreamOperatorFactory工厂类return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator)); }// 步骤4将工厂实例传入到Transformation中 protected R SingleOutputStreamOperatorR doTransform(String operatorName,TypeInformationR outTypeInfo,StreamOperatorFactoryR operatorFactory) {OneInputTransformationT, R resultTransform new OneInputTransformation(this.transformation,operatorName,// 将StreamOperatorFactory工厂实例传入到Transformation中operatorFactory,outTypeInfo,environment.getParallelism());SuppressWarnings({unchecked, rawtypes})SingleOutputStreamOperatorR returnStream new SingleOutputStreamOperator(environment, resultTransform);getExecutionEnvironment().addOperator(resultTransform);return returnStream; }在步骤2中将MapFunction封装成StreamMapStreamMap是AbstractUdfStreamOperator的子类并且同时实现了OneInputStreamOperator进行数据处理逻辑。在处理数据时实际上是调用MapFunction的map方法完成即在业务代码中指定的row - Tuple2.of(row, 1)的逻辑。 public class StreamMapIN, OUT extends AbstractUdfStreamOperatorOUT, MapFunctionIN, OUTimplements OneInputStreamOperatorIN, OUT {// 以下3个属性从父类继承// 函数实例protected final F userFunction;// 结果输出protected transient OutputStreamRecordOUT output;// 默认算子链生成策略protected ChainingStrategy chainingStrategy ChainingStrategy.HEAD;public StreamMap(MapFunctionIN, OUT mapper) {super(mapper);// 实例化StreamMap时指定ALWAYS的算子链生成策略chainingStrategy ChainingStrategy.ALWAYS;}Overridepublic void processElement(StreamRecordIN element) throws Exception {// userFunction即MapFunction处理数据时实质调用MapFunction的map方法。output.collect(element.replace(userFunction.map(element.getValue())));} }要在Task中算子才会真正执行这里仅仅是在逻辑上完成算子的定义。
  2. 算子链 Flink中会将多个算子合并到一起组成算子链从而提高算子的运行效率。同一个算子链意味着将在同一个线程中运行。flink中算子链使用OperatorChain抽象类表示。 算子的合并策略在ChainingStrateg枚举类中定义详情如下 /*** StreamOperator 使用的默认值为 HEAD,这意味着算子不链接到其前身。大多数算子使用 ALWAYS 覆盖此操作,这意味着它们将尽可能链接到前身。 */ public enum ChainingStrategy {// 尽可能的将和上游算子链接到一起大多数算子的默认值ALWAYS,// 当前算子不会上下游算子链接到一起NEVER,// 不会上游算子连接到一起但是可以和下游算子链接到一起HEAD,// 此运算符将运行在链的头部(与 HEAD 类似,但它还会尝试在可能的情况下链接source。这允许将多输入运算符与多个源链接到一个任务中。HEAD_WITH_SOURCES;public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY ALWAYS; }