From b56750c27e773b58253309a672f6556632252bd6 Mon Sep 17 00:00:00 2001 From: Kristian Krsnik Date: Thu, 5 Oct 2023 14:43:46 +0200 Subject: [PATCH] Began refactoring Changes * Added documentation * More types * Argparse command-line interface * A '--dry-run' option Tests * Passes mypy * Build with nix --- README.md | 12 +++- dyn_gandi/main.py | 160 +++++++++++++++++++++++++++++++++------------- flake.nix | 27 ++++---- tests/__init__.py | 0 4 files changed, 139 insertions(+), 60 deletions(-) delete mode 100644 tests/__init__.py diff --git a/README.md b/README.md index e337290..a31a574 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,23 @@ # Dyn-Gandi + A DNS record updater for [Gandi's LiveDNS](https://api.gandi.net/docs/livedns/) API. This script is heavily inspired by [dyn-gandi](https://github.com/Danamir/dyn-gandi). ## How it works + This script determines the the current IP address by querying the resolvers defined in the config file. It then queries the subdomains' A records off of Gandi and compares their IP addresses to the current IP address. Should the IP address of a subdomain's A record not match your current IP address it will be updated. The subdomain's A record will be created should it not already exist. ## Notes + Every invocation of the script causes at least 1 request to a resolver specified and 1 API call to Gandi per domain. Updating a subdomain's A record is 1 API request per subdomain, even if they share the same domain. Resolvers are queried in the order specified until one returns a valid IP address. It is also possible to define a path to a file with the API key written in it. This is good for environments where the config file has to be shared like in a nix project. -## How to use +## Usage + First, get your API key from https://account.gandi.net/en/users/USER/security where `USER` is your Gandi username. The script looks for a config file at `$HOME/.config/dyn-gandi/config.log` or `/etc/dyn-gandi.conf` in that order. So create a file at one of these locations according to the schema below. @@ -40,9 +44,10 @@ The script looks for a config file at `$HOME/.config/dyn-gandi/config.log` or `/ ``` ## Nix -Add this to the modules. -```nix +Add this to the modules. + +```nix inputs = { nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05"; dyn-gandi.url = "git+https://git.krsnik.at/Kristian/dyn-gandi"; @@ -78,6 +83,7 @@ outputs = { ... } ``` + Use `dyn-gandi.nixosModules.default` for a NixOs module and `dyn-gandi.homeManagerModules.default` for home-manager `dyn-gandi.timer` specifies a timer in seconds when the script should be repeated. diff --git a/dyn_gandi/main.py b/dyn_gandi/main.py index 67aadc1..91edc60 100644 --- a/dyn_gandi/main.py +++ b/dyn_gandi/main.py @@ -1,12 +1,21 @@ -import requests -import json -import re import os +import re +import json +import argparse from datetime import datetime from typing import Callable, Any +import requests + def loadSettings(path: str | None) -> dict: + ''' + Load the settings file. + + `path` Path to a settings file. + By default the program will look for `$HOME/dyn-gandi/config.json` and then `/etc/dyn-gandi.json`. + ''' + if path is None: path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json" @@ -14,14 +23,21 @@ def loadSettings(path: str | None) -> dict: path = '/etc/dyn-gandi.json' if not os.path.exists(path): + print(f"[ERROR] '{path}' does not exist.") quit() - # TODO: check integrity of the config file + # TODO check integrity of the config file with open(path, 'r') as file: - # Read if the API keys are path to files + # Load as dictionary config = json.load(file) + + # API Key entries keys = config['api'].keys() + + # This will build up a copy of the API section with the API keys instead of filenames. mask = dict() + + # If the API key is a file the use the contents as a new key. for key in keys: api_key = key if os.path.exists(key): @@ -30,25 +46,49 @@ def loadSettings(path: str | None) -> dict: mask[key] = api_key config['api'] = {mask[key]: value for key, value in config['api'].items()} + return config -def log(logline: str, path: str = './log.txt') -> None: - # Appends logline at the end of the file file with a timestamp - with open(path, 'a') as file: - file.write( - f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}\n") +def log(logline: str, path: str = './log.txt', stdout: bool = False) -> None: + ''' + Appends logline at the end of the file file with a timestamp + + `logline` The line to log. + + 'path' Path to file where to log. + + `stdout` Wether to print to stdout instead of a file. + ''' + + line = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}" + + if stdout: + print(line) + else: + with open(path, 'a') as file: + file.write(line + '\n') def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None: - # Go through resolvers until one retuns an IP + ''' + Go through resolvers until one returns an IP. + + `resolvers` A list of IP resolvers. + + `log` A function which takes a string and logs it. + ''' + for resolver in resolvers: response = requests.get(resolver) - if response.status_code == 200: + + if response.ok: current_ip = response.text.strip() - # it suffices to check whether the search is not None since the regex is matches from beginning to end + + # It suffices to check whether the search is not None since the regex matches from beginning to end. is_ipv4 = re.search( '^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None + if is_ipv4: log(f"[OK][{resolver}] Current IP: '{current_ip}'") return current_ip # Break if we have found our IP @@ -56,37 +96,65 @@ def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None: log(f"[ERROR][{resolver}] '{current_ip}' is not IPv4") else: log(f"[ERROR][{resolver}][{response.status_code}] {response.text}") + return None def checkRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any]) -> dict[str, dict[str, list[str]]]: - # Returns the records that need to be renewed - # TODO: Also detect TTL change + ''' + Return records that need to be renewed. + + `api` The API section of the settings file. + + `current_ip` The current IP address. + + `log` A function that takes in a string and logs it. + ''' + + # TODO: Detect TTL change + records_to_renew = api for key, domains in api.items(): headers = {'authorization': f"Apikey {key}"} + for domain, _ in domains.items(): response = requests.get( f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers) + if response.status_code == 200: - records_to_renew[key][domain] = list( - set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (filter(lambda x: current_ip in x['rrset_values'], response.json())))) + records_to_renew[key][domain] = list(set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in ( + filter(lambda x: current_ip in x['rrset_values'], response.json())))) + + if records_to_renew[key][domain] != []: + log(f"[INFO][{' '.join(records_to_renew)}]Record that need to be renewed") + else: log(f"[ERROR][{domain}][{response.status_code}] {response.json()}") return records_to_renew -def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl) -> None: - # Updates the records and reates them if they don't exist +def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl: int = 300) -> None: + ''' + Update records and create them, if they don't exist. + + `api` The API section of the settings file. + + `current_ip` The current IP address. + + `log` A function that takes a string and logs it. + + `ttl` How long a record should be cached. + ''' + payload = json.dumps({ 'rrset_values': [current_ip], 'rrset_ttl': ttl }) - for key, domains in api.items(): + for api_key, domains in api.items(): headers = { - 'authorization': f"Apikey {key}", + 'authorization': f"Apikey {api_key}", 'content-type': 'application/json', } @@ -100,36 +168,40 @@ def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Cal log(f"[ERROR][{domain}][{record}][{response.status_code}] {response.json()}") -def parseArgv(argv: list[str]) -> dict[str, list[str]]: - cli_options: dict = {} - buffer: str = '' +def parseArgs(args: list[str]) -> argparse.Namespace: + ''' + Parse command-line arguments. + + `args` A list of user-supplied command line arguments. + ''' + + parser = argparse.ArgumentParser( + description='A program that renews A records via the Gandi API v5.') + + parser.add_argument('--config', type=str, default=None, + help='Path to a config file.') + parser.add_argument('--dry-run', action='store_true', + help='Just perform a dry-run.') + + return parser.parse_args(args) - for arg in argv: - if arg.startswith('--'): - cli_options[arg.removeprefix('--')] = [] - buffer = arg.removeprefix('--') - else: - cli_options[buffer].append(arg) - return cli_options def main(): - argv = os.sys.argv - options = {} - if len(argv) > 1: - options = parseArgv(argv[1:]) + # Parse user-supplied command-line arguments. + args = parseArgs(os.sys.argv[1:]) - config_path = None - if 'config' in options: - config_path = options['config'][0] + # Read in the settings file. + config = loadSettings(args.config) - settings = loadSettings(config_path) - log_path, ttl = settings['log_path'], settings['ttl'] - def logger(x: str) -> Any: return log(x, log_path) + def logger(x: str) -> Any: return log(x, config['log_path'], args.dry_run) - current_ip = getCurrentIP(settings['resolvers'], logger) + current_ip = getCurrentIP(config['resolvers'], logger) if current_ip is not None: - records_to_renew = checkRecords(settings['api'], current_ip, logger) - renewRecords(records_to_renew, current_ip, logger, ttl=ttl) + records_to_renew = checkRecords(config['api'], current_ip, logger) + + # Do not perform an actual record change during a dry-run. + if not args.dry_run: + renewRecords(records_to_renew, current_ip, logger, config['ttl']) if __name__ == '__main__': diff --git a/flake.nix b/flake.nix index 40eadf6..572c766 100644 --- a/flake.nix +++ b/flake.nix @@ -1,5 +1,5 @@ { - description = "A very basic flake"; + description = "A program to update an A record with the Gandi API v5."; inputs = { nixpkgs.url = "github:nixos/nixpkgs/nixos-23.05"; @@ -13,15 +13,23 @@ forAllSystems = nixpkgs.lib.genAttrs supportedSystems; pkgs = forAllSystems (system: nixpkgs.legacyPackages.${system}); in { - # Formatter + # `nix fmt` formatter = forAllSystems (system: nixpkgs.legacyPackages.${system}.alejandra); - # Packages + # `nix build` packages = forAllSystems (system: { default = pkgs.${system}.poetry2nix.mkPoetryApplication {projectDir = self;}; }); - # Dev Shell + # `nix run` + apps = forAllSystems (system: { + default = { + program = "${self.packages.${system}.default}/bin/dyn_gandi"; + type = "app"; + }; + }); + + # `nix develop` devShells = forAllSystems (system: { default = pkgs.${system}.mkShellNoCC { packages = with pkgs.${system}; [ @@ -33,16 +41,9 @@ }; }); - # Run as apps - apps = forAllSystems (system: { - default = { - program = "${self.packages.${system}.default}/bin/dyn_gandi"; - type = "app"; - }; - }); + # TODO Home Manager Module + # homeManagerModules.default = import ./nix/hm-module.nix self; - # Home Manager Module - homeManagerModules.default = import ./nix/hm-module.nix self; # NixOS Module nixosModules.default = import ./nix/module.nix inputs; }; diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000