initial commit

This commit is contained in:
Kristian Krsnik 2025-03-23 21:08:31 +01:00
commit 7517ee5534
Signed by: Kristian
GPG Key ID: FD1330AC9F909E85
17 changed files with 800 additions and 0 deletions

1
.envrc Normal file
View File

@ -0,0 +1 @@
use flake

26
.gitignore vendored Normal file
View File

@ -0,0 +1,26 @@
# Python #
# Virtual Environment
/.venv/
# Cache
__pycache__/
# Build
/dist/
*.egg-info/
/build/
# Tools
/.pytest_cache/
/.mypy_cache/
# Nix #
/result
# Direnv #
/.direnv/
# Project specific files #
config.json
*.key
*.jsonl

98
README.md Normal file
View File

@ -0,0 +1,98 @@
# Cloudns - A Cloudflare Dynamic DNS Solution
A DNS record updater for [Cloudflare's DNS](https://developers.cloudflare.com/api/resources/dns/) API.
## How it works
This script determines the the current IP address by querying the resolvers defined in the config file.
It then queries the subdomains' A records off of Cloudflare and compares their IP addresses to the current IP address.
Should the IP address of a subdomain's A record not match your current IP address it will be updated. The subdomain's A record will be created should it not already exist.
## Notes
Every invocation of the script causes at least 1 request to a resolver specified and 1 API call to Cloudflare per domain.
Updating a subdomain's A record is 1 API request per subdomain, even if they share the same domain.
Resolvers are queried in the order specified until one returns a valid IP address.
It is also possible to define a path to a file with the API key written in it.
This is good for environments where the config file has to be shared like in a nix project.
## Usage
First, get your User API Token from https://dash.cloudflare.com/profile/api-tokens.
The token need the following permissions:
* **Edit DNS**
```json
{
"api": {
"API Key": {
"example.com": [ "@", "www", "sub1" ],
"example.org": [ "@", "www", "sub1", "sub2" ]
},
"/path/to/a/file/containing/api_key": {
"example.at": [ "sub1" ],
"example.au": [ "sub1", "sub2" ]
}
},
"resolvers": [
"https://ifconfig.me/ip",
"https://me.gandi.net"
],
"log-path": "./log.jsonl"
}
```
## Nix
Add this to the modules.
```nix
inputs = {
cloudns.url = "git+https://git.krsnik.at/Kristian/cloudns";
};
outputs = {
self,
nixpkgs,
cloudns
}: {
...
modules = [
cloudns.nixosModules.default
{
cloudns.enable = true;
cloudns.timer = 300;
cloudns.settings = {
api = {
"/path/to/a/file/containing/api_key" = {
"example.com" = ["@" "www"];
};
};
resolvers = [
"https://ifconfig.me/ip"
"https://me.gandi.net"
];
log_path = "/path/to/log/file.jsonl";
};
}
...
];
...
}
```
Use `cloudns.nixosModules.default` for a NixOs module.
`cloudns.timer` specifies a timer in seconds when the script should be repeated.
## Features
* Support for arbitrarily many domains and subdomains through a nested data structure.
* Small codebase
* Logging
* NixOS and home-manager modules
## Limitations
* Only IPv4 addresses are supported

27
flake.lock generated Normal file
View File

@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1742422364,
"narHash": "sha256-mNqIplmEohk5jRkqYqG19GA8MbQ/D4gQSK0Mu4LvfRQ=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a84ebe20c6bc2ecbcfb000a50776219f48d134cc",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

105
flake.nix Normal file
View File

@ -0,0 +1,105 @@
{
description = "A Python Project Template.";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
};
outputs = {
self,
nixpkgs,
...
} @ inputs: let
supportedSystems = ["x86_64-linux" "x86_64-darwin" "aarch64-linux" "aarch64-darwin"];
forAllSystems = nixpkgs.lib.genAttrs supportedSystems;
pkgs = forAllSystems (system: nixpkgs.legacyPackages.${system}.extend overlay);
overlay = final: prev: rec {
python3Packages = prev.python3Packages.overrideScope (pfinal: pprev: {
packageNameToDrv = x: builtins.getAttr (cleanPythonPackageName x) final.python3Packages;
});
cleanPythonPackageName = x: let
cleanName = builtins.match "([a-z,A-Z,0-9,_,-]+).*" x;
in
if cleanName != null
then builtins.elemAt cleanName 0
else builtins.warn "Could not determine package name from '${x}'" null;
};
pyproject = builtins.fromTOML (builtins.readFile ./pyproject.toml);
buildDependencies = forAllSystems (system: builtins.map pkgs.${system}.python3Packages.packageNameToDrv pyproject.build-system.requires);
runtimeDependencies = forAllSystems (system: builtins.map pkgs.${system}.python3Packages.packageNameToDrv pyproject.project.dependencies);
optionalDependencies = forAllSystems (system: builtins.mapAttrs (name: value: builtins.map pkgs.${system}.python3Packages.packageNameToDrv value) pyproject.project.optional-dependencies);
in {
# `nix build`
packages = forAllSystems (system: let
buildProject = {skipCheck ? false}:
pkgs.${system}.python3Packages.buildPythonPackage {
pname = pyproject.project.name;
version = pyproject.project.version;
src = ./.;
pyproject = true;
build-system = buildDependencies.${system};
dependencies = runtimeDependencies.${system};
optional-dependencies = optionalDependencies.${system};
nativeCheckInputs = optionalDependencies.${system}.dev;
checkPhase = let
dev = builtins.map (x: x.pname) optionalDependencies.${system}.dev;
in ''
${
if builtins.elem "pytest" dev && !skipCheck
then "pytest tests"
else ""
}
${
if builtins.elem "mypy" dev && !skipCheck
then "mypy src"
else ""
}
${
if builtins.elem "pylint" dev && !skipCheck
then "pylint src"
else ""
}
'';
};
in {
default = self.packages.${system}."${pyproject.project.name}";
"${pyproject.project.name}" = buildProject {skipCheck = false;};
quick = buildProject {skipCheck = true;};
});
# `nix fmt`
formatter = forAllSystems (system: pkgs.${system}.alejandra);
# `nix develop`
devShells = forAllSystems (system: rec {
default = venv;
venv = pkgs.${system}.mkShell {
shellHook = ''
if [ ! -d .venv/ ]; then
echo "Creating Virtual Environment..."
${pkgs.${system}.python3}/bin/python3 -m venv .venv
fi
alias activate='source .venv/bin/activate'
echo "Entering Virtual Environment..."
source .venv/bin/activate
'';
};
});
# NixOS Module
nixosModules.default = import ./nix/module.nix inputs;
};
}

