From 7517ee5534d7de76f4e8b6ce7436e8dfa45ad15c Mon Sep 17 00:00:00 2001 From: Kristian Krsnik Date: Sun, 23 Mar 2025 21:08:31 +0100 Subject: [PATCH] initial commit --- .envrc | 1 + .gitignore | 26 +++++ README.md | 98 ++++++++++++++++++ flake.lock | 27 +++++ flake.nix | 105 ++++++++++++++++++++ nix/module.nix | 63 ++++++++++++ pyproject.toml | 40 ++++++++ src/cloudns/__init__.py | 1 + src/cloudns/__main__.py | 3 + src/cloudns/cloudns.py | 110 +++++++++++++++++++++ src/cloudns/logger/__init__.py | 3 + src/cloudns/logger/logger.py | 175 +++++++++++++++++++++++++++++++++ src/cloudns/main.py | 122 +++++++++++++++++++++++ src/cloudns/py.typed | 0 src/cloudns/types.py | 19 ++++ tests/__main__.py | 5 + tests/test_imports.py | 2 + 17 files changed, 800 insertions(+) create mode 100644 .envrc create mode 100644 .gitignore create mode 100644 README.md create mode 100644 flake.lock create mode 100644 flake.nix create mode 100644 nix/module.nix create mode 100644 pyproject.toml create mode 100644 src/cloudns/__init__.py create mode 100644 src/cloudns/__main__.py create mode 100644 src/cloudns/cloudns.py create mode 100644 src/cloudns/logger/__init__.py create mode 100644 src/cloudns/logger/logger.py create mode 100644 src/cloudns/main.py create mode 100644 src/cloudns/py.typed create mode 100644 src/cloudns/types.py create mode 100644 tests/__main__.py create mode 100644 tests/test_imports.py diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..3550a30 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use flake diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1e1cdb6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# Python # +# Virtual Environment +/.venv/ + +# Cache +__pycache__/ + +# Build +/dist/ +*.egg-info/ +/build/ + +# Tools +/.pytest_cache/ +/.mypy_cache/ + +# Nix # +/result + +# Direnv # +/.direnv/ + +# Project specific files # +config.json +*.key +*.jsonl diff --git a/README.md b/README.md new file mode 100644 index 0000000..3d4f994 --- /dev/null +++ b/README.md @@ -0,0 +1,98 @@ +# Cloudns - A Cloudflare Dynamic DNS Solution + +A DNS record updater for [Cloudflare's DNS](https://developers.cloudflare.com/api/resources/dns/) API. + +## How it works + +This script determines the the current IP address by querying the resolvers defined in the config file. +It then queries the subdomains' A records off of Cloudflare and compares their IP addresses to the current IP address. +Should the IP address of a subdomain's A record not match your current IP address it will be updated. The subdomain's A record will be created should it not already exist. + +## Notes + +Every invocation of the script causes at least 1 request to a resolver specified and 1 API call to Cloudflare per domain. +Updating a subdomain's A record is 1 API request per subdomain, even if they share the same domain. +Resolvers are queried in the order specified until one returns a valid IP address. +It is also possible to define a path to a file with the API key written in it. +This is good for environments where the config file has to be shared like in a nix project. + +## Usage + +First, get your User API Token from https://dash.cloudflare.com/profile/api-tokens. +The token need the following permissions: + +* **Edit DNS** + +```json +{ + "api": { + "API Key": { + "example.com": [ "@", "www", "sub1" ], + "example.org": [ "@", "www", "sub1", "sub2" ] + }, + "/path/to/a/file/containing/api_key": { + "example.at": [ "sub1" ], + "example.au": [ "sub1", "sub2" ] + } + }, + "resolvers": [ + "https://ifconfig.me/ip", + "https://me.gandi.net" + ], + "log-path": "./log.jsonl" +} +``` + +## Nix + +Add this to the modules. + +```nix +inputs = { + cloudns.url = "git+https://git.krsnik.at/Kristian/cloudns"; +}; + +outputs = { + self, + nixpkgs, + cloudns +}: { + ... + modules = [ + cloudns.nixosModules.default + { + cloudns.enable = true; + cloudns.timer = 300; + cloudns.settings = { + api = { + "/path/to/a/file/containing/api_key" = { + "example.com" = ["@" "www"]; + }; + }; + resolvers = [ + "https://ifconfig.me/ip" + "https://me.gandi.net" + ]; + log_path = "/path/to/log/file.jsonl"; + }; + } + ... + ]; + ... +} +``` + +Use `cloudns.nixosModules.default` for a NixOs module. + +`cloudns.timer` specifies a timer in seconds when the script should be repeated. + +## Features + +* Support for arbitrarily many domains and subdomains through a nested data structure. +* Small codebase +* Logging +* NixOS and home-manager modules + +## Limitations + +* Only IPv4 addresses are supported diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..62e0614 --- /dev/null +++ b/flake.lock @@ -0,0 +1,27 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1742422364, + "narHash": "sha256-mNqIplmEohk5jRkqYqG19GA8MbQ/D4gQSK0Mu4LvfRQ=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "a84ebe20c6bc2ecbcfb000a50776219f48d134cc", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..471424b --- /dev/null +++ b/flake.nix @@ -0,0 +1,105 @@ +{ + description = "A Python Project Template."; + + inputs = { + nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; + }; + + outputs = { + self, + nixpkgs, + ... + } @ inputs: let + supportedSystems = ["x86_64-linux" "x86_64-darwin" "aarch64-linux" "aarch64-darwin"]; + forAllSystems = nixpkgs.lib.genAttrs supportedSystems; + pkgs = forAllSystems (system: nixpkgs.legacyPackages.${system}.extend overlay); + + overlay = final: prev: rec { + python3Packages = prev.python3Packages.overrideScope (pfinal: pprev: { + packageNameToDrv = x: builtins.getAttr (cleanPythonPackageName x) final.python3Packages; + }); + + cleanPythonPackageName = x: let + cleanName = builtins.match "([a-z,A-Z,0-9,_,-]+).*" x; + in + if cleanName != null + then builtins.elemAt cleanName 0 + else builtins.warn "Could not determine package name from '${x}'" null; + }; + + pyproject = builtins.fromTOML (builtins.readFile ./pyproject.toml); + + buildDependencies = forAllSystems (system: builtins.map pkgs.${system}.python3Packages.packageNameToDrv pyproject.build-system.requires); + runtimeDependencies = forAllSystems (system: builtins.map pkgs.${system}.python3Packages.packageNameToDrv pyproject.project.dependencies); + optionalDependencies = forAllSystems (system: builtins.mapAttrs (name: value: builtins.map pkgs.${system}.python3Packages.packageNameToDrv value) pyproject.project.optional-dependencies); + in { + # `nix build` + packages = forAllSystems (system: let + buildProject = {skipCheck ? false}: + pkgs.${system}.python3Packages.buildPythonPackage { + pname = pyproject.project.name; + version = pyproject.project.version; + src = ./.; + + pyproject = true; + + build-system = buildDependencies.${system}; + + dependencies = runtimeDependencies.${system}; + + optional-dependencies = optionalDependencies.${system}; + + nativeCheckInputs = optionalDependencies.${system}.dev; + + checkPhase = let + dev = builtins.map (x: x.pname) optionalDependencies.${system}.dev; + in '' + ${ + if builtins.elem "pytest" dev && !skipCheck + then "pytest tests" + else "" + } + ${ + if builtins.elem "mypy" dev && !skipCheck + then "mypy src" + else "" + } + ${ + if builtins.elem "pylint" dev && !skipCheck + then "pylint src" + else "" + } + ''; + }; + in { + default = self.packages.${system}."${pyproject.project.name}"; + "${pyproject.project.name}" = buildProject {skipCheck = false;}; + quick = buildProject {skipCheck = true;}; + }); + + # `nix fmt` + formatter = forAllSystems (system: pkgs.${system}.alejandra); + + # `nix develop` + devShells = forAllSystems (system: rec { + default = venv; + + venv = pkgs.${system}.mkShell { + shellHook = '' + if [ ! -d .venv/ ]; then + echo "Creating Virtual Environment..." + ${pkgs.${system}.python3}/bin/python3 -m venv .venv + fi + + alias activate='source .venv/bin/activate' + + echo "Entering Virtual Environment..." + source .venv/bin/activate + ''; + }; + }); + + # NixOS Module + nixosModules.default = import ./nix/module.nix inputs; + }; +} diff --git a/nix/module.nix b/nix/module.nix new file mode 100644 index 0000000..bd54334 --- /dev/null +++ b/nix/module.nix @@ -0,0 +1,63 @@ +inputs: { + config, + lib, + pkgs, + ... +}: let + cfg = config.cloudns; + package = inputs.self.packages.${pkgs.stdenv.hostPlatform.system}.default; + inherit (lib) mkIf mkEnableOption mkOption types; + + format = pkgs.formats.json {}; + configFile = format.generate "config.json" cfg.settings; +in { + options.cloudns = { + enable = mkEnableOption "cloudns"; + + timer = lib.mkOption { + type = types.nullOr types.int; + default = null; + description = lib.mdDoc '' + The time interval in seconds the script should be repeated. + ''; + }; + + settings = mkOption { + type = with types; let + valueType = nullOr (oneOf [ + # TODO: restrict type to actual config file structure + bool + int + float + str + path + (attrsOf valueType) + (listOf valueType) + ]); + in + valueType; + default = throw "Please specify cloudns.settings"; + }; + }; + + config = mkIf cfg.enable { + environment.systemPackages = [package]; + + systemd.services.cloudns = mkIf (cfg.timer != null) { + script = "${package}/bin/cloudns --config ${configFile}"; + requires = ["network-online.target"]; + serviceConfig = { + Type = "oneshot"; + }; + }; + + systemd.timers.cloudns = mkIf (cfg.timer != null) { + wantedBy = ["timers.target"]; + timerConfig = { + OnBootSec = "0s"; + OnUnitActiveSec = "${toString cfg.timer}s"; + Unit = "cloudns.service"; + }; + }; + }; +} diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b67a146 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,40 @@ +[project] +name = "cloudns" +version = "0.1.0" +requires-python = "~=3.12, <4" +dependencies = ["requests~=2.32.3", "pydantic~=2.10.5"] + +[project.optional-dependencies] +dev = [ + "pytest~=8.3", + "mypy~=1.13", + "pylint~=3.3", + "types-requests~=2.32.0.20241016", +] + +[project.scripts] +cloudns = "cloudns.main:main" + +[build-system] +requires = ["setuptools~=75.1"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.setuptools.package-data] +cloudns = ["py.typed"] + +[tool.autopep8] +max_line_length = 150 + +[tool.pylint.'MESSAGES CONTROL'] +disable = [ + "line-too-long", + "missing-module-docstring", + "missing-class-docstring", + "missing-function-docstring", + "too-few-public-methods", + "broad-exception-caught", + "logging-fstring-interpolation", +] diff --git a/src/cloudns/__init__.py b/src/cloudns/__init__.py new file mode 100644 index 0000000..86966d1 --- /dev/null +++ b/src/cloudns/__init__.py @@ -0,0 +1 @@ +from .main import run diff --git a/src/cloudns/__main__.py b/src/cloudns/__main__.py new file mode 100644 index 0000000..5d6a810 --- /dev/null +++ b/src/cloudns/__main__.py @@ -0,0 +1,3 @@ +from .main import main + +main() diff --git a/src/cloudns/cloudns.py b/src/cloudns/cloudns.py new file mode 100644 index 0000000..0b22552 --- /dev/null +++ b/src/cloudns/cloudns.py @@ -0,0 +1,110 @@ +import re +from typing import Iterable, Literal + +import requests + +from .logger import getLogger + + +LOGGER = getLogger('cloudns') + + +class ZoneNotFoundError(Exception): + pass + +class CouldNotGetIPError(Exception): + pass + +class APIError(Exception): + pass + +def get_current_ip(resolvers: Iterable[str]) -> str: + ''' + Go through resolvers until one returns an IP. + + `resolvers` A list of IP resolvers. + + `log` A function which takes a string and logs it. + ''' + + for resolver in resolvers: + response = requests.get(resolver, timeout = 5) + + if not response.ok: + LOGGER.warning(f"[ERROR][{resolver}][{response.status_code}] {response.text}", extra = { 'resolver': resolver, 'status_code': response.status_code, 'content': response.content }) + + current_ip = response.text.strip() + + # It suffices to check whether the search is not None since the regex matches from beginning to end. + is_ipv4 = re.search(r'^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\b){4}$', current_ip) is not None + + if is_ipv4: + LOGGER.debug(f"[OK][{resolver}] Current IP: '{current_ip}'", extra = { 'resolver': resolver, 'current_ip': current_ip }) + return current_ip # Break if we have found our IP + + LOGGER.warning(f"[WARNING][{resolver}] '{current_ip}' is not IPv4", extra = { 'resolver': resolver, 'ip': current_ip }) + + raise CouldNotGetIPError('Could not get IP.') + +def _cloudflare_api_request(method: Literal['GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE'], endpoint: str, api_key: str, headers: dict | None = None, timeout: int | None = 5, **kwargs) -> dict: + response = requests.request( + method, + f'https://api.cloudflare.com/client/v4/{endpoint}', + headers = { + 'Authorization': f'Bearer {api_key}', + 'Content-Type': 'application/json' + } | (headers or {}), + timeout = timeout, + **kwargs + ) + + LOGGER.debug('Issued Cloudflare API request.', extra = { 'method': method, 'endpoint': endpoint, 'headers': headers, 'kwargs': kwargs, 'status_code': response.status_code, 'content': response.content }) + + if not response.ok: + LOGGER.error('', extra = { 'status_code': response.status_code, 'content': response.content }) + raise APIError(response.content) + + result = response.json() + + if not result['success']: + LOGGER.error('API call was not successful.', extra = { 'status_code': response.status_code, 'content': response.content }) + raise APIError('API call was not successful.') + + return result['result'] + +def api_key_is_valid(api_key: str) -> bool: + result = _cloudflare_api_request('GET', 'user/tokens/verify', api_key) + return result['status'] == 'active' + +def get_zone_id(api_key: str, domain: str) -> str: + result = _cloudflare_api_request('GET', 'zones', api_key, params = { 'name': domain }) + + if len(result) != 1: + message = f"Expected a list of length '1' but got length '{len(result)}'" + raise ZoneNotFoundError(message) + + return result[0]['id'] + +def get_A_records(api_key: str, zone_id: str) -> dict: # pylint: disable=invalid-name + result = _cloudflare_api_request('GET', f'zones/{zone_id}/dns_records', api_key, params = { 'type': 'A' }) + return result + +def create_A_record(api_key: str, zone_id: str, name: str, value: str) -> None: # pylint: disable=invalid-name + _cloudflare_api_request( + 'POST', + f'zones/{zone_id}/dns_records', + api_key, + json = { + 'type': 'A', + 'name': name, + 'content': value + } + ) + +def update_A_record(api_key: str, zone_id: str, record_id: str, value: str) -> None: # pylint: disable=invalid-name + _cloudflare_api_request( + 'PATCH', + f'/zones/{zone_id}/dns_records/{record_id}', + api_key, + json = { 'content': value } + ) diff --git a/src/cloudns/logger/__init__.py b/src/cloudns/logger/__init__.py new file mode 100644 index 0000000..4857813 --- /dev/null +++ b/src/cloudns/logger/__init__.py @@ -0,0 +1,3 @@ +from logging import Logger, getLogger + +from .logger import setup_logging diff --git a/src/cloudns/logger/logger.py b/src/cloudns/logger/logger.py new file mode 100644 index 0000000..be84f1d --- /dev/null +++ b/src/cloudns/logger/logger.py @@ -0,0 +1,175 @@ +import sys +import json +import logging +import logging.config +import logging.handlers +import atexit +from datetime import datetime, timezone +from typing_extensions import override + + + +LOG_RECORD_BUILTIN_ATTRS = { + "args", + "asctime", + "created", + "exc_info", + "exc_text", + "filename", + "funcName", + "levelname", + "levelno", + "lineno", + "module", + "msecs", + "message", + "msg", + "name", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "thread", + "threadName", + "taskName", +} + + +class JSONFormatter(logging.Formatter): + + def __init__(self, *, fmt_keys: dict[str, str] | None = None): + super().__init__() + self.fmt_keys = fmt_keys if fmt_keys is not None else {} + + @override + def format(self, record: logging.LogRecord) -> str: + message = self._prepare_log_dict(record) + return json.dumps(message, default=str) + + def _prepare_log_dict(self, record: logging.LogRecord) -> dict: + always_fields = { + 'message': record.getMessage(), + 'timestamp': datetime.fromtimestamp( + record.created, tz=timezone.utc + ).isoformat() + } + + if record.exc_info is not None: + always_fields['exc_info'] = self.formatException(record.exc_info) + + if record.stack_info is not None: + always_fields['stack_info'] = self.formatStack(record.stack_info) + + message = { + key: msg_value + if (msg_value := always_fields.pop(value, None)) is not None + else getattr(record, value) + for key, value in self.fmt_keys.items() + } + + message.update(always_fields) + + for key, value in record.__dict__.items(): + if key not in LOG_RECORD_BUILTIN_ATTRS: + message[key] = value + + return message + + +class NonErrorFilter(logging.Filter): + @override + def filter(self, record: logging.LogRecord) -> bool | logging.LogRecord: + return record.levelno <= logging.INFO + + +def generate_log_config(log_path: str | None = None, backup_count: int = 3, max_bytes: int = 1024 * 1024 * 10) -> dict: + logger_config: dict = { + 'version': 1, + 'disable_existing_loggers': False, + 'filters': { + 'no_errors': { + "()": NonErrorFilter + } + }, + 'formatters': { + 'simple': { + 'format': '[%(asctime)s][%(levelname)s] %(message)s', + 'datefmt': '%Y-%m-%d %H:%M:%S' + }, + 'detailed': { + 'format': '[%(asctime)s][%(levelname)s] %(message)s', + 'datefmt': '%Y-%m-%dT%H:%M:%S%z' # ISO-8601 Timestamp + }, + 'json': { + '()': JSONFormatter, + 'fmt_keys': { + 'timestamp': 'timestamp', + 'level': 'levelname', + 'message': 'message', + 'logger': 'name', + 'module': 'module', + 'function': 'funcName', + 'line': 'lineno', + 'thread_name': 'threadName' + }, + } + }, + 'handlers': { + 'stdout': { + 'class': logging.StreamHandler, + 'level': 'DEBUG', + 'filters': ['no_errors'], + 'formatter': 'simple', + 'stream': 'ext://sys.stdout' + }, + 'stderr': { + 'class': logging.StreamHandler, + 'level': 'WARNING', + 'formatter': 'simple', + 'stream': 'ext://sys.stderr' + } + } | ({'file': { + 'class': logging.handlers.RotatingFileHandler, + 'level': 'DEBUG', + 'formatter': 'json', + 'filename': log_path, + 'maxBytes': max_bytes, + 'backupCount': backup_count + }} if log_path is not None else {}), + 'loggers': { + 'root': { + 'level': 'DEBUG', + 'handlers': [ + 'stdout', + 'stderr' + ] + (['file'] if log_path is not None else []), + } + } + } + + if sys.version_info >= (3, 12): # Python 3.12+ + logger_config['handlers']['queue_handler'] = { + 'class': logging.handlers.QueueHandler, + 'respect_handler_level': True, + 'handlers': [ + 'stdout', + 'stderr' + ] + (['file'] if log_path is not None else []), + } + + logger_config['loggers']['root']['handlers'] = ['queue_handler'] + + return logger_config + + +def setup_logging(log_path: str | None = None, backup_count: int = 3, max_bytes: int = 1024 * 1024 * 10) -> None: + log_config = generate_log_config( + log_path if log_path != '-' else None, backup_count, max_bytes) + logging.config.dictConfig(log_config) + + if sys.version_info >= (3, 12): # Python 3.12+ + queue_handler = logging.getHandlerByName('queue_handler') + if queue_handler is not None: + queue_handler.listener.start() # type: ignore + atexit.register(queue_handler.listener.stop) # type: ignore diff --git a/src/cloudns/main.py b/src/cloudns/main.py new file mode 100644 index 0000000..f7cc119 --- /dev/null +++ b/src/cloudns/main.py @@ -0,0 +1,122 @@ +import os +import sys +import argparse +import shutil +from typing import Any + +from typing_extensions import Sequence + +from .logger import setup_logging, getLogger +from .types import Config +from . import cloudns + + +def parse_args(args: Sequence[str]): + + class EnvDefault(argparse.Action): + # Custom action to enable environment variables + # https://stackoverflow.com/questions/10551117/setting-options-from-environment-variables-when-using-argparse/10551190#10551190 + def __init__(self, envvar: str, const: Any | None = None, default: Any = None, required: bool = True, **kwargs): + if envvar in os.environ: + default = os.environ[envvar] if const is None else const + + if const is not None: + required = False + kwargs['const'] = const + kwargs['nargs'] = 0 # no additional arguments are expected + + # Make the argument optional if a default is determined + if default is not None: + required = False + + super().__init__(default=default, required=required, metavar=envvar, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + # If const is defined (store_const behavior), set it directly + setattr(namespace, self.dest, self.const if self.const is not None else values) + + def formatter(prog): + return argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=shutil.get_terminal_size().columns) + + parser = argparse.ArgumentParser(formatter_class=formatter) + + parser.add_argument( + '-c', '--config', type = argparse.FileType('r'), + action = EnvDefault, envvar = 'CLOUDNS_CONFIG', + default = './config.json', + help = 'Path to config file in JSON format.' + ) + + parser.add_argument( + '-d', '--dry-run', type = bool, + action = EnvDefault, envvar = 'CLOUDNS_DRY_RUN', + const = True, default = False, + help = 'Do not commit any changes' + ) + + return parser.parse_args(args) + + +def run(argv: Sequence[str]) -> None: # pylint: disable=too-many-locals + # Parse command-line parameters + args = parse_args(argv) # pylint: disable=unused-variable + + config = Config.model_validate_json(args.config.read()) + + setup_logging(config.log_path) + logger = getLogger('cloudns') + logger.debug('Started program.', extra = { 'argv': argv }) + + try: + current_ip = cloudns.get_current_ip(config.resolvers) + + for api_key, domains in config.api.items(): + if not cloudns.api_key_is_valid(api_key): + logger.error(f"API key ending in '{api_key[-4:]}' is not active.") + continue + + zones = {} + for domain in domains: + try: + zones[domain] = cloudns.get_zone_id(api_key, domain) + except cloudns.ZoneNotFoundError as err: + logger.exception(err) + + records_to_update = {} # { ('sub_domain', 'domain'): ('zone_id', 'record_id' | None) } + for domain, zone_id in zones.items(): + records = { record['name']: record for record in cloudns.get_A_records(api_key, zone_id) } + for sub_domain in domains[domain]: + full_domain = f'{sub_domain}.{domain}' if sub_domain != '@' else domain + if full_domain not in records: + logger.info(f"Could not find an A record for '{full_domain}'. Will be created.", extra = { 'sub': sub_domain, 'domain': domain }) + records_to_update[(sub_domain, domain)] = (zone_id, None) + continue + + if (record := records[full_domain])['content'] != current_ip: + logger.info(f"IP for '{full_domain}' does not match current IP. Will be updated.", extra = { 'sub': sub_domain, 'domain': domain, 'content': record['content'] }) + records_to_update[(sub_domain, domain)] = (zone_id, record['id']) + + # Update/Create A Records # + if args.dry_run: + continue + + for (sub, domain), (zone_id, record_id) in records_to_update.items(): + full_domain = f'{sub}.{domain}' if sub != '@' else domain + + if record_id is None: + # Create record + cloudns.create_A_record(api_key, zone_id, full_domain, current_ip) + logger.info(f"A record for '{full_domain}' created.", extra = { 'sub': sub, 'domain': domain }) + continue + + # Update Record + cloudns.update_A_record(api_key, zone_id, record_id, current_ip) + logger.info(f"'A record for '{full_domain}' updated.", extra = { 'sub': sub, 'domain': domain }) + + except Exception as err: + logger.exception(err) + raise err from err + + +def main() -> None: + run(sys.argv[1:]) diff --git a/src/cloudns/py.typed b/src/cloudns/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/cloudns/types.py b/src/cloudns/types.py new file mode 100644 index 0000000..6447bd5 --- /dev/null +++ b/src/cloudns/types.py @@ -0,0 +1,19 @@ +from os import path + +from typing_extensions import Self +from pydantic import BaseModel, Field, model_validator + +class Config(BaseModel): + api: dict[str, dict[str, set[str]]] + resolvers: list[str] + log_path: str | None = Field(alias = 'log-path', default = None) + + @model_validator(mode = 'after') + def check_if_api_key_is_path(self) -> Self: + for key in set(self.api.keys()): + if path.exists(key): + with open(key, encoding = 'utf-8') as file: + api_key = file.read().strip() + self.api[api_key] = self.api.pop(key) + + return self diff --git a/tests/__main__.py b/tests/__main__.py new file mode 100644 index 0000000..15a901a --- /dev/null +++ b/tests/__main__.py @@ -0,0 +1,5 @@ +import sys + +import pytest + +retcode = pytest.main(sys.argv[1:]) diff --git a/tests/test_imports.py b/tests/test_imports.py new file mode 100644 index 0000000..edfa010 --- /dev/null +++ b/tests/test_imports.py @@ -0,0 +1,2 @@ +def test_import(): + import cloudns # pylint: disable=unused-import,import-outside-toplevel