Skip to content

Commit 77ad0e8

Browse files
committed
add celery demo
1 parent 3fce85e commit 77ad0e8

14 files changed

Lines changed: 156 additions & 34 deletions

File tree

demo/celery_demo/celery.py

Lines changed: 0 additions & 15 deletions
This file was deleted.

demo/celery_demo/celery_demo1.py

Lines changed: 0 additions & 8 deletions
This file was deleted.

demo/celery_demo/demo1/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
1. 启动 celery 进程,开始监听(角色为消费者,处理任务)
2+
3+
`celery -A task worker -l info`
4+
5+
-A 后面跟应用名称,即有 Celery 对象的文件,这里是 task.py
6+
7+
`worker -c <concurrency>` 指定并发的数量,默认是系统 cpu 数量
8+
9+
`worker -P [prefork|eventlet|gevent|solo|processes|threads]` 池的实现方式,默认是 prefork
10+
11+
2. 运行任务 exec_task.py(角色为生产者,产生任务)
12+
13+
`python exec_task.py`
14+
15+
运行之后,可以在 celery 进程中看到 worker 已经接受了一系列 task 并开始了处理,一般在运行任务时打印出任务 ID。
16+
17+
3. 运行结果可以通过 fetch_result.py 根据任务 ID 来获取。
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from task import send_msg, send_msg1
2+
import time
3+
4+
# for i in range(5):
5+
# send_msg1.delay(str(i))
6+
# send_msg.delay(str(i+5))
7+
8+
result = send_msg.delay("Hello")
9+
print(result.id)
10+
while not result.ready():
11+
print("正在执行中...")
12+
time.sleep(0.5)
13+
14+
print("执行完成\n" + result.get())
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from task import send_msg
2+
import time
3+
from datetime import datetime
4+
5+
6+
# 指定时间运行任务,默认是 utc 时区
7+
time_start = datetime(2022,3,25,16,44,00).timestamp()
8+
time_start = datetime.utcfromtimestamp(time_start)
9+
print(time_start)
10+
11+
result = send_msg.apply_async(args=["hello"], eta=time_start)
12+
print(result.id)
13+
14+
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from celery.result import AsyncResult
2+
from task import cel
3+
4+
async_result = AsyncResult(id="be35b203-3f5e-4da2-b3d9-e2d1f9b12dc3", app=cel)
5+
6+
STATUS = ['PENDING', 'STARTED', 'RETRY', 'FAILURE', 'SUCCESS']
7+
8+
if async_result.successful():
9+
result = async_result.get()
10+
print(result)
11+
elif async_result.failed():
12+
print('任务执行失败')

demo/celery_demo/demo1/task.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import time
2+
from celery import Celery
3+
from pytz import timezone
4+
5+
broker = "redis://localhost:6379/0"
6+
backend = "redis://localhost:6379/1"
7+
8+
cel = Celery('tasks', broker=broker,backend=backend)
9+
10+
# 相当于注册一个任务
11+
@cel.task
12+
def send_msg(user):
13+
print(f"MSG-1 Sending msg to {user}...")
14+
time.sleep(2)
15+
msg = f"MSG-1 Sent msg to {user} done."
16+
return msg
17+
18+
@cel.task
19+
def send_msg1(user):
20+
print(f"MSG-2 Sending msg1 to {user}...")
21+
time.sleep(5)
22+
msg = f"MSG-2 Sent msg1 to {user} done."
23+
return msg

demo/celery_demo/demo2/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
拆分任务模块,在初始化 Celery 对象的时候使用 `include` 来注册任务。
File renamed without changes.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from datetime import timedelta
2+
from celery import Celery
3+
4+
broker = "redis://localhost:6379/0"
5+
backend = "redis://localhost:6379/1"
6+
7+
cel = Celery(
8+
"celery_demo",
9+
broker=broker,
10+
backend=backend,
11+
# 通过引入的方式来注册任务,可以将各个任务分散开
12+
include=["celery_tasks.send_msg", "celery_tasks.send_mail"],
13+
)
14+
15+
cel.conf.timezone = "Asia/Shanghai"
16+
17+
# 启动 worker: `celery -A celery_tasks worker -l info`
18+
19+
20+
# 创建定时任务
21+
cel.conf.beat_schedule = {
22+
"run_task1": {
23+
"task": "celery_tasks.send_msg.send_msg_1",
24+
"schedule": timedelta(seconds=5),
25+
"args": ("hello msg",)
26+
},
27+
"run_task2": {
28+
"task": "celery_tasks.send_mail.send_mail_1",
29+
"schedule": timedelta(seconds=5),
30+
"args": ("hello mail",)
31+
},
32+
}
33+
34+
# 启动定时任务,将任务周期性地添加到 worker 队列中执行
35+

0 commit comments

Comments
 (0)