63
nix/module.nix Normal file
View File

@ -0,0 +1,63 @@
inputs: {
config,
lib,
pkgs,
...
}: let
cfg = config.cloudns;
package = inputs.self.packages.${pkgs.stdenv.hostPlatform.system}.default;
inherit (lib) mkIf mkEnableOption mkOption types;
format = pkgs.formats.json {};
configFile = format.generate "config.json" cfg.settings;
in {
options.cloudns = {
enable = mkEnableOption "cloudns";
timer = lib.mkOption {
type = types.nullOr types.int;
default = null;
description = lib.mdDoc ''
The time interval in seconds the script should be repeated.
'';
};
settings = mkOption {
type = with types; let
valueType = nullOr (oneOf [
# TODO: restrict type to actual config file structure
bool
int
float
str
path
(attrsOf valueType)
(listOf valueType)
]);
in
valueType;
default = throw "Please specify cloudns.settings";
};
};
config = mkIf cfg.enable {
environment.systemPackages = [package];
systemd.services.cloudns = mkIf (cfg.timer != null) {
script = "${package}/bin/cloudns --config ${configFile}";
requires = ["network-online.target"];
serviceConfig = {
Type = "oneshot";
};
};
systemd.timers.cloudns = mkIf (cfg.timer != null) {
wantedBy = ["timers.target"];
timerConfig = {
OnBootSec = "0s";
OnUnitActiveSec = "${toString cfg.timer}s";
Unit = "cloudns.service";
};
};
};
}

40
pyproject.toml Normal file
View File

@ -0,0 +1,40 @@
[project]
name = "cloudns"
version = "0.1.0"
requires-python = "~=3.12, <4"
dependencies = ["requests~=2.32.3", "pydantic~=2.10.5"]
[project.optional-dependencies]
dev = [
"pytest~=8.3",
"mypy~=1.13",
"pylint~=3.3",
"types-requests~=2.32.0.20241016",
]
[project.scripts]
cloudns = "cloudns.main:main"
[build-system]
requires = ["setuptools~=75.1"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.package-data]
cloudns = ["py.typed"]
[tool.autopep8]
max_line_length = 150
[tool.pylint.'MESSAGES CONTROL']
disable = [
"line-too-long",
"missing-module-docstring",
"missing-class-docstring",
"missing-function-docstring",
"too-few-public-methods",
"broad-exception-caught",
"logging-fstring-interpolation",
]

