阜城网站建设代理常平网站建设
- 作者: 五速梦信息网
- 时间: 2026年03月21日 11:15
当前位置: 首页 > news >正文
阜城网站建设代理,常平网站建设,黄山市旅游攻略,设计个网页多少钱本文对 ClickHouse 物化视图的写入流程源码做个详细说明#xff0c;基于 v22.8.14.53-lts 版本。
StorageMaterializedView
首先来看物化视图的构造函数#xff1a;
StorageMaterializedView::StorageMaterializedView(const StorageID tableid,ContextPtr local_…本文对 ClickHouse 物化视图的写入流程源码做个详细说明基于 v22.8.14.53-lts 版本。
StorageMaterializedView
首先来看物化视图的构造函数
StorageMaterializedView::StorageMaterializedView(const StorageID tableid,ContextPtr localcontext,const ASTCreateQuery query,const ColumnsDescription columns,bool attach_,const String comment): IStorage(tableid), WithMutableContext(local_context-getGlobalContext())
{StorageInMemoryMetadata storage_metadata;storagemetadata.setColumns(columns);……if (!has_inner_table){target_table_id query.to_tableid;}else if (attach){/// If there is an ATTACH request, then the internal table must already be created.target_table_id StorageID(getStorageID().database_name, generateInnerTableName(getStorageID()), query.to_inner_uuid);}else{/// We will create a query to create an internal table.auto create_context Context::createCopy(local_context);auto manual_create_query std::make_sharedASTCreateQuery();manual_create_query-setDatabase(getStorageID().database_name);manual_create_query-setTable(generateInnerTableName(getStorageID()));manual_create_query-uuid query.to_inner_uuid;auto new_columns_list std::make_sharedASTColumns();new_columns_list-set(new_columns_list-columns, query.columns_list-columns-ptr());manual_create_query-set(manual_create_query-columns_list, new_columns_list);manual_create_query-set(manual_create_query-storage, query.storage-ptr());InterpreterCreateQuery create_interpreter(manual_create_query, create_context);create_interpreter.setInternal(true);create_interpreter.execute();target_table_id DatabaseCatalog::instance().getTable({manual_create_query-getDatabase(), manual_create_query-getTable()}, getContext())-getStorageID();}
}通过以上代码可以发现物化视图支持几种创建语法总的来说可以归为 3 类 指定了目的表的情况 create table src(id Int32) EngineMemory();
create table dest(id Int32) EngineMemory();create materialized view mv to dest as select * from src;使用以上形式时target_table_id 会选择 dest 表的 table_id。 不指定目的表的情况 create table src(id Int32) EngineMemory();create materialized view mv EngineMemory() as select * from src;使用以上形式时首先会根据源表的 table_id 生成一个以 .inner. 开头的目的表名如 .inner.5ef4ec2c-efb1-4918-bf6c-59de2edb54cf然后在生成一个随机的 uuid 作为目的表的 table_id 并同时作为 target_table_id 。 第 3 种其实不是创建语法而是在 ClickHouse 启动或者物化视图被 detach 掉后执行 attach 的实现。
StorageMaterializedView::read
void StorageMaterializedView::read(QueryPlan query_plan,const Names column_names,const StorageSnapshotPtr storage_snapshot,SelectQueryInfo query_info,ContextPtr local_context,QueryProcessingStage::Enum processed_stage,const size_t max_block_size,const size_t num_streams)
{/// 获取目的表实例auto storage getTargetTable();auto lock storage-lockForShare(local_context-getCurrentQueryId(), local_context-getSettingsRef().lock_acquire_timeout);auto target_metadata_snapshot storage-getInMemoryMetadataPtr();auto target_storage_snapshot storage-getStorageSnapshot(target_metadata_snapshot, local_context);if (query_info.order_optimizer)query_info.input_order_info query_info.order_optimizer-getInputOrder(target_metadata_snapshot, local_context);storage-read(query_plan, column_names, target_storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);if (query_plan.isInitialized()){/// 获取物化视图 stream 中对应的 block 结构auto mv_header getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);/// 获取查询语句中所需的列对应的 block 结构auto target_header query_plan.getCurrentDataStream().header;/// 从查询的列中去除那些mv不存在的列removeNonCommonColumns(mv_header, target_header);/// 分布式表引擎在查询处理到指定阶段header 中可能不包含物化视图中的所有列例如 group by/// 所以从 mv_header 中去除那些查询不需要的列removeNonCommonColumns(target_header, mv_header);/// 当查询中得到的 mv_header 和 target_header 有不同结构时会通过在 pipeline 中添加表达式计算来进行转换/// 比如 Decimal(38, 6) - Decimal(16, 6)或者一些聚合运算如 sum 等if (!blocksHaveEqualStructure(mv_header, target_header)){auto converting_actions ActionsDAG::makeConvertingActions(target_header.getColumnsWithTypeAndName(),mv_header.getColumnsWithTypeAndName(),ActionsDAG::MatchColumnsMode::Name);auto converting_step std::make_uniqueExpressionStep(query_plan.getCurrentDataStream(), converting_actions);converting_step-setStepDescription(Convert target table structure to MaterializedView structure);query_plan.addStep(std::move(converting_step));}query_plan.addStorageHolder(storage);query_plan.addTableLock(std::move(lock));}
}通过以上代码可以看出物化视图是一种逻辑描述数据都是存储在目的表中读取时实际操作的目的表并且在在查询过程中还会涉及到多阶段 block 的转换以及表达式的计算。
StorageMaterializedView::write
SinkToStoragePtr StorageMaterializedView::write(const ASTPtr query, const StorageMetadataPtr /metadata_snapshot/, ContextPtr local_context)
{auto storage getTargetTable();auto lock storage-lockForShare(local_context-getCurrentQueryId(), local_context-getSettingsRef().lock_acquire_timeout);auto metadata_snapshot storage-getInMemoryMetadataPtr();auto sink storage-write(query, metadata_snapshot, local_context);sink-addTableLock(lock);return sink;
}同样写也是将数据存入了目的表。 我们都知道数据写源表时会触发写物化视图从而将数据写入目的表下面就看一下是如何实现的。SQL 的执行都是通过 IInterpreter 到 InterpreterXxx 的这里就不再多说一个写入操作最中会调用 InterpreterInsertQuery所以从 InterpreterInsertQuery::execute() 开始跟踪。
InterpreterInsertQuery::execute()
BlockIO InterpreterInsertQuery::execute()
{……std::vectorChain out_chains;if (!distributed_pipeline || query.watch){size_t out_streams_size 1;……for (size_t i 0; i out_streams_size; i){auto out buildChainImpl(table, metadata_snapshot, query_sample_block, nullptr, nullptr);out_chains.emplace_back(std::move(out));}}……
}execute() 中通过 buildChainImpl() 来构建输出链 buildChainImpl() 会判断当前表是否有物化视图关联如果有就会调用 buildPushingToViewsChain() 。
buildPushingToViewsChain()
这个方法非常长这里只展示和本文想说明的问题相关的部分。
Chain buildPushingToViewsChain(const StoragePtr storage,const StorageMetadataPtr metadata_snapshot,ContextPtr context,const ASTPtr query_ptr,bool no_destination,ThreadStatusesHolderPtr thread_status_holder,std::atomic_uint64_t * elapsed_counter_ms,const Block live_view_header)
{……auto table_id storage-getStorageID();auto views DatabaseCatalog::instance().getDependentViews(table_id);……std::vectorChain chains;for (const auto view_id : views){auto view DatabaseCatalog::instance().tryGetTable(view_id, context);……if (auto * materialized_view dynamic_castStorageMaterializedView *(view.get())){……StoragePtr inner_table materialized_view-getTargetTable();auto inner_table_id inner_table-getStorageID();auto inner_metadata_snapshot inner_table-getInMemoryMetadataPtr();query view_metadata_snapshot-getSelectQuery().inner_query;target_name inner_table_id.getFullTableName();Block header;/// Get list of columns we get from select query.if (select_context-getSettingsRef().allow_experimental_analyzer)header InterpreterSelectQueryAnalyzer::getSampleBlock(query, select_context);elseheader InterpreterSelectQuery(query, select_context, SelectQueryOptions().analyze()).getSampleBlock();/// Insert only columns returned by select.Names insert_columns;const auto inner_table_columns inner_metadata_snapshot-getColumns();for (const auto column : header){/// But skip columns which storage doesnt have.if (inner_table_columns.hasPhysical(column.name))insert_columns.emplace_back(column.name);}InterpreterInsertQuery interpreter(nullptr, insert_context, false, false, false);out interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms);out.addStorageHolder(view);out.addStorageHolder(inner_table);}else if (auto * live_view dynamic_castStorageLiveView *(view.get())){runtime_stats-type QueryViewsLogElement::ViewType::LIVE;query live_view-getInnerQuery(); // Used only to log in system.query_views_logout buildPushingToViewsChain(view, view_metadata_snapshot, insert_context, ASTPtr(), true, thread_status_holder, view_counter_ms, storage_header);}else if (auto * window_view dynamic_castStorageWindowView *(view.get())){runtime_stats-type QueryViewsLogElement::ViewType::WINDOW;query window_view-getMergeableQuery(); // Used only to log in system.query_views_logout buildPushingToViewsChain(view, view_metadata_snapshot, insert_context, ASTPtr(), true, thread_status_holder, view_counter_ms);}elseout buildPushingToViewsChain(view, view_metadata_snapshot, insert_context, ASTPtr(), false, thread_status_holder, view_counter_ms);……
}buildPushingToViewsChain() 会检查当前表是否有视图依赖通过几个判断可以看出视图分为三种物化视图、实时视图和窗口视图最后的 else 是指当前表是个普通表。如果当前表是源表且有物化视图依赖就会调用 buildPushingToViewsChain() 来构建链这是个递归调用首次进入当前表是普通表其依赖的物化视图会再次调用该方法再次进入就会物化视图的 if 逻辑最终是通过 buildChain() 来构建链。
buildChainImpl
buildChain() 中是调用了 buildChainImpl() 这个实现类。
Chain InterpreterInsertQuery::buildChainImpl(const StoragePtr table,const StorageMetadataPtr metadata_snapshot,const Block query_sample_block,ThreadStatusesHolderPtr thread_status_holder,std::atomic_uint64_t * elapsed_counter_ms)
{……/// We create a pipeline of several streams, into which we will write data.Chain out;/// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyedout.addInterpreterContext(context_ptr);/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage./// Otherwise well get duplicates when MV reads same rows again from Kafka.if (table-noPushingToViews() !no_destination){auto sink table-write(query_ptr, metadata_snapshot, context_ptr);sink-setRuntimeData(thread_status, elapsed_counter_ms);out.addSource(std::move(sink));}else{out buildPushingToViewsChain(table, metadata_snapshot, context_ptr, query_ptr, no_destination, thread_status_holder, elapsed_counter_ms);}……
}buildChainImpl() 会根据当前表或视图是否有依赖的视图或目的表来做不同的操作这里就可以处理视图级连视图的情况会不断递归构造相应的链节点使之连接起来。
Chain InterpreterInsertQuery::buildChainImpl(const StoragePtr table,const StorageMetadataPtr metadata_snapshot,const Block query_sample_block,ThreadStatusesHolderPtr thread_status_holder,std::atomic_uint64_t * elapsed_counter_ms)
{…/// We create a pipeline of several streams, into which we will write data.Chain out;/// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyedout.addInterpreterContext(context_ptr);/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage./// Otherwise well get duplicates when MV reads same rows again from Kafka.if (table-noPushingToViews() !no_destination) // table-noPushingToViews() 用于禁止物化视图插入数据到 KafkaEngine{auto sink table-write(query_ptr, metadata_snapshot, context_ptr);sink-setRuntimeData(thread_status, elapsed_counter_ms);out.addSource(std::move(sink));}else // 构建物化视图插入 pushingToViewChain重点{out buildPushingToViewsChain(table, metadata_snapshot, context_ptr, query_ptr, no_destination, thread_status_holder, elapsed_counter_ms);}…return out;
}小结
所以源表和物化视图在写入时是构造了多个输出链数据也是只能对当前写入的数据做操作不会影响源表现有数据。而且写入源表和目的表的过程是一个 pipeline需要全部完成才算写入成功当然 pipeline 可以并行处理可以加快写入速度。 欢迎添加微信xiedeyantu讨论技术问题。
- 上一篇: 附近网站建设公司哪家好dw用表格做网站
- 下一篇: 阜宁网站制作哪家好wordpress网页移动
相关文章
-
附近网站建设公司哪家好dw用表格做网站
附近网站建设公司哪家好dw用表格做网站
- 技术栈
- 2026年03月21日
-
附近哪里有计算机培训班网站内容优化的重要性
附近哪里有计算机培训班网站内容优化的重要性
- 技术栈
- 2026年03月21日
-
附近旧模板出售市场seo网站优化方案
附近旧模板出售市场seo网站优化方案
- 技术栈
- 2026年03月21日
-
阜宁网站制作哪家好wordpress网页移动
阜宁网站制作哪家好wordpress网页移动
- 技术栈
- 2026年03月21日
-
阜宁做网站哪家公司好wordpress设置网页跳转
阜宁做网站哪家公司好wordpress设置网页跳转
- 技术栈
- 2026年03月21日
-
阜新门户网站建设浏览器怎么下载视频
阜新门户网站建设浏览器怎么下载视频
- 技术栈
- 2026年03月21日






