信阳网站公司响应式网站欣赏

当前位置: 首页 > news >正文

信阳网站公司,响应式网站欣赏,国外摄影网站推荐,vue快速搭建网站Flink之DataStream数据源、数据转换、数据输出#xff08;scala#xff09; 0.前言–数据源 在进行数据转换之前#xff0c;需要进行数据读取。 数据读取分为4大部分#xff1a; #xff08;1#xff09;内置数据源#xff1b; 又分为文件数据源#xff1b; socket…Flink之DataStream数据源、数据转换、数据输出scala 0.前言–数据源 在进行数据转换之前需要进行数据读取。 数据读取分为4大部分 1内置数据源 又分为文件数据源 socket数据源
集合数据源三类
2Kafka数据源 第二个参数用到的SimpleStringSchema对象是一个内置的DeserializationSchema对象可以把字节数据反序列化程一个String对象。 另外FlinkKafkaConsumer开始读取Kafka消息时可以配置他的 读 起始位置有如下四种。
import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.windowing.time.Time object KafkaWordCount {def main(args: Array[String]): Unit {val kafkaProps new Properties()//Kafka的一些属性kafkaProps.setProperty(bootstrap.servers, localhost:9092)//所在的消费组kafkaProps.setProperty(group.id, group1)//获取当前的执行环境val evn StreamExecutionEnvironment.getExecutionEnvironment //创建Kafka的消费者wordsendertest是要消费的Topicval kafkaSource new FlinkKafkaConsumerString//设置从最新的offset开始消费kafkaSource.setStartFromLatest()//自动提交offset kafkaSource.setCommitOffsetsOnCheckpoints(true)//绑定数据源val stream evn.addSource(kafkaSource)//设置转换操作逻辑val text stream.flatMap{ _.toLowerCase().split(\W)filter{ .nonEmpty} }.map{(,1)}.keyBy(0).timeWindow(Time.seconds(5)).sum(1)//打印输出text.print()//程序触发执行evn.execute(Kafka Word Count)} } 3HDFS数据源 4自定义数据源 一个例子 import java.util.Calendar import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import scala.util.Randomcase class StockPrice(stockId:String,timeStamp:Long,price:Double) object StockPriceStreaming {def main(args: Array[String]) { //设置执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度
env.setParallelism(1)
//股票价格数据流val stockPriceStream: DataStream[StockPrice] env//该数据流由StockPriceSource类随机生成.addSource(new StockPriceSource)//打印结果stockPriceStream.print()//程序触发执行env.execute(stock price streaming)}class StockPriceSource extends RichSourceFunction[StockPrice]{ var isRunning: Boolean trueval rand new Random()//初始化股票价格var priceList: List[Double] List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)var stockId 0var curPrice 0.0d override def run(srcCtx: SourceContext[StockPrice]): Unit {while (isRunning) {//每次从列表中随机选择一只股票stockId rand.nextInt(priceList.size)val curPrice priceList(stockId) rand.nextGaussian() * 0.05priceList priceList.updated(stockId, curPrice)val curTime Calendar.getInstance.getTimeInMillis//将数据源收集写入SourceContextsrcCtx.collect(StockPrice(stock_ stockId.toString, curTime, curPrice))Thread.sleep(rand.nextInt(10))} } override def cancel(): Unit {isRunning false}} } 1.数据转换之map操作 1.数据转换算子的四种类型 基于单条记录fliter、map 基于窗口window 合并多条数据流unionjoinconnect 拆分多条数据流split 2.map(func)操作将一个DataStream中的每个元素传递到函数func中并将结果返回为一个新的DataStream。输出的数据流DataStream[OUT]类型可能和输入的数据流DataStream[IN]不同 理解一 一对应的关系一个x得到一个y val dataStream env.fromElements(1,2,3,4,5) val mapStream dataStream.map(xx10)3.演示代码 import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentcase class StockPrice(stockId:String,timeStamp:Long,price:Double) object MapFunctionTest {def main(args: Array[String]): Unit {//设定执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//设定程序并行度env.setParallelism(1)//创建数据源val dataStream: DataStream[Int] env.fromElements(1, 2, 3, 4, 5, 6, 7)//设置转换操作逻辑val richFunctionDataStream dataStream.map {new MyMapFunction()}//打印输出richFunctionDataStream.print()//程序触发执行env.execute(MapFunctionTest)}//自定义函数继承RichMapFunctionclass MyMapFunction extends RichMapFunction[Int, String] {override def map(input: Int): String (Input : input.toString , Output : (input * 3).toString)} } 2.数据转换之flatMap操作 1.flatMap和map相似每个输入元素都可以映射到0或多个输出结果。 val dataStream env.fromElements(Hadoop is good,Flink is fast,Flink is better) val flatMapStream dataStream.flatMap(line line.split( ))可以理解为flatMap比map多了flat操作。如图。map是将输入数据映射成数组flat是将数据拍扁成为一个个元素。把元素映射成了多个。 2.代码演示 import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collectorcase class StockPrice(stockId:String,timeStamp:Long,price:Double) object FlatMapFunctionTest {def main(args: Array[String]): Unit {//设定执行环境 val env StreamExecutionEnvironment.getExecutionEnvironment//设定程序并行度 env.setParallelism(1) //设置数据源 val dataStream: DataStream[String] env.fromElements(Hello Spark, Flink is excellent“) //针对数据集的转换操作逻辑 val result dataStream.flatMap(new WordSplitFlatMap(15)) //打印输出 result.print() //程序触发执行env.execute(FlatMapFunctionTest)} //使用FlatMapFunction实现过滤逻辑只对字符串长度大于threshold的内容进行切词class WordSplitFlatMap(threshold: Int) extends FlatMapFunction[String, String] {override def flatMap(value: String, out: Collector[String]): Unit {if (value.size threshold) {value.split( ).foreach(out.collect)}}} } 预计输出 Flink is excellent这里只对字符长度超过15的做切割。threshold是阈值少于15的不做切割。 3.数据转换之filter和keyBy操作 1.filter(func)操作会筛选出满足函数func的元素并返回一个新的数据集 2.代码举例 val dataStream env.fromElements(Hadoop is good,Flink is fast,Flink is better) val filterStream dataStream.filter(line line.contains(Flink))如图所示
3.keyBy注意方法里k小写B大写将相同Key的数据放置在相同的分区中。 keyBy算子根据元素的形状对数据进行分组相同形状的元素被分到了一起可被后续算子统一处理 比如在词频统计时 hello flink hello hadoophello zhangsan这里 词频hello,1,hello,1hello,1统计出来之后通过keyBy就可以聚合放在了相同的分区里进行统一计算。 通过聚合函数后又可以吧KeyedStream转换成DataStream。 4.在使用keyBy算子时需要向keyBy算子传递一个参数, 可使用数字位置来指定Key 比如刚才词频统计时keyBy0就是hello这个单词。 val dataStream: DataStream[(Int, Double)] env.fromElements((1, 2.0), (2, 1.7), (1, 4.9), (3, 8.5), (3, 11.2)) //使用数字位置定义Key 按照第一个字段进行分组 val keyedStream dataStream.keyBy(0)这里keyby 是第一个字段1或者2或者3分组分类。 5.keyBy代码举例 import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类包含三个字段股票ID、交易时间、交易价格 case class StockPrice(stockId:String,timeStamp:Long,price:Double)object KeyByTest{def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1) //创建数据源val stockList List(StockPrice(stock_4,1602031562148L,43.4D),StockPrice(stock_1,1602031562148L,22.9D),StockPrice(stock_0,1602031562153L,8.2D),StockPrice(stock_3,1602031562153L,42.1D),StockPrice(stock_2,1602031562153L,29.2D),StockPrice(stock_0,1602031562159L,8.1D),StockPrice(stock_4,1602031562159L,43.7D),StockPrice(stock_4,1602031562169L,43.5D))val dataStream env.fromCollection(stockList) //设定转换操作逻辑val keyedStream dataStream.keyBy(stockId“) //打印输出keyedStream.print() //程序触发执行env.execute(KeyByTest)} }这里看起来没什么变换 因为没进行聚合操作所以什么变化都没有原样输出。 我加上聚合函数看起来就有变化了。 //简写上面的代码 加上聚合函数val keyedStream dataStream.keyBy(stockId)val aggre keyedStream.sum(2) //这里相加的是价格price第三个字段// keyedStream.print()aggre.print()//聚合后打印结果 对比上面哪里变化了呢 stcokid顺序4-1-0-3-2-0(这里之前也有0,就会加上之前的0,变为16.299后面的4也在累加前面的price了 4.数据转换之reduce操作和聚合操作 1.reducereduce算子将输入的KeyedStream通过传入的用户自定义函数滚动地进行数据聚合处理处理以后得到一个新的DataStream如下实例 import org.apache.flink.streaming.api.scala. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类包含三个字段股票ID、交易时间、交易价格 case class StockPrice(stockId:String,timeStamp:Long,price:Double)object ReduceTest{def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//创建数据源val stockList List(StockPrice(stock_4,1602031562148L,43.4D),StockPrice(stock_1,1602031562148L,22.9D),StockPrice(stock_0,1602031562153L,8.2D),StockPrice(stock_3,1602031562153L,42.1D),StockPrice(stock_2,1602031562153L,29.2D),StockPrice(stock_0,1602031562159L,8.1D),StockPrice(stock_4,1602031562159L,43.7D),StockPrice(stock4,1602031562169L,43.5D))val dataStream env.fromCollection(stockList)//设定转换操作逻辑val keyedStream dataStream.keyBy(stockId)val reduceStream keyedStream.reduce((t1,t2)StockPrice(t1.stockId,t1.timeStamp,t1.pricet2.price))//打印输出reduceStream.print()//程序触发执行env.execute(ReduceTest)} }reduce结果和上面的一样就是累加
2.flink也支持自定义的reduce函数 import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.scala.
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类包含三个字段股票ID交易时间交易价格 case class StockPrice(stockId:String,timeStamp:Long,price:Double)object MyReduceFunctionTest{def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//创建数据源val stockList List(StockPrice(stock_4,1602031562148L,43.4D),StockPrice(stock_1,1602031562148L,22.9D),StockPrice(stock_0,1602031562153L,8.2D),StockPrice(stock_3,1602031562153L,42.1D),StockPrice(stock_2,1602031562153L,29.2D),StockPrice(stock_0,1602031562159L,8.1D),StockPrice(stock_4,1602031562159L,43.7D),StockPrice(stock4,1602031562169L,43.5D))val dataStream env.fromCollection(stockList) //设定转换操作逻辑val keyedStream dataStream.keyBy(stockId)val reduceStream keyedStream.reduce(new MyReduceFunction)//打印输出reduceStream.print()//程序触发执行env.execute(MyReduceFunctionTest)}class MyReduceFunction extends ReduceFunction[StockPrice] {override def reduce(t1: StockPrice,t2:StockPrice):StockPrice {StockPrice(t1.stockId,t1.timeStamp,t1.pricet2.price)}} } 主要不同的就是创建了MyReduceFunction . 3.聚合算子 和excel一样。 代码举例 import org.apache.flink.streaming.api.scala. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类包含三个字段股票ID、交易时间、交易价格 case class StockPrice(stockId:String,timeStamp:Long,price:Double) object AggregationTest{def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)
//创建数据源val stockList List(StockPrice(stock_4,1602031562148L,43.4D),StockPrice(stock_1,1602031562148L,22.9D),StockPrice(stock_0,1602031562153L,8.2D),StockPrice(stock_3,1602031562153L,42.1D),StockPrice(stock_2,1602031562153L,29.2D),StockPrice(stock_0,1602031562159L,8.1D),StockPrice(stock_4,1602031562159L,43.7D),StockPrice(stock4,1602031562169L,43.5D))val dataStream env.fromCollection(stockList)//设定转换操作逻辑val keyedStream dataStream.keyBy(stockId)val aggregationStream keyedStream.sum(2) //区别在这里 sum聚合 2表示第三个字段//打印输出aggregationStream.print()//执行操作env.execute( AggregationTest)} }运行结果
5.数据输出 1.基本数据输出包括文件输出客户端输出socket网络端口输出。 文件输出具体代码 val dataStream env.fromElements(hadoop,spark,flink) //文件输出 dataStream.writeAsText(file:///home/hadoop/output.txt) //hdfs输出//把数据写入HDFS dataStream.writeAsText(hdfs://localhost:9000/output.txt“) //通过writeToSocket方法将DataStream数据集输出到指定socket端口 dataStream.writeToSocket(outputHost,outputPort,new SimpleStringSchema())2.输出到kafka 代码举例 import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerobject SinkKafkaTest{def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//加载或创建数据源val dataStream env.fromElements(hadoop,spark,flink)//把数据输出到Kafka dataStream.addSink(new FlinkKafkaProducer String)//程序触发执行env.execute()} }