化妆品 网站建设案例小米的网站设计
- 作者: 五速梦信息网
- 时间: 2026年03月21日 10:51
当前位置: 首页 > news >正文
化妆品 网站建设案例,小米的网站设计,网页游戏开服表的排行榜,阜阳网站制作公司报价目录Celery 简介介绍安装基本使用Flask使用Celery异步任务定时任务Celery使用Flask上下文进阶使用参考停止Worker后台运行Celery 简介 介绍 Celery 是一个简单、灵活且可靠的#xff0c;处理大量消息的分布式系统#xff0c;并且提供维护这样一个系统的必需工具。 它是一个… 目录Celery 简介介绍安装基本使用Flask使用Celery异步任务定时任务Celery使用Flask上下文进阶使用参考停止Worker后台运行Celery 简介 介绍 Celery 是一个简单、灵活且可靠的处理大量消息的分布式系统并且提供维护这样一个系统的必需工具。 它是一个专注于实时处理的任务队列同时也支持任务调度。 Celery 通过消息机制进行通信通常需要中间人Broker和工作者Worker来进行调节。其中Broker就是消息中间件常用的rabbitmq和redis主要用来进行发送和接收消息Worker就是任务的执行单元通常是开发者来自己定义任务的内容。 Celery 特点 简单Celery 上手比较简单不需要配置文件就可以直接运行高可用如果出现丢失连接或连接失败职程Worker和客户端会自动重试并且中间人通过 主/主 主/从 的方式来进行提高可用性快速单个 Celery 进行每分钟可以处理数以百万的任务而且延迟仅为亚毫秒使用 RabbitMQ、 librabbitmq 在优化过后灵活Celery 的每个部分几乎都可以自定义扩展和单独使用例如自定义连接池、序列化方式、压缩方式、日志记录方式、任务调度、生产者、消费者、中间人Broker等。 Celery 主要用途 异步执行解决耗时任务,将耗时操作任务提交给Celery去异步执行比如发送短信/邮件、消息推送、音视频处理等等延迟执行解决延迟任务比如有些跑批接口的任务需要耗时比较长调度可以通过调度功能在一段时间内指定任务的执行时间 datetime也可以根据简单每隔一段时间进行执行重复的任务支持分钟、小时、星期几也支持某一天或某一年的Crontab表达式。 Celery 主要架构 celery 的5个主要角色 Task就是任务有异步任务和定时任务Broker中间人接收生产者发来的消息即Task将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务推荐用Redis或RabbitMQ实现队列服务Worker执行任务的单元它实时监控消息队列如果有任务就获取任务并执行它Beat定时任务调度器根据配置定时将任务发送给BrokerBackend用于存储任务的执行结果例如SQLAlchemy/Django] ORM、Memcached、Redis、 RPC RabbitMQ/AMQP)以及自定义的后端结果存储中间件。 安装 安装比较简单命令如下 pip install Celery另外Celery 自定义了一组用于安装 Celery 和特定功能的依赖。可以在中括号加入您需要依赖并可以通过逗号分割需要安装的多个依赖包例如安装rabbitmq和redis依赖如下 pip install celery[librabbitmq,redis]注意RabbitMQ 是默认的中间人Broker只需要配置连接的URL即可不需要安装额外的的配置以及初始化配置信息。但是使用redis作为中间件时或作为结果存储必须要安装 Celery 的依赖库按照上面的命令即可。 基本使用 选用rabbitmq作为Broker。 创建一个工程保存为一个 app 文件中。针对大型的项目可能需要创建 独立的模块。 首先创建 tasks.py from celery import Celeryapp Celery(tasks,brokeramqp://guest:guestlocalhost:5672,backendredis://localhost)app.task def add(x, y):return x yCelery参数 第一个参数为当前模块的名称只有在 main模块中定义任务时才会生产名称。 第二个参数为中间人Broker的链接 URL 实例中使用的 RabbitMQCelery默认使用的也是RabbitMQ。 第三个参数为后端结果链接实例中使用的redis。 这里创建了一个名称为 add 的任务返回的俩个数字的和。 然后启动celery服务 celery -A tasks worker –loglevelinfo成功运行后打印如下信息 ————– celerytestdeMacBook-Pro.local v5.2.7 (dawn-chorus) — ***** —– – ******* —- macOS-10.16-x86_64-i386-64bit 2023-01-03 21:39:48
*** — * —
** ———- [config]
** ———- . app: tasks:0x7fcbf8ce4e20
** ———- . transport: amqp://guest:**localhost:5672//
** ———- . results: redis://localhost/
*** — * — . concurrency: 8 (prefork) – ******* —- . task events: OFF (enable -E to monitor tasks in this worker) — ***** —– ————– [queues]. celery exchangecelery(direct) keycelery[tasks]. tasks.add[2023-01-03 21:39:48,906: INFO/MainProcess] Connected to amqp://guest:*127.0.0.1:5672// [2023-01-03 21:39:48,916: INFO/MainProcess] mingle: searching for neighbors [2023-01-03 21:39:49,957: INFO/MainProcess] mingle: all alone [2023-01-03 21:39:50,007: INFO/MainProcess] celerytestdeMacBook-Pro.local ready.其中可以看到配置信息和任务信息。 最后调用任务 需要调用我们创建的实例任务可以通过 delay() 进行调用。 delay() 是 apply_async() 的快捷方法可以更好的控制任务的执行 from tasks import addadd.delay(4, 4) AsyncResult: e4722dc5-1ccd-45b5-ba89-75ff3edf4255调用任务会返回一个 AsyncResult 的实例用于检测任务的状态等待任务完成获取返回值如果任务执行失败会抛出异常。默认这个功能是不开启的到那时我们配置了backend所以可以查看任务的状态如下 result.ready() # ready() 可以检测是否已经处理完毕 Trueresult.get(timeout1) # get获取任务执行结果 8如果任务出现异常可以通过以下命令进行回溯 result.traceback同时该任务已经有被celery Worker开始处理可以在启动celery的控制台输出的日志进行查看执行情况如下可以看到任务被成功执行 [2023-01-03 21:48:08,374: INFO/MainProcess] Task tasks.add[1b5f11de-c113-4f07-b2de-52c7f7594959] received [2023-01-03 21:48:08,390: INFO/ForkPoolWorker-8] Task tasks.add[1b5f11de-c113-4f07-b2de-52c7f7594959] succeeded in 0.013839624999999245s: 8Flask使用Celery 异步任务 下面来看下在实际flask项目中如何使用Celery。 1.创建flask项目 使用pycharm创建一个flask项目名称为flask_celery。 同时创建一个文件夹名称为celery_tasks专门用于管理celery相关的文件。 2.celery配置 针对较大型的项目建议使用专用配置模块进行针对 Celery 配置。不建议使用硬编码建议将所有的配置项集中化配置。 需要在celery_tasks文件夹下创建一个名为 config.py 的文件添加以下配置内容 broker_url amqp://guest:guestlocalhost:5672 result_backend redis://:123456localhost:6379/8更多配置后面会讲解。 创建完后可以通过 app.config_from_object() 进行加载配置模块。 3.创建Celery app实例 在celery_tasks文件夹下创建main.py文件代码如下 from celery import Celery from celery_tasks import configcelery_app Celery(name)# 加载配置文件 celery_app.config_from_object(config)# 自动搜寻异步任务 celery_app.autodiscover_tasks([celery_tasks.sms]) 注意autodiscover_tasks用来自动搜寻我们指定文件夹下创建的任务但是任务所在的文件必须叫tasks.py。 当然也可以在创建celery对象时使用参数include直接指定搜寻任务文件夹如下 celery_app Celery(tasks,brokerredis://127.0.0.1:6379⁄1,backendredis://127.0.0.1:6379⁄2,include[celery_tasks.sms,])4.创建任务 创建一个发送短信验证码的任务先在celery_tasks文件夹下创建sms文件夹然后创建一个tasks.py为了能被自动搜寻到文件编辑代码如下 import timefrom celery_tasks.celery_service import celery_appcelery_app.task def send_sms(phone, data):模拟发送短信time.sleep(3)print(发送短信成功)return True 5.创建业务接口 一般真实业务中我们的任务都是由flask项目中的业务去调用。这里创建一个名称为sms的接口来调用发送短信验证码的任务代码如下 from flask import Flaskfrom celery_tasks.sms_task import send_smsapp Flask(name)app.route(/) def hello_world():return Hello World!app.route(/sms) def sms():# 做一些业务操作# delay函数会把任务交给celery去执行然后立即返回所以是异步send_sms.delay()return send sms successif name main:app.run() 6.启动flask项目 启动flask项目即可 7.启动celery 注意启动时不要切换到celery_app所在文件夹下而是在项目根目录即可后面会说明为什么。 执行命令 celery -A celery_tasks.main worker -l info————– celerytestdembp v5.2.7 (dawn-chorus) — **** —– – ******* —- macOS-10.16-x86_64-i386-64bit 2023-01-05 00:14:03
*** — * —
** ———- [config]
** ———- . app: celery_tasks.celery_service:0x7f95f8a69f70
** ———- . transport: amqp://guest:**localhost:5672//
** ———- . results: redis://:**localhost:6379⁄8
*** — * — . concurrency: 8 (prefork) – ******* —- . task events: OFF (enable -E to monitor tasks in this worker) — ***** —– ————– [queues]. celery exchangecelery(direct) keycelery[tasks]. celery_tasks.sms.tasks.send_sms[2023-01-05 00:14:04,226: INFO/MainProcess] Connected to amqp://guest:**127.0.0.1:5672// [2023-01-05 00:14:04,235: INFO/MainProcess] mingle: searching for neighbors [2023-01-05 00:14:05,273: INFO/MainProcess] mingle: all alone [2023-01-05 00:14:05,298: INFO/MainProcess] celerytestdembp ready.可以看到tasks里已经有我们的celery_tasks.sms.tasks.send_sms任务了注意这里的任务名就是这个包含路径的全名而不只是send_sms这个函数名。 注意正式环境启动时可以使用如下命令启动celer worker nohup celery -A proj worker - P gevent -c 1000 celery.log 21 # 1.nohup 和末尾的:后台执行忽略所有挂断
2.- P gevent -c 1000 开1000个协程执行可根据业务调整8.测试
访问我们的sms接口http://127.0.0.1:5000/sms。然后可以看到接口是立即返回的没有延迟3秒。然后再celery任务控制台看到如下信息 [2023-01-05 00:16:54,391: INFO/MainProcess] Task celery_tasks.sms.tasks.send_sms[23b58998-ab44-4bc6-8aff-bc6cc12aa664] received [2023-01-05 00:16:57,394: WARNING/ForkPoolWorker-8] 发送短信成功 [2023-01-05 00:16:57,402: INFO/ForkPoolWorker-8] Task celery_tasks.sms.tasks.send_sms[23b58998-ab44-4bc6-8aff-bc6cc12aa664] succeeded in 3.0086944159999973s: True任务被成功执行且看到最后耗时3.0086944159999973s返回True。 看一下整个项目的结构如下 最后特别说明 整个项目其实由三部分组成客户端flask应用、Celery服务端、中间件broker。这三部分在实际成产中都可以分开单独去部署Celery本身也是一个单独的服务只不过为了方便写在了flask项目中实际也可以把celery_tasks文件夹单独拿出来单独去部署启动服务。但是文件分开后注意一点flask客户端去调用celery里的异步任务的时候是通过任务名就是上面我们看到的文件路径加函数名组成的放到Broker中celery服务从Broker获取任务名去执行。flask可以没有具体的异步任务逻辑代码只要一个任务名就行但是为了flask能顺利调用异步任务flask项目使用celery的相关文件的目录结构要与celery本身服务的目录结构相同这样保证了flask客户端和celery服务两边的任务名相同。 当celery异步任务中需要flask上下文时就不宜把flask项目和celery项目分开了。 还有一点是除非有特殊要求否则任务最好不要返回值能省去后端结果存储、提高性能。 定时任务 创建定时任务时使用celery的celery beat它是一个调度程序定期启动任务然后由集群中的可用节点执行任务。 默认情况下会从配置中的 beat_schedule 项中获取条目(entries也就是要执行的具体的定时任务)但是也可以使用自定义存储例如将entries存储在SQL数据库中。 时区设置默认情况下定期任务计划使用UTC时区但是可以使用时区设置更改使用的时区。 例如配置 Asia/Shanghai broker_url amqp://guest:guestlocalhost:5672 result_backend redis://:123456localhost:6379⁄8
时区设置
timezone Asia/Shanghai也可以你直接使用app.conf.timezone Asia/Shanghai来直接配置。 首先创建我们的任务在celery_tasks文件夹创建一个新文件夹scheduled然后创建tasks.py文件编辑 celery_tasks/scheduled/tasks.py代码如下 from celery_tasks.main import celery_appcelery_app.task def scheduled_task(name):模拟定时任务print(f{name}执行定时任务)return True 然后在周期调度列表中添加条目(entry)建议使用如下方式添加编辑celery_tasks/main.py from datetime import timedeltafrom celery import Celery# from app import app from celery_tasks import configcelery_app Celery(name)# 加载配置文件 celery_app.config_from_object(config)# 自动搜寻异步任务 celery_app.autodiscover_tasks([celery_tasks.sms, celery_tasks.scheduled])# 设置定时任务 celery_app.conf.beat_schedule {# 名字最好做到见名知意add-every-10-seconds: {# 执行scheduled.tasks下的scheduled_task函数task: celery_tasks.scheduled.tasks.scheduled_task,# 每隔3秒执行一次schedule: 3.0,# schedule: timedelta(seconds3),# 传递参数args: (张三,)}, }设置时间间隔时也可以使用timedelta不仅可以设置秒也可以设置天年等间隔这样更加灵活。 下面启动我们的celery worker服务 celery -A celery_tasks.main worker -l info# 显示如下启动成功————– celerytestdembp v5.2.7 (dawn-chorus) — ***** —– – ******* —- macOS-10.16-x86_64-i386-64bit 2023-01-07 12:14:20
*** — * —
** ———- [config]
** ———- . app: celery_tasks.main:0x7fbb2066c3d0
** ———- . transport: amqp://guest:**localhost:5672//
** ———- . results: redis://:**localhost:6379⁄8
*** — * — . concurrency: 8 (prefork) – ******* —- . task events: OFF (enable -E to monitor tasks in this worker) — ***** —– ————– [queues]. celery exchangecelery(direct) keycelery[tasks]. celery_tasks.scheduled.tasks.scheduled_task. celery_tasks.sms.tasks.send_sms[2023-01-07 12:14:21,070: INFO/MainProcess] Connected to amqp://guest:**127.0.0.1:5672// [2023-01-07 12:14:21,083: INFO/MainProcess] mingle: searching for neighbors [2023-01-07 12:14:22,119: INFO/MainProcess] mingle: all alone [2023-01-07 12:14:22,145: INFO/MainProcess] celerytestdembp ready.这时候是不会执行任何任务的。与一般的异步任务不同的是定时任务是依赖celery beat服务的需要单独再启动celery beat服务如下 celery -A celery_tasks.main beat
显示如下则启动成功
celery beat v5.2.7 (dawn-chorus) is starting. __ - … __ - _ LocalTime - 2023-01-07 12:07:10 Configuration -. broker - amqp://guest:**localhost:5672//. loader - celery.loaders.app.AppLoader. scheduler - celery.beat.PersistentScheduler. db - celerybeat-schedule. logfile - [stderr]%WARNING. maxinterval - 5.00 minutes (300s) beat进程会读取配置文件的内容周期性的将配置中到期需要执行的任务发送给任务队列然后worker进程从队列读取执行。 启动后在worker服务的终端马山可以看到任务已经执行了如下 [2023-01-07 12:14:28,999: INFO/MainProcess] Task celery_tasks.scheduled.tasks.scheduled_task[64510402-3a67-43f4-b1f3-60477f873e7b] received [2023-01-07 12:14:29,001: WARNING/ForkPoolWorker-8] 张三执行定时任务 [2023-01-07 12:14:29,008: INFO/ForkPoolWorker-8] Task celery_tasks.scheduled.tasks.scheduled_task[64510402-3a67-43f4-b1f3-60477f873e7b] succeeded in 0.007088375000000369s: True [2023-01-07 12:14:31,973: INFO/MainProcess] Task celery_tasks.scheduled.tasks.scheduled_task[d81cf939-c24d-4cdd-b111-03bfa4a315b8] received [2023-01-07 12:14:31,974: WARNING/ForkPoolWorker-8] 张三执行定时任务 [2023-01-07 12:14:31,976: INFO/ForkPoolWorker-8] Task celery_tasks.scheduled.tasks.scheduled_task[d81cf939-c24d-4cdd-b111-03bfa4a315b8] succeeded in 0.0017581670000001992s: True [2023-01-07 12:14:34,970: INFO/MainProcess] Task celery_tasks.scheduled.tasks.scheduled_task[385e75e0-e9c6-4510-993d-12a82799e5dc] received [2023-01-07 12:14:34,971: WARNING/ForkPoolWorker-8] 张三执行定时任务 [2023-01-07 12:14:34,972: INFO/ForkPoolWorker-8] Task celery_tasks.scheduled.tasks.scheduled_task[385e75e0-e9c6-4510-993d-12a82799e5dc] succeeded in 0.0008522909999992834s: True可以看到确实任务每3秒执行一次。 设置任务时还可以更加灵活的使用Crontab 调度器例如一天中的特定时间或一周中的某天执行任务。 如下设置任务每周一的上午7:30分执行 from celery.schedules import crontabapp.conf.beat_schedule {# Executes every Monday morning at 7:30 a.m.add-every-monday-morning: {task: celery_tasks.scheduled.tasks.scheduled_task,schedule: crontab(hour7, minute30, day_of_week1),args: (张三,),}, }Celery使用Flask上下文 参考https://flask.palletsprojects.com/en/2.2.x/patterns/celery/ 关于Flask的上下文原理参考文章https://blog.csdn.net/qq_43745578/article/details/129574039?spm1001.2014.3001.5501 根据Flask上下文原理介绍我们知道 在一个非请求的线程中想要使用应用上下文必须要 手动把上下文push到栈中文章中介绍了两种方法。 在Celery执行任务时也一样由于Celery是异步的和请求并不是一个线程所以Celery要想使用上下文也需要同样把上下文手动push到栈中。这里我们使用with app.app_context()。 同时我们需要使用工厂函数模式来创建app和celery_app。 修改我们的项目结构如下 主要变化如下 1.增加一个app文件夹用于创建Flaskapp和Celery app 2.celery_tasks文件夹下的init.py文件增加创建Celery app的工厂方法。 3.增加run.py取代之前的app.py用于启动flask应用。 具体代码如下 1celery_tasks/init.py 这个文件增加用来创建Celery app的工厂方法 from celery import Celery, Task from flask import Flaskdef celery_init_app(app: Flask) - Celery:# 重写celery的task类主要把任务放在上下文中执行class FlaskTask(Task):def call(self, *args: object, **kwargs: object) - object:with app.app_context():return self.run(*args, **kwargs)celery_app Celery(app.name, task_clsFlaskTask)# 把celery赋值到app的属性中便于后面取app.extensions[celery] celery_appreturn celery_app2app_factory/init.py 这个文件增加flask app的工厂方法用于创建flask app 和 celery app from flask import Flask from celery_tasks import celery_init_appdef create_app() - Flask:app Flask(name)celery_init_app(app)return appflask_app create_app() celery_app flask_app.extensions[celery]3celery_tasks/main.py 修改次文件不再创建celery app而是使用app_factory/init.py文件创建的 celery app。 from celery import Celeryfrom celery_tasks import config from app_factory import celery_app# celery_app Celery(name)# 加载配置文件 celery_app.config_from_object(config)# 自动搜寻异步任务 celery_app.autodiscover_tasks([celery_tasks.sms, celery_tasks.scheduled, celery_tasks.context_tasks])# 设置定时任务 celery_app.conf.beat_schedule {# 名字最好做到见名知意add-every-10-seconds: {# 执行scheduled.tasks下的scheduled_task函数task: celery_tasks.scheduled.tasks.scheduled_task,# 每隔3秒执行一次schedule: 3.0,# schedule: timedelta(seconds3),# 传递参数args: (张三,)}, }配置文件还和以前一样。 4创建celery任务 新建文件夹celery_tasks/context_tasks在里面创建文件tasks.py from app_factory import celery_app from flask import current_appcelery_app.task def context_task():celery中使用flask上下文print(context_task)with current_app.app_context():print(fcurrent_app mysql:{current_app.mysql})return str(current_app.config)这里主要创建一个task并且使用了flask中的应用上下文打印出来应用的name。 5run.py 创建视图函数启动app from app_factory import flask_app as app from celery_tasks.context_tasks.tasks import context_task from celery_tasks.sms.tasks import send_smsapp.route(/sms, methods[GET]) def sms():send_sms.delay()return send sms successapp.route(/app_context, methods[GET]) def test_flask_app_context():result context_task.delay()print(result)return success!if name main:app.run()这里的app使用我们工厂函数创建的app。 6启动celery celery -A celery_tasks.main worker -l info7测试 访问路由http://127.0.0.1:5000/app_context看到celery任务后台打印如下 [2023-03-21 19:04:37,353: INFO/MainProcess] Task celery_tasks.context_tasks.tasks.context_task[70891026-47f7-48b1-bb9c-41aa08be51c3] received [2023-03-21 19:04:37,356: WARNING/ForkPoolWorker-8] context_task [2023-03-21 19:04:37,359: WARNING/ForkPoolWorker-8] current_app mysql:mysql1 [2023-03-21 19:04:37,367: INFO/ForkPoolWorker-8] Task celery_tasks.context_tasks.tasks.context_task[70891026-47f7-48b1-bb9c-41aa08be51c3] succeeded in 0.011942791999999702s: Config {\ENV: \production\, \DEBUG: False, \TESTING: False, \PROPAGATE_EXCEPTIONS: None, \PRESERVE_CONTEXT_ON_EXCEPTION: None, \SECRET_KEY: None, \PERMANENT_SESSION_LIFETIME: datetime.timedelta(days31), \USE_X_SENDFILE: False, \SERVER_NAME: None, \APPLICATION_ROOT: \/\, \SESSION_COOKIE_NAME: \session\, \SESSION_COOKIE_DOMAIN: None, \SESSION_COOKIE_PATH: None, \SESSION_COOKIE_HTTPONLY: True, \SESSION_COOKIE_SECURE: False, \SESSION_COOKIE_SAMESITE: None, \SESSION_REFRESH_EACH_REQUEST: True, \MAX_CONTENT_LENGTH: None, \SEND_FILE_MAX_AGE_DEFAULT: None, \TRAP_BAD_REQUEST_ERRORS: None, \TRAP_HTTP_EXCEPTIONS: False, \EXPLAIN_TEMPLATE_LOADING: False, \PREFERRED_URL_SCHEME: \http\, \JSON_AS_ASCII: True, \JSON_SORT_KEYS: True, \JSONIFY_PRETTYPRINT_REGULAR: False, \JSONIFY_MIMETYPE: \application/json\, \TEMPLATES_AUTO_RELOAD: None, \MAX_COOKIE_SIZE: 4093} 进阶使用参考 停止Worker 使用 Control c 就可以停止职程Worker 后台运行 正式环境后台启动celery命令 nohup celery -A proj worker - P gevent -c 1000 celery.log 21 # 1.nohup 和末尾的:后台执行忽略所有挂断
2.- P gevent -c 1000 开1000个协程执行可根据业务调整参考
https://www.celerycn.io/ru-men/celery-jian-jie https://blog.csdn.net/u010339879/article/details/97691231
- 上一篇: 化妆培训网站开发建立个人网站有什么好处
- 下一篇: 化妆品公司的网站建设策划书全屏网站设计尺寸
相关文章
-
化妆培训网站开发建立个人网站有什么好处
化妆培训网站开发建立个人网站有什么好处
- 技术栈
- 2026年03月21日
-
化纤公司网站建设时事新闻热点事件
化纤公司网站建设时事新闻热点事件
- 技术栈
- 2026年03月21日
-
化工原料价格查询网站公司大厅设计效果图大全
化工原料价格查询网站公司大厅设计效果图大全
- 技术栈
- 2026年03月21日
-
化妆品公司的网站建设策划书全屏网站设计尺寸
化妆品公司的网站建设策划书全屏网站设计尺寸
- 技术栈
- 2026年03月21日
-
化妆品购物网站建设目的wordpress怎么设置主页
化妆品购物网站建设目的wordpress怎么设置主页
- 技术栈
- 2026年03月21日
-
化妆品网站的建设方案seo俱乐部
化妆品网站的建设方案seo俱乐部
- 技术栈
- 2026年03月21日
