百度网盟 网站定向做网站还是做app好

当前位置: 首页 > news >正文

百度网盟 网站定向,做网站还是做app好,代理ip地址,海外 推广网站1.前期准备 #xff08;1#xff09;Flink基础环境安装
参考文章#xff1a; 利用docker-compose来搭建flink集群-CSDN博客 显示为这样就成功了 #xff08;2#xff09;把docker#xff0c;docker-compose#xff0c;kafka集群安装配置好 参考文章#xff1a; …1.前期准备 1Flink基础环境安装
参考文章 利用docker-compose来搭建flink集群-CSDN博客 显示为这样就成功了 2把dockerdocker-composekafka集群安装配置好 参考文章 利用docker搭建kafka集群并且进行相应的实践-CSDN博客 这篇文章里面有另外两篇文章的链接点进去就能够看到 3在windows上面创建一个数据库mysql1如果没有的话就需要创建接着在这个数据库里面建一个表min_table 具体代码如下 create database if not exists mysql1; – 注释符为‘– 注意有个空格 use mysql1; CREATE TABLE min_table ( id INT AUTO_INCREMENT PRIMARY KEY, timestamp TIMESTAMP NOT NULL, quantity INT NOT NULL, amount DOUBLE NOT NULL, UNIQUE KEY unique_timestamp (timestamp) ); create database if not exists mysql1; – 注释符为‘– 注意有个空格use mysql1;CREATE TABLE min_table (id INT AUTO_INCREMENT PRIMARY KEY,timestamp TIMESTAMP NOT NULL,quantity INT NOT NULL,amount DOUBLE NOT NULL,UNIQUE KEY unique_timestamp (timestamp)); 4接着在安装配置了flink的linux虚拟机上面安装好mysql 参考文章黑马大数据学习笔记4-Hive部署和基本操作_黑马大数据 hive笔记-CSDN博客 5然后同样的在linux虚拟机上面的mysql中创建一个数据库mysql1如果没有的话就需要创建接着在这个数据库里面建一个表min_table 具体代码如下 create database if not exists mysql1; – 注释符为‘– 注意有个空格 use mysql1; CREATE TABLE min_table ( id INT AUTO_INCREMENT PRIMARY KEY, timestamp TIMESTAMP NOT NULL, quantity INT NOT NULL, amount DOUBLE NOT NULL, UNIQUE KEY unique_timestamp (timestamp) ); create database if not exists mysql1; – 注释符为‘– 注意有个空格use mysql1;CREATE TABLE min_table (id INT AUTO_INCREMENT PRIMARY KEY,timestamp TIMESTAMP NOT NULL,quantity INT NOT NULL,amount DOUBLE NOT NULL,UNIQUE KEY unique_timestamp (timestamp)); 6在idea里面新建一个Maven项目名字叫做FlinkDemo然后往pom.xml中添加以下配置 dependencies!– Flink 的核心库 –dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.18.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion1.18.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion1.18.0/version/dependency!– Flink Kafka Connector –dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.0.1-1.18/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.1-1.17/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.33/version/dependency/dependencies buildpluginspluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins /build 这个和上面的是一个东西就看你喜欢一键复制还是分别复制了 dependencies     !– Flink 的核心库 –     dependency         groupIdorg.apache.flink/groupId         artifactIdflink-java/artifactId         version1.18.0/version     /dependency     dependency         groupIdorg.apache.flink/groupId         artifactIdflink-streaming-java/artifactId         version1.18.0/version     /dependency     dependency         groupIdorg.apache.flink/groupId         artifactIdflink-clients/artifactId         version1.18.0/version     /dependency     !– Flink Kafka Connector –     dependency         groupIdorg.apache.flink/groupId         artifactIdflink-connector-kafka/artifactId         version3.0.1-1.18/version     /dependency     dependency         groupIdorg.apache.flink/groupId         artifactIdflink-connector-jdbc/artifactId         version3.1.1-1.17/version     /dependency     dependency         groupIdmysql/groupId         artifactIdmysql-connector-java/artifactId         version8.0.33/version     /dependency /dependencies build     plugins         plugin             artifactIdmaven-assembly-plugin/artifactId             configuration                 descriptorRefs                     descriptorRefjar-with-dependencies/descriptorRef                 /descriptorRefs             /configuration             executions                 execution                     phasepackage/phase                     goals                         goalsingle/goal                     /goals                 /execution             /executions         /plugin     /plugins /build 7在该项目的com.examle目录下创建三个文件 目录结构如下 DatabaseSink.java package com.example;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.types.Row; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple3;import java.sql.PreparedStatement; import java.sql.Timestamp;public class DatabaseSink {private String url;private String username;private String password;public DatabaseSink(String url, String username, String password) {this.url url;this.username username;this.password password;}public void addSink(DataStreamTuple3Timestamp, Long, Double stream) {stream.addSink(JdbcSink.sink(INSERT INTO min_table (timestamp, quantity, amount) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE quantity quantity VALUES(quantity), amount amount VALUES(amount),(ps, t) - {ps.setTimestamp(1, t.f0);ps.setLong(2, t.f1);ps.setDouble(3, t.f2);},new JdbcExecutionOptions.Builder().withBatchSize(5000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(this.url).withDriverName(com.mysql.jdbc.Driver).withUsername(this.username).withPassword(this.password).build()));} } LocalFlinkTest.java package com.example;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.util.Collector; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit;public class LocalFlinkTest {public static void main(String[] args) throws Exception {SimpleDateFormat sdf new SimpleDateFormat((yyyy-MM-dd HH:mm));SimpleDateFormat sdf_hour new SimpleDateFormat(yyyy-MM-dd HH);final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(333, // 尝试重启的次数org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // 延迟));env.setRestartStrategy(RestartStrategies.noRestart());KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(192.168.88.101:19092,192.168.88.101:29092,192.168.88.101:39092) // 你的 Kafka 服务器地址.setGroupId(testGroup) // 你的消费者组 ID.setTopics(foo) // 你的主题.setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) // 从消费者组的最新偏移量开始消费.build();DataStreamString stream env.fromSource(source,WatermarkStrategy.noWatermarks(), Kafka Source); // flatMap 函数它接收一个输入元素并可以输出零个、一个或多个元素。 // 在这个函数中输入元素是从 Kafka 中读取的一行数据输出元素是一个包含交易量的元组。 // 近 1 分钟与当天累计的总交易金额、交易数量 // DataStreamString stream env.readTextFile(D:\idea\flinkTest\src\main\java\com\springbootdemo\2.csv, GBK);DataStreamTuple3Timestamp, Long, Double transactionVolumes stream.filter(new FilterFunctionString() {Overridepublic boolean filter(String value) throws Exception {// 假设文件的第一行是表头这里跳过它return !value.startsWith(time);}}).flatMap(new FlatMapFunctionString, Tuple3Timestamp, Long,Double() {Overridepublic void flatMap(String line, CollectorTuple3Timestamp, Long,Double out) {try {String[] fields line.split(,);String s fields[0]; // 解析时间字符串后将日期时间对象的秒字段设置为 0Date date sdf.parse(s);Timestamp sqlTimestamp new Timestamp(date.getTime());double price Double.parseDouble(fields[3]);long quantity Long.parseLong(fields[4]);double amount price * quantity;out.collect(Tuple3.of(sqlTimestamp, quantity, amount)); // System.out.println(line);} catch (Exception e) {System.out.println(line); }}}); // 过滤掉解析失败的记录;// 计算每 500 毫秒的数据 // keyBy(t - t.f0)代表以第一个字段 Timestamp 为键确保一个窗口内的时间都是相同的DataStreamTuple3Timestamp,Long ,Double oneSecondAmounts transactionVolumes.keyBy(t - t.f0).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce((Tuple3Timestamp,Long ,Double value1,Tuple3Timestamp,Long ,Double value2) - { // System.out.println(Tuple3.of(value1.f0,value1.f1 value2.f1, value1.f2 value2.f2));return Tuple3.of(value1.f0,value1.f1 value2.f1, value1.f2 value2.f2);});oneSecondAmounts.print();DatabaseSink dbSink new DatabaseSink(jdbc:mysql://localhost:3306/mysql1, root, 123456);dbSink.addSink(oneSecondAmounts);env.execute(Kafka Flink Demo);} }DatabaseSink dbSink new DatabaseSink(jdbc:mysql://localhost:3306/mysql1, root, 123456); 这里的密码应该改成你自己的。当然博主本人的是123456 FlinkTest.java package com.example;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.util.Collector; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit;public class FlinkTest {public static void main(String[] args) throws Exception {SimpleDateFormat sdf new SimpleDateFormat((yyyy-MM-dd HH:mm));SimpleDateFormat sdf_hour new SimpleDateFormat(yyyy-MM-dd HH);final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(333, // 尝试重启的次数org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // 延迟));env.setRestartStrategy(RestartStrategies.noRestart());KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(192.168.88.101:19092,192.168.88.101:29092,192.168.88.101:39092) // 你的 Kafka 服务器地址.setGroupId(testGroup) // 你的消费者组 ID.setTopics(foo) // 你的主题.setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) // 从消费者组的最新偏移量开始消费.build();DataStreamString stream env.fromSource(source,WatermarkStrategy.noWatermarks(), Kafka Source); // flatMap 函数它接收一个输入元素并可以输出零个、一个或多个元素。 // 在这个函数中输入元素是从 Kafka 中读取的一行数据输出元素是一个包含交易量的元组。 // 近 1 分钟与当天累计的总交易金额、交易数量 // DataStreamString stream env.readTextFile(D:\idea\flinkTest\src\main\java\com\springbootdemo\2.csv, GBK);DataStreamTuple3Timestamp, Long, Double transactionVolumes stream.filter(new FilterFunctionString() {Overridepublic boolean filter(String value) throws Exception {// 假设文件的第一行是表头这里跳过它return !value.startsWith(time);}}).flatMap(new FlatMapFunctionString, Tuple3Timestamp, Long,Double() {Overridepublic void flatMap(String line, CollectorTuple3Timestamp, Long,Double out) {try {String[] fields line.split(,);String s fields[0]; // 解析时间字符串后将日期时间对象的秒字段设置为 0Date date sdf.parse(s);Timestamp sqlTimestamp new Timestamp(date.getTime());double price Double.parseDouble(fields[3]);long quantity Long.parseLong(fields[4]);double amount price * quantity;out.collect(Tuple3.of(sqlTimestamp, quantity, amount)); // System.out.println(line);} catch (Exception e) {System.out.println(line); }}}); // 过滤掉解析失败的记录;// 计算每 500 毫秒的数据 // keyBy(t - t.f0)代表以第一个字段 Timestamp 为键确保一个窗口内的时间都是相同的DataStreamTuple3Timestamp,Long ,Double oneSecondAmounts transactionVolumes.keyBy(t - t.f0).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce((Tuple3Timestamp,Long ,Double value1,Tuple3Timestamp,Long ,Double value2) - { // System.out.println(Tuple3.of(value1.f0,value1.f1 value2.f1, value1.f2 value2.f2));return Tuple3.of(value1.f0,value1.f1 value2.f1, value1.f2 value2.f2);});oneSecondAmounts.print();DatabaseSink dbSink new DatabaseSink(jdbc:mysql://192.168.88.101:3306/mysql1, root, 123456);dbSink.addSink(oneSecondAmounts);env.execute(Kafka Flink Demo);} }DatabaseSink dbSink new DatabaseSink(jdbc:mysql://192.168.88.101:3306/mysql1, root, 123456); 这里的密码和主机号192.168.88.101应该改成你自己的密码和主机号 2.开始实验分为本地测试和flink测试 1启动node1打开Finalshell启动docker启动kafka集群flink集群 systemctl start docker cd /export/server docker-compose -f kafka.yml up -d docker-compose -f flink.yml up -d docker ps 效果如下 2先进行本地测试这里只需要用到kafka集群 打开两个node1的窗口 在第二个窗口进入kafka2容器启动消费者进程 代码 docker exec -it kafka2 /bin/bash cd /opt/bitnami/kafka/bin kafka-console-consumer.sh –bootstrap-server 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 –topic foo 效果如下 进入idea运行这个文件LocalFlinkTest.java 在第一个窗口进入kafka1容器发送文件的前5行 [rootnode1 server]# docker exec -it kafka1 /bin/bash roota2f7152188c1:/#  cd /opt/bitnami/kafka/bin roota2f7152188c1:/opt/bitnami/kafka/bin# head -n 5 /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh –broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 –topic foo roota2f7152188c1:/opt/bitnami/kafka/bin# 代码 docker exec -it kafka1 /bin/bash cd /opt/bitnami/kafka/bin head -n 5 /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh –broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 –topic foo 接着在idea里面查看 在mysql里查看 到这里本地测试就已经成功了 3再进行flink测试先在idea这里双击packge然后去target目录看看有没有多出这两个文件先运行文件FlinkTest.java先 运行文件FlinkTest.java 在idea这里双击packge然后去target目录看看有没有多出这两个文件  进入网页node1:8081,上传这个名字更长的jar包 输入这个路径 D:\JetBrains\idea-project\FlinkDemo\target 反正就是target目录的位置 添加成功后 点一下那个玩意儿填入如下内容com.example.FlinkTest 这个com.example.FlinkTest是FlinkTest.java在项目中的路径 以及选择输入3 然后点击submit提交即可结果显示正常运行 再回到node1的第一个窗口 在这个位置 root41d3910fe6c9:/opt/bitnami/kafka/bin#输入以下代码kafka1的/opt/bitnami/kafka/bin目录下来发个文件过去 代码 cat /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh –broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 –topic foo 任意点开一个在监控参数中选择numRecordsInPerSecond可以查看每秒处理数据速度。