免费注册域名网站推荐电子贺卡app
- 作者: 五速梦信息网
- 时间: 2026年03月21日 10:22
当前位置: 首页 > news >正文
免费注册域名网站推荐,电子贺卡app,江西seo推广软件,Wordpress找不到外观选项系列文章目录
物流实时数仓#xff1a;采集通道搭建 物流实时数仓#xff1a;数仓搭建 物流实时数仓#xff1a;数仓搭建#xff08;DIM#xff09; 物流实时数仓#xff1a;数仓搭建#xff08;DWD#xff09;一 文章目录 系列文章目录前言一、文件编写1.目录创建2.b…系列文章目录
物流实时数仓采集通道搭建 物流实时数仓数仓搭建 物流实时数仓数仓搭建DIM 物流实时数仓数仓搭建DWD一 文章目录 系列文章目录前言一、文件编写1.目录创建2.bean文件1.DwdOrderDetailOriginBean2.DwdOrderInfoOriginBean3.DwdTradeCancelDetailBean4.DwdTradeOrderDetailBean5.DwdTradePaySucDetailBean6.DwdTransBoundFinishDetailBean7.DwdTransDeliverSucDetailBean8.DwdTransDispatchDetailBean9.DwdTransReceiveDetailBean10.DwdTransSignDetailBean 3.DwdOrderRelevantApp 二、代码测试1.环境启动2.kafka消费者3.修改配置4.测试结果 总结 前言
这次博客我们进行DWD层的搭建内容比较多一次可能写不完。 以上就是本次博客需要完成的内容简单来说就是从kafka读取数据然后根据不同的关键字将其从主流中进行分离然后在写入各自的kafka中以便后续的操作 一、文件编写
1.目录创建
我们现在beans中创建后边需要的的bean 然后在dwd目录中创建此次需要的app
2.bean文件
1.DwdOrderDetailOriginBean
package com.atguigu.tms.realtime.beans;import lombok.Data;import java.math.BigDecimal;/*订单货物明细实体类*/Data
public class DwdOrderDetailOriginBean {// 编号主键String id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumnLength;// 宽cmInteger volumnWidth;// 高cmInteger volumnHeight;// 重量 kgBigDecimal weight;// 创建时间String createTime;// 更新时间String updateTime;// 是否删除String isDeleted;
}2.DwdOrderInfoOriginBean
package com.atguigu.tms.realtime.beans;import lombok.Data;import java.math.BigDecimal;/* 订单实体类/
Data
public class DwdOrderInfoOriginBean {// 编号主键String id;// 运单号String orderNo;// 运单状态String status;// 取件类型1为网点自寄2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间Long estimateArriveTime;// 距离单位公里BigDecimal distance;// 创建时间String createTime;// 更新时间String updateTime;// 是否删除String isDeleted;
}3.DwdTradeCancelDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/** 交易域:取消运单事务事实表实体类*/
Data
public class DwdTradeCancelDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 取消时间String cancelTime;// 运单号String orderNo;// 运单状态String status;// 取件类型1为网点自寄2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离单位公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id detailOriginBean.id;this.orderId detailOriginBean.orderId;this.cargoType detailOriginBean.cargoType;this.volumeLength detailOriginBean.volumnLength;this.volumeWidth detailOriginBean.volumnWidth;this.volumeHeight detailOriginBean.volumnHeight;this.weight detailOriginBean.weight;// 合并原始订单字段this.orderNo infoOriginBean.orderNo;this.status infoOriginBean.status;this.collectType infoOriginBean.collectType;this.userId infoOriginBean.userId;this.receiverComplexId infoOriginBean.receiverComplexId;this.receiverProvinceId infoOriginBean.receiverProvinceId;this.receiverCityId infoOriginBean.receiverCityId;this.receiverDistrictId infoOriginBean.receiverDistrictId;this.receiverName infoOriginBean.receiverName;this.senderComplexId infoOriginBean.senderComplexId;this.senderProvinceId infoOriginBean.senderProvinceId;this.senderCityId infoOriginBean.senderCityId;this.senderDistrictId infoOriginBean.senderDistrictId;this.senderName infoOriginBean.senderName;this.paymentType infoOriginBean.paymentType;this.cargoNum infoOriginBean.cargoNum;this.amount infoOriginBean.amount;this.estimateArriveTime DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance infoOriginBean.distance;this.cancelTime DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000);this.ts DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000;}
}4.DwdTradeOrderDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/*交易域:下单事务事实表实体类*/
Data
public class DwdTradeOrderDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 下单时间String orderTime;// 运单号String orderNo;// 运单状态String status;// 取件类型1为网点自寄2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离单位公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id detailOriginBean.id;this.orderId detailOriginBean.orderId;this.cargoType detailOriginBean.cargoType;this.volumeLength detailOriginBean.volumnLength;this.volumeWidth detailOriginBean.volumnWidth;this.volumeHeight detailOriginBean.volumnHeight;this.weight detailOriginBean.weight;this.orderTime DateFormatUtil.toYmdHms(DateFormatUtil.toTs(detailOriginBean.createTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000);this.ts DateFormatUtil.toTs(detailOriginBean.createTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000;// 合并原始订单字段this.orderNo infoOriginBean.orderNo;this.status infoOriginBean.status;this.collectType infoOriginBean.collectType;this.userId infoOriginBean.userId;this.receiverComplexId infoOriginBean.receiverComplexId;this.receiverProvinceId infoOriginBean.receiverProvinceId;this.receiverCityId infoOriginBean.receiverCityId;this.receiverDistrictId infoOriginBean.receiverDistrictId;this.receiverName infoOriginBean.receiverName;this.senderComplexId infoOriginBean.senderComplexId;this.senderProvinceId infoOriginBean.senderProvinceId;this.senderCityId infoOriginBean.senderCityId;this.senderDistrictId infoOriginBean.senderDistrictId;this.senderName infoOriginBean.senderName;this.paymentType infoOriginBean.paymentType;this.cargoNum infoOriginBean.cargoNum;this.amount infoOriginBean.amount;this.estimateArriveTime DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance infoOriginBean.distance;}
}5.DwdTradePaySucDetailBean
package com.atguigu.tms.realtime.beans;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/交易域:支付成功事务事实表实体类/
Data
public class DwdTradePaySucDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 支付时间String payTime;// 运单号String orderNo;// 运单状态String status;// 取件类型1为网点自寄2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离单位公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id detailOriginBean.id;this.orderId detailOriginBean.orderId;this.cargoType detailOriginBean.cargoType;this.volumeLength detailOriginBean.volumnLength;this.volumeWidth detailOriginBean.volumnWidth;this.volumeHeight detailOriginBean.volumnHeight;this.weight detailOriginBean.weight;// 合并原始订单字段this.orderNo infoOriginBean.orderNo;this.status infoOriginBean.status;this.collectType infoOriginBean.collectType;this.userId infoOriginBean.userId;this.receiverComplexId infoOriginBean.receiverComplexId;this.receiverProvinceId infoOriginBean.receiverProvinceId;this.receiverCityId infoOriginBean.receiverCityId;this.receiverDistrictId infoOriginBean.receiverDistrictId;this.receiverName infoOriginBean.receiverName;this.senderComplexId infoOriginBean.senderComplexId;this.senderProvinceId infoOriginBean.senderProvinceId;this.senderCityId infoOriginBean.senderCityId;this.senderDistrictId infoOriginBean.senderDistrictId;this.senderName infoOriginBean.senderName;this.paymentType infoOriginBean.paymentType;this.cargoNum infoOriginBean.cargoNum;this.amount infoOriginBean.amount;this.estimateArriveTime DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance infoOriginBean.distance;this.payTime DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000);this.ts DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000;}
}6.DwdTransBoundFinishDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;
/*物流域:转运完成事务事实表实体类*/
Data
public class DwdTransBoundFinishDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 发单时间String boundFinishTime;// 运单号String orderNo;// 运单状态String status;// 取件类型1为网点自寄2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离单位公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id detailOriginBean.id;this.orderId detailOriginBean.orderId;this.cargoType detailOriginBean.cargoType;this.volumeLength detailOriginBean.volumnLength;this.volumeWidth detailOriginBean.volumnWidth;this.volumeHeight detailOriginBean.volumnHeight;this.weight detailOriginBean.weight;// 合并原始订单字段this.orderNo infoOriginBean.orderNo;this.status infoOriginBean.status;this.collectType infoOriginBean.collectType;this.userId infoOriginBean.userId;this.receiverComplexId infoOriginBean.receiverComplexId;this.receiverProvinceId infoOriginBean.receiverProvinceId;this.receiverCityId infoOriginBean.receiverCityId;this.receiverDistrictId infoOriginBean.receiverDistrictId;this.receiverName infoOriginBean.receiverName;this.senderComplexId infoOriginBean.senderComplexId;this.senderProvinceId infoOriginBean.senderProvinceId;this.senderCityId infoOriginBean.senderCityId;this.senderDistrictId infoOriginBean.senderDistrictId;this.senderName infoOriginBean.senderName;this.paymentType infoOriginBean.paymentType;this.cargoNum infoOriginBean.cargoNum;this.amount infoOriginBean.amount;this.estimateArriveTime DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance infoOriginBean.distance;this.boundFinishTime DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000);this.ts DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000;}
}7.DwdTransDeliverSucDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;
/物流域:派送成功事务事实表实体类/
Data
public class DwdTransDeliverSucDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 派送成功时间String deliverTime;// 运单号String orderNo;// 运单状态String status;// 取件类型1为网点自寄2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离单位公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id detailOriginBean.id;this.orderId detailOriginBean.orderId;this.cargoType detailOriginBean.cargoType;this.volumeLength detailOriginBean.volumnLength;this.volumeWidth detailOriginBean.volumnWidth;this.volumeHeight detailOriginBean.volumnHeight;this.weight detailOriginBean.weight;// 合并原始订单字段this.orderNo infoOriginBean.orderNo;this.status infoOriginBean.status;this.collectType infoOriginBean.collectType;this.userId infoOriginBean.userId;this.receiverComplexId infoOriginBean.receiverComplexId;this.receiverProvinceId infoOriginBean.receiverProvinceId;this.receiverCityId infoOriginBean.receiverCityId;this.receiverDistrictId infoOriginBean.receiverDistrictId;this.receiverName infoOriginBean.receiverName;this.senderComplexId infoOriginBean.senderComplexId;this.senderProvinceId infoOriginBean.senderProvinceId;this.senderCityId infoOriginBean.senderCityId;this.senderDistrictId infoOriginBean.senderDistrictId;this.senderName infoOriginBean.senderName;this.paymentType infoOriginBean.paymentType;this.cargoNum infoOriginBean.cargoNum;this.amount infoOriginBean.amount;this.estimateArriveTime DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance infoOriginBean.distance;this.deliverTime DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000);this.ts DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000;}
}8.DwdTransDispatchDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;import lombok.Data;import java.math.BigDecimal;/*物流域:发单事务事实表实体类*/
Data
public class DwdTransDispatchDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 发单时间String dispatchTime;// 运单号String orderNo;// 运单状态String status;// 取件类型1为网点自寄2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离单位公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id detailOriginBean.id;this.orderId detailOriginBean.orderId;this.cargoType detailOriginBean.cargoType;this.volumeLength detailOriginBean.volumnLength;this.volumeWidth detailOriginBean.volumnWidth;this.volumeHeight detailOriginBean.volumnHeight;this.weight detailOriginBean.weight;// 合并原始订单字段this.orderNo infoOriginBean.orderNo;this.status infoOriginBean.status;this.collectType infoOriginBean.collectType;this.userId infoOriginBean.userId;this.receiverComplexId infoOriginBean.receiverComplexId;this.receiverProvinceId infoOriginBean.receiverProvinceId;this.receiverCityId infoOriginBean.receiverCityId;this.receiverDistrictId infoOriginBean.receiverDistrictId;this.receiverName infoOriginBean.receiverName;this.senderComplexId infoOriginBean.senderComplexId;this.senderProvinceId infoOriginBean.senderProvinceId;this.senderCityId infoOriginBean.senderCityId;this.senderDistrictId infoOriginBean.senderDistrictId;this.senderName infoOriginBean.senderName;this.paymentType infoOriginBean.paymentType;this.cargoNum infoOriginBean.cargoNum;this.amount infoOriginBean.amount;this.estimateArriveTime DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance infoOriginBean.distance;this.dispatchTime DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000);this.ts DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000;}
}9.DwdTransReceiveDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/物流域:揽收接单事务事实表实体类/
Data
public class DwdTransReceiveDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 揽收时间String receiveTime;// 运单号String orderNo;// 运单状态String status;// 取件类型1为网点自寄2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离单位公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id detailOriginBean.id;this.orderId detailOriginBean.orderId;this.cargoType detailOriginBean.cargoType;this.volumeLength detailOriginBean.volumnLength;this.volumeWidth detailOriginBean.volumnWidth;this.volumeHeight detailOriginBean.volumnHeight;this.weight detailOriginBean.weight;// 合并原始订单字段this.orderNo infoOriginBean.orderNo;this.status infoOriginBean.status;this.collectType infoOriginBean.collectType;this.userId infoOriginBean.userId;this.receiverComplexId infoOriginBean.receiverComplexId;this.receiverProvinceId infoOriginBean.receiverProvinceId;this.receiverCityId infoOriginBean.receiverCityId;this.receiverDistrictId infoOriginBean.receiverDistrictId;this.receiverName infoOriginBean.receiverName;this.senderComplexId infoOriginBean.senderComplexId;this.senderProvinceId infoOriginBean.senderProvinceId;this.senderCityId infoOriginBean.senderCityId;this.senderDistrictId infoOriginBean.senderDistrictId;this.senderName infoOriginBean.senderName;this.paymentType infoOriginBean.paymentType;this.cargoNum infoOriginBean.cargoNum;this.amount infoOriginBean.amount;this.estimateArriveTime DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance infoOriginBean.distance;this.receiveTime DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000);this.ts DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000;}
}10.DwdTransSignDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/*** 物流域:签收事务事实表实体类*/
Data
public class DwdTransSignDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 签收时间String signTime;// 运单号String orderNo;// 运单状态String status;// 取件类型1为网点自寄2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离单位公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id detailOriginBean.id;this.orderId detailOriginBean.orderId;this.cargoType detailOriginBean.cargoType;this.volumeLength detailOriginBean.volumnLength;this.volumeWidth detailOriginBean.volumnWidth;this.volumeHeight detailOriginBean.volumnHeight;this.weight detailOriginBean.weight;// 合并原始订单字段this.orderNo infoOriginBean.orderNo;this.status infoOriginBean.status;this.collectType infoOriginBean.collectType;this.userId infoOriginBean.userId;this.receiverComplexId infoOriginBean.receiverComplexId;this.receiverProvinceId infoOriginBean.receiverProvinceId;this.receiverCityId infoOriginBean.receiverCityId;this.receiverDistrictId infoOriginBean.receiverDistrictId;this.receiverName infoOriginBean.receiverName;this.senderComplexId infoOriginBean.senderComplexId;this.senderProvinceId infoOriginBean.senderProvinceId;this.senderCityId infoOriginBean.senderCityId;this.senderDistrictId infoOriginBean.senderDistrictId;this.senderName infoOriginBean.senderName;this.paymentType infoOriginBean.paymentType;this.cargoNum infoOriginBean.cargoNum;this.amount infoOriginBean.amount;this.estimateArriveTime DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance infoOriginBean.distance;this.signTime DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000);this.ts DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll(T, ).replaceAll(Z, ), true) 8 * 60 * 60 * 1000;}
}3.DwdOrderRelevantApp
package com.atguigu.tms.realtime.app.dwd;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.*;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class DwdOrderRelevantApp {public static void main(String[] args) throws Exception {// 1.环境准备StreamExecutionEnvironment env CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 2.从Kafka读数据String topic tms_ods;String groupId dwd_order_relevant_group;KafkaSourceString kafkaSource KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperatorString kafkaStrDS env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka_source).uid(kafka_source);// 3.筛选订单和订单明细数据SingleOutputStreamOperatorString filterDS kafkaStrDS.filter((FilterFunctionString) jsonStr - {JSONObject jsonObj JSON.parseObject(jsonStr);String tableName jsonObj.getJSONObject(source).getString(table);return order_info.equals(tableName) || order_cargo.equals(tableName);});
// filterDS.print();// 4.对流中的数据类型进行转换 jsonStr-jsonObjSingleOutputStreamOperatorJSONObject jsonObjDS filterDS.map((MapFunctionString, JSONObject) jsonStr - {JSONObject jsonObj JSON.parseObject(jsonStr);String tableName jsonObj.getJSONObject(source).getString(table);jsonObj.put(table, tableName);jsonObj.remove(source);jsonObj.remove(transaction);return jsonObj;});// jsonObjDS.print();// 5.按照order_id进行分组KeyedStreamJSONObject, String keyDS jsonObjDS.keyBy((KeySelectorJSONObject, String) jsonObj - {String table jsonObj.getString(table);if (order_info.equals(table)) {return jsonObj.getJSONObject(after).getString(id);}return jsonObj.getJSONObject(after).getString(order_id);});
// keyDS.print();// 6.定义侧输出流标签 下单放到主流支付成功、取消运单、揽收(接单)、发单 转运完成、派送成功、签收放到侧输出流// 支付成功明细流标签OutputTagString paySucTag new OutputTagString(dwd_trade_pay_suc_detail) {};// 取消运单明细流标签OutputTagString cancelDetailTag new OutputTagString(dwd_trade_cancel_detail) {};// 揽收明细流标签OutputTagString receiveDetailTag new OutputTagString(dwd_trans_receive_detail) {};// 发单明细流标签OutputTagString dispatchDetailTag new OutputTagString(dwd_trans_dispatch_detail) {};// 转运完成明细流标签OutputTagString boundFinishDetailTag new OutputTagString(dwd_trans_bound_finish_detail) {};// 派送成功明细流标签OutputTagString deliverSucDetailTag new OutputTagString(dwd_trans_deliver_detail) {};// 签收明细流标签OutputTagString signDetailTag new OutputTagString(dwd_trans_sign_detail) {};// 7.分流SingleOutputStreamOperatorString orderDetailDS keyDS.process(new KeyedProcessFunctionString, JSONObject, String() {private ValueStateDwdOrderInfoOriginBean infoBeanState;private ValueStateDwdOrderDetailOriginBean detailBeanState;Overridepublic void open(Configuration parameters) {ValueStateDescriptorDwdOrderInfoOriginBean InfoOriginBeanStateDescriptor new ValueStateDescriptor(infoBeanState, DwdOrderInfoOriginBean.class);InfoOriginBeanStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(5)).build());infoBeanState getRuntimeContext().getState(InfoOriginBeanStateDescriptor);ValueStateDescriptor detailBeanStateDescriptor new ValueStateDescriptor(detailBeanState, DwdOrderDetailOriginBean.class);detailBeanState getRuntimeContext().getState(detailBeanStateDescriptor);}Overridepublic void processElement(JSONObject jsonObj, KeyedProcessFunctionString, JSONObject, String.Context ctx, CollectorString out) throws Exception {String table jsonObj.getString(table);String op jsonObj.getString(op);JSONObject data jsonObj.getJSONObject(after);if (order_info.equals(table)) {//处理的是订单数据DwdOrderInfoOriginBean infoOriginBean data.toJavaObject(DwdOrderInfoOriginBean.class);// 脱敏String senderName infoOriginBean.getSenderName();String receiverName infoOriginBean.getReceiverName();senderName senderName.charAt(0) senderName.substring(1).replaceAll(., \*);receiverName receiverName.charAt(0) receiverName.substring(1).replaceAll(., \*);infoOriginBean.setSenderName(senderName);infoOriginBean.setReceiverName(receiverName);DwdOrderDetailOriginBean detailOriginBean detailBeanState.value();if (c.equals(op)) {// 下单操作if (detailOriginBean null) {// 订单数据 比明细数据先到将订单数据放到状态中infoBeanState.update(infoOriginBean);} else {// 说明订单数据来之前明细数据已经来到了直接关联DwdTradeOrderDetailBean dwdTradeOrderDetailBean new DwdTradeOrderDetailBean();dwdTradeOrderDetailBean.mergeBean(detailOriginBean, infoOriginBean);// 将下单业务过程数据 放到主流中out.collect(JSON.toJSONString(dwdTradeOrderDetailBean));}} else if (u.equals(op) detailOriginBean ! null) {// 其它操作// 获取修改前的数据JSONObject oldData jsonObj.getJSONObject(before);// 获取修改前的状态值String oldStatus oldData.getString(status);String status infoOriginBean.getStatus();if (!oldStatus.equals(status)) {// 说明修改的是status字段String changeLog oldStatus - status;switch (changeLog) {case 60010 - 60020:// 处理支付成功数据DwdTradePaySucDetailBean dwdTradePaySucDetailBean new DwdTradePaySucDetailBean();dwdTradePaySucDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(paySucTag, JSON.toJSONString(dwdTradePaySucDetailBean));break;case 60020 - 60030:// 处理揽收明细数据DwdTransReceiveDetailBean dwdTransReceiveDetailBean new DwdTransReceiveDetailBean();dwdTransReceiveDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(receiveDetailTag, JSON.toJSONString(dwdTransReceiveDetailBean));break;case 60040 - 60050:// 处理发单明细数据DwdTransDispatchDetailBean dispatchDetailBean new DwdTransDispatchDetailBean();dispatchDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(dispatchDetailTag, JSON.toJSONString(dispatchDetailBean));break;case 60050 - 60060:// 处理转运完成明细数据DwdTransBoundFinishDetailBean boundFinishDetailBean new DwdTransBoundFinishDetailBean();boundFinishDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(boundFinishDetailTag, JSON.toJSONString(boundFinishDetailBean));break;case 60060 - 60070:// 处理派送成功数据DwdTransDeliverSucDetailBean dwdTransDeliverSucDetailBean new DwdTransDeliverSucDetailBean();dwdTransDeliverSucDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(deliverSucDetailTag, JSON.toJSONString(dwdTransDeliverSucDetailBean));break;case 60070 - 60080:// 处理签收明细数据DwdTransSignDetailBean dwdTransSignDetailBean new DwdTransSignDetailBean();dwdTransSignDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(signDetailTag, JSON.toJSONString(dwdTransSignDetailBean));// 签收后订单数据不会再发生变化状态可以清除detailBeanState.clear();break;default:if (status.equals(60999)) {DwdTradeCancelDetailBean dwdTradeCancelDetailBean new DwdTradeCancelDetailBean();dwdTradeCancelDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(cancelDetailTag, JSON.toJSONString(dwdTradeCancelDetailBean));// 取消后订单数据不会再发生变化状态可以清除detailBeanState.clear();}break;}}}} else {// 处理订单明细DwdOrderDetailOriginBean detailOriginBean data.toJavaObject(DwdOrderDetailOriginBean.class);if (c.equals(op)) {detailBeanState.update(detailOriginBean);// 获取状态中存放的订单数据 注意只有下单操作并且订单数据先到明细数据后到的情况才会从状态中拿到订单数据DwdOrderInfoOriginBean infoOriginBean infoBeanState.value();if (infoOriginBean ! null) {//属于下单业务过程DwdTradeOrderDetailBean dwdTradeOrderDetailBean new DwdTradeOrderDetailBean();dwdTradeOrderDetailBean.mergeBean(detailOriginBean, infoOriginBean);// 将下单业务过程数据 放到主流中out.collect(JSON.toJSONString(dwdTradeOrderDetailBean));}}}}}).uid(process_data);// 8.从主流中提取侧输出流// 支付成功明细流//8.1 支付成功明细流SideOutputDataStreamString paySucDS orderDetailDS.getSideOutput(paySucTag);// 8.2 取消运单明细流SideOutputDataStreamString cancelDetailDS orderDetailDS.getSideOutput(cancelDetailTag);// 8.3 揽收明细流SideOutputDataStreamString receiveDetailDS orderDetailDS.getSideOutput(receiveDetailTag);// 8.4 发单明细流SideOutputDataStreamString dispatchDetailDS orderDetailDS.getSideOutput(dispatchDetailTag);// 8.5 转运成功明细流SideOutputDataStreamString boundFinishDetailDS orderDetailDS.getSideOutput(boundFinishDetailTag);// 8.6 派送成功明细流SideOutputDataStreamString deliverSucDetailDS orderDetailDS.getSideOutput(deliverSucDetailTag);// 8.7 签收明细流SideOutputDataStreamString signDetailDS orderDetailDS.getSideOutput(signDetailTag);// 9.将不同流的数据写到kafka的不同主题中// 9.1.1 交易域下单明细主题String detailTopic tms_dwd_trade_order_detail;// 9.1.2 交易域支付成功明细主题String paySucDetailTopic tms_dwd_trade_pay_suc_detail;// 9.1.3 交易域取消运单明细主题String cancelDetailTopic tms_dwd_trade_cancel_detail;// 9.1.4 物流域接单揽收明细主题String receiveDetailTopic tms_dwd_trans_receive_detail;// 9.1.5 物流域发单明细主题String dispatchDetailTopic tms_dwd_trans_dispatch_detail;// 9.1.6 物流域转运完成明细主题String boundFinishDetailTopic tms_dwd_trans_bound_finish_detail;// 9.1.7 物流域派送成功明细主题String deliverSucDetailTopic tms_dwd_trans_deliver_detail;// 9.1.8 物流域签收明细主题String signDetailTopic tms_dwd_trans_sign_detail;// 9.2 发送数据到 Kafka// 9.2.1 运单明细数据KafkaSinkString kafkaProducer KafkaUtil.getKafkaSink(detailTopic, args);orderDetailDS.print(~~);orderDetailDS.sinkTo(kafkaProducer).uid(order_detail_sink);// 9.2.2 支付成功明细数据KafkaSinkString paySucKafkaProducer KafkaUtil.getKafkaSink(paySucDetailTopic, args);paySucDS.print(!!);paySucDS.sinkTo(paySucKafkaProducer).uid(pay_suc_detail_sink);// 9.2.3 取消运单明细数据KafkaSinkString cancelKafkaProducer KafkaUtil.getKafkaSink(cancelDetailTopic, args);cancelDetailDS.print();cancelDetailDS.sinkTo(cancelKafkaProducer).uid(cancel_detail_sink);// 9.2.4 揽收明细数据KafkaSinkString receiveKafkaProducer KafkaUtil.getKafkaSink(receiveDetailTopic, args);receiveDetailDS.print(##);receiveDetailDS.sinkTo(receiveKafkaProducer).uid(reveive_detail_sink);// 9.2.5 发单明细数据KafkaSinkString dispatchKafkaProducer KafkaUtil.getKafkaSink(dispatchDetailTopic, args);dispatchDetailDS.print($$);dispatchDetailDS.sinkTo(dispatchKafkaProducer).uid(dispatch_detail_sink);// 9.2.6 转运完成明细主题KafkaSinkString boundFinishKafkaProducer KafkaUtil.getKafkaSink(boundFinishDetailTopic, args);boundFinishDetailDS.print(%%);boundFinishDetailDS.sinkTo(boundFinishKafkaProducer).uid(bound_finish_detail_sink);// 9.2.7 派送成功明细数据KafkaSinkString deliverSucKafkaProducer KafkaUtil.getKafkaSink(deliverSucDetailTopic, args);deliverSucDetailDS.print(^^);deliverSucDetailDS.sinkTo(deliverSucKafkaProducer).uid(deliver_suc_detail_sink);// 9.2.8 签收明细数据KafkaSinkString signKafkaProducer KafkaUtil.getKafkaSink(signDetailTopic, args);signDetailDS.print();signDetailDS.sinkTo(signKafkaProducer).uid(sign_detail_sink);env.execute();}
}二、代码测试
1.环境启动
hadoopzkkf全部启动 根据流程图可以看到流程中没有使用到dim层的内容所以我们不需要启动hbase。
2.kafka消费者
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trade_order_detail
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trade_pay_suc_detail
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trade_cancel_detail
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trans_receive_detail
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trans_dispatch_detail
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trans_bound_finish_detail
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trans_deliver_detail
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trans_sign_detail一共需要查看8个消费者主题你可以开8个窗口也可以一个一个看kafka如果没有消费者会先将数据保存等待消费所以不需要8个主题同时消费。
3.修改配置 4.测试结果
先启动OdsApp和DwdOrderRelevantApp然后生成模拟数据之后查看kakfa消费者有些数据可能要多生成几次才行。
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trade_order_detail
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trade_pay_suc_detail kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trade_cancel_detail
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trans_receive_detail kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trans_dispatch_detail 这个主题是特殊情况正常可能没有输出。
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trans_bound_finish_detail
kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trans_deliver_detail kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic tms_dwd_trans_sign_detail 总结
至此这篇博客的内容结束。
- 上一篇: 免费注册网站网址亚马逊做qa的网站
- 下一篇: 免费注册自助网站企业做网站须要注意些什么
相关文章
-
免费注册网站网址亚马逊做qa的网站
免费注册网站网址亚马逊做qa的网站
- 技术栈
- 2026年03月21日
-
免费注册网站空间h5特效网站欣赏
免费注册网站空间h5特效网站欣赏
- 技术栈
- 2026年03月21日
-
免费注册建网站只做一页的网站多少钱
免费注册建网站只做一页的网站多少钱
- 技术栈
- 2026年03月21日
-
免费注册自助网站企业做网站须要注意些什么
免费注册自助网站企业做网站须要注意些什么
- 技术栈
- 2026年03月21日
-
免费追剧网站网络营销策略有哪五种
免费追剧网站网络营销策略有哪五种
- 技术栈
- 2026年03月21日
-
免费自己建网站制作照片视频的软件
免费自己建网站制作照片视频的软件
- 技术栈
- 2026年03月21日