1
src/cloudns/__init__.py Normal file
View File

@ -0,0 +1 @@
from .main import run

3
src/cloudns/__main__.py Normal file
View File

@ -0,0 +1,3 @@
from .main import main
main()

110
src/cloudns/cloudns.py Normal file
View File

@ -0,0 +1,110 @@
import re
from typing import Iterable, Literal
import requests
from .logger import getLogger
LOGGER = getLogger('cloudns')
class ZoneNotFoundError(Exception):
pass
class CouldNotGetIPError(Exception):
pass
class APIError(Exception):
pass
def get_current_ip(resolvers: Iterable[str]) -> str:
'''
Go through resolvers until one returns an IP.
`resolvers` A list of IP resolvers.
`log` A function which takes a string and logs it.
'''
for resolver in resolvers:
response = requests.get(resolver, timeout = 5)
if not response.ok:
LOGGER.warning(f"[ERROR][{resolver}][{response.status_code}] {response.text}", extra = { 'resolver': resolver, 'status_code': response.status_code, 'content': response.content })
current_ip = response.text.strip()
# It suffices to check whether the search is not None since the regex matches from beginning to end.
is_ipv4 = re.search(r'^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\b){4}$', current_ip) is not None
if is_ipv4:
LOGGER.debug(f"[OK][{resolver}] Current IP: '{current_ip}'", extra = { 'resolver': resolver, 'current_ip': current_ip })
return current_ip # Break if we have found our IP
LOGGER.warning(f"[WARNING][{resolver}] '{current_ip}' is not IPv4", extra = { 'resolver': resolver, 'ip': current_ip })
raise CouldNotGetIPError('Could not get IP.')
def _cloudflare_api_request(method: Literal['GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE'], endpoint: str, api_key: str, headers: dict | None = None, timeout: int | None = 5, **kwargs) -> dict:
response = requests.request(
method,
f'https://api.cloudflare.com/client/v4/{endpoint}',
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
} | (headers or {}),
timeout = timeout,
**kwargs
)
LOGGER.debug('Issued Cloudflare API request.', extra = { 'method': method, 'endpoint': endpoint, 'headers': headers, 'kwargs': kwargs, 'status_code': response.status_code, 'content': response.content })
if not response.ok:
LOGGER.error('', extra = { 'status_code': response.status_code, 'content': response.content })
raise APIError(response.content)
result = response.json()
if not result['success']:
LOGGER.error('API call was not successful.', extra = { 'status_code': response.status_code, 'content': response.content })
raise APIError('API call was not successful.')
return result['result']
def api_key_is_valid(api_key: str) -> bool:
result = _cloudflare_api_request('GET', 'user/tokens/verify', api_key)
return result['status'] == 'active'
def get_zone_id(api_key: str, domain: str) -> str:
result = _cloudflare_api_request('GET', 'zones', api_key, params = { 'name': domain })
if len(result) != 1:
message = f"Expected a list of length '1' but got length '{len(result)}'"
raise ZoneNotFoundError(message)
return result[0]['id']
def get_A_records(api_key: str, zone_id: str) -> dict: # pylint: disable=invalid-name
result = _cloudflare_api_request('GET', f'zones/{zone_id}/dns_records', api_key, params = { 'type': 'A' })
return result
def create_A_record(api_key: str, zone_id: str, name: str, value: str) -> None: # pylint: disable=invalid-name
_cloudflare_api_request(
'POST',
f'zones/{zone_id}/dns_records',
api_key,
json = {
'type': 'A',
'name': name,
'content': value
}
)
def update_A_record(api_key: str, zone_id: str, record_id: str, value: str) -> None: # pylint: disable=invalid-name
_cloudflare_api_request(
'PATCH',
f'/zones/{zone_id}/dns_records/{record_id}',
api_key,
json = { 'content': value }
)

View File

@ -0,0 +1,3 @@
from logging import Logger, getLogger
from .logger import setup_logging

View File

