做php网站用什么软件好手机p2p网站开发

当前位置: 首页 > news >正文

做php网站用什么软件好,手机p2p网站开发,php网站建设思路,idzoom室内设计师网前言 使用Starrocks引擎中的窗口函数 row_number() over( )对10亿的数据集进行去重操作#xff0c;BE内存溢出问题频发#xff08;忘记当时指定的BE内存上限是多少了…..#xff09;#xff0c;此时才意识到#xff0c;开窗操作#xff0c;如果使用 不当#xff0c;反而…前言 使用Starrocks引擎中的窗口函数 row_number() over( )对10亿的数据集进行去重操作BE内存溢出问题频发忘记当时指定的BE内存上限是多少了…..此时才意识到开窗操作如果使用 不当反而更容易引发性能问题。 下文是对Hive中的窗口函数底层源码进行初步学习若有问题请指正~ 一、窗口函数的执行步骤 1将数据分割成多个分区 2在各个分区上调用窗口函数 由于窗口函数的返回结果不是一个聚合值而是另一张表的格式table-in, table-out因此Hive社区引入分区表函数  Partitioned Table FunctionPTF。 简略的代码流转图 hive会把QueryBlock翻译成执行操作数OperatorTree其中每个operator都会有三个重要的方法 initializeOp() 初始化算子process() 执行每一行数据forward() 把处理好的每一行数据发送到下个Operator 当遇到窗口函数时会生成PTFOperatorPTFOperator依赖PTFInvocation 读取已经排好序的数据创建相应的输入分区PTFPartition inputPart; WindowTableFunction 负责管理窗口帧、调用窗口函数UDAF、并将结果写入输出分区: PTFPartition outputPart。 二、源码分析 2.1 PTFOperator 类 是PartitionedTableFunction的运算符继承Operator抽象类Hive运算符基类 重写process(Object row, int tag) 方法该方法来处理一行数据Row Overridepublic void process(Object row, int tag) throws HiveException {if (!isMapOperator) {/** check if current row belongs to the current accumulated Partition:* - If not:* - process the current Partition* - reset input Partition* - set currentKey to the newKey if it is null or has changed./newKeys.getNewKey(row, inputObjInspectors[0]);//会判断当前row所属的KeynewKeys是否等于当前正在累积数据的partition所属的keycurrentKeysboolean keysAreEqual (currentKeys ! null newKeys ! null) ?newKeys.equals(currentKeys) : false;// 如果不相等就结束当前partition分区的数据累积触发窗口计算if (currentKeys ! null !keysAreEqual) {// 关闭正在积累的分区ptfInvocation.finishPartition();}// 如果currentKeys为空或者被改变就将newKeys赋值给currentKeysif (currentKeys null || !keysAreEqual) {// 开启一个新的分区partitionptfInvocation.startPartition();if (currentKeys null) {currentKeys newKeys.copyKey();} else {currentKeys.copyKey(newKeys);}}} else if (firstMapRow) { // 说明当前row是进入的第一行ptfInvocation.startPartition();firstMapRow false;}// 将数据row添加到分区中积累数据ptfInvocation.processRow(row);}上面的代码可以看出所有数据应该是按照分区排好了序排队进入process方法当遇到进入的row和当前分区不是同一个key时当前分区就可以关闭了然后在打开下一个分区。 2.2 PTFInvocation类 PTFInvocation是PTFOperator类 的内部类 在PTFOperator的初始化方法中创建了实例。 Overrideprotected void initializeOp(Configuration jobConf) throws HiveException {…ptfInvocation setupChain();ptfInvocation.initializeStreaming(jobConf, isMapOperator);…}它的主要作用是负责PTF 数据链中行 row的流动通过 ptfInvocation.processRow(row) 方法调用传递链中的每一行并且通过ptfInvocation.startPartition()、ptfInvocation.finishPartition()方法来通知分区何时开始何时结束。 该类中包含TableFunction用来处理分区数据。 PTFPartition inputPart; // inputPart理解为分区对象一直是在复用一个inputPart TableFunctionEvaluator tabFn; // tabFn理解为窗口函数的实例//向分区中添加一行数据 void processRow(Object row) throws HiveException {if (isStreaming()) {// tabFn是窗口函数的实例handleOutputRows(tabFn.processRow(row));} else {// inputPart就是当前正在累积数据的分区inputPart.append(row);} }// 开启一个分区 void startPartition() throws HiveException {if (isStreaming()) {tabFn.startPartition();} else {if (prev null || prev.isOutputIterator()) {if (inputPart null) {// 创建新分区对象PTFPartition对象createInputPartition();} else {// 重置分区inputPart.reset();}}}if (next ! null) {next.startPartition();} }// 关闭一个分区 void finishPartition() throws HiveException {if (isStreaming()) {handleOutputRows(tabFn.finishPartition());} else {if (tabFn.canIterateOutput()) {outputPartRowsItr inputPart null ? null :tabFn.iterator(inputPart.iterator());} else {// tabFn是窗口函数的实例execute方法执行窗口函数逻辑的计算返回outputPart依旧是一个分区对象outputPart inputPart null ? null : tabFn.execute(inputPart);outputPartRowsItr outputPart null ? null : outputPart.iterator();}if (next ! null) {if (!next.isStreaming() !isOutputIterator()) {next.inputPart outputPart;} else {if (outputPartRowsItr ! null) {while (outputPartRowsItr.hasNext()) {next.processRow(outputPartRowsItr.next());}}}}if (next ! null) {next.finishPartition();} else {if (!isStreaming()) {if (outputPartRowsItr ! null) {while (outputPartRowsItr.hasNext()) {// 将窗口函数计算结果逐条输出到下一个Operator中forward(outputPartRowsItr.next(), outputObjInspector);}}}} }2.3 PTFPartition类 该类表示由TableFunction或WindowFunction来处理的行集合使用PTFRowContainer来保存数据。 private final PTFRowContainerListObject elems; // 存放数据的容器public void append(Object o) throws HiveException {//在往PTFPartition中添加数据时如果当前累计条数超过了Int最大值(21亿)会抛异常。if (elems.rowCount() Integer.MAX_VALUE) {throw new HiveException(String.format(Cannot add more than %d elements to a PTFPartition,Integer.MAX_VALUE));}SuppressWarnings(unchecked)ListObject l (ListObject)ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE);elems.addRow(l); }2.4 TableFunctionEvaluator类 该类负责对分区内的数据做实际的窗口计算 public abstract class TableFunctionEvaluator { transient protected PTFPartition outputPartition; // transient瞬态变量该属性可以不参与序列化// iPart理解为分区对象 public PTFPartition execute(PTFPartition iPart)throws HiveException {if (ptfDesc.isMapSide()) {return transformRawInput(iPart);}PTFPartitionIteratorObject pItr iPart.iterator();PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc.getLlInfo(), pItr);if (outputPartition null) {outputPartition PTFPartition.create(ptfDesc.getCfg(),tableDef.getOutputShape().getSerde(),OI, tableDef.getOutputShape().getOI());} else {outputPartition.reset();}// 入参1输入PTFPartition转换的迭代器入参2输出PTFPartitionexecute(pItr, outputPartition);return outputPartition; }protected abstract void execute(PTFPartitionIteratorObject pItr, PTFPartition oPart) throws HiveException; } 抽象方法 execute(PTFPartitionIterator pItr, PTFPartition oPart) 方法的具体实现在子类WindowingTableFunction中 public class WindowingTableFunction extends TableFunctionEvaluator {Override public void execute(PTFPartitionIteratorObject pItr, PTFPartition outP) throws HiveException {ArrayListList? oColumns new ArrayListList?();PTFPartition iPart pItr.getPartition();StructObjectInspector inputOI iPart.getOutputOI();WindowTableFunctionDef wTFnDef (WindowTableFunctionDef) getTableDef();for (WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {// 这里是判断逻辑如果该窗口定义是一个从第一行到最后一行的全局无限窗口就返回false反之trueboolean processWindow processWindow(wFn.getWindowFrame());pItr.reset();if (!processWindow) {Object out evaluateFunctionOnPartition(wFn, iPart);if (!wFn.isPivotResult()) {out new SameList(iPart.size(), out);}oColumns.add((List?) out);} else {oColumns.add(executeFnwithWindow(wFn, iPart));}}/** Output Columns in the following order - the columns representing the output from Window Fns* - the input Rows columns*/for (int i 0; i iPart.size(); i) {ArrayList oRow new ArrayList();Object iRow iPart.getAt(i);for (int j 0; j oColumns.size(); j) {oRow.add(oColumns.get(j).get(i));}for (StructField f : inputOI.getAllStructFieldRefs()) {oRow.add(inputOI.getStructFieldData(iRow, f));}//最终将处理好的数据逐条添加到输出PTFPartition中outP.append(oRow);} }// Evaluate the function result for each row in the partition ArrayListObject executeFnwithWindow(WindowFunctionDef wFnDef,PTFPartition iPart)throws HiveException {ArrayListObject vals new ArrayListObject();for (int i 0; i iPart.size(); i) {// 入参1.窗口函数、2.当前行的行号、3.输入PTFPartition对象Object out evaluateWindowFunction(wFnDef, i, iPart);vals.add(out);}return vals; }// Evaluate the result given a partition and the row number to process private Object evaluateWindowFunction(WindowFunctionDef wFn, int rowToProcess, PTFPartition partition)throws HiveException {BasePartitionEvaluator partitionEval wFn.getWFnEval().getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI(), nullsLast);// 给定当前行获取窗口的聚合return partitionEval.iterate(rowToProcess, ptfDesc.getLlInfo()); }} 注WindowingTableFunction类中的execute方法 没怎么理解清楚待补充~ 三、Hive SQL窗口函数实现原理 window Funtion的使用语法 select col1,col2,row_number() over (partition by col1 order by col2 窗口子句) as rnfrom tableA 上面的语句主要分两部分 window函数部分window_func 窗口定义部分
3.1 window函数部分 windows函数部分即是在窗口上执行的函数。主要有count 、sum、avg聚合类窗口函数、还有常用的row_number、rank这样的排序函数。 3.2  窗口定义部分 即为 over里面的三部分内容均可省略不写 partition by 分区 order by 排序 rows | range between … and …..  窗口子句 ps Hive 窗口函数的详细介绍 (07)Hive——窗口函数详解_hive 窗口函数-CSDN博客 3.3  window Function实现原理 窗口函数的实现主要借助 Partitioned Table Function 即PTF 1PTF的输入可以是表、子查询或另一个PTF函数输出 2PTF输出是一张表。 写一个相对复杂的sql来看一下执行窗口函数时数据的流转情况 select id,sq,cell_type,rank,row_number() over(partition by id order by rank ) as rn ,rank() over(partition by id order by rank) as r,dense_rank() over(partition by cell_type order by id) as dr from window_test_table group byid,sq,cell_type,rank; 数据流转如下图 以上代码实现主要有三个阶段 计算除窗口函数以外所有的其他运算如group byjoin having等。上面的代码的第一阶段即为
selectid, sq, cell_type, rank from window_test_table group byid, sq, cell_type, rank; 将第一步的输出作为第一个 PTF 的输入计算对应的窗口函数值。上面代码的第二阶段即为
select id,sq,cell_type,rank,rn,r from window(w,–将第一阶段输出记为wpartition by id, –分区order by rank, –窗口函数的order[rn:row_number(),r:rank()] –窗口函数调用) 由于row_number()rank() 两个函数对应的窗口是相同的partition by id  order by rank因此这两个函数可以在一次shuffle中完成。 将第二步的输出结果作为 第二个PTF 的输入计算对应的窗口函数值。上面代码的第三阶段即为
select id,sq,cell_type,rank,rn,r,dr from window(w1,–将第二阶段输出记为w1partition by cell_type, –分区order by id, –窗口函数的order[dr:dense_rank()] –窗口函数调用)由于dense_rank()的窗口与前两个函数不同因此需要再partition一次得到最终的输出结果。 总结上述代码显示需要shuffle三次才能得到最终的结果第一阶段的group by 第二阶段第三阶段的开窗操作。对应到MapReduce程序即需要经历三次 map-reduce组合对应到spark sql上需要Exchange三次再加上中间排序操作在数据量很大的情况下效率上确实会有较大的影响。 四、窗口函数的性能问题 在使用Hive进行数据处理时借助窗口函数可以对数据进行分组、排序等操作但是在使用row_number这类窗口函数时会遇到性能较慢的问题j即比普通的聚合函数 summinmax等运行成本更高为啥 4.1 性能问题产生原因 4.1.1 第一个版本 小破站一个up主给出的答案 原因 1开窗函数不能做预聚合 数据量很多shuffle慢计算慢并且会有 数据倾斜的风险 2开窗多一步order by 更耗时间 4.1.2 第二个版本 原因 1普通的聚合函数语句可以根据函数不同采用partial merge 的方式运行也即是map端预聚合但那是window 窗口语句只能在reduce 端一次性聚合即只有complete 执行模式。 2普通聚合函数的物理执行计划分为SortBased和HashBased的而window是SortBased。 3window语句作用于 对行并为每行返回一个聚合结果这决定了window在执行过程中需要更大的buffer 进行汇总。 4.2 性能问题的优化方法 4.2.1 用聚合函数替代 排序开窗函数 例如假设需要求出历史至今用户粒度末次交易的sku名称或者交易金额等这种情况下可以将 交易时间和sku名称拼接起来取max 之后再将sku名称拆解开即能达到预期效果。 在Hive 中row_number是一个常用的窗口函数用于为结果集中的每一行分配一个唯一的数字。通常会搭配over子句来指定窗口的范围和排序方式。例如: select col1,col2,row_number() over (partition by col1 order by col2 窗口子句) as rnfrom tableA 上述示例row_number 函数将根据col1进行分组并按照col2的值进行排序为每一组数据分配一个唯一的行号。然而在处理大规模数据时使用row_number可能会导致性能下降这是因为row_number 需要对数据进行排序和标记而这些操作在大数据量下会消耗较多的计算资源。 注 以下都是row_number() over () 开窗函数性能优化的几种方式 4.2.2 减少数据量 一种最直接的优化方法是减少需要进行row_number计算的数据量。可以通过在where子句中添加条件、对数据进行分区等方式来减小数据规模从而提升计算性能。 ps 这种方式在生产环境中用过。 4.2.3 避免多次排序 在使用row_number时尽量避免多次排序操作。可以将row_number 函数应用在子查询中然后再进行排序操作避免重复的排序过程。 selectcol1,col2,rn from ( select col1,col2,row_number() over (partition by col1 order by col2) as rnfrom tableA) tmp1 order by col1,col2; 参考文章 常用的SQL优化方式, 用聚合函数替代排序开窗求最值, sparksql, hivesql_哔哩哔哩_bilibili https://blog.51cto.com/u_162134359877979 Hive学习一窗口函数源码阅读_hive 源码阅读-CSDN博客 https://mp.weixin.qq.com/s/WBryrbpHGO9jmzMp0e7jhw