import os import re import json import argparse from datetime import datetime from typing import Callable, Any import requests def loadSettings(path: str | None) -> dict: ''' Load the settings file. `path` Path to a settings file. By default the program will look for `$HOME/.config/dyn-gandi/config.json` and then `/etc/dyn-gandi.json`. ''' if path is None: path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json" if not os.path.exists(path): path = '/etc/dyn-gandi.json' if not os.path.exists(path): print( f"[ERROR] No config file found at '~/.config/dyn-gandi/config.json' or '/etc/dyn-gandi.json'") quit() if not os.path.exists(path): print(f"[ERROR] '{path}' does not exist.") quit() # TODO check integrity of the config file with open(path, 'r') as file: # Load as dictionary config = json.load(file) # API Key entries keys = config['api'].keys() # This will build up a copy of the API section with the API keys instead of filenames. mask = dict() # If the API key is a file the use the contents as a new key. for key in keys: api_key = key if os.path.exists(key): with open(key, 'r') as file: api_key = file.readline().strip() mask[key] = api_key config['api'] = {mask[key]: value for key, value in config['api'].items()} return config def log(logline: str, path: str = './log.txt', stdout: bool = False) -> None: ''' Appends `logline` at the end of the file file with a timestamp. `logline` The line to log. 'path' Path to file where to log. `stdout` Wether to print to stdout instead of a file. ''' line = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}" if stdout: print(line) else: with open(path, 'a') as file: file.write(line + '\n') def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None: ''' Go through resolvers until one returns an IP. `resolvers` A list of IP resolvers. `log` A function which takes a string and logs it. ''' for resolver in resolvers: response = requests.get(resolver) if response.ok: current_ip = response.text.strip() # It suffices to check whether the search is not None since the regex matches from beginning to end. is_ipv4 = re.search( '^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None if is_ipv4: log(f"[OK][{resolver}] Current IP: '{current_ip}'") return current_ip # Break if we have found our IP else: log(f"[ERROR][{resolver}] '{current_ip}' is not IPv4") else: log(f"[ERROR][{resolver}][{response.status_code}] {response.text}") return None def checkRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any]) -> dict[str, dict[str, list[str]]]: ''' Return records that need to be renewed. `api` The API section of the settings file. `current_ip` The current IP address. `log` A function that takes in a string and logs it. ''' # TODO: Detect TTL change records_to_renew = api for key, domains in api.items(): headers = {'authorization': f"Apikey {key}"} for domain, _ in domains.items(): response = requests.get( f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers) if response.status_code == 200: records_to_renew[key][domain] = list(set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in ( filter(lambda x: current_ip in x['rrset_values'], response.json())))) if (records := records_to_renew[key][domain]) != []: log(f"[INFO][{','.join(records)}] Records need to be renewed") else: log(f"[ERROR][{domain}][{response.status_code}] {response.json()}") return records_to_renew def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl: int = 300) -> None: ''' Update records and create them, if they don't exist. `api` The API section of the settings file. `current_ip` The current IP address. `log` A function that takes a string and logs it. `ttl` How long a record should be cached. ''' payload = json.dumps({ 'rrset_values': [current_ip], 'rrset_ttl': ttl }) for api_key, domains in api.items(): headers = { 'authorization': f"Apikey {api_key}", 'content-type': 'application/json', } for domain, records in domains.items(): for record in records: response = requests.put( f"https://api.gandi.net/v5/livedns/domains/{domain}/records/{record}/A", data=payload, headers=headers) if response.status_code == 201: log(f"[OK][{domain}][{record}] Renewed with IP '{current_ip}'") else: log(f"[ERROR][{domain}][{record}][{response.status_code}] {response.json()}") def parseArgs(args: list[str]) -> argparse.Namespace: ''' Parse command-line arguments. `args` A list of user-supplied command line arguments. ''' parser = argparse.ArgumentParser( description='A program that renews A records via the Gandi API v5.') parser.add_argument('--config', type=str, default=None, help='Path to a config file.') parser.add_argument('--dry-run', action='store_true', help='Just perform a dry-run.') return parser.parse_args(args) def main(): # Parse user-supplied command-line arguments. args = parseArgs(os.sys.argv[1:]) # Read in the settings file. config = loadSettings(args.config) def logger(x: str) -> Any: return log(x, config['log_path'], args.dry_run) current_ip = getCurrentIP(config['resolvers'], logger) if current_ip is not None: records_to_renew = checkRecords(config['api'], current_ip, logger) # Do not perform an actual record change during a dry-run. if not args.dry_run: renewRecords(records_to_renew, current_ip, logger, config['ttl']) if __name__ == '__main__': main()