@ -0,0 +1,175 @@
import sys
import json
import logging
import logging.config
import logging.handlers
import atexit
from datetime import datetime, timezone
from typing_extensions import override
LOG_RECORD_BUILTIN_ATTRS = {
"args",
"asctime",
"created",
"exc_info",
"exc_text",
"filename",
"funcName",
"levelname",
"levelno",
"lineno",
"module",
"msecs",
"message",
"msg",
"name",
"pathname",
"process",
"processName",
"relativeCreated",
"stack_info",
"thread",
"threadName",
"taskName",
}
class JSONFormatter(logging.Formatter):
def __init__(self, *, fmt_keys: dict[str, str] | None = None):
super().__init__()
self.fmt_keys = fmt_keys if fmt_keys is not None else {}
@override
def format(self, record: logging.LogRecord) -> str:
message = self._prepare_log_dict(record)
return json.dumps(message, default=str)
def _prepare_log_dict(self, record: logging.LogRecord) -> dict:
always_fields = {
'message': record.getMessage(),
'timestamp': datetime.fromtimestamp(
record.created, tz=timezone.utc
).isoformat()
}
if record.exc_info is not None:
always_fields['exc_info'] = self.formatException(record.exc_info)
if record.stack_info is not None:
always_fields['stack_info'] = self.formatStack(record.stack_info)
message = {
key: msg_value
if (msg_value := always_fields.pop(value, None)) is not None
else getattr(record, value)
for key, value in self.fmt_keys.items()
}
message.update(always_fields)
for key, value in record.__dict__.items():
if key not in LOG_RECORD_BUILTIN_ATTRS:
message[key] = value
return message
class NonErrorFilter(logging.Filter):
@override
def filter(self, record: logging.LogRecord) -> bool | logging.LogRecord:
return record.levelno <= logging.INFO
def generate_log_config(log_path: str | None = None, backup_count: int = 3, max_bytes: int = 1024 * 1024 * 10) -> dict:
logger_config: dict = {
'version': 1,
'disable_existing_loggers': False,
'filters': {
'no_errors': {
"()": NonErrorFilter
}
},
'formatters': {
'simple': {
'format': '[%(asctime)s][%(levelname)s] %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'
},
'detailed': {
'format': '[%(asctime)s][%(levelname)s] %(message)s',
'datefmt': '%Y-%m-%dT%H:%M:%S%z' # ISO-8601 Timestamp
},
'json': {
'()': JSONFormatter,
'fmt_keys': {
'timestamp': 'timestamp',
'level': 'levelname',
'message': 'message',
'logger': 'name',
'module': 'module',
'function': 'funcName',
'line': 'lineno',
'thread_name': 'threadName'
},
}
},
'handlers': {
'stdout': {
'class': logging.StreamHandler,
'level': 'DEBUG',
'filters': ['no_errors'],
'formatter': 'simple',
'stream': 'ext://sys.stdout'
},
'stderr': {
'class': logging.StreamHandler,
'level': 'WARNING',
'formatter': 'simple',
'stream': 'ext://sys.stderr'
}
} | ({'file': {
'class': logging.handlers.RotatingFileHandler,
'level': 'DEBUG',
'formatter': 'json',
'filename': log_path,
'maxBytes': max_bytes,
'backupCount': backup_count
}} if log_path is not None else {}),
'loggers': {
'root': {
'level': 'DEBUG',
'handlers': [
'stdout',
'stderr'
] + (['file'] if log_path is not None else []),
}
}
}
if sys.version_info >= (3, 12): # Python 3.12+
logger_config['handlers']['queue_handler'] = {
'class': logging.handlers.QueueHandler,
'respect_handler_level': True,
'handlers': [
'stdout',
'stderr'
] + (['file'] if log_path is not None else []),
}
logger_config['loggers']['root']['handlers'] = ['queue_handler']
return logger_config
def setup_logging(log_path: str | None = None, backup_count: int = 3, max_bytes: int = 1024 * 1024 * 10) -> None:
log_config = generate_log_config(
log_path if log_path != '-' else None, backup_count, max_bytes)
logging.config.dictConfig(log_config)
if sys.version_info >= (3, 12): # Python 3.12+
queue_handler = logging.getHandlerByName('queue_handler')
if queue_handler is not None:
queue_handler.listener.start() # type: ignore
atexit.register(queue_handler.listener.stop) # type: ignore

122
src/cloudns/main.py Normal file
View File

@ -0,0 +1,122 @@
import os
import sys
import argparse
import shutil
from typing import Any
from typing_extensions import Sequence
from .logger import setup_logging, getLogger
from .types import Config
from . import cloudns
def parse_args(args: Sequence[str]):
class EnvDefault(argparse.Action):
# Custom action to enable environment variables
# https://stackoverflow.com/questions/10551117/setting-options-from-environment-variables-when-using-argparse/10551190#10551190
def __init__(self, envvar: str, const: Any | None = None, default: Any = None, required: bool = True, **kwargs):
if envvar in os.environ:
default = os.environ[envvar] if const is None else const
if const is not None:
required = False
kwargs['const'] = const
kwargs['nargs'] = 0 # no additional arguments are expected
# Make the argument optional if a default is determined
if default is not None:
required = False
super().__init__(default=default, required=required, metavar=envvar, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
# If const is defined (store_const behavior), set it directly
setattr(namespace, self.dest, self.const if self.const is not None else values)
def formatter(prog):
return argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=shutil.get_terminal_size().columns)
parser = argparse.ArgumentParser(formatter_class=formatter)
parser.add_argument(
'-c', '--config', type = argparse.FileType('r'),
action = EnvDefault, envvar = 'CLOUDNS_CONFIG',
default = './config.json',
help = 'Path to config file in JSON format.'
)
parser.add_argument(
'-d', '--dry-run', type = bool,
action = EnvDefault, envvar = 'CLOUDNS_DRY_RUN',
const = True, default = False,
help = 'Do not commit any changes'
)
return parser.parse_args(args)
def run(argv: Sequence[str]) -> None: # pylint: disable=too-many-locals
# Parse command-line parameters
args = parse_args(argv) # pylint: disable=unused-variable
config = Config.model_validate_json(args.config.read())
setup_logging(config.log_path)
logger = getLogger('cloudns')
logger.debug('Started program.', extra = { 'argv': argv })
try:
current_ip = cloudns.get_current_ip(config.resolvers)
for api_key, domains in config.api.items():
if not cloudns.api_key_is_valid(api_key):
logger.error(f"API key ending in '{api_key[-4:]}' is not active.")
continue
zones = {}
for domain in domains:
try:
zones[domain] = cloudns.get_zone_id(api_key, domain)
except cloudns.ZoneNotFoundError as err:
logger.exception(err)
records_to_update = {} # { ('sub_domain', 'domain'): ('zone_id', 'record_id' | None) }
for domain, zone_id in zones.items():
records = { record['name']: record for record in cloudns.get_A_records(api_key, zone_id) }
for sub_domain in domains[domain]:
full_domain = f'{sub_domain}.{domain}' if sub_domain != '@' else domain
if full_domain not in records:
logger.info(f"Could not find an A record for '{full_domain}'. Will be created.", extra = { 'sub': sub_domain, 'domain': domain })
records_to_update[(sub_domain, domain)] = (zone_id, None)
continue
if (record := records[full_domain])['content'] != current_ip:
logger.info(f"IP for '{full_domain}' does not match current IP. Will be updated.", extra = { 'sub': sub_domain, 'domain': domain, 'content': record['content'] })
records_to_update[(sub_domain, domain)] = (zone_id, record['id'])
# Update/Create A Records #
if args.dry_run:
continue
for (sub, domain), (zone_id, record_id) in records_to_update.items():
full_domain = f'{sub}.{domain}' if sub != '@' else domain
if record_id is None:
# Create record
cloudns.create_A_record(api_key, zone_id, full_domain, current_ip)
logger.info(f"A record for '{full_domain}' created.", extra = { 'sub': sub, 'domain': domain })
continue
# Update Record
cloudns.update_A_record(api_key, zone_id, record_id, current_ip)
logger.info(f"'A record for '{full_domain}' updated.", extra = { 'sub': sub, 'domain': domain })
except Exception as err:
logger.exception(err)
raise err from err
def main() -> None:
run(sys.argv[1:])

0
src/cloudns/py.typed Normal file
View File

19
src/cloudns/types.py Normal file
View File

@ -0,0 +1,19 @@
from os import path
from typing_extensions import Self
from pydantic import BaseModel, Field, model_validator
class Config(BaseModel):
api: dict[str, dict[str, set[str]]]
resolvers: list[str]
log_path: str | None = Field(alias = 'log-path', default = None)
@model_validator(mode = 'after')
def check_if_api_key_is_path(self) -> Self:
for key in set(self.api.keys()):
if path.exists(key):
with open(key, encoding = 'utf-8') as file:
api_key = file.read().strip()
self.api[api_key] = self.api.pop(key)
return self

5
tests/__main__.py Normal file
View File

@ -0,0 +1,5 @@
import sys
import pytest
retcode = pytest.main(sys.argv[1:])

2
tests/test_imports.py Normal file
View File

@ -0,0 +1,2 @@
def test_import():
import cloudns # pylint: disable=unused-import,import-outside-toplevel