整个网站建设中的关键步骤是建设官方网站e路护航

当前位置: 首页 > news >正文

整个网站建设中的关键步骤是,建设官方网站e路护航,宿迁网站搭建,陶瓷类网站建设1. 写在前面 在软件开发中#xff0c;有时候需要通过 Python 去监听指定区域文件或目录的创建、修改#xff0c;或者删除#xff0c;从而引发特定的事件处理。本篇博客为你介绍第三方模块 Watchdog 实现对文件事件的监控。 公众号#xff1a; 滑翔的纸飞机 2. Watchdog 2…1. 写在前面 在软件开发中有时候需要通过 Python 去监听指定区域文件或目录的创建、修改或者删除从而引发特定的事件处理。本篇博客为你介绍第三方模块 Watchdog 实现对文件事件的监控。 公众号 滑翔的纸飞机 2. Watchdog 2.1 什么是 Watchdog? 用于监视文件系统事件的 Python API 和 shell 实用程序。 项目地址https://pypi.org/project/watchdog/ 最新版本Watchdog 3.0.0 适用于 Python 3.7 安装需要运行以下命令进行安装确保使用的是 Python 3.7 pip install watchdog2.2 官方快速入门示例 以下示例程序将以递归方式监视当前目录文件系统变更并简单地将它们输出到控制台 import sys import logging from watchdog.observers import Observer from watchdog.events import LoggingEventHandlerif name main:# 设置日志信息格式logging.basicConfig(levellogging.INFO,format%(asctime)s - %(message)s,datefmt%Y-%m-%d %H:%M:%S)# 要监控的目录路径path sys.argv[1] if len(sys.argv) 1 else .# 创建一个日志事件处理程序event_handler LoggingEventHandler()# 创建一个观察者对象observer Observer()# 声明一个定时任务observer.schedule(event_handler, path, recursiveTrue)# 启动定时任务observer.start()try:while observer.is_alive():observer.join(1)finally:observer.stop()observer.join()输出: 跟踪目录变更事件通过日志输出变更记录。 例如: 创建 test 1.txt 控制台输出 2023-10-19 00:56:18 - Created directory: /Users/demo/2023/10/watchdog/test 2023-10-19 00:56:18 - Modified directory: /Users/demo/2023/10/watchdog 2023-10-19 00:56:27 - Created file: /Users/demo/2023/10/watchdog/test/1.txt 2023-10-19 00:56:27 - Modified directory: /Users/demo/2023/10/watchdog/test2.3 Event Handler 和 Observer Watchdog 的主要实现或者可以说 Watchdog 的构件是基于以下类 Observer观察者用于监视目录并将调用分派给事件处理程序Event handler: 文件系统事件和事件处理程序 说白了Observer 监控目录触发 Event handler 针对事件做出响应 导入方式 from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler 2.3.1 Event handler 以下是Watchdog中默认提供的4个事件处理类 FileSystemEventHandler文件事件处理器的基类用于处理事件PatternMatchingEventHandler模式匹配文件RegexMatchingEventHandler正则匹配文件LoggingEventHandler记录日志。 有关处理程序的更多详情请参阅此链接 通过扩展 Watchdog 提供的默认事件处理程序类实现自定义的函数来处理修改、创建、删除和移动事件。还可以覆盖 FileSystemEventHandler 中的函数以下函数因为其他事件处理类都继承自该类。 on_any_event(event)捕获所有事件处理程序 on_created(event)在创建文件或目录时调用 on_deleted(event)删除文件或目录时调用 on_modified(event)当文件或目录被修改时调用 on_moved(event)在移动或重命名文件或目录时调用 on_closed(event)文件已关闭时调用 on_opened(event)打开文件时调用 每个函数都有一个名为 event 的输入参数其中包含以下变量 event_type字符串形式的事件类型“moved”、“deleted”、“created”、“modified”、“closed”、“opened”默认为 “无”is_directoryTrue表示事件针对目录src_path触发此事件的文件系统对象的源路径 2.3.2 Observer 如果大家熟悉设计模式那么 Watchdog 就遵循观察设计模式。因此每个观察者都会有事件如果文件或目录有任何变化它就会查看并显示变化。 Observer观察目录针对事件调用处理程序也可以直接导入特定平台的类并用它代替 Observer 2.3.3 回顾上文简单示例 通过上述介绍对 Event handler 和 Observer有一个简单的理解现在我们回过头继续来看官方示例 import sys import logging from watchdog.observers import Observer from watchdog.events import LoggingEventHandlerif name main:# 设置日志信息格式logging.basicConfig(levellogging.INFO,format%(asctime)s - %(message)s,datefmt%Y-%m-%d %H:%M:%S)# 要监控的目录路径path sys.argv[1] if len(sys.argv) 1 else .# 创建一个日志事件处理程序event_handler LoggingEventHandler()# 创建一个观察者对象observer Observer()# 声明一个定时任务observer.schedule(event_handler, path, recursiveTrue)# 启动定时任务observer.start()try:while observer.is_alive():observer.join(1)finally:observer.stop()observer.join()1event_handler LoggingEventHandler() 创建一个日志事件处理程序 2observer Observer()创建一个观察者对象 3observer.schedule(event_handler, path, recursiveTrue)声明一个定时任务传入事件处理程序、监控路径、以及是否递归子目录 4observer.start()启动定时任务 进一步分析下 schedule(self, event_handler, path, recursiveFalse):该方法用于监视 path 路径并调用给定的事情 event_handler 。 参数 recursive 表示是否递归子目录即监听子目录默认为 False。 start():启动线程这里开启了新的守护线程主程序如果结束 该线程也会停止。 每个线程对象只能调用1次它安排对象的 run() 方法在单独的控制线程中调用如果在同一线程对象上多次调用此方法将引发 RuntimeError。 2.4. 理解和使用 基于上述关键概念介绍以及官方示例自己实现一个文件事件监听 在本例中使用 FileSystemEventHandler 事件类。对一个文件夹设置监视并在有文件产生时触发另一个函数。处理完成后将把文件移到另一个文件夹。 1首先你需要创建一个继承自 FileSystemEventHandler 事件处理类并创建一个观察者和自定义的事件处理程序实例。 from watchdog.observers import Observer from watchdog.events import FileSystemEventHandlerclass MyHandler(FileSystemEventHandler):passobserver Observer() event_handler MyHandler()2创建一个定时任务传入以下参数 event_handler刚创建的处理程序对象path要跟踪的文件夹路径recursive是否递归子目录 observer.schedule(event_handler, path./input_files, recursiveTrue)(3) observer.start() - 启动任务等待目录产生事件触发事件处理程序中的代码。 (4) observer.stop() - 该函数将清理资源。 (5) 最后用 observer.join() 结束因为我们在这里使用的是多线程概念。join() 将连接多个线程直到调用 join 方法的线程终止。 observer.start() try:while True:time.sleep(300) except KeyboardInterrupt:observer.stop() observer.join()接下去自定义事件处理类MyHandler 在这个示例中我将检查是否有文件上传到所跟踪的文件夹中。为此我可以使用 on_created(event) def create_directory(file_pathNone):# 以年-月-日的格式获取当前日期current_date datetime.now().strftime(%Y-%m-%d)# 创建一个包含当前日期的文件夹folder_path f{file_path}/{current_date}if not os.path.exists(folder_path):os.makedirs(folder_path)return folder_pathelse:return folder_pathclass MyHandler(FileSystemEventHandler):def on_created(self, event):dir_path event.src_path.split(/input_files)processed_files f{dir_path[0]}/processed_fileschild_processed_dir create_directory(file_pathprocessed_files)if event:print(file created:{}.format(event.src_path))# 这里调用其他处理函数main(file_nameevent.src_path)file_name event.src_path.split(/)[-1]destination_path f{child_processed_dir}/{file_name}# 将文件移动到其他目录shutil.move(event.src_path, destination_path)print(file moved:{} to {}.format(event.src_path, destination_path))在上面的示例中我使用函数 create_directory() 来检查目标路径中是否有当前日期的文件夹否则就创建相同的文件夹。 然后在其他 python 脚本函数 main() 中做了一些处理后使用相同的路径作为目标路径来移动文件 下面是最终代码my_event_handler.py from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import shutil import time import os from datetime import datetime from watchdog_fileobserver import maindef create_directory(file_pathNone):# 以年-月-日的格式获取当前日期current_date datetime.now().strftime(%Y-%m-%d)# 创建一个包含当前日期的文件夹folder_path f{file_path}/{current_date}if not os.path.exists(folder_path):os.makedirs(folder_path)return folder_pathelse:return folder_pathclass MyHandler(FileSystemEventHandler):def on_created(self, event):dir_path event.src_path.split(/input_files)processed_files f{dir_path[0]}/processed_fileschild_processed_dir create_directory(file_pathprocessed_files)if event:print(file created:{}.format(event.src_path))# 这里调用其他处理函数main(file_nameevent.src_path)file_name event.src_path.split(/)[-1]destination_path f{child_processed_dir}/{file_name}shutil.move(event.src_path, destination_path)print(file moved:{} to {}.format(event.src_path, destination_path))if name main:observer Observer()event_handler MyHandler()observer.schedule(event_handler, path./input_files, recursiveTrue)observer.start()try:while True:time.sleep(300)except KeyboardInterrupt:observer.stop()observer.join()watchdog_fileobserver.py: import csvdef read_csv_file(file_name):try:with open(f{file_name}, r) as file:csvreader csv.DictReader(file)for row in csvreader:print(row)return csvreaderexcept Exception as e:passdef main(file_nameNone):if file_name:dict_data read_csv_file(file_name)print(Process completed)else:print(Invalid file path)在这种情况下需要等待文件上传然后执行所需的操作。为此你可以在事件函数中添加以下代码 def on_created(self, event):file_size -1while file_size ! os.path.getsize(event.src_path):file_size os.path.getsize(event.src_path)print(file_size)time.sleep(1)### OR ###def on_created(self, event):file Nonewhile file is None:try:file open(event.src_path)except OSError:logger.info(Waiting for file transfer….)time.sleep(1)continue验证 脚本所在路径下创建用于监听目录 input_files 在该目录下创建一个文件控制台输出 file created:/Users/demo/2023/10/watchdog/input_files/text.txt Process completed file moved:/Users/demo/2023/10/watchdog/input_files/text.txt to /Users/demo/2023/10/watchdog/processed_files/2023-10-20/text.txt目录如下 . ├── input_files ├── my_event_handler.py ├── processed_files │ └── 2023-10-20 │ └── text.txt └── watchdog_fileobserver.py2.5. Watchdog 使用案例 2.5.1 忽略子目录或只包含模式匹配文件的情况 如果要忽略某个目录中的某些文件可以使用最简单的方法之一即使用 PatternMatchingEventHandler 在文件 my_event_handler.py 中修改 MyHandler 中的继承类PatternMatchingEventHandler如下所示 class MyHandler(PatternMatchingEventHandler):……..if name main:event_handler MyHandler(patterns[*.csv, .pdf],ignore_patterns[],ignore_directoriesTrue)……..2.5.2 使用 Celery 来启动/停止 Watchdog 可以使用下面的示例来实现 Watchdog。不过这个示例只是一个关于如何将 celery 集成到Watchdog 中的想法。 from celery import Celery from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler import os import timeapp Celery(celery_ex.celery_apptask_ex, brokerredis://localhost:63790)app.task def process_file(file_path):# do something with the filewith open(file_path, r) as f:print(f.read())class MyHandler(PatternMatchingEventHandler):def on_created(self, event):file_size -1while file_size ! os.path.getsize(event.src_path):file_size os.path.getsize(event.src_path)print(file_size)time.sleep(1)if event:print(file created:{}.format(event.src_path))# call function hereprocess_file.apply_async(args(event.src_path,))if name main:observer Observer()event_handler MyHandler(patterns[.csv, .pdf],ignore_patterns[],ignore_directoriesTrue)observer.schedule(event_handler, path./input_files, recursiveTrue)observer.start()try:while True:time.sleep(1)except KeyboardInterrupt:observer.stop()observer.join()此示例需要 redis、celery 支持 2.5.3 监控目录变化 观察者observer可以设置指定目录及其所有子目录在文件或目录创建、删除或修改时调用相应的方法on_created、on_deleted 或 on_modified观察者以无限循环的方式运行可以被键盘中断打断。 import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandlerclass EventHandler(FileSystemEventHandler):def on_created(self, event):if event.is_directory:print(Directory created:, event.src_path)else:print(File created:, event.src_path)def on_deleted(self, event):if event.is_directory:print(Directory deleted:, event.src_path)else:print(File deleted:, event.src_path)def on_modified(self, event):if event.is_directory:print(Directory modified:, event.src_path)else:print(File modified:, event.src_path)event_handler EventHandler() observer Observer() observer.schedule(event_handler, /path/to/dir, recursiveTrue) observer.start()try:while True:time.sleep(1) except KeyboardInterrupt:observer.stop() observer.join()2.5.4 使用线程和多进程执行 Watchdog 来启动独立进程 可以运行 Watchdog使用线程和多进程并行处理多个文件。下面是一个相同的示例: from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler import os import ntpath import time import optparse import multiprocessing import threading from collections import OrderedDictlock threading.RLock()def process_function(get_event, event_dict):print(fProcess started for event: {get_event})dir_path ntpath.abspath(get_event)file_name ntpath.basename(get_event)if len(get_event) 0:your_own_function()do something….class Handler(PatternMatchingEventHandler):def init(self, queue):PatternMatchingEventHandler.init(self, patterns[.csv],ignore_patterns[],ignore_directoriesTrue)self.queue queuedef on_created(self, event):# logger.info(fWait while the transfer of the file is finished before processing it)# file_size -1# while file_size ! os.path.getsize(event.src_path):# file_size os.path. getsize(event.src_path)# time.sleep(1)file Nonewhile file is None:try:file open(event.src_path)except OSError:logger.info(Waiting for file transfer)time.sleep(5)continueself.queue.put(event.src_path)def on_modified(self, event):passdef start_watchdog(watchdog_queue, dir_path):logger.info(fStarting Watchdog Observer\n)event_handler Handler(watchdog_queue)observer Observer()observer.schedule(event_handler, dir_path, recursiveFalse)observer.start()try:while True:time.sleep(1)except Exception as error:observer.stop()logger.error(fError: {str(error)})observer.join()if name main:dir_path r//file_path/watchdog_queue Queue()logger.info(fStarting Worker Thread)worker threading.Thread(targetstart_watchdog, nameWatchdog,args(watchdog_queue, dir_path), daemonTrue)worker.start()mp Manager()event_dict mp.dict()while True:if not watchdog_queue.empty():logger.info(fIs Queue empty: {watchdog_queue.empty()})pool Pool()pool.apply_async(process_function, (watchdog_queue.get(), event_dict))else:time.sleep(1)2.5.5 在 Watchdog 中进行日志记录 要记录事件可以创建一个继承自 FileSystemEventHandler 类的自定义事件处理程序类并重写与要记录的事件相对应的方法。 下面举例说明如何使用 Watchdog 库记录文件创建和修改事件 import logging from watchdog.observers import Observer from watchdog.events import FileSystemEventHandlerclass LogEventHandler(FileSystemEventHandler):def on_created(self, event):if not event.is_directory:logging.info(fFile created: {event.src_path})def on_modified(self, event):if not event.is_directory:logging.info(fFile modified: {event.src_path})logging.basicConfig(filenamewatchdog.log, levellogging.INFO, format%(asctime)s - %(message)s) event_handler LogEventHandler() observer Observer() observer.schedule(event_handler, /path/to/, recursiveTrue) observer.start()try:while True:time.sleep(1) except KeyboardInterrupt:observer.stop() observer.join()3. 最后 无论你是在一个需要跟踪多个文件的大型项目中工作还是只想关注单个文件的任务Watchdog 库都能满足你的需求。 感谢您花时间阅读文章 关注公众号不迷路