import requests, json, re, os from datetime import datetime def loadSettings(path = f"{os.path.expanduser('~')}/config.json") -> dict: # TODO: check integrity try: with open(path, 'r') as file: # Read if the API keys are path to files config = json.load(file) keys = config['api'].keys() mask = dict() for key in keys: api_key = key if os.path.exists(key): with open(key, 'r') as file: api_key = file.readline().strip() mask[key] = api_key config['api'] = { mask[key]: value for key, value in config['api'].items() } except FileNotFoundError: sample_config = { 'api': {}, 'ttl': 3600, 'resolvers': [ '' ], 'log_path': './log.txt' } with open('config.json', 'w') as file: json.dump(sample_config, file, indent = 2) return sample_config # Appends logline at the end of the file file with a timestamp def log(logline: str, path: str = './log.txt') -> None: with open(path, 'a') as file: file.write(f"[{'%Y-%m-%d %H:%M:%S')}]{logline}\n") # Go through resolvers until one retuns an IP def getCurrentIP(resolvers: list[str], log: callable) -> None | str: for resolver in resolvers: response = requests.get(resolver) if response.status_code == 200: current_ip = response.text.strip() is_ipv4 ='^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None if is_ipv4: log(f"[OK][{resolver}] Current IP: '{current_ip}'") return current_ip # Break if we have found our IP else: log(f"[ERROR][{resolver}] '{current_ip}' is not IPv4") else: log(f"[ERROR][{resolver}][{response.status_code}] {response.text}") return None # Returns the records that need to be renewed # TODO: Also detect TTL change def checkRecords(api: dict, current_ip: str, log: callable) -> dict: records_to_renew = api for key, domains in api.items(): headers = { 'authorization': f"Apikey {key}" } for domain, records in domains.items(): response = requests.get(f"{domain}/records?rrset_type=A", headers = headers) if response.status_code == 200: records_to_renew[key][domain] = list(set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (filter(lambda x: current_ip in x['rrset_values'], json.loads(response.json()))))) else: log(f"[ERROR][{domain}][{response.status_code}] {response.json()}") return records_to_renew # Updates the records and reates them if they don't exist def renewRecords(records, current_ip, log, ttl = 3600): payload = json.dumps({ 'items': [{ 'rrset_type': 'A', 'rrset_values': [ current_ip ], 'rrset_ttl': ttl }] }) for key, domains in records.items(): headers = { 'authorization': f"Apikey {key}", 'content-type': 'application/json', } for domain, records in domains.items(): for record in records: response = requests.put(f"{domain}/records/{record}", data = payload, headers = headers) if response.status_code == 201: log(f"[OK][{domain}][{record}] Renewed with IP '{current_ip}'") else: log(f"[ERROR][{domain}][{record}][{response.status_code}] {response.json()}") def main(): settings = loadSettings() log_path = settings['log_path'] ttl = settings['ttl'] myLog = lambda x: log(x, log_path) current_ip = getCurrentIP(settings['resolvers'], myLog) records_to_renew = checkRecords(settings['api'], current_ip, myLog) renewRecords(records_to_renew, current_ip, myLog, ttl = ttl) if __name__ == '__main__': main()