Compare commits

..

7 Commits

Author SHA1 Message Date
cfeb253cac
Documentation and fixed a bug with the log 2023-10-13 14:09:03 +02:00
23214ff78f
deleted unused file 2023-10-13 13:57:49 +02:00
a72ccd4b26
Better error message 2023-10-13 13:53:54 +02:00
f66c489fc6
changed gitignore 2023-10-13 13:51:12 +02:00
115aa579ba
changed binary name 2023-10-13 13:50:05 +02:00
7b4a7c6f6d
Log format 2023-10-13 13:39:16 +02:00
b56750c27e
Began refactoring
Changes
* Added documentation
* More types
* Argparse command-line interface
* A '--dry-run' option

Tests
* Passes mypy
* Build with nix
2023-10-05 14:46:06 +02:00
8 changed files with 148 additions and 139 deletions

15
.gitignore vendored
View File

@ -1,7 +1,10 @@
result /result
config.json
log.txt
__pycache__/ __pycache__/
.vscode/ /.vscode/
.direnv/ /.direnv/
.mypy_cache/ /.mypy_cache/
/config.json
/log.txt
/api.key

View File

@ -1,19 +1,23 @@
# Dyn-Gandi # Dyn-Gandi
A DNS record updater for [Gandi's LiveDNS](https://api.gandi.net/docs/livedns/) API. A DNS record updater for [Gandi's LiveDNS](https://api.gandi.net/docs/livedns/) API.
This script is heavily inspired by [dyn-gandi](https://github.com/Danamir/dyn-gandi). This script is heavily inspired by [dyn-gandi](https://github.com/Danamir/dyn-gandi).
## How it works ## How it works
This script determines the the current IP address by querying the resolvers defined in the config file. This script determines the the current IP address by querying the resolvers defined in the config file.
It then queries the subdomains' A records off of Gandi and compares their IP addresses to the current IP address. It then queries the subdomains' A records off of Gandi and compares their IP addresses to the current IP address.
Should the IP address of a subdomain's A record not match your current IP address it will be updated. The subdomain's A record will be created should it not already exist. Should the IP address of a subdomain's A record not match your current IP address it will be updated. The subdomain's A record will be created should it not already exist.
## Notes ## Notes
Every invocation of the script causes at least 1 request to a resolver specified and 1 API call to Gandi per domain. Every invocation of the script causes at least 1 request to a resolver specified and 1 API call to Gandi per domain.
Updating a subdomain's A record is 1 API request per subdomain, even if they share the same domain. Updating a subdomain's A record is 1 API request per subdomain, even if they share the same domain.
Resolvers are queried in the order specified until one returns a valid IP address. Resolvers are queried in the order specified until one returns a valid IP address.
It is also possible to define a path to a file with the API key written in it. This is good for environments where the config file has to be shared like in a nix project. It is also possible to define a path to a file with the API key written in it. This is good for environments where the config file has to be shared like in a nix project.
## How to use ## Usage
First, get your API key from https://account.gandi.net/en/users/USER/security where `USER` is your Gandi username. First, get your API key from https://account.gandi.net/en/users/USER/security where `USER` is your Gandi username.
The script looks for a config file at `$HOME/.config/dyn-gandi/config.log` or `/etc/dyn-gandi.conf` in that order. So create a file at one of these locations according to the schema below. The script looks for a config file at `$HOME/.config/dyn-gandi/config.log` or `/etc/dyn-gandi.conf` in that order. So create a file at one of these locations according to the schema below.
@ -40,9 +44,10 @@ The script looks for a config file at `$HOME/.config/dyn-gandi/config.log` or `/
``` ```
## Nix ## Nix
Add this to the modules.
```nix
Add this to the modules.
```nix
inputs = { inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05"; nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05";
dyn-gandi.url = "git+https://git.krsnik.at/Kristian/dyn-gandi"; dyn-gandi.url = "git+https://git.krsnik.at/Kristian/dyn-gandi";
@ -78,6 +83,7 @@ outputs = {
... ...
} }
``` ```
Use `dyn-gandi.nixosModules.default` for a NixOs module and `dyn-gandi.homeManagerModules.default` for home-manager Use `dyn-gandi.nixosModules.default` for a NixOs module and `dyn-gandi.homeManagerModules.default` for home-manager
`dyn-gandi.timer` specifies a timer in seconds when the script should be repeated. `dyn-gandi.timer` specifies a timer in seconds when the script should be repeated.

View File

@ -1,27 +1,48 @@
import requests
import json
import re
import os import os
import re
import json
import argparse
from datetime import datetime from datetime import datetime
from typing import Callable, Any from typing import Callable, Any
import requests
def loadSettings(path: str | None) -> dict: def loadSettings(path: str | None) -> dict:
'''
Load the settings file.
`path` Path to a settings file.
By default the program will look for `$HOME/.config/dyn-gandi/config.json` and then `/etc/dyn-gandi.json`.
'''
if path is None: if path is None:
path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json" path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json"
if not os.path.exists(path): if not os.path.exists(path):
path = '/etc/dyn-gandi.json' path = '/etc/dyn-gandi.json'
if not os.path.exists(path):
print(
f"[ERROR] No config file found at '~/.config/dyn-gandi/config.json' or '/etc/dyn-gandi.json'")
quit()
if not os.path.exists(path): if not os.path.exists(path):
print(f"[ERROR] '{path}' does not exist.")
quit() quit()
# TODO: check integrity of the config file # TODO check integrity of the config file
with open(path, 'r') as file: with open(path, 'r') as file:
# Read if the API keys are path to files # Load as dictionary
config = json.load(file) config = json.load(file)
# API Key entries
keys = config['api'].keys() keys = config['api'].keys()
# This will build up a copy of the API section with the API keys instead of filenames.
mask = dict() mask = dict()
# If the API key is a file the use the contents as a new key.
for key in keys: for key in keys:
api_key = key api_key = key
if os.path.exists(key): if os.path.exists(key):
@ -30,25 +51,49 @@ def loadSettings(path: str | None) -> dict:
mask[key] = api_key mask[key] = api_key
config['api'] = {mask[key]: value for key, config['api'] = {mask[key]: value for key,
value in config['api'].items()} value in config['api'].items()}
return config return config
def log(logline: str, path: str = './log.txt') -> None: def log(logline: str, path: str = './log.txt', stdout: bool = False) -> None:
# Appends logline at the end of the file file with a timestamp '''
with open(path, 'a') as file: Appends `logline` at the end of the file file with a timestamp.
file.write(
f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}\n") `logline` The line to log.
'path' Path to file where to log.
`stdout` Wether to print to stdout instead of a file.
'''
line = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}"
if stdout:
print(line)
else:
with open(path, 'a') as file:
file.write(line + '\n')
def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None: def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None:
# Go through resolvers until one retuns an IP '''
Go through resolvers until one returns an IP.
`resolvers` A list of IP resolvers.
`log` A function which takes a string and logs it.
'''
for resolver in resolvers: for resolver in resolvers:
response = requests.get(resolver) response = requests.get(resolver)
if response.status_code == 200:
if response.ok:
current_ip = response.text.strip() current_ip = response.text.strip()
# it suffices to check whether the search is not None since the regex is matches from beginning to end
# It suffices to check whether the search is not None since the regex matches from beginning to end.
is_ipv4 = re.search( is_ipv4 = re.search(
'^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None '^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None
if is_ipv4: if is_ipv4:
log(f"[OK][{resolver}] Current IP: '{current_ip}'") log(f"[OK][{resolver}] Current IP: '{current_ip}'")
return current_ip # Break if we have found our IP return current_ip # Break if we have found our IP
@ -56,37 +101,65 @@ def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None:
log(f"[ERROR][{resolver}] '{current_ip}' is not IPv4") log(f"[ERROR][{resolver}] '{current_ip}' is not IPv4")
else: else:
log(f"[ERROR][{resolver}][{response.status_code}] {response.text}") log(f"[ERROR][{resolver}][{response.status_code}] {response.text}")
return None return None
def checkRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any]) -> dict[str, dict[str, list[str]]]: def checkRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any]) -> dict[str, dict[str, list[str]]]:
# Returns the records that need to be renewed '''
# TODO: Also detect TTL change Return records that need to be renewed.
`api` The API section of the settings file.
`current_ip` The current IP address.
`log` A function that takes in a string and logs it.
'''
# TODO: Detect TTL change
records_to_renew = api records_to_renew = api
for key, domains in api.items(): for key, domains in api.items():
headers = {'authorization': f"Apikey {key}"} headers = {'authorization': f"Apikey {key}"}
for domain, _ in domains.items(): for domain, _ in domains.items():
response = requests.get( response = requests.get(
f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers) f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers)
if response.status_code == 200: if response.status_code == 200:
records_to_renew[key][domain] = list( records_to_renew[key][domain] = list(set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (
set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (filter(lambda x: current_ip in x['rrset_values'], response.json())))) filter(lambda x: current_ip in x['rrset_values'], response.json()))))
if (records := records_to_renew[key][domain]) != []:
log(f"[INFO][{','.join(records)}] Records need to be renewed")
else: else:
log(f"[ERROR][{domain}][{response.status_code}] {response.json()}") log(f"[ERROR][{domain}][{response.status_code}] {response.json()}")
return records_to_renew return records_to_renew
def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl) -> None: def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl: int = 300) -> None:
# Updates the records and reates them if they don't exist '''
Update records and create them, if they don't exist.
`api` The API section of the settings file.
`current_ip` The current IP address.
`log` A function that takes a string and logs it.
`ttl` How long a record should be cached.
'''
payload = json.dumps({ payload = json.dumps({
'rrset_values': [current_ip], 'rrset_values': [current_ip],
'rrset_ttl': ttl 'rrset_ttl': ttl
}) })
for key, domains in api.items(): for api_key, domains in api.items():
headers = { headers = {
'authorization': f"Apikey {key}", 'authorization': f"Apikey {api_key}",
'content-type': 'application/json', 'content-type': 'application/json',
} }
@ -100,36 +173,40 @@ def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Cal
log(f"[ERROR][{domain}][{record}][{response.status_code}] {response.json()}") log(f"[ERROR][{domain}][{record}][{response.status_code}] {response.json()}")
def parseArgv(argv: list[str]) -> dict[str, list[str]]: def parseArgs(args: list[str]) -> argparse.Namespace:
cli_options: dict = {} '''
buffer: str = '' Parse command-line arguments.
`args` A list of user-supplied command line arguments.
'''
parser = argparse.ArgumentParser(
description='A program that renews A records via the Gandi API v5.')
parser.add_argument('--config', type=str, default=None,
help='Path to a config file.')
parser.add_argument('--dry-run', action='store_true',
help='Just perform a dry-run.')
return parser.parse_args(args)
for arg in argv:
if arg.startswith('--'):
cli_options[arg.removeprefix('--')] = []
buffer = arg.removeprefix('--')
else:
cli_options[buffer].append(arg)
return cli_options
def main(): def main():
argv = os.sys.argv # Parse user-supplied command-line arguments.
options = {} args = parseArgs(os.sys.argv[1:])
if len(argv) > 1:
options = parseArgv(argv[1:])
config_path = None # Read in the settings file.
if 'config' in options: config = loadSettings(args.config)
config_path = options['config'][0]
settings = loadSettings(config_path) def logger(x: str) -> Any: return log(x, config['log_path'], args.dry_run)
log_path, ttl = settings['log_path'], settings['ttl']
def logger(x: str) -> Any: return log(x, log_path)
current_ip = getCurrentIP(settings['resolvers'], logger) current_ip = getCurrentIP(config['resolvers'], logger)
if current_ip is not None: if current_ip is not None:
records_to_renew = checkRecords(settings['api'], current_ip, logger) records_to_renew = checkRecords(config['api'], current_ip, logger)
renewRecords(records_to_renew, current_ip, logger, ttl=ttl)
# Do not perform an actual record change during a dry-run.
if not args.dry_run:
renewRecords(records_to_renew, current_ip, logger, config['ttl'])
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,5 +1,5 @@
{ {
description = "A very basic flake"; description = "A program to update an A record with the Gandi API v5.";
inputs = { inputs = {
nixpkgs.url = "github:nixos/nixpkgs/nixos-23.05"; nixpkgs.url = "github:nixos/nixpkgs/nixos-23.05";
@ -13,15 +13,15 @@
forAllSystems = nixpkgs.lib.genAttrs supportedSystems; forAllSystems = nixpkgs.lib.genAttrs supportedSystems;
pkgs = forAllSystems (system: nixpkgs.legacyPackages.${system}); pkgs = forAllSystems (system: nixpkgs.legacyPackages.${system});
in { in {
# Formatter # `nix fmt`
formatter = forAllSystems (system: nixpkgs.legacyPackages.${system}.alejandra); formatter = forAllSystems (system: nixpkgs.legacyPackages.${system}.alejandra);
# Packages # `nix build`
packages = forAllSystems (system: { packages = forAllSystems (system: {
default = pkgs.${system}.poetry2nix.mkPoetryApplication {projectDir = self;}; default = pkgs.${system}.poetry2nix.mkPoetryApplication {projectDir = self;};
}); });
# Dev Shell # `nix develop`
devShells = forAllSystems (system: { devShells = forAllSystems (system: {
default = pkgs.${system}.mkShellNoCC { default = pkgs.${system}.mkShellNoCC {
packages = with pkgs.${system}; [ packages = with pkgs.${system}; [
@ -33,16 +33,9 @@
}; };
}); });
# Run as apps # TODO Home Manager Module
apps = forAllSystems (system: { # homeManagerModules.default = import ./nix/hm-module.nix self;
default = {
program = "${self.packages.${system}.default}/bin/dyn_gandi";
type = "app";
};
});
# Home Manager Module
homeManagerModules.default = import ./nix/hm-module.nix self;
# NixOS Module # NixOS Module
nixosModules.default = import ./nix/module.nix inputs; nixosModules.default = import ./nix/module.nix inputs;
}; };

View File

@ -1,70 +0,0 @@
self: {
config,
lib,
pkgs,
...
}: let
cfg = config.dyn-gandi;
package = self.packages.${pkgs.stdenv.hostPlatform.system}.default;
inherit (lib) mkIf mkEnableOption mkOption types;
format = pkgs.formats.json {};
configFile = format.generate "config.json" cfg.settings;
in {
options.dyn-gandi = {
enable = mkEnableOption "dyn-gandi";
timer = lib.mkOption {
type = types.nullOr types.int;
default = null;
description = lib.mdDoc ''
The time intervall in seconds the script should be repeated.
'';
};
settings = mkOption {
type = with types; let
valueType = nullOr (oneOf [
# TODO: restrict type to actual config file structure
bool
int
float
str
path
(attrsOf valueType)
(listOf valueType)
]);
in
valueType;
default = throw "Please specify dyn-gandi.settings";
};
};
config = mkIf cfg.enable {
home.packages = [package]; # TODO: make this architecture independent
systemd.user.services.dyn-gandi = mkIf (cfg.timer
!= null) {
Unit = {
After = ["network.target"];
};
Install = {WantedBy = ["default.target"];};
Service = {
Type = "oneshot";
ExecStart = "${package}/bin/dyn_gandi --config ${configFile}";
};
};
systemd.user.timers.dyn-gandi = mkIf (cfg.timer
!= null) {
Install = {WantedBy = ["timers.target"];};
Timer = {
OnBootSec = "0s";
OnUnitActiveSec = "${toString cfg.timer}s";
Unit = "dyn-gandi.service";
};
};
};
}

View File

@ -46,7 +46,7 @@ in {
systemd.services.dyn-gandi = mkIf (cfg.timer systemd.services.dyn-gandi = mkIf (cfg.timer
!= null) { != null) {
script = "${package}/bin/dyn_gandi --config ${configFile}"; script = "${package}/bin/dyn-gandi --config ${configFile}";
serviceConfig = { serviceConfig = {
Type = "oneshot"; Type = "oneshot";

View File

@ -4,14 +4,14 @@ version = "0.1.0"
description = "A DNS updater" description = "A DNS updater"
authors = ["Kristian Krsnik <git@krsnik.at>"] authors = ["Kristian Krsnik <git@krsnik.at>"]
readme = "README.md" readme = "README.md"
packages = [{include = "dyn_gandi"}] packages = [{ include = "dyn_gandi" }]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.10" python = "^3.10"
requests = "2.29.0" requests = "2.29.0"
[tool.poetry.scripts] [tool.poetry.scripts]
dyn_gandi = "dyn_gandi.main:main" dyn-gandi = "dyn_gandi.main:main"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]

View File