分类网站作用长沙网站推广服务公司
- 作者: 五速梦信息网
- 时间: 2026年03月21日 11:18
当前位置: 首页 > news >正文
分类网站作用,长沙网站推广服务公司,活动策划网站,花生棒做网站作者#xff1a;来自 Elastic Andre Luiz 了解如何通过 Apache Airflow 将数据导入 Elasticsearch。 Apache Airflow Apache Airflow 是一个旨在创建、安排#xff08;schedule#xff09;和监控工作流的平台。它用于编排 ETL#xff08;Extract-Transform-Load#xff0…作者来自 Elastic Andre Luiz 了解如何通过 Apache Airflow 将数据导入 Elasticsearch。 Apache Airflow Apache Airflow 是一个旨在创建、安排schedule和监控工作流的平台。它用于编排 ETLExtract-Transform-Load 流程、数据管道和其他复杂工作流提供灵活性和可扩展性。它的可视化界面和实时监控功能使管道管理更易于访问和高效让你可以跟踪执行的进度和结果。以下是它的四个主要支柱 动态管道以 Python 定义允许动态灵活地生成工作流。可扩展Airflow 可以与各种环境集成可以创建自定义运算符并可以根据需要执行特定代码。优雅管道以干净明确的方式编写。可扩展其模块化架构使用消息队列来编排任意数量的工作器。 在实践中Airflow 可用于以下场景 数据导入编排将数据每日提取到 Elasticsearch 等数据库中。日志监控管理日志文件的收集和处理然后在 Elasticsearch 中进行分析以识别错误或异常。多种数据源集成将来自不同系统API、数据库、文件的信息合并到 Elasticsearch 中的单个层中简化搜索和报告。 DAGDirected Acyclic Graphs - 有向无环图 在 Airflow 中工作流由 DAG有向无环图表示。DAG 是一种定义任务执行顺序的结构。DAG 的主要特征是 由独立任务组成每个任务代表一个工作单元旨在独立执行。排序任务的执行顺序在 DAG 中明确定义。可重用性DAG 旨在重复执行促进流程自动化。 Airflow 的主要组件 Airflow 生态系统由多个组件组成它们共同协作以协调任务 调度程序 - scheduler负责调度 DAG 并发送任务以供工作人员执行。执行器 - Exectutor管理任务的执行将其委托给工作人员。Web 服务器 - Webserver提供与 DAG 和任务交互的图形界面。Dags 文件夹 - Dags folder我们存储用 Python 编写的 DAG 的文件夹。元数据 - Metadata作为工具存储库的数据库由调度程序和执行器用于存储执行状态。 Apache Airflow 和 Elasticsearch 我们将演示如何使用 Apache Airflow 和 Elasticsearch 来协调任务并在 Elasticsearch 中索引结果。此演示的目标是创建一个任务管道来更新 Elasticsearch 索引中的记录。此索引包含电影数据库用户可以在其中进行评分和分配评级。想象一个每天有数百个评级的场景有必要保持评级记录更新。为此将开发一个 DAG它将每天执行负责检索新的合并评级并更新索引中的记录。 在 DAG 流程中我们将有一个获取评级的任务然后是一个验证结果的任务。如果数据不存在DAG 将被定向到失败任务。否则数据将在 Elasticsearch 中编入索引。目标是通过一种带有负责计算分数的机制的方法检索评级以更新索引中电影的评级字段。 使用 Apache Airflow 和 Elasticsearch 以及 Docker 要创建容器化环境我们将使用 Apache Airflow 和 Docker。按照 “在 Docker 中运行 Airflow” 指南中的说明实际设置 Airflow。 至于 Elasticsearch我将使用 Elastic Cloud 上的集群但如果你愿意也可以使用 Docker 配置 Elasticsearch。已经创建了一个包含电影目录的索引其中电影数据已编入索引。这些电影的 “rating” 字段将被更新。 创建 DAG 通过 Docker 安装后将创建一个文件夹结构其中包括 dags 文件夹我们必须将 DAG 文件放在该文件夹中以便 Airflow 识别它们。 在此之前我们需要确保安装了必要的依赖项。以下是此项目的依赖项 pip install apache-airflow apache-airflow-providers-elasticsearch 我们将创建文件 update_ratings_movies.py 并开始编写任务代码。 现在让我们导入必要的库 from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook 我们将使用 ElasticsearchPythonHook这是一个通过抽象连接和使用外部 API 来简化 Airflow 和 Elasticsearch 集群之间集成的组件。 接下来我们定义 DAG并指定其主要参数 dag_idDAG 的名称。start_dateDAG 的启动时间。schedule定义周期在我们的例子中是每日。doc_md将导入并显示在 Airflow 界面中的文档。 定义任务 现在让我们定义 DAG 的任务。第一个任务将负责检索电影评级数据。我们将使用 PythonOperator并将 task_id 设置为“get_movie_ratings”。python_callable 参数将调用负责获取 ratings 的函数。 get_ratings_operator PythonOperator(task_idget_movie_ratings,python_callableget_movie_ratings_task ) 接下来我们需要验证结果是否有效。为此我们将使用带有 BranchPythonOperator 的条件。task_id 将为 “validate_result”python_callable 将调用验证函数。op_args 参数将用于将上一个任务 “get_movie_ratings” 的结果传递给验证函数。 validate_result BranchPythonOperator(task_idvalidate_result,python_callablevalidate_result,op_args[{{ task_instance.xcom_pull(task_idsget_movie_ratings) }}] ) 如果验证成功我们将从 “get_movie_ratings” 任务中获取数据并将其索引到 Elasticsearch 中。为此我们将创建一个新任务 “index_movie_ratings”它将使用 PythonOperator。op_args 参数将 “get_movie_ratings” 任务的结果传递给索引函数。 index_ratings_operator PythonOperator(task_idindex_movie_ratings,python_callableindex_movie_ratings_task,op_args[{{ task_instance.xcom_pull(task_idsget_movie_ratings) }}] ) 如果验证表明失败DAG 将继续执行失败通知任务。在此示例中我们只是打印一条消息但在实际场景中我们可以配置警报来通知失败。 failed_get_rating_operator PythonOperator(task_idfailed_get_rating_operator,python_callablelambda: print(Ratings were False, skipping indexing.) ) 最后我们定义任务依赖关系确保它们以正确的顺序执行 get_ratings_operator validate_result [index_ratings_operator, failed_get_rating_operator] 以下是我们 DAG 的完整代码 DAG update Rating Moviesimport ast import randomfrom airflow import DAG from datetime import datetimefrom airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHookdef index_movie_ratings_task(movies):es_hook ElasticsearchPythonHook(hostsNone,es_conn_args{cloud_id: cloud_idapi_key: api-key})es_client es_hook.get_connactions []for movie in ast.literal_eval(movies):actions.append({update: {_id: movie[id],_index: movies}})actions.append({doc: {rating: movie[rating]},doc_as_upsert: True})result es_client.bulk(operationsactions)print(fIngestion completed.)print(result)return Truedef get_movie_ratings_task():movies [{id: i, rating: round(random.uniform(1, 10), 1)}for i in range(1, 100)]return moviesdef validate_result(result):if not result:return failed_get_rating_operatorelse:return index_movie_ratingswith DAG(dag_idupdate_ratings_movies_2024,start_datedatetime(2024, 12, 29),scheduledaily,doc_mddoc, ):get_ratings_operator PythonOperator(task_idget_movie_ratings,python_callableget_movie_ratings_task)validate_result BranchPythonOperator(task_idvalidate_result,python_callablevalidate_result,op_args[{{ task_instance.xcom_pull(task_idsget_movie_ratings) }}],provide_contextTrue)index_ratings_operator PythonOperator(task_idindex_movie_ratings,python_callableindex_movie_ratings_task,op_args[{{ task_instance.xcom_pull(task_idsget_movie_ratings) }}])failed_get_rating_operator PythonOperator(task_idfailed_get_rating_operator,python_callablelambda: print(Ratings were False, skipping indexing.))get_ratings_operator validate_result [index_ratings_operator, failed_get_rating_operator] 可视化 DAG 执行 在 Apache Airflow 界面中我们可以可视化 DAG 的执行。只需转到 “DAG” 选项卡并找到你创建的 DAG 即可。 下面我们可以直观地看到任务的执行情况及其各自的状态。通过选择特定日期的执行我们可以访问每个任务的日志。请注意在 index_movie_ratings 任务中我们可以在索引中看到索引结果并且它已成功完成。 在其他选项卡中可以访问有关任务和 DAG 的其他信息以协助分析和解决潜在问题。 结论 在本文中我们演示了如何将 Apache Airflow 与 Elasticsearch 集成以创建数据提取解决方案。我们展示了如何配置 DAG、定义负责检索、验证和索引电影数据的任务以及如何在 Airflow 界面中监控和可视化这些任务的执行。 这种方法可以轻松适应不同类型的数据和工作流使 Airflow 成为在各种场景中编排数据管道的有用工具。 参考资料 Apache AirFlow https://airflow.apache.org/ 使用 Docker 安装 Apache Airflow https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html Elasticsearch Python Hook https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html Python 运算符 https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html 想要获得 Elastic 认证了解下一期 Elasticsearch 工程师培训何时开始 Elasticsearch 包含许多新功能可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息开始免费云试用或立即在吗的本地机器上试用 Elastic。 原文How to ingest data to Elasticsearch through Apache Airflow - Elasticsearch Labs
相关文章
-
分类网站上怎么做锚文本漳州网站开发去博大钱少a
分类网站上怎么做锚文本漳州网站开发去博大钱少a
- 技术栈
- 2026年03月21日
-
分类网站模版广州网站建设高端
分类网站模版广州网站建设高端
- 技术栈
- 2026年03月21日
-
分局网站建设个人站长和企业网站
分局网站建设个人站长和企业网站
- 技术栈
- 2026年03月21日
-
分类信息导航网站模板wordpress与iis7欢迎
分类信息导航网站模板wordpress与iis7欢迎
- 技术栈
- 2026年03月21日
-
分类信息发布网站模板职业本科专业建设规划
分类信息发布网站模板职业本科专业建设规划
- 技术栈
- 2026年03月21日
-
分类信息网站成都搭建太仓智能网站开发
分类信息网站成都搭建太仓智能网站开发
- 技术栈
- 2026年03月21日






