项目背景
在公司内部运维自动化平台开发中,使用 ansible + celery 来实现添加主机之后,异步的更新主机的额外信息,比如CPU型号,序列号等等。celery delay执行对应的task任务,在接口 JsonResponse 返回时遇到 报错
1
| Object of type AsyncResult is not JSON serializable
|
部分代码如
1 2 3 4
| def update_host_extra_info(request, host_name): .... ... res = tasks.update_host_info.delay(host_name) return JsonResponse({"task_result": res})
|
问题分析
从上面的代码和报错信息,我们可以推断出 AsyncResult 是 celery 的执行结果的返回类型,查看celery相关代码我们发现 AsyncResult 位于 celery.result
中
** Demo实操 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| >>> from celery.result import AsyncResult >>> from demo_ansible import celery_app >>> >>> res = AsyncResult(id="053a9dad-4362-4e6f-8ab6-c4ef2a61e52c", app=celery_app) >>> res. res.TimeoutError( res.maybe_reraise( res.app res.maybe_throw( res.args res.name res.as_list( res.on_ready( res.as_tuple( res.parent res.backend res.queue res.build_graph( res.ready( res.children res.result res.collect( res.retries res.date_done res.revoke( res.failed( res.state res.forget( res.status res.get( res.successful( res.get_leaf( res.supports_native_join res.graph res.task_id res.id res.then( res.ignored res.throw( res.info res.traceback res.iterdeps( res.wait( res.kwargs res.worker >>> res.task_id '053a9dad-4362-4e6f-8ab6-c4ef2a61e52c' >>> res.status 'SUCCESS' >>> res.successful() True >>> type(res.info) <class 'dict'> >>> res.info {'host_name': 'global-nginx-01-vpc', 'host_ip': '172.16.xx.xx', ... ...}]} >>>
|
从上面我们看到 res.info
的类型是 dict, dict 是可以在 JsonResponse中直接使用的。
所以最开始的代码部分应该修改为:
1 2 3 4
| def update_host_extra_info(request, host_name): .... ... res = tasks.update_host_info.delay(host_name) return JsonResponse({"task_result": res.info})
|
如果此时去验证的话发现返回的记过是空,为什么呢?
因为任务是异步的,返回的时候任务还没有执行成功,这个时候获取任务结果当然是空的。 可以通过在 程序中 debug 输出 res.status(PENDING)
或者 res.successful() (False)
来验证。
问题解决
两种方式,
1、既然是异步执行,那们当前接口只负责调用执行task, 另外新增接口获取celery task执行结果
1 2 3 4 5 6 7 8 9 10
| def update_host_extra_info(request, host_name): print(host_name) res = update_host_info.delay(host_name) return JsonResponse({"task_id": res.task_id, "msg": "update successfully!"})
def check_task(request, task_id): task = AsyncResult(id=task_id, app=celery_app) return JsonResponse({'task_id': task_id, 'result': task.info})
|
2、如果一定要在一个接口中执行之后等待
然后获取结果,可以修改代码为如下:
1 2 3 4 5 6 7
| def update_host_extra_info(request, host_name): print(host_name) res = update_host_info.delay(host_name) print(res, res.status, res.successful()) if res.wait(): return JsonResponse({"task_id": res.task_id, "result": res.info})
|
上面两种方式,一般是建议第二种,不用等待,实际更新操作异步执行即可。因为不用立马
看到执行的结果