Began refactoring

Changes
* Added documentation
* More types
* Argparse command-line interface
* A '--dry-run' option

Tests
* Passes mypy
* Build with nix
This commit is contained in:
Kristian Krsnik 2023-10-05 14:43:46 +02:00
parent ea99231f03
commit b56750c27e
Signed by: Kristian
GPG Key ID: FD1330AC9F909E85
4 changed files with 139 additions and 60 deletions

View File

@ -1,19 +1,23 @@
# Dyn-Gandi # Dyn-Gandi
A DNS record updater for [Gandi's LiveDNS](https://api.gandi.net/docs/livedns/) API. A DNS record updater for [Gandi's LiveDNS](https://api.gandi.net/docs/livedns/) API.
This script is heavily inspired by [dyn-gandi](https://github.com/Danamir/dyn-gandi). This script is heavily inspired by [dyn-gandi](https://github.com/Danamir/dyn-gandi).
## How it works ## How it works
This script determines the the current IP address by querying the resolvers defined in the config file. This script determines the the current IP address by querying the resolvers defined in the config file.
It then queries the subdomains' A records off of Gandi and compares their IP addresses to the current IP address. It then queries the subdomains' A records off of Gandi and compares their IP addresses to the current IP address.
Should the IP address of a subdomain's A record not match your current IP address it will be updated. The subdomain's A record will be created should it not already exist. Should the IP address of a subdomain's A record not match your current IP address it will be updated. The subdomain's A record will be created should it not already exist.
## Notes ## Notes
Every invocation of the script causes at least 1 request to a resolver specified and 1 API call to Gandi per domain. Every invocation of the script causes at least 1 request to a resolver specified and 1 API call to Gandi per domain.
Updating a subdomain's A record is 1 API request per subdomain, even if they share the same domain. Updating a subdomain's A record is 1 API request per subdomain, even if they share the same domain.
Resolvers are queried in the order specified until one returns a valid IP address. Resolvers are queried in the order specified until one returns a valid IP address.
It is also possible to define a path to a file with the API key written in it. This is good for environments where the config file has to be shared like in a nix project. It is also possible to define a path to a file with the API key written in it. This is good for environments where the config file has to be shared like in a nix project.
## How to use ## Usage
First, get your API key from https://account.gandi.net/en/users/USER/security where `USER` is your Gandi username. First, get your API key from https://account.gandi.net/en/users/USER/security where `USER` is your Gandi username.
The script looks for a config file at `$HOME/.config/dyn-gandi/config.log` or `/etc/dyn-gandi.conf` in that order. So create a file at one of these locations according to the schema below. The script looks for a config file at `$HOME/.config/dyn-gandi/config.log` or `/etc/dyn-gandi.conf` in that order. So create a file at one of these locations according to the schema below.
@ -40,9 +44,10 @@ The script looks for a config file at `$HOME/.config/dyn-gandi/config.log` or `/
``` ```
## Nix ## Nix
Add this to the modules.
```nix
Add this to the modules.
```nix
inputs = { inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05"; nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05";
dyn-gandi.url = "git+https://git.krsnik.at/Kristian/dyn-gandi"; dyn-gandi.url = "git+https://git.krsnik.at/Kristian/dyn-gandi";
@ -78,6 +83,7 @@ outputs = {
... ...
} }
``` ```
Use `dyn-gandi.nixosModules.default` for a NixOs module and `dyn-gandi.homeManagerModules.default` for home-manager Use `dyn-gandi.nixosModules.default` for a NixOs module and `dyn-gandi.homeManagerModules.default` for home-manager
`dyn-gandi.timer` specifies a timer in seconds when the script should be repeated. `dyn-gandi.timer` specifies a timer in seconds when the script should be repeated.

View File

@ -1,12 +1,21 @@
import requests
import json
import re
import os import os
import re
import json
import argparse
from datetime import datetime from datetime import datetime
from typing import Callable, Any from typing import Callable, Any
import requests
def loadSettings(path: str | None) -> dict: def loadSettings(path: str | None) -> dict:
'''
Load the settings file.
`path` Path to a settings file.
By default the program will look for `$HOME/dyn-gandi/config.json` and then `/etc/dyn-gandi.json`.
'''
if path is None: if path is None:
path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json" path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json"
@ -14,14 +23,21 @@ def loadSettings(path: str | None) -> dict:
path = '/etc/dyn-gandi.json' path = '/etc/dyn-gandi.json'
if not os.path.exists(path): if not os.path.exists(path):
print(f"[ERROR] '{path}' does not exist.")
quit() quit()
# TODO: check integrity of the config file # TODO check integrity of the config file
with open(path, 'r') as file: with open(path, 'r') as file:
# Read if the API keys are path to files # Load as dictionary
config = json.load(file) config = json.load(file)
# API Key entries
keys = config['api'].keys() keys = config['api'].keys()
# This will build up a copy of the API section with the API keys instead of filenames.
mask = dict() mask = dict()
# If the API key is a file the use the contents as a new key.
for key in keys: for key in keys:
api_key = key api_key = key
if os.path.exists(key): if os.path.exists(key):
@ -30,25 +46,49 @@ def loadSettings(path: str | None) -> dict:
mask[key] = api_key mask[key] = api_key
config['api'] = {mask[key]: value for key, config['api'] = {mask[key]: value for key,
value in config['api'].items()} value in config['api'].items()}
return config return config
def log(logline: str, path: str = './log.txt') -> None: def log(logline: str, path: str = './log.txt', stdout: bool = False) -> None:
# Appends logline at the end of the file file with a timestamp '''
with open(path, 'a') as file: Appends logline at the end of the file file with a timestamp
file.write(
f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}\n") `logline` The line to log.
'path' Path to file where to log.
`stdout` Wether to print to stdout instead of a file.
'''
line = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}"
if stdout:
print(line)
else:
with open(path, 'a') as file:
file.write(line + '\n')
def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None: def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None:
# Go through resolvers until one retuns an IP '''
Go through resolvers until one returns an IP.
`resolvers` A list of IP resolvers.
`log` A function which takes a string and logs it.
'''
for resolver in resolvers: for resolver in resolvers:
response = requests.get(resolver) response = requests.get(resolver)
if response.status_code == 200:
if response.ok:
current_ip = response.text.strip() current_ip = response.text.strip()
# it suffices to check whether the search is not None since the regex is matches from beginning to end
# It suffices to check whether the search is not None since the regex matches from beginning to end.
is_ipv4 = re.search( is_ipv4 = re.search(
'^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None '^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None
if is_ipv4: if is_ipv4:
log(f"[OK][{resolver}] Current IP: '{current_ip}'") log(f"[OK][{resolver}] Current IP: '{current_ip}'")
return current_ip # Break if we have found our IP return current_ip # Break if we have found our IP
@ -56,37 +96,65 @@ def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None:
log(f"[ERROR][{resolver}] '{current_ip}' is not IPv4") log(f"[ERROR][{resolver}] '{current_ip}' is not IPv4")
else: else:
log(f"[ERROR][{resolver}][{response.status_code}] {response.text}") log(f"[ERROR][{resolver}][{response.status_code}] {response.text}")
return None return None
def checkRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any]) -> dict[str, dict[str, list[str]]]: def checkRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any]) -> dict[str, dict[str, list[str]]]:
# Returns the records that need to be renewed '''
# TODO: Also detect TTL change Return records that need to be renewed.
`api` The API section of the settings file.
`current_ip` The current IP address.
`log` A function that takes in a string and logs it.
'''
# TODO: Detect TTL change
records_to_renew = api records_to_renew = api
for key, domains in api.items(): for key, domains in api.items():
headers = {'authorization': f"Apikey {key}"} headers = {'authorization': f"Apikey {key}"}
for domain, _ in domains.items(): for domain, _ in domains.items():
response = requests.get( response = requests.get(
f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers) f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers)
if response.status_code == 200: if response.status_code == 200:
records_to_renew[key][domain] = list( records_to_renew[key][domain] = list(set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (
set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (filter(lambda x: current_ip in x['rrset_values'], response.json())))) filter(lambda x: current_ip in x['rrset_values'], response.json()))))
if records_to_renew[key][domain] != []:
log(f"[INFO][{' '.join(records_to_renew)}]Record that need to be renewed")
else: else:
log(f"[ERROR][{domain}][{response.status_code}] {response.json()}") log(f"[ERROR][{domain}][{response.status_code}] {response.json()}")
return records_to_renew return records_to_renew
def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl) -> None: def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl: int = 300) -> None:
# Updates the records and reates them if they don't exist '''
Update records and create them, if they don't exist.
`api` The API section of the settings file.
`current_ip` The current IP address.
`log` A function that takes a string and logs it.
`ttl` How long a record should be cached.
'''
payload = json.dumps({ payload = json.dumps({
'rrset_values': [current_ip], 'rrset_values': [current_ip],
'rrset_ttl': ttl 'rrset_ttl': ttl
}) })
for key, domains in api.items(): for api_key, domains in api.items():
headers = { headers = {
'authorization': f"Apikey {key}", 'authorization': f"Apikey {api_key}",
'content-type': 'application/json', 'content-type': 'application/json',
} }
@ -100,36 +168,40 @@ def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Cal
log(f"[ERROR][{domain}][{record}][{response.status_code}] {response.json()}") log(f"[ERROR][{domain}][{record}][{response.status_code}] {response.json()}")
def parseArgv(argv: list[str]) -> dict[str, list[str]]: def parseArgs(args: list[str]) -> argparse.Namespace:
cli_options: dict = {} '''
buffer: str = '' Parse command-line arguments.
`args` A list of user-supplied command line arguments.
'''
parser = argparse.ArgumentParser(
description='A program that renews A records via the Gandi API v5.')
parser.add_argument('--config', type=str, default=None,
help='Path to a config file.')
parser.add_argument('--dry-run', action='store_true',
help='Just perform a dry-run.')
return parser.parse_args(args)
for arg in argv:
if arg.startswith('--'):
cli_options[arg.removeprefix('--')] = []
buffer = arg.removeprefix('--')
else:
cli_options[buffer].append(arg)
return cli_options
def main(): def main():
argv = os.sys.argv # Parse user-supplied command-line arguments.
options = {} args = parseArgs(os.sys.argv[1:])
if len(argv) > 1:
options = parseArgv(argv[1:])
config_path = None # Read in the settings file.
if 'config' in options: config = loadSettings(args.config)
config_path = options['config'][0]
settings = loadSettings(config_path) def logger(x: str) -> Any: return log(x, config['log_path'], args.dry_run)
log_path, ttl = settings['log_path'], settings['ttl']
def logger(x: str) -> Any: return log(x, log_path)
current_ip = getCurrentIP(settings['resolvers'], logger) current_ip = getCurrentIP(config['resolvers'], logger)
if current_ip is not None: if current_ip is not None:
records_to_renew = checkRecords(settings['api'], current_ip, logger) records_to_renew = checkRecords(config['api'], current_ip, logger)
renewRecords(records_to_renew, current_ip, logger, ttl=ttl)
# Do not perform an actual record change during a dry-run.
if not args.dry_run:
renewRecords(records_to_renew, current_ip, logger, config['ttl'])
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,5 +1,5 @@
{ {
description = "A very basic flake"; description = "A program to update an A record with the Gandi API v5.";
inputs = { inputs = {
nixpkgs.url = "github:nixos/nixpkgs/nixos-23.05"; nixpkgs.url = "github:nixos/nixpkgs/nixos-23.05";
@ -13,15 +13,23 @@
forAllSystems = nixpkgs.lib.genAttrs supportedSystems; forAllSystems = nixpkgs.lib.genAttrs supportedSystems;
pkgs = forAllSystems (system: nixpkgs.legacyPackages.${system}); pkgs = forAllSystems (system: nixpkgs.legacyPackages.${system});
in { in {
# Formatter # `nix fmt`
formatter = forAllSystems (system: nixpkgs.legacyPackages.${system}.alejandra); formatter = forAllSystems (system: nixpkgs.legacyPackages.${system}.alejandra);
# Packages # `nix build`
packages = forAllSystems (system: { packages = forAllSystems (system: {
default = pkgs.${system}.poetry2nix.mkPoetryApplication {projectDir = self;}; default = pkgs.${system}.poetry2nix.mkPoetryApplication {projectDir = self;};
}); });
# Dev Shell # `nix run`
apps = forAllSystems (system: {
default = {
program = "${self.packages.${system}.default}/bin/dyn_gandi";
type = "app";
};
});
# `nix develop`
devShells = forAllSystems (system: { devShells = forAllSystems (system: {
default = pkgs.${system}.mkShellNoCC { default = pkgs.${system}.mkShellNoCC {
packages = with pkgs.${system}; [ packages = with pkgs.${system}; [
@ -33,16 +41,9 @@
}; };
}); });
# Run as apps # TODO Home Manager Module
apps = forAllSystems (system: { # homeManagerModules.default = import ./nix/hm-module.nix self;
default = {
program = "${self.packages.${system}.default}/bin/dyn_gandi";
type = "app";
};
});
# Home Manager Module
homeManagerModules.default = import ./nix/hm-module.nix self;
# NixOS Module # NixOS Module
nixosModules.default = import ./nix/module.nix inputs; nixosModules.default = import ./nix/module.nix inputs;
}; };

View File