白日依山尽,黄河入海流。欲穷千里目,更上一层楼。 -- 唐·王之涣

python ansible celery 实现任务异步执行(ArsyncResult问题)

项目背景

在公司内部运维自动化平台开发中,使用 ansible + celery 来实现添加主机之后,异步的更新主机的额外信息,比如CPU型号,序列号等等。celery delay执行对应的task任务,在接口 JsonResponse 返回时遇到 报错

1
Object of type AsyncResult is not JSON serializable

部分代码如

1
2
3
4
def update_host_extra_info(request, host_name):
.... ...
res = tasks.update_host_info.delay(host_name)
return JsonResponse({"task_result": res})

问题分析

从上面的代码和报错信息,我们可以推断出 AsyncResult 是 celery 的执行结果的返回类型,查看celery相关代码我们发现 AsyncResult 位于 celery.result

** Demo实操 **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
>>> from celery.result import AsyncResult
>>> from demo_ansible import celery_app
>>>
>>> res = AsyncResult(id="053a9dad-4362-4e6f-8ab6-c4ef2a61e52c", app=celery_app)
>>> res.
res.TimeoutError( res.maybe_reraise(
res.app res.maybe_throw(
res.args res.name
res.as_list( res.on_ready(
res.as_tuple( res.parent
res.backend res.queue
res.build_graph( res.ready(
res.children res.result
res.collect( res.retries
res.date_done res.revoke(
res.failed( res.state
res.forget( res.status
res.get( res.successful(
res.get_leaf( res.supports_native_join
res.graph res.task_id
res.id res.then(
res.ignored res.throw(
res.info res.traceback
res.iterdeps( res.wait(
res.kwargs res.worker
>>> res.task_id
'053a9dad-4362-4e6f-8ab6-c4ef2a61e52c'
>>> res.status
'SUCCESS'
>>> res.successful()
True
>>> type(res.info)
<class 'dict'>
>>> res.info
{'host_name': 'global-nginx-01-vpc', 'host_ip': '172.16.xx.xx', ... ...}]}
>>>

从上面我们看到 res.info 的类型是 dict, dict 是可以在 JsonResponse中直接使用的。

所以最开始的代码部分应该修改为:

1
2
3
4
def update_host_extra_info(request, host_name):
.... ...
res = tasks.update_host_info.delay(host_name)
return JsonResponse({"task_result": res.info})

如果此时去验证的话发现返回的记过是空,为什么呢?

因为任务是异步的,返回的时候任务还没有执行成功,这个时候获取任务结果当然是空的。 可以通过在 程序中 debug 输出 res.status(PENDING) 或者 res.successful() (False) 来验证。

问题解决

两种方式,

1、既然是异步执行,那们当前接口只负责调用执行task, 另外新增接口获取celery task执行结果

1
2
3
4
5
6
7
8
9
10
def update_host_extra_info(request, host_name):
print(host_name)
res = update_host_info.delay(host_name)
return JsonResponse({"task_id": res.task_id, "msg": "update successfully!"})


def check_task(request, task_id):
# 单独接口获取task执行结果
task = AsyncResult(id=task_id, app=celery_app)
return JsonResponse({'task_id': task_id, 'result': task.info})

2、如果一定要在一个接口中执行之后等待然后获取结果,可以修改代码为如下:

1
2
3
4
5
6
7
def update_host_extra_info(request, host_name):
print(host_name)
res = update_host_info.delay(host_name)
print(res, res.status, res.successful())
# 这里执行等待,成功的时候再返回
if res.wait():
return JsonResponse({"task_id": res.task_id, "result": res.info})

上面两种方式,一般是建议第二种,不用等待,实际更新操作异步执行即可。因为不用立马看到执行的结果

python ansible celery 实现任务异步执行(ArsyncResult问题)

http://blog.colinspace.com/2022/07/29/20220729-python-celery-ArsyncResult问题/

作者

Colin

发布于

2022-07-29

许可协议