回音壁


一切爆发都有片刻的宁静/一切死亡都有冗长的回声


Celery


http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html
http://python.jobbole.com/87238/
https://www.cnblogs.com/forward-wang/p/5970806.html
https://www.cnblogs.com/jonathan1314/p/7649249.html
https://www.cnblogs.com/alex3714/p/6351797.html

安装

1
2
3
4
5
apt-get install redis-server
pip install redis
pip install celery
sudo apt install redis
sudo pip install -U "celery[redis]"

Demo

1.新建tasks.py

1
2
3
4
5
6
7
8
9
#!/usr/bin/python
#-*- coding:utf-8 -*-

from celery import Celery

app = Celery('tasks', #任务名称
broker='redis://localhost:6379/0', #消息队列
             backend='redis://localhost') #任务结果存储
``

#app = Celery()

#app.conf.broker_url = ‘redis://localhost:6379/0’ #app.conf.result_backend = ‘redis://localhost:6379/0’

1
2
3
4

@app.task #普通函数装饰为 celery task
def add(x,y):
    return x+y

2.启动celery worker角色 来开始监听并执行任务

1
2
$ celery worker -A tasks -l info
$ celery worker --help

-a ,–app APP
-b BROKER, –broker BROKER
-D, –detach Start worker as a background process.
-l LOGLEVEL, –loglevel LOGLEVEL
Logging level, choose between DEBUG, INFO, WARNING,
ERROR, CRITICAL, or FATAL.

3.另起终端执行任务

1
2
from tasks import add
add.delay(1,1)

-celery常用接口
tasks.add(4,6) —> 本地执行
tasks.add.delay(3,4) –> worker执行
t=tasks.add.delay(3,4) –> t.get() 获取结果,或卡住,阻塞
t.ready()—> False:未执行完,True:已执行完
t.get(propagate=False) 抛出简单异常,但程序不会停止
t.traceback 追踪完整异常
t.id
t.state

后台启动

celery multi start w1 -A project -l info
celery multi start w2 -A project -l info
celery multi start w3 -A project -l info

celery multi restart w1 -A project -l info
celery multi stop w1 w2 w3 # 任务立刻停止
celery multi stopwait w1 w2 w3 # 任务执行完,停止

项目结构

cron_task.py

1
2
#!/usr/bin/python
#-*- coding:utf-8 -*-

project
|– init.py
|– celery.py # 配置文档
|– cron_tasks.py # 任务函数
|– cron_tasks2.py # 任务函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from celery import Celery
from celery.schedules import crontab

app = Celery('project',
             broker='redis://localhost',
             backend='redis://localhost',
             include=['cron_task',])


app.conf.update(
    result_expires=3600,
)

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每10s调用 test('hello')
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # 每20s调用 test('world')
    sender.add_periodic_task(20.0, test.s('world'), expires=10)

    # 每周一早上7:30 执行 test('Happy Mondays!')
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1), # 可灵活修改
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print arg

启动角色 worker 执行任务

celery worker -A project -l info

启动角色 beat 将定时任务放到队列中

celery beat -A project.periodic_task -l debug