个体工商网站备案wordpress所有外链本地化
- 作者: 五速梦信息网
- 时间: 2026年03月21日 11:10
当前位置: 首页 > news >正文
个体工商网站备案,wordpress所有外链本地化,手机网站如何建立,十大不收费看盘软件网站重试机制 当任务出现异常的时候#xff0c;会直接停止任务——解决方式#xff0c;重试机制 1、设置checkpoint后#xff0c;会给任务一个重启策略——无限重启 2、可以手动设置任务的重启策略 代码设置 //开启checkpoint后#xff0c;默认是无限重启#xff0c;可以…重试机制 当任务出现异常的时候会直接停止任务——解决方式重试机制 1、设置checkpoint后会给任务一个重启策略——无限重启 2、可以手动设置任务的重启策略 代码设置 //开启checkpoint后默认是无限重启可以设置该值 表示不重启 env.setRestartStrategy(RestartStrategies.noRestart());//作业失败flink中最多重启3次每次重启的最小间隔是10s env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2分钟内最多重启3次每次重启的最小间隔是5秒 env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS)) );//无限重启 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, // 无限重启次数Time.of(10, TimeUnit.SECONDS) // 每次重启的延迟时间 )); 维表join 所谓的维表Join: 进入Flink的数据需要关联另外一些存储设备的数据才能计算出来结果 那么存储在外部设备上的表称之为维表可能存储在mysql也可能存储在hbase 等。 维表一般的特点是变化比较慢。——名词表维度表。 解决方式 解决维表join的方式方式一可以用一个静态代码块或者在open方法中对一个集合初始化用于存放想要相关联的数据。缺点数据不能动态改变了方式二在open中初始化连接在map中每拿到流中的一条数据就去mysql中查找一次缺点数据可以动态改变但是去mysql查找的次数太多了方式三创建一个缓存区用于存放数据若过期则再去mysql中查询数据。没有缺点可以动态获取数据了也减少了mysql的查询次数缓冲唯一的是若是多线程可能会去mysql查询多次 方式一 package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource; import org.apache.commons.dbutils.QueryRunner; import org.apache.commons.dbutils.handlers.MapHandler; import org.apache.commons.dbutils.handlers.MapListHandler; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.List; import java.util.Map; import java.util.Properties;/*** 直接从mysql中拿出* 弊端 只能拿到一次 不能实现动态*/ public class _03_维表join_01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();Properties properties new Properties();properties.setProperty(bootstrap.servers, bigdata01:9092);properties.setProperty(group.id, g1);FlinkKafkaConsumerString consumer new FlinkKafkaConsumer(edu,new SimpleStringSchema(),properties);DataStreamSourceString source env.addSource(consumer);source.map(new RichMapFunctionString, String() {ComboPooledDataSource pool null;QueryRunner queryRunner null;ListMapString, Object list null;Overridepublic void open(Configuration parameters) throws Exception {// 在open中执行sqlpool new ComboPooledDataSource();queryRunner new QueryRunner(pool);String sql select * from city ;list queryRunner.query(sql, new MapListHandler());}Overridepublic void close() throws Exception {pool.close();}Overridepublic String map(String line) throws Exception {String[] split line.split(,);Object cityName 未知;for (MapString, Object map : list) {String cityId (String)map.get(city_id);if (cityId.equals(split[1])){cityName map.get(city_name);}}return line,cityName;}}).print();env.execute();} } 方式二 package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource; import org.apache.commons.dbutils.QueryRunner; import org.apache.commons.dbutils.handlers.MapHandler; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Map; import java.util.Properties;/*** 每次从kafka中拿到一条数据就从mysql中查一遍* 弊端 对mysql的压力加大/ public class _03_维表join_02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();Properties properties new Properties();properties.setProperty(bootstrap.servers, bigdata01:9092);properties.setProperty(group.id, g1);FlinkKafkaConsumerString consumer new FlinkKafkaConsumer(edu,new SimpleStringSchema(),properties);DataStreamSourceString source env.addSource(consumer);source.map(new RichMapFunctionString, String() {ComboPooledDataSource pool null;QueryRunner queryRunner null;Overridepublic void open(Configuration parameters) throws Exception {pool new ComboPooledDataSource();queryRunner new QueryRunner(pool);}Overridepublic void close() throws Exception {pool.close();}Overridepublic String map(String line) throws Exception {// 在处理逻辑中执行sqlString[] split line.split(,);String sql select city_name from city where city_id ?;MapString, Object rs queryRunner.query(sql, new MapHandler(), split[1]);String cityName未知;if (rs !null){cityName (String) rs.get(city_name);}return line,cityName;}}).print();env.execute();} } 方式三 package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource; import org.apache.commons.dbutils.QueryRunner; import org.apache.commons.dbutils.handlers.MapHandler; import org.apache.commons.dbutils.handlers.MapListHandler; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.cache.; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit;/*** 最终 非常好的方式* 现在内存中查 查不到在去mysql中找* 唯一的问题是假如是多线程情况下可能会触发多次去mysql中查找的方法*/ public class _03_维表join_03_cache {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();Properties properties new Properties();properties.setProperty(bootstrap.servers, bigdata01:9092);properties.setProperty(group.id, g1);FlinkKafkaConsumerString consumer new FlinkKafkaConsumer(edu,new SimpleStringSchema(),properties);DataStreamSourceString source env.addSource(consumer);// 记得设置并行度env.setParallelism(1);source.map(new RichMapFunctionString, String() {ComboPooledDataSource pool null;QueryRunner queryRunner null;// 定义一个Cache// 第一个是传入的参数类型 第二个是存放的值的类型// 也就是传入一个参数根据这个值获取结果拿的时候通过传入的值 拿存放的值LoadingCacheString, String cache;Overridepublic void open(Configuration parameters) throws Exception {pool new ComboPooledDataSource();queryRunner new QueryRunner(pool);cache CacheBuilder.newBuilder()//最多缓存个数超过了就根据最近最少使用算法来移除缓存 LRU.maximumSize(1000)//在更新后的指定时间后就回收// 不会自动调用而是当过期后又用到了过期的key值数据才会触发的。.expireAfterWrite(50, TimeUnit.SECONDS)//指定移除通知.removalListener(new RemovalListenerString, String() {Overridepublic void onRemoval(RemovalNotificationString, String removalNotification) {System.out.println(removalNotification.getKey() 被移除了值为 removalNotification.getValue());}}).build(//指定加载缓存的逻辑new CacheLoaderString, String() {// 假如缓存中没有数据会触发该方法的执行并将结果自动保存到缓存中Overridepublic String load(String cityId) throws Exception {String sql select city_name from city where city_id ? ;MapString, Object rs queryRunner.query(sql, new MapHandler(), cityId);String cityName null;if (rs!null){cityName (String) rs.get(city_name);}System.out.println(进入数据库查询成功查询的值为cityId–cityName);return cityName;}});}Overridepublic void close() throws Exception {pool.close();}Overridepublic String map(String line) throws Exception {String[] arr line.split(,);// 使用这种方式取值String cityName cache.get(arr[1]);return line,cityName;}}).print();env.execute();} }
- 上一篇: 个人做信息分类网站需备案吗北京优化服务
- 下一篇: 个体户经营范围网站建设做网店有哪些拿货网站
相关文章
-
个人做信息分类网站需备案吗北京优化服务
个人做信息分类网站需备案吗北京优化服务
- 技术栈
- 2026年03月21日
-
个人做网站需要注意什么91助手
个人做网站需要注意什么91助手
- 技术栈
- 2026年03月21日
-
个人做网站能备案吗1小时赚8000元游戏
个人做网站能备案吗1小时赚8000元游戏
- 技术栈
- 2026年03月21日
-
个体户经营范围网站建设做网店有哪些拿货网站
个体户经营范围网站建设做网店有哪些拿货网站
- 技术栈
- 2026年03月21日
-
个体户可以注册网站建设服务微信公众号模板哪里找
个体户可以注册网站建设服务微信公众号模板哪里找
- 技术栈
- 2026年03月21日
-
个体户营业执照科研做企业网站吗果合gohe网站建设
个体户营业执照科研做企业网站吗果合gohe网站建设
- 技术栈
- 2026年03月21日
