#!/usr/bin/python from __future__ import (absolute_import, division, print_function) __metaclass__ = type DOCUMENTATION = r''' --- module: cloudns_monitor short_description: Configure a ClouDNS monitoring check description: Configure a ClouDNS monitoring check options: name: description: The name of the ClouDNS monitoring check. required: true type: str author: - @irl ''' EXAMPLES = r''' TODO ''' RETURN = r''' TODO ''' import requests from ansible.module_utils.basic import AnsibleModule CHECK_TYPE = { 'ping': '17', 'web': '18', } def get_existing_check(auth_id: str, auth_password: str, name: str) -> dict | None: url = "https://api.cloudns.net/monitoring/get-records.json" params = { 'auth-id': auth_id, 'auth-password': auth_password, 'search': name, 'rows-per-page': 50, 'page': 1, } r = requests.get(url, params=params) checks = r.json() for check_id in checks: if checks[check_id]['name'] == name: return checks[check_id] return None def update_existing_check(auth_id: str, auth_password: str, check_id: str, check_type: str, host: str, ip: str, http_status_code: str, name: str, path: str, port: int) -> dict | None: url = "https://api.cloudns.net/monitoring/update.json" params = { 'auth-id': auth_id, 'auth-password': auth_password, 'id': check_id, 'check_type': CHECK_TYPE[check_type], 'host': host, 'http_status_code': http_status_code, 'ip': ip, 'name': name, 'port': port, 'path': path, 'status_change_checks': 5, 'check_period': 60, 'timeout': 5, } r = requests.get(url, params=params) return r.json() def create_check(auth_id: str, auth_password: str, check_type: str, host: str, ip: str, http_status_code: str, name: str, path: str, port: int) -> dict | None: url = "https://api.cloudns.net/monitoring/create.json" params = { 'auth-id': auth_id, 'auth-password': auth_password, 'check_type': CHECK_TYPE[check_type], 'host': host, 'http_status_code': http_status_code, 'ip': ip, 'name': name, 'port': port, 'path': path, 'status_change_checks': 5, 'check_period': 60, 'timeout': 5, } r = requests.get(url, params=params) return r.json() def get_existing_notifications(auth_id: str, auth_password: str, check_id: str) -> dict | None: url = "https://api.cloudns.net/monitoring/list-notifications.json" params = { 'auth-id': auth_id, 'auth-password': auth_password, 'id': check_id, 'rows-per-page': 50, 'page': 1, } r = requests.get(url, params=params) return r.json() def run_module(): module_args = dict( auth_id=dict(type='str', required=True), auth_password=dict(type='str', no_log=True, required=True), check_type=dict(type='str', default="web"), emails=dict(type='list', elements='str', required=True), host=dict(type='str', required=True), http_status_code=dict(type='str', default=200), ip=dict(type='str', required=True), name=dict(type='str', required=True), path=dict(type='str', default=''), port=dict(type='int', default=443), ) module = AnsibleModule( argument_spec=module_args, supports_check_mode=True ) result = dict( changed=False, existing_check=get_existing_check( module.params['auth_id'], module.params['auth_password'], module.params['name'] ), ) if module.check_mode: module.exit_json(**result) if not result['existing_check']: result['changed'] = True create_check(auth_id=module.params['auth_id'], auth_password=module.params['auth_password'], check_type=module.params['check_type'], host=module.params['host'], http_status_code=module.params['http_status_code'], ip=module.params['ip'], name=module.params['name'], path=module.params['path'], port=module.params['port'], ) else: if result['existing_check']['check_type'] != CHECK_TYPE[module.params['check_type']]: result['changed'] = True if result['existing_check']['host'] != module.params['host']: result['changed'] = True if result['existing_check']['http_status_code'] != module.params['http_status_code']: result['changed'] = True if result['existing_check']['ip'] != module.params['ip']: result['changed'] = True if result['existing_check']['path'] != module.params['path']: result['changed'] = True if int(result['existing_check']['port']) != module.params['port']: result['changed'] = True if result['changed']: result['update_response'] = update_existing_check( auth_id=module.params['auth_id'], auth_password=module.params['auth_password'], check_id=result['existing_check']['id'], check_type=module.params['check_type'], host=module.params['host'], http_status_code=module.params['http_status_code'], ip=module.params['ip'], name=module.params['name'], path=module.params['path'], port=module.params['port'], ) result['current_check'] = get_existing_check( module.params['auth_id'], module.params['auth_password'], module.params['name']) emails_to_add = set(module.params['emails']) notifications_to_remove = set() result['existing_notifications'] = get_existing_notifications( auth_id=module.params['auth_id'], auth_password=module.params['auth_password'], check_id=result['current_check']['id'], ) for notification in result['existing_notifications']: if notification['type'] == "mail": if notification['value'] in emails_to_add: emails_to_add.remove(notification['value']) else: notifications_to_remove.add(notification['notification_id']) if emails_to_add or notifications_to_remove: result['changed'] = True for email in emails_to_add: url = "https://api.cloudns.net/monitoring/add-notification.json" params = { 'auth-id': module.params['auth_id'], 'auth-password': module.params['auth_password'], 'id': result['current_check']['id'], 'type': 'mail', 'value': email, } r = requests.get(url, params=params) r.raise_for_status() for email in notifications_to_remove: url = "https://api.cloudns.net/monitoring/delete-notification.json" params = { 'auth-id': module.params['auth_id'], 'auth-password': module.params['auth_password'], 'id': result['current_check']['id'], 'notification-id': email, } r = requests.get(url, params=params) r.raise_for_status() result['current_notifications'] = get_existing_notifications( auth_id=module.params['auth_id'], auth_password=module.params['auth_password'], check_id=result['current_check']['id'], ) module.exit_json(**result) def main(): run_module() if __name__ == '__main__': main()