From b5ab25940a6946e96d01e7a149400a472916fc79 Mon Sep 17 00:00:00 2001 From: Kristian Krsnik Date: Sun, 13 Aug 2023 13:58:30 +0200 Subject: [PATCH] finalized code and added typing --- dyn_gandi/main.py | 69 ++++++++++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/dyn_gandi/main.py b/dyn_gandi/main.py index 664dcf7..eb6f7c9 100644 --- a/dyn_gandi/main.py +++ b/dyn_gandi/main.py @@ -3,27 +3,33 @@ import json import re import os from datetime import datetime +from typing import Callable, Any -def loadSettings(path: str = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json") -> dict: - # TODO: check integrity of the config file - try: - with open(path, 'r') as file: - # Read if the API keys are path to files - config = json.load(file) - keys = config['api'].keys() - mask = dict() - for key in keys: - api_key = key - if os.path.exists(key): - with open(key, 'r') as file: - api_key = file.readline().strip() - mask[key] = api_key - config['api'] = {mask[key]: value for key, - value in config['api'].items()} - except FileNotFoundError: +def loadSettings() -> dict: + path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json" + + if not os.path.exists(path): + path = '/etc/dyn-gandi.json' + + if not os.path.exists(path): quit() - # TODO log error and remove code below (do not create an unwanted config) + + # TODO: check integrity of the config file + with open(path, 'r') as file: + # Read if the API keys are path to files + config = json.load(file) + keys = config['api'].keys() + mask = dict() + for key in keys: + api_key = key + if os.path.exists(key): + with open(key, 'r') as file: + api_key = file.readline().strip() + mask[key] = api_key + config['api'] = {mask[key]: value for key, + value in config['api'].items()} + return config def log(logline: str, path: str = './log.txt') -> None: @@ -33,12 +39,13 @@ def log(logline: str, path: str = './log.txt') -> None: f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}\n") -def getCurrentIP(resolvers: list[str], log: callable) -> None | str: +def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None: # Go through resolvers until one retuns an IP for resolver in resolvers: response = requests.get(resolver) if response.status_code == 200: current_ip = response.text.strip() + # it suffices to check whether the search is not None since the regex is matches from beginning to end is_ipv4 = re.search( '^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None if is_ipv4: @@ -51,25 +58,25 @@ def getCurrentIP(resolvers: list[str], log: callable) -> None | str: return None -def checkRecords(api: dict, current_ip: str, log: callable) -> dict: +def checkRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any]) -> dict[str, dict[str, list[str]]]: # Returns the records that need to be renewed # TODO: Also detect TTL change records_to_renew = api for key, domains in api.items(): headers = {'authorization': f"Apikey {key}"} - for domain, records in domains.items(): + for domain, _ in domains.items(): response = requests.get( f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers) if response.status_code == 200: - records_to_renew[key][domain] = list(set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in ( - filter(lambda x: current_ip in x['rrset_values'], json.loads(response.json()))))) + records_to_renew[key][domain] = list( + set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (filter(lambda x: current_ip in x['rrset_values'], response.json())))) else: log(f"[ERROR][{domain}][{response.status_code}] {response.json()}") return records_to_renew -def renewRecords(records, current_ip, log, ttl=3600): +def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl: int = 3600) -> None: # Updates the records and reates them if they don't exist payload = json.dumps({ 'items': [{ @@ -79,7 +86,7 @@ def renewRecords(records, current_ip, log, ttl=3600): }] }) - for key, domains in records.items(): + for key, domains in api.items(): headers = { 'authorization': f"Apikey {key}", 'content-type': 'application/json', @@ -97,13 +104,13 @@ def renewRecords(records, current_ip, log, ttl=3600): def main(): settings = loadSettings() - log_path = settings['log_path'] - ttl = settings['ttl'] - def myLog(x): return log(x, log_path) + log_path, ttl = settings['log_path'], settings['ttl'] + def logger(x: str) -> Any: return log(x, log_path) - current_ip = getCurrentIP(settings['resolvers'], myLog) - records_to_renew = checkRecords(settings['api'], current_ip, myLog) - renewRecords(records_to_renew, current_ip, myLog, ttl=ttl) + current_ip = getCurrentIP(settings['resolvers'], logger) + if current_ip is not None: + records_to_renew = checkRecords(settings['api'], current_ip, logger) + renewRecords(records_to_renew, current_ip, logger, ttl=ttl) if __name__ == '__main__':