Compare commits

...

7 Commits

Author SHA1 Message Date
cfeb253cac
Documentation and fixed a bug with the log 2023-10-13 14:09:03 +02:00
23214ff78f
deleted unused file 2023-10-13 13:57:49 +02:00
a72ccd4b26
Better error message 2023-10-13 13:53:54 +02:00
f66c489fc6
changed gitignore 2023-10-13 13:51:12 +02:00
115aa579ba
changed binary name 2023-10-13 13:50:05 +02:00
7b4a7c6f6d
Log format 2023-10-13 13:39:16 +02:00
b56750c27e
Began refactoring
Changes
* Added documentation
* More types
* Argparse command-line interface
* A '--dry-run' option

Tests
* Passes mypy
* Build with nix
2023-10-05 14:46:06 +02:00
8 changed files with 148 additions and 139 deletions

15
.gitignore vendored
View File

@ -1,7 +1,10 @@
result
config.json
log.txt
/result
__pycache__/
.vscode/
.direnv/
.mypy_cache/
/.vscode/
/.direnv/
/.mypy_cache/
/config.json
/log.txt
/api.key

View File

@ -1,19 +1,23 @@
# Dyn-Gandi
A DNS record updater for [Gandi's LiveDNS](https://api.gandi.net/docs/livedns/) API.
This script is heavily inspired by [dyn-gandi](https://github.com/Danamir/dyn-gandi).
## How it works
This script determines the the current IP address by querying the resolvers defined in the config file.
It then queries the subdomains' A records off of Gandi and compares their IP addresses to the current IP address.
Should the IP address of a subdomain's A record not match your current IP address it will be updated. The subdomain's A record will be created should it not already exist.
## Notes
Every invocation of the script causes at least 1 request to a resolver specified and 1 API call to Gandi per domain.
Updating a subdomain's A record is 1 API request per subdomain, even if they share the same domain.
Resolvers are queried in the order specified until one returns a valid IP address.
It is also possible to define a path to a file with the API key written in it. This is good for environments where the config file has to be shared like in a nix project.
## How to use
## Usage
First, get your API key from https://account.gandi.net/en/users/USER/security where `USER` is your Gandi username.
The script looks for a config file at `$HOME/.config/dyn-gandi/config.log` or `/etc/dyn-gandi.conf` in that order. So create a file at one of these locations according to the schema below.
@ -40,9 +44,10 @@ The script looks for a config file at `$HOME/.config/dyn-gandi/config.log` or `/
```
## Nix
Add this to the modules.
```nix
Add this to the modules.
```nix
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05";
dyn-gandi.url = "git+https://git.krsnik.at/Kristian/dyn-gandi";
@ -78,6 +83,7 @@ outputs = {
...
}
```
Use `dyn-gandi.nixosModules.default` for a NixOs module and `dyn-gandi.homeManagerModules.default` for home-manager
`dyn-gandi.timer` specifies a timer in seconds when the script should be repeated.

View File

@ -1,12 +1,21 @@
import requests
import json
import re
import os
import re
import json
import argparse
from datetime import datetime
from typing import Callable, Any
import requests
def loadSettings(path: str | None) -> dict:
'''
Load the settings file.
`path` Path to a settings file.
By default the program will look for `$HOME/.config/dyn-gandi/config.json` and then `/etc/dyn-gandi.json`.
'''
if path is None:
path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json"
@ -14,14 +23,26 @@ def loadSettings(path: str | None) -> dict:
path = '/etc/dyn-gandi.json'
if not os.path.exists(path):
print(
f"[ERROR] No config file found at '~/.config/dyn-gandi/config.json' or '/etc/dyn-gandi.json'")
quit()
# TODO: check integrity of the config file
if not os.path.exists(path):
print(f"[ERROR] '{path}' does not exist.")
quit()
# TODO check integrity of the config file
with open(path, 'r') as file:
# Read if the API keys are path to files
# Load as dictionary
config = json.load(file)
# API Key entries
keys = config['api'].keys()
# This will build up a copy of the API section with the API keys instead of filenames.
mask = dict()
# If the API key is a file the use the contents as a new key.
for key in keys:
api_key = key
if os.path.exists(key):
@ -30,25 +51,49 @@ def loadSettings(path: str | None) -> dict:
mask[key] = api_key
config['api'] = {mask[key]: value for key,
value in config['api'].items()}
return config
def log(logline: str, path: str = './log.txt') -> None:
# Appends logline at the end of the file file with a timestamp
def log(logline: str, path: str = './log.txt', stdout: bool = False) -> None:
'''
Appends `logline` at the end of the file file with a timestamp.
`logline` The line to log.
'path' Path to file where to log.
`stdout` Wether to print to stdout instead of a file.
'''
line = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}"
if stdout:
print(line)
else:
with open(path, 'a') as file:
file.write(
f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}\n")
file.write(line + '\n')
def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None:
# Go through resolvers until one retuns an IP
'''
Go through resolvers until one returns an IP.
`resolvers` A list of IP resolvers.
`log` A function which takes a string and logs it.
'''
for resolver in resolvers:
response = requests.get(resolver)
if response.status_code == 200:
if response.ok:
current_ip = response.text.strip()
# it suffices to check whether the search is not None since the regex is matches from beginning to end
# It suffices to check whether the search is not None since the regex matches from beginning to end.
is_ipv4 = re.search(
'^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None
if is_ipv4:
log(f"[OK][{resolver}] Current IP: '{current_ip}'")
return current_ip # Break if we have found our IP
@ -56,37 +101,65 @@ def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None:
log(f"[ERROR][{resolver}] '{current_ip}' is not IPv4")
else:
log(f"[ERROR][{resolver}][{response.status_code}] {response.text}")
return None
def checkRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any]) -> dict[str, dict[str, list[str]]]:
# Returns the records that need to be renewed
# TODO: Also detect TTL change
'''
Return records that need to be renewed.
`api` The API section of the settings file.
`current_ip` The current IP address.
`log` A function that takes in a string and logs it.
'''
# TODO: Detect TTL change
records_to_renew = api
for key, domains in api.items():
headers = {'authorization': f"Apikey {key}"}
for domain, _ in domains.items():
response = requests.get(
f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers)
if response.status_code == 200:
records_to_renew[key][domain] = list(
set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (filter(lambda x: current_ip in x['rrset_values'], response.json()))))
records_to_renew[key][domain] = list(set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (
filter(lambda x: current_ip in x['rrset_values'], response.json()))))
if (records := records_to_renew[key][domain]) != []:
log(f"[INFO][{','.join(records)}] Records need to be renewed")
else:
log(f"[ERROR][{domain}][{response.status_code}] {response.json()}")
return records_to_renew
def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl) -> None:
# Updates the records and reates them if they don't exist
def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl: int = 300) -> None:
'''
Update records and create them, if they don't exist.
`api` The API section of the settings file.
`current_ip` The current IP address.
`log` A function that takes a string and logs it.
`ttl` How long a record should be cached.
'''
payload = json.dumps({
'rrset_values': [current_ip],
'rrset_ttl': ttl
})
for key, domains in api.items():
for api_key, domains in api.items():
headers = {
'authorization': f"Apikey {key}",
'authorization': f"Apikey {api_key}",
'content-type': 'application/json',
}
@ -100,36 +173,40 @@ def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Cal
log(f"[ERROR][{domain}][{record}][{response.status_code}] {response.json()}")
def parseArgv(argv: list[str]) -> dict[str, list[str]]:
cli_options: dict = {}
buffer: str = ''
def parseArgs(args: list[str]) -> argparse.Namespace:
'''
Parse command-line arguments.
`args` A list of user-supplied command line arguments.
'''
parser = argparse.ArgumentParser(
description='A program that renews A records via the Gandi API v5.')
parser.add_argument('--config', type=str, default=None,
help='Path to a config file.')
parser.add_argument('--dry-run', action='store_true',
help='Just perform a dry-run.')
return parser.parse_args(args)
for arg in argv:
if arg.startswith('--'):
cli_options[arg.removeprefix('--')] = []
buffer = arg.removeprefix('--')
else:
cli_options[buffer].append(arg)
return cli_options
def main():
argv = os.sys.argv
options = {}
if len(argv) > 1:
options = parseArgv(argv[1:])
# Parse user-supplied command-line arguments.
args = parseArgs(os.sys.argv[1:])
config_path = None
if 'config' in options:
config_path = options['config'][0]
# Read in the settings file.
config = loadSettings(args.config)
settings = loadSettings(config_path)
log_path, ttl = settings['log_path'], settings['ttl']
def logger(x: str) -> Any: return log(x, log_path)
def logger(x: str) -> Any: return log(x, config['log_path'], args.dry_run)
current_ip = getCurrentIP(settings['resolvers'], logger)
current_ip = getCurrentIP(config['resolvers'], logger)
if current_ip is not None:
records_to_renew = checkRecords(settings['api'], current_ip, logger)
renewRecords(records_to_renew, current_ip, logger, ttl=ttl)
records_to_renew = checkRecords(config['api'], current_ip, logger)
# Do not perform an actual record change during a dry-run.
if not args.dry_run:
renewRecords(records_to_renew, current_ip, logger, config['ttl'])
if __name__ == '__main__':

View File

@ -1,5 +1,5 @@
{
description = "A very basic flake";
description = "A program to update an A record with the Gandi API v5.";
inputs = {
nixpkgs.url = "github:nixos/nixpkgs/nixos-23.05";
@ -13,15 +13,15 @@
forAllSystems = nixpkgs.lib.genAttrs supportedSystems;
pkgs = forAllSystems (system: nixpkgs.legacyPackages.${system});
in {
# Formatter
# `nix fmt`
formatter = forAllSystems (system: nixpkgs.legacyPackages.${system}.alejandra);
# Packages
# `nix build`
packages = forAllSystems (system: {
default = pkgs.${system}.poetry2nix.mkPoetryApplication {projectDir = self;};
});
# Dev Shell
# `nix develop`
devShells = forAllSystems (system: {
default = pkgs.${system}.mkShellNoCC {
packages = with pkgs.${system}; [
@ -33,16 +33,9 @@
};
});
# Run as apps
apps = forAllSystems (system: {
default = {
program = "${self.packages.${system}.default}/bin/dyn_gandi";
type = "app";
};
});
# TODO Home Manager Module
# homeManagerModules.default = import ./nix/hm-module.nix self;
# Home Manager Module
homeManagerModules.default = import ./nix/hm-module.nix self;
# NixOS Module
nixosModules.default = import ./nix/module.nix inputs;
};

View File

@ -1,70 +0,0 @@
self: {
config,
lib,
pkgs,
...
}: let
cfg = config.dyn-gandi;
package = self.packages.${pkgs.stdenv.hostPlatform.system}.default;
inherit (lib) mkIf mkEnableOption mkOption types;
format = pkgs.formats.json {};
configFile = format.generate "config.json" cfg.settings;
in {
options.dyn-gandi = {
enable = mkEnableOption "dyn-gandi";
timer = lib.mkOption {
type = types.nullOr types.int;
default = null;
description = lib.mdDoc ''
The time intervall in seconds the script should be repeated.
'';
};
settings = mkOption {
type = with types; let
valueType = nullOr (oneOf [
# TODO: restrict type to actual config file structure
bool
int
float
str
path
(attrsOf valueType)
(listOf valueType)
]);
in
valueType;
default = throw "Please specify dyn-gandi.settings";
};
};
config = mkIf cfg.enable {
home.packages = [package]; # TODO: make this architecture independent
systemd.user.services.dyn-gandi = mkIf (cfg.timer
!= null) {
Unit = {
After = ["network.target"];
};
Install = {WantedBy = ["default.target"];};
Service = {
Type = "oneshot";
ExecStart = "${package}/bin/dyn_gandi --config ${configFile}";
};
};
systemd.user.timers.dyn-gandi = mkIf (cfg.timer
!= null) {
Install = {WantedBy = ["timers.target"];};
Timer = {
OnBootSec = "0s";
OnUnitActiveSec = "${toString cfg.timer}s";
Unit = "dyn-gandi.service";
};
};
};
}

View File

@ -46,7 +46,7 @@ in {
systemd.services.dyn-gandi = mkIf (cfg.timer
!= null) {
script = "${package}/bin/dyn_gandi --config ${configFile}";
script = "${package}/bin/dyn-gandi --config ${configFile}";
serviceConfig = {
Type = "oneshot";

View File

@ -4,14 +4,14 @@ version = "0.1.0"
description = "A DNS updater"
authors = ["Kristian Krsnik <git@krsnik.at>"]
readme = "README.md"
packages = [{include = "dyn_gandi"}]
packages = [{ include = "dyn_gandi" }]
[tool.poetry.dependencies]
python = "^3.10"
requests = "2.29.0"
[tool.poetry.scripts]
dyn_gandi = "dyn_gandi.main:main"
dyn-gandi = "dyn_gandi.main:main"
[build-system]
requires = ["poetry-core"]

View File