白日依山尽,黄河入海流。欲穷千里目,更上一层楼。 -- 唐·王之涣

Python Ansible API封装

Python 调用 Ansible API 实现自动化管理,为后续运维平台自动化管理提供帮助,也是学习Jumpserver源码做实战演练。

代码已经做过测试,效果如下:

playbook 测试
python-ansible-playbook.png

adhoc 测试
python-ansible-adhoc.png

demo代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python
# encoding: utf-8
# Author: ColinSpace.com
# Date:2021-09
# Desc: 利用Python实现ansible的Adhoc 和 playbook 两种任务方式
#


import json
import shutil

from ansible import context
from ansible.module_utils.common.collections import ImmutableDict

# 注意: 使用namedtuple实现Options的方式,官网已经不推荐使用
# 网上很多例子都是按照Options的方式,会有报错如下:
# error_msg: "msg": "the connection plugin '<class 'ansible.utils.sentinel.Sentinel'>' was not found",
# 网上提供解决方案: https://blog.csdn.net/FMT21/article/details/103468284
# 但是目前验证报错,options 最终的tuple类型,在初始化的时候用到了 vars 系统函数,但是该函数报错
# TypeError: vars() argument must have __dict__ attribute
# from collections import namedtuple
# Options = namedtuple('Options', ['connection', 'module_path', 'forks', 'become', 'become_method', 'become_user', 'check', 'diff'])
# options = Options(connection='ssh', module_path=['/to/mymodules'], forks=10, become=None, become_method=None, become_user=None, check=False, diff=False)

from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
from ansible.executor.playbook_executor import PlaybookExecutor

import ansible.constants as C

# 注意: 该配置是为了避免没有第一次公钥访问写入 know_hosts 文件问题,
# 关于如何避免该问题,详见
C.HOST_KEY_CHECKING = False

class ResultCallback(CallbackBase):

def v2_runner_on_ok(self, result, **kwargs):
host = result._host
print(json.dumps({host.name: result._result}, indent=4))

def v2_runner_on_failed(self, result, **kwargs):
host = result._host
print(json.dumps({host.name: result._result}, indent=4))

def v2_runner_on_unreachable(self, result, **kwargs):
host = result._host
print(json.dumps({host.name: result._result}, indent=4))


class AnsibleApi:
"""
Description:
"""

context.CLIARGS = ImmutableDict(
connection='ssh', remote_user=None, listtags=None, listhosts=None, listtasks=None,
module_path=None, verbosity=5, ask_sudo_pass=False, private_key_file=None,
# become=None, become_method=None, become_user=None,
become=True, become_method='sudo', become_user='root',
forks=10, check=False, diff=False, syntax=None,start_at_task=None,
)

"""

Options = namedtuple('Options', ['connection', 'remote_user', 'listtags', 'listhosts', 'listtasks',
'module_path', 'verbosity', 'ask_sudo_pass', 'private_key_file',
'become', 'become_method', 'become_user',
'forks', 'check', 'diff', 'syntax', 'start_at_task'])
options = Options(
connection='ssh', remote_user=None, listtags=None, listhosts=None, listtasks=None,
module_path=None, verbosity=5, ask_sudo_pass=False, private_key_file=None,
# become=None, become_method=None, become_user=None,
become=True, become_method='sudo', become_user='root',
forks=10, check=False, diff=False, syntax=None,start_at_task=None,
)
# 报错: TypeError: vars() argument must have __dict__ attribute
context._init_global_context(options)
"""


# 一般使用ansible的时候都会通过免密的方式,所以这里直接初始化passwords变为空
def __init__(self):
# self.name = name
# self.host_list = host_list
# self.task_list = task_list
self.loader = DataLoader()
self.result_callback = ResultCallback()
# self.passwords = dict(vault_pass=passwords)
self.passwords = dict()
self.inventory = InventoryManager(loader=self.loader, sources=['/etc/ansible/inventory/hosts', '/etc/ansible/hosts'])
self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory)

def run_adhoc(self, name, hosts, tasks):
play_source = dict(
name=name,
hosts=hosts,
gather_facts='no',
tasks=tasks
)
play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)
tqm = None
try:
tqm = TaskQueueManager(
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
passwords=self.passwords,
stdout_callback=self.result_callback, # Use our custom callback instead of the ``default`` callback plugin, which prints to stdout
run_additional_callbacks=C.DEFAULT_LOAD_CALLBACK_PLUGINS,
run_tree=False,
)
result = tqm.run(play) # most interesting data for a play is actually sent to the callback's methods
finally:
# we always need to cleanup child procs and the structres we use to communicate with them
if tqm is not None:
tqm.cleanup()

# Remove ansible tmpdir
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)

def run_playhook(self, playbook):
# self.variable_manager.extra_vars = {
# 'customer': 'test',
# 'disabled': 'yes'
# }
playbook = PlaybookExecutor(
playbooks=playbook,
inventory=self.inventory,
variable_manager = self.variable_manager,
loader=self.loader,
passwords=self.passwords,
)
result = playbook.run()
return result

if __name__ == '__main__':
a = AnsibleApi()
host_list = ['192.168.3.4']
task_list = [
dict(action=dict(module='shell', args="ls -l "))
]
# a.run_adhoc(name="checkConnection", hosts=host_list, tasks=task_list)
a.run_playhook(playbook=["/home/james.liu/test.yml"])

作者

James

发布于

2021-10-09

许可协议