贵阳企业网站排名优化小说网站搭建教程
- 作者: 五速梦信息网
- 时间: 2026年04月20日 11:05
当前位置: 首页 > news >正文
贵阳企业网站排名优化,小说网站搭建教程,shanxi建设银行网站首页,市政工程建设规范免费下载网站业务数据_增量表数据同步 1#xff09;Flume配置概述2#xff09;Flume配置实操3#xff09;通道测试4#xff09;编写Flume启停脚本 1#xff09;Flume配置概述 Flume需要将Kafka中topic_db主题的数据传输到HDFS#xff0c;故其需选用KafkaSource以及HDFSSink#xff… 业务数据_增量表数据同步 1Flume配置概述2Flume配置实操3通道测试4编写Flume启停脚本 1Flume配置概述 Flume需要将Kafka中topic_db主题的数据传输到HDFS故其需选用KafkaSource以及HDFSSinkChannel选用FileChannel。 需要注意的是 HDFSSink需要将不同mysql业务表的数据写到不同的路径并且路径中应当包含一层日期用于区分每天的数据。关键配置如下 2Flume配置实操 1创建Flume配置文件 在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_db.conf [atguiguhadoop104 flume]\( mkdir job [atguiguhadoop104 flume]\) vim job/kafka_to_hdfs_db.conf 2配置文件内容如下 a1.sources r1 a1.channels c1 a1.sinks k1a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize 5000 a1.sources.r1.batchDurationMillis 2000 a1.sources.r1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092 a1.sources.r1.kafka.topics topic_db a1.sources.r1.kafka.consumer.group.id flume a1.sources.r1.setTopicHeader true a1.sources.r1.topicHeader topic a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.atguigu.gmall.flume.interceptor.TimestampAndTableNameInterceptor\(Buildera1.channels.c1.type file a1.channels.c1.checkpointDir /opt/module/flume/checkpoint/behavior2 a1.channels.c1.dataDirs /opt/module/flume/data/behavior2/ a1.channels.c1.maxFileSize 2146435071 a1.channels.c1.capacity 1000000 a1.channels.c1.keep-alive 6## sink1 a1.sinks.k1.type hdfs a1.sinks.k1.hdfs.path /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix db a1.sinks.k1.hdfs.round falsea1.sinks.k1.hdfs.rollInterval 10 a1.sinks.k1.hdfs.rollSize 134217728 a1.sinks.k1.hdfs.rollCount 0a1.sinks.k1.hdfs.fileType CompressedStream a1.sinks.k1.hdfs.codeC gzip## 拼装 a1.sources.r1.channels c1 a1.sinks.k1.channel c13编写Flume拦截器 新建一个Maven项目并在pom.xml文件中加入如下配置 dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/versionscopeprovided/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency /dependenciesbuildpluginspluginartifactIdmaven-compiler-plugin/artifactIdversion2.3.2/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins /build在com.atguigu.gmall.flume.interceptor包下创建TimestampAndTableNameInterceptor类 package com.atguigu.gmall.flume.interceptor; import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; public class TimestampAndTableNameInterceptor implements Interceptor {Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {MapString, String headers event.getHeaders(); String log new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject JSONObject.parseObject(log);Long ts jsonObject.getLong(ts);//Maxwell输出的数据中的ts字段时间戳单位为秒Flume HDFSSink要求单位为毫秒String timeMills String.valueOf(ts * 1000);String tableName jsonObject.getString(table);headers.put(timestamp, timeMills);headers.put(tableName, tableName);return event;}Overridepublic ListEvent intercept(ListEvent events) {for (Event event : events) {intercept(event);}return events;}Overridepublic void close() {}public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new TimestampAndTableNameInterceptor ();}Overridepublic void configure(Context context) {}} }重新打包 将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下 [atguiguhadoop102 lib]\) ls | grep interceptor flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar 3通道测试 1启动Zookeeper、Kafka集群 2启动hadoop104的Flume [atguiguhadoop104 flume]\( bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.loggerinfo,console 3生成模拟数据 [atguiguhadoop102 bin]\) cd /opt/module/db_log/ [atguiguhadoop102 db_log]\( java -jar gmall2020-mock-db-2021-11-14.jar 4观察HDFS上的目标路径是否有数据出现 若HDFS上的目标路径已有增量表的数据出现了就证明数据通道已经打通。 5数据目标路径的日期说明 仔细观察会发现目标路径中的日期并非模拟数据的业务日期而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值是数据的变动日期。而真实场景下数据的业务日期与变动日期应当是一致的。 4编写Flume启停脚本 为方便使用此处编写一个Flume的启停脚本 1在hadoop102节点的/home/atguigu/bin目录下创建脚本f3.sh [atguiguhadoop102 bin]\) vim f3.sh在脚本中填写如下内容 #!/bin/bashcase \(1 in start)echo --------启动 hadoop104 业务数据flume-------ssh hadoop104 nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf /dev/null 21 ;; stop)echo --------停止 hadoop104 业务数据flume-------ssh hadoop104 ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk {print \\)2} | xargs -n1 kill ;; esac 2增加脚本执行权限 [atguiguhadoop102 bin]\( chmod 777 f3.sh 3f3启动 [atguiguhadoop102 module]\) f3.sh start 4f3停止 [atguiguhadoop102 module]\( f3.sh stop 2.2.6.3 Maxwell配置 1Maxwell时间戳问题此处为了模拟真实环境对Maxwell源码进行了改动增加了一个参数mock_date该参数的作用就是指定Maxwell输出JSON字符串的ts时间戳的日期接下来进行测试。 修改Maxwell配置文件config.properties增加mock_date参数如下 log_levelinfoproducerkafka kafka.bootstrap.servershadoop102:9092,hadoop103:9092#kafka topic配置 kafka_topictopic_db#注该参数仅在maxwell教学版中存在修改该参数后重启Maxwell才可生效 mock_date2020-06-14# mysql login info hosthadoop102 usermaxwell passwordmaxwell jdbc_optionsuseSSLfalseserverTimezoneAsia/Shanghai 注该参数仅供学习使用修改该参数后重启Maxwell才可生效。 重启Maxwell [atguiguhadoop102 bin]\) mxw.sh restart 重新生成模拟数据 [atguiguhadoop102 bin]\( cd /opt/module/db_log/ [atguiguhadoop102 db_log]\) java -jar gmall2020-mock-db-2021-11-14.jar 观察HDFS目标路径日期是否正常 2.2.6.4 增量表首日全量同步 通常情况下增量表需要在首日进行一次全量同步后续每日再进行增量同步首日全量同步可以使用Maxwell的bootstrap功能方便起见下面编写一个增量表首日全量同步脚本。 1在~/bin目录创建mysql_to_kafka_inc_init.sh [atguiguhadoop102 bin]\( vim mysql_to_kafka_inc_init.sh 脚本内容如下 #!/bin/bash# 该脚本的作用是初始化所有的增量表只需执行一次MAXWELL_HOME/opt/module/maxwellimport_data() {\)MAXWELL_HOME/bin/maxwell-bootstrap –database gmall –table \(1 --config \)MAXWELL_HOME/config.properties }case \(1 in cart_info)import_data cart_info;; comment_info)import_data comment_info;; coupon_use)import_data coupon_use;; favor_info)import_data favor_info;; order_detail)import_data order_detail;; order_detail_activity)import_data order_detail_activity;; order_detail_coupon)import_data order_detail_coupon;; order_info)import_data order_info;; order_refund_info)import_data order_refund_info;; order_status_log)import_data order_status_log;; payment_info)import_data payment_info;; refund_payment)import_data refund_payment;; user_info)import_data user_info;; all)import_data cart_infoimport_data comment_infoimport_data coupon_useimport_data favor_infoimport_data order_detailimport_data order_detail_activityimport_data order_detail_couponimport_data order_infoimport_data order_refund_infoimport_data order_status_logimport_data payment_infoimport_data refund_paymentimport_data user_info;; esac 2为mysql_to_kafka_inc_init.sh增加执行权限 [atguiguhadoop102 bin]\) chmod 777 ~/bin/mysql_to_kafka_inc_init.sh 3测试同步脚本 1清理历史数据 为方便查看结果现将HDFS上之前同步的增量表数据删除 [atguiguhadoop102 ~]\( hadoop fs -ls /origin_data/gmall/db | grep _inc | awk {print KaTeX parse error: Expected EOF, got } at position 2: 8}̲ | xargs hadoo… mysql_to_kafka_inc_init.sh all 4检查同步结果 观察HDFS上是否重新出现增量表数据。 2.3 采集通道启动/停止脚本 1在/home/atguigu/bin目录下创建脚本cluster.sh [atguiguhadoop102 bin]\) vim cluster.sh在脚本中填写如下内容 #!/bin/bashcase \(1 in start){echo 启动 集群 #启动 Zookeeper集群zk.sh start#启动 Hadoop集群hdp.sh start#启动 Kafka采集集群kf.sh start#启动采集 Flumef1.sh start#启动日志消费 Flumef2.sh start#启动业务消费 Flumef3.sh start#启动 maxwellmxw.sh start};; stop){echo 停止 集群 #停止 Maxwellmxw.sh stop#停止 业务消费Flumef3.sh stop#停止 日志消费Flumef2.sh stop#停止 日志采集Flumef1.sh stop#停止 Kafka采集集群kf.sh stop#停止 Hadoop集群hdp.sh stop#停止 Zookeeper集群zk.sh stop};; esac2增加脚本执行权限 [atguiguhadoop102 bin]\) chmod 777 cluster.sh 3cluster集群启动脚本 [atguiguhadoop102 module]\( cluster.sh start 4cluster集群停止脚本 [atguiguhadoop102 module]\) cluster.sh stop
- 上一篇: 贵阳企业网站模板个体工商注册查询平台
- 下一篇: 贵阳企业自助建站系统电话交换机ip地址
相关文章
-
贵阳企业网站模板个体工商注册查询平台
贵阳企业网站模板个体工商注册查询平台
- 技术栈
- 2026年04月20日
-
贵阳企业建站系统模板个人 服务器 linux 建网站
贵阳企业建站系统模板个人 服务器 linux 建网站
- 技术栈
- 2026年04月20日
-
贵阳模板建站定制seo从入门到精通
贵阳模板建站定制seo从入门到精通
- 技术栈
- 2026年04月20日
-
贵阳企业自助建站系统电话交换机ip地址
贵阳企业自助建站系统电话交换机ip地址
- 技术栈
- 2026年04月20日
-
贵阳商城网站开发凡客官网旗舰店
贵阳商城网站开发凡客官网旗舰店
- 技术栈
- 2026年04月20日
-
贵阳网站备案核验点照相西安在线网站
贵阳网站备案核验点照相西安在线网站
- 技术栈
- 2026年04月20日
