做交易网站需要用到的软件有哪些网站备案 二级域名

当前位置: 首页 > news >正文

做交易网站需要用到的软件有哪些,网站备案 二级域名,淘宝seo优化是什么,上海网站建设免费推荐在上面篇文章中已经对flink进行了简单的介绍以及了解了Flink API 层级划分#xff0c;这一章内容我们主要介绍DataStream API 流程图解#xff1a; 一、DataStream API Source Flink 在流处理和批处理上的 source 大概有 4 类#xff1a; #xff08;1#xff09;基于本…在上面篇文章中已经对flink进行了简单的介绍以及了解了Flink API 层级划分这一章内容我们主要介绍DataStream API 流程图解 一、DataStream API Source Flink 在流处理和批处理上的 source 大概有 4 类 1基于本地集合的 source 2基于文件的 source 3基于网络套接字的 source具体来说就是从远程服务器或本地端口上的套接字连接中接收数据比如上一篇文章中的入门案例就属于这一种。 4自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等当然你也可以定义自己的 source灵活度较高看个人需求。 下面就是纯代码演示了具体细节会在注释中说明 1、本地集合的source import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class Demo1ListSource {public static void main(String[] args) throws Exception{//创建flink执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//创建集合ArrayListString arrayList new ArrayList();arrayList.add(java);arrayList.add(java);arrayList.add(java);arrayList.add(java);arrayList.add(java);/基于集合的Source —– 属于有界流*/DataStreamString listDS env.fromCollection(arrayList);listDS.print();//启动Flink作业执行env.execute();} } 结果 在这解释一下结果图中的数字前缀这个前缀的主要目的是不同并行实例的输出。什么都不设置的话取决于你电脑的内存了比如我电脑是16G的内存那么当数据较多时默认分配给该作业分了16个task。 2、本地文件的source 注意同一个File数据源既能有界读取也能无界读取 2.1 有界读取 /流批统一:* 1、同一套算子代码既能作流处理也能做批处理* 2、同一个File数据源既能有界读取也能无界读取/ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo2FileSource1 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();/**有界读取///老版本方式简单但不灵活DataStreamString lineDS env.readTextFile(flink/data/student.csv); // lineDS.print();//新版本方式复杂一点但更灵活使用这种既能有界读取也能无界读取//构建fileSourceFileSourceString fileSource FileSource.forRecordStreamFormat(//指定编码new TextLineInputFormat(UTF-8)//指定路径, new Path(flink/data/student.csv)).build();//使用fileSourceDataStreamString fileDS env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), fileSource);fileDS.print();env.execute();} } 2.1 无界读取 import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo2FileSource2 {public static void main(String[] args) throws Exception {/使用无界流读取文件数很简单其实就是对上面的代码修改运行模式并加个参数就可以了*/StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//修改运行模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//构建fileSourceFileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(UTF-8),new Path(spark/data/student.csv)).build();//使用fileSourceDataStreamSourceString linesDS env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), fileSource);linesDS.print();env.execute();} } 3、本地端口的source 上一篇文章中的入门案例就属于这一种后面在代码中也会用到在此不在赘述了。 4、自定义的 source 举例使用自定义source读取mysql中的数据 /实现方式 1、实现SourceFunction或ParallelSourceFunction接口来创建自定义的数据源。* 2、然后使用env.addSource(new CustomSourceFunction())或DataStreamSource.fromSource添加你自定义的数据源。*/ import lombok.AllArgsConstructor; import lombok.Data; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet;public class Demo3MysqlSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//使用自定义的sourceDataStreamStudent studentDSSource env.addSource(new MysqlSource());//统计学生表每个班级的人数//取出每一行的班级列并加上人数后缀1DataStreamTuple2String, Integer clazzKvDS studentDSSource.map(line - Tuple2.of(line.getClazz(), 1), Types.TUPLE(Types.STRING, Types.INT));//分组将相同的键发送给同一个task中KeyedStreamTuple2String, Integer, String keyByDS clazzKvDS.keyBy(kv - kv.f0);//求和SingleOutputStreamOperatorTuple2String, Integer clazzSum keyByDS.sum(1);//输出clazzSum.print();env.execute();}}/* 自定义source读取mysql中的数据/ class MysqlSource implements SourceFunctionStudent {/** run()方法会在任务启动的时候执行一次*/Overridepublic void run(SourceContext ctx) throws Exception {//1、加载mysq驱动Class.forName(com.mysql.jdbc.Driver);//2、创建数据库连接//注意如果报连不上的错误将参数补全useUnicodetrueallowPublicKeyRetrievaltruecharacterEncodingutf8useSSLfalseConnection conn DriverManager.getConnection(jdbc:mysql://master:3306/bigdata29?useSSLfalse, root, 123456);//3、编写sql查询PreparedStatement sql conn.prepareStatement(select * from students);//4、执行查询ResultSet resultSet sql.executeQuery();//5、遍历查询出的数据while (resultSet.next()) {int id resultSet.getInt(id);String name resultSet.getString(name);int age resultSet.getInt(age);String gender resultSet.getString(gender);String clazz resultSet.getString(clazz);//将数据发送到下游/** collect():从 DataStream 收集所有的元素并将它们作为列表或其他集合类型返回给客户端/ctx.collect(new Student(id, name, age, gender, clazz));}//6、释放资源sql.close();conn.close();}Overridepublic void cancel() {/** cancel()它用于在任务完成后执行清理操作/} }/*** 这里使用了lombok插件小辣椒* 这个插件的作用可以在代码编译的时候增加方法相当于scala中的case class,就不用我们自己手动添加get、set、toString等方法了。* 使用方法加就行了/ Data AllArgsConstructor class Student {private int id;private String name;private int age;private String gender;private String clazz; } 二、DataStream API Transformation Transformation数据流转换。 常见算子有 Map / FlatMap / Filter /KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等操作很多可以将数据转换计算成你想要的数据形式。 其实这些算子在功能上与scala或spark中的基本相同只是形式和细节上会有些差别。 1、map DataStream → DataStream    输入一个元素同时输出一个元素 import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo1Map {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//使用nc -lk 8888 模拟实时数据的产生DataStreamSourceString source env.socketTextStream(master, 8888);//方式1匿名内部类形式/** 观察源码map发现 MapFunctionT, O 是一个函数接口用于对流中的每个元素的处理* 这个接口定义了一个 map 方法该方法接受一个输入元素类型为 T并返回一个输出元素类型为 O。/DataStreamString map1DS source.map(new MapFunctionString, String() {Overridepublic String map(String word) throws Exception {return word.toUpperCase();}}); // map1DS.print();//方式2lambda表达式形式更简洁常用source.map(String::toUpperCase).print(); //是对source.map(word - word.toUpperCase())的更简写env.execute();} } 结果 2、flatMap DataStream → DataStream 输入一个元素转换为一个或多个元素输出 /**flatMap 方法用于将输入流中的每个元素转换成一个或多个输出元素/import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class Demo2FaltMap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString source env.socketTextStream(master, 8888);//方式1匿名内部类//看源码这个方法接受一个FlatMapFunctionT, R类型的参数其中T是输入元素的类型R是输出元素的类型DataStreamString out2DS source.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString out) throws Exception {for (String word : line.split(,)) {//循环将数据发送到下游out.collect(word);}}});// out2DS.print();//方式2lambda表达式DataStreamString out1DS source.flatMap((line, out) - {for (String word : line.split(,)) {//循环将数据发送到下游out.collect(word);}}, Types.STRING);out1DS.print();env.execute();} }结果 3、filter DataStream → DataStream  为每个元素执行一个布尔 function并保留那些 function 输出值为 true 的元素 import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo3Filter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString source env.readTextFile(spark/data/student.csv);//需求过滤出文科一班的学生的信息//方式一匿名内部类source.filter(new FilterFunctionString() {Overridepublic boolean filter(String lines) throws Exception {return 文科一班.equals(lines.split(,)[4]);}}); //.print();//方式2lambda表达式source.filter(lines-文科一班.equals(lines.split(,)[4])).print();env.execute();} }结果 4、keyBy 作用为分组 DataStream → KeyedStream 在逻辑层面将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部 keyBy() 是通过哈希分区实现的。有多种指定 key 的方式。 import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo4KeyBy {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.socketTextStream(master, 8888);//方式1匿名内部类/** public K KeyedStreamT, K keyBy(KeySelectorT, K key)* 其中 T 是输入元素的类型K 是键的类型/source.map(word- Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT)).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer kv) throws Exception {return kv.f0;}});//.print();//方式2lambda表达式source.map(word- Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT)).keyBy(kv-kv.f0).print();env.execute();} } 结果 可以看出的确作了分区 5、reduce 作用为聚合 KeyedStream → DataStream 在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。 import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo5Reduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString source env.socketTextStream(master, 8888);//方式1匿名内部类source.map(word - Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv - kv.f0).reduce(new ReduceFunctionTuple2String, Integer() {Overridepublic Tuple2String, Integer reduce(Tuple2String, Integer kv1,Tuple2String, Integer kv2) throws Exception {//kv1和kv2的key是一样的String word kv1.f0;int counts kv1.f1 kv2.f1;return Tuple2.of(word,counts);}}).print();env.execute();} } 结果从结果来看说明reduce是一个有状态算子。  6、Window KeyedStream → WindowedStream  可以在已经分区的 KeyedStreams 上定义 WindowWindow 根据某些特征例如最近 5 秒内到达的数据对每个 key Stream 中的数据进行分组。 窗口算子有很多以后会专门出一章具体说明下面写一个滑动窗口的案例。 import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo6Window {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();/** 每隔5秒统计最近15秒每个单词的数量 — 滑动窗口/DataStreamString wordsDS env.socketTextStream(master, 8888);//转换成kvDataStreamTuple2String, Integer kvDS wordsDS.map(word - Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));//按照单词分组KeyedStreamTuple2String, Integer, String keyByDS kvDS.keyBy(kv - kv.f0);//划分窗口//SlidingEventTimeWindows:滑动的处理时间窗口//前一个参数为窗口大小window size后一个参数为滑动大小window slideWindowedStreamTuple2String, Integer, String, TimeWindow windowDS keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)));//统计单词的数量DataStreamTuple2String, Integer countDS windowDS.sum(1);countDS.print();env.execute();} } 7、Union DataStream→ DataStream 将两个或多个数据流联合来创建一个包含所有流中数据的新流。注意如果一个数据流和自身进行联合这个流中的每个数据将在合并后的流中出现两次。 import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo7Union {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString source1 env.socketTextStream(master, 8888);DataStreamString source2 env.socketTextStream(master, 9999);/** 合并两个DataStream 注意在数据层面并没有合并只是在逻辑层面合并了/DataStreamString unionDS source1.union(source2);unionDS.print();env.execute();} } 结果 8、process DataStream→ DataStream process算子是flink的底层算子可以用来代替map、faltMap、filter等算子 import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;public class Demo8Process {public static void main(String[] args) throws Exception {/** process算子是flink的底层算子可以用来代替map、faltMap、filter等算子** public R SingleOutputStreamOperatorR process(ProcessFunctionT, R processFunction) 其中 T 是输入数据的类型R 是输出数据的类型/StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString source env.socketTextStream(master, 8888);DataStreamTuple2String, Integer processDS source.process(new ProcessFunctionString, Tuple2String, Integer() {/** processElement在当前代码中相当于flatMap每一条数据执行一次可以返回一条或多条数据 ctx:上下文对象代表flink执行环境* out:输出用于将数据发送到下游/Overridepublic void processElement(String line, ProcessFunctionString, Tuple2String, Integer.Context ctx,CollectorTuple2String, Integer out) throws Exception {//这里的逻辑与flatMap的逻辑相同for (String word : line.split(,)) {out.collect(Tuple2.of(word, 1));}}});env.execute();/** 注意该算子不能用lambda表达式改写,因为ProcessFunction它包含了一些生命周期方法和状态管理的方法 这些方法使得它不适合直接简化为lambda表达式的形式。** 在底层代码层面来说ProcessFunction是一个抽象类该类还有许多复杂的方法使得它无法直接用lambda表达式来改写* 因为 lambda 表达式只能表示简单的函数接口即那些只包含一个抽象方法的接口* public abstract class ProcessFunctionI, O extends AbstractRichFunction/} }三、DataStream API Sink Flink 将转换计算后的数据发送的地点 。 Flink 常见的 Sink 大概有如下几类 1打印在控制台、写入文件。 2写入 socket具体指的是将数据发送到网络套接字例如端口。 3自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等当然你也可以根据需求定义自己的 sink。 1、写入文件 对于写入文件是否要将所有数据写入同一个文件由于是流式写入该文件就一直处于正在写入的状态而且可能会造成文件过大的问题所以DataStream API提供了滚动策略的方式来解决这样的问题。 import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import java.time.Duration;public class Demo1FileSink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString source env.socketTextStream(master, 8888);//创建fileSink/**public static IN DefaultRowFormatBuilderIN forRowFormat( final Path basePath, final EncoderIN encoder)}IN : The type of the elements that are being written by the sink.*/FileSinkString fileSink FileSink.forRowFormat(new Path(flink/data/words), new SimpleStringEncoderString(UTF-8)).withRollingPolicy(DefaultRollingPolicy.builder()//每10秒进行一次滚动生成文件.withRolloverInterval(Duration.ofSeconds(10))//当延迟超过10秒进行一次滚动.withInactivityInterval(Duration.ofSeconds(5))//文件大小达到1MB进行一次滚动.withMaxPartSize(MemorySize.ofMebiBytes(1)).build()).build();//使用fileSink,将读取的数据写入另一到文件夹中source.sinkTo(fileSink);env.execute();} } 结果  2、自定义的 sink 举例使用自定义sink将数据存到mysql中 import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement;public class Demo3MySqlSInk {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString wordsDS env.socketTextStream(master, 8888);//统计单词的数量DataStreamTuple2String, Integer countDS wordsDS.map(word - Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv - kv.f0).sum(1);//将统计结果保存到数据countDS.addSink(new MySQlSink());env.execute();} }/* 自定义sink将数据保存到mysql* RichSinkFunction:多了open和close方法用于打开和关闭连接* SinkFunction/ class MySQlSink extends RichSinkFunctionTuple2String, Integer {Connection con;PreparedStatement stat;/** invoke方法每一条数据执行一次/Overridepublic void invoke(Tuple2String, Integer kv, Context context) throws Exception {stat.setString(1, kv.f0);stat.setInt(2, kv.f1);//执行sqlstat.execute();}/** open方法会在任务启动的时候每一个task中执行一次/Overridepublic void open(Configuration parameters) throws Exception {System.out.println(创建数据库连接);//1、加载启动Class.forName(com.mysql.jdbc.Driver);//2、创建数据库连接con DriverManager.getConnection(jdbc:mysql://master:3306/bigdata29, root, 123456);//3、编写保存数据的sql//replace into 替换插入如果没有就插入如果有就更新表需要有主键stat con.prepareStatement(replace into word_count values(?,?));}/** close方法会在任务取消的时候每一个task中执行一次*/Overridepublic void close() throws Exception {//4、关闭数据库连接stat.close();con.close();} ——————————————————————————————————————————— 代码注意提示 如果在写flink代码的过程中出现了以下错误大概率就是有些算子使用没有写数据类型与spark不同spaark底层由scala编写scala提供了自动类型推断机制所以不写参数类型也不会报错但是flink底层是java编写的java没有这种机制。 基础的算子到这结束其他算子后续也会写以上内容具体详情皆参考apache flink官网官网详细说明了各种算子的使用网址贴在下面了 https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/operators/overview/ 个人感觉写的很详细了看不懂建议直接打死作者(^_^)