Python调用ansible2.4
- 2020 年 1 月 10 日
- 筆記
代码如下:
#!/usr/bin/env python import json import shutil from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars.manager import VariableManager from ansible.inventory.manager import InventoryManager from ansible.playbook.play import Play from ansible.executor.task_queue_manager import TaskQueueManager from ansible.executor.playbook_executor import PlaybookExecutor from ansible.plugins.callback import CallbackBase import ansible.constants as C class ResultCallback(CallbackBase): def __init__(self, *args, **kwargs): # super(ResultsCollector, self).__init__(*args, **kwargs) self.host_ok = {} self.host_unreachable = {} self.host_failed = {} def v2_runner_on_unreachable(self, result): self.host_unreachable[result._host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): self.host_ok[result._host.get_name()] = result def v2_runner_on_failed(self, result, *args, **kwargs): self.host_failed[result._host.get_name()] = result class AnsibleApi(object): def __init__(self): self.Options = namedtuple('Options', ['connection', 'remote_user', 'ask_sudo_pass', 'verbosity', 'ack_pass', 'module_path', 'forks', 'become', 'become_method', 'become_user', 'check', 'listhosts', 'listtasks', 'listtags', 'syntax', 'sudo_user', 'sudo', 'diff']) self.ops = self.Options(connection='smart', remote_user=None, ack_pass=None, sudo_user=None, forks=5, sudo=None, ask_sudo_pass=False, verbosity=5, module_path=None, become=None, become_method=None, become_user=None, check=False, diff=False, listhosts=None, listtasks=None, listtags=None, syntax=None) self.loader = DataLoader() self.passwords = dict() self.results_callback = ResultCallback() self.inventory = InventoryManager(loader=self.loader, sources=['/etc/ansible/inventory/hosts']) self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory) def runansible(self,host_list, task_list): play_source = dict( name="Ansible Play", hosts=host_list, gather_facts='no', tasks=task_list ) play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) tqm = None try: tqm = TaskQueueManager( inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.ops, passwords=self.passwords, stdout_callback=self.results_callback, run_additional_callbacks=C.DEFAULT_LOAD_CALLBACK_PLUGINS, run_tree=False, ) result = tqm.run(play) finally: if tqm is not None: tqm.cleanup() shutil.rmtree(C.DEFAULT_LOCAL_TMP, True) results_raw = {} results_raw['success'] = {} results_raw['failed'] = {} results_raw['unreachable'] = {} for host, result in self.results_callback.host_ok.items(): results_raw['success'][host] = json.dumps(result._result) for host, result in self.results_callback.host_failed.items(): results_raw['failed'][host] = result._result['msg'] for host, result in self.results_callback.host_unreachable.items(): results_raw['unreachable'][host] = result._result['msg'] print results_raw def playbookrun(self, playbook_path): self.variable_manager.extra_vars = {'customer': 'test', 'disabled': 'yes'} playbook = PlaybookExecutor(playbooks=playbook_path, inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.ops, passwords=self.passwords) result = playbook.run() return result if __name__ == "__main__": a = AnsibleApi() host_list = ['100.101.23.153'] tasks_list = [ dict(action=dict(module='command', args='ls')), # dict(action=dict(module='shell', args='python sleep.py')), # dict(action=dict(module='synchronize', args='src=/home/op/test dest=/home/op/ delete=yes')), ] a.runansible(host_list,tasks_list) a.playbookrun(playbook_path=['/etc/ansible/test.yml'])
test.yml
--- - hosts: all remote_user: root tasks: - name: Test ansible api shell: touch a.txt
执行结果:
{'failed': {}, 'success': {u'100.101.23.153': '{"_ansible_parsed": true, "stderr_lines": [], "changed": true, "end": "2018-03-29 14:32:16.038750", "_ansible_no_log": false, "stdout": "Desktop\nDocuments\nDownloads\nMusic\nPictures\nPublic\nTemplates\nVideos\nbin\nhosts\ninst-sys\ntest.txt", "cmd": ["ls"], "start": "2018-03-29 14:32:15.863760", "delta": "0:00:00.174990", "stderr": "", "rc": 0, "invocation": {"module_args": {"creates": null, "executable": null, "_uses_shell": false, "_raw_params": "ls", "removes": null, "warn": true, "chdir": null, "stdin": null}}, "stdout_lines": ["Desktop", "Documents", "Downloads", "Music", "Pictures", "Public", "Templates", "Videos", "bin", "hosts", "inst-sys", "test.txt"]}'}, 'unreachable': {}} PLAY [all] ********************************************************************************************************************************************************************************************** TASK [Gathering Facts] ********************************************************************************************************************************************************************************** ok: [100.101.23.153] TASK [Test ansible api] ********************************************************************************************************************************************************************************* fatal: [100.101.23.153]: FAILED! => {"changed": true, "cmd": "assdds", "delta": "0:00:00.173921", "end": "2018-03-29 14:32:24.685301", "msg": "non-zero return code", "rc": 127, "start": "2018-03-29 14:32:24.511380", "stderr": "/bin/sh: assdds: command not found", "stderr_lines": ["/bin/sh: assdds: command not found"], "stdout": "", "stdout_lines": []} to retry, use: --limit @/etc/ansible/main.retry PLAY RECAP ********************************************************************************************************************************************************************************************** 100.101.23.153 : ok=1 changed=0 unreachable=0 failed=1