在Flask中集成使用Celery

最近在阅读《深入理解Flask》,很好的一本书。

这篇我们来看看如何在Flask项目中集成使用Celery。这里的逻辑原理暂时我还没完全吃透,不过我已经实现了使用,方式稍微和书籍中不同,以后我吃透之后会慢慢完善。

下面开始我们的学习吧。

我们在Flask核心目录下创建一个包tasks

在初始化模块中编写以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import os
from celery import Celery

from app import create_app


def make_celery(app_ob):
_celery = Celery(app_ob.import_name, broker=app_ob.config['CELERY_BROKER_URL'])

_celery.conf.update(app_ob.config)
task_base = _celery.Task

class ContextTask(task_base):
abstract = True

def __call__(self, *args, **kwargs):
with app_ob.app_context():
return task_base.__call__(self, *args, **kwargs)

_celery.Task = ContextTask
return _celery


flask_app = create_app(os.getenv('FLASK_CONFIG') or 'default')
celery = make_celery(flask_app)

# 这里将我们编写任务的模块导入
from app.tasks import timing_sta

创建模块timing_sta.py进行一步任务编辑。

1
2
3
4
5
6
# -*- coding: utf-8 -*-
from app.tasks import celery

@celery.task
def log(message):
return message

这样就完了吗?

当然没有,我们需要在配置文件中加入一下配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from celery.schedules import crontab


CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
CELERY_TASK_RESULT_EXPIRES = 86400
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYD_TASK_TIME_LIMIT = 120

CELERYBEAT_SCHEDULE = {
'test-beat-log': {
'task': 'app.tasks.timing_sta.log',
'schedule': crontab(minute='*/1'),
}
}

这样完了吗?

还没有,我们还需要开启Celery进程,这里会分为两类。

开启异步任务:

1
celery -A worker app.tasks --loglevel=info

开启定时任务:

1
celery -A app.tasks  beat

这样子就是完成了全部的配置,你的异步任务可以嗖嗖跑起来了。

具体例子和原理看书哦!

看下参考文章也不错:

初级原理:http://www.lhcx821.com/?p=267

普通文章:http://www.cnblogs.com/cwp-bg/p/9259974.html

稍深原理:https://blog.csdn.net/Jmilk/article/details/53677411

深入原理:https://jiayi.space/post/zai-celeryzhong-shi-yong-flaskde-shang-xia-wen

其他参考:

Flask 配置Celery

Python Celery使用

知识就是财富
如果您觉得文章对您有帮助, 欢迎请我喝杯水!