209 lines
6.3 KiB
Python
209 lines
6.3 KiB
Python
import os
|
|
import re
|
|
import json
|
|
import argparse
|
|
from datetime import datetime
|
|
from typing import Callable, Any
|
|
|
|
import requests
|
|
|
|
|
|
def loadSettings(path: str | None) -> dict:
|
|
'''
|
|
Load the settings file.
|
|
|
|
`path` Path to a settings file.
|
|
By default the program will look for `$HOME/dyn-gandi/config.json` and then `/etc/dyn-gandi.json`.
|
|
'''
|
|
|
|
if path is None:
|
|
path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json"
|
|
|
|
if not os.path.exists(path):
|
|
path = '/etc/dyn-gandi.json'
|
|
|
|
if not os.path.exists(path):
|
|
print(f"[ERROR] '{path}' does not exist.")
|
|
quit()
|
|
|
|
# TODO check integrity of the config file
|
|
with open(path, 'r') as file:
|
|
# Load as dictionary
|
|
config = json.load(file)
|
|
|
|
# API Key entries
|
|
keys = config['api'].keys()
|
|
|
|
# This will build up a copy of the API section with the API keys instead of filenames.
|
|
mask = dict()
|
|
|
|
# If the API key is a file the use the contents as a new key.
|
|
for key in keys:
|
|
api_key = key
|
|
if os.path.exists(key):
|
|
with open(key, 'r') as file:
|
|
api_key = file.readline().strip()
|
|
mask[key] = api_key
|
|
config['api'] = {mask[key]: value for key,
|
|
value in config['api'].items()}
|
|
|
|
return config
|
|
|
|
|
|
def log(logline: str, path: str = './log.txt', stdout: bool = False) -> None:
|
|
'''
|
|
Appends logline at the end of the file file with a timestamp
|
|
|
|
`logline` The line to log.
|
|
|
|
'path' Path to file where to log.
|
|
|
|
`stdout` Wether to print to stdout instead of a file.
|
|
'''
|
|
|
|
line = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}"
|
|
|
|
if stdout:
|
|
print(line)
|
|
else:
|
|
with open(path, 'a') as file:
|
|
file.write(line + '\n')
|
|
|
|
|
|
def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None:
|
|
'''
|
|
Go through resolvers until one returns an IP.
|
|
|
|
`resolvers` A list of IP resolvers.
|
|
|
|
`log` A function which takes a string and logs it.
|
|
'''
|
|
|
|
for resolver in resolvers:
|
|
response = requests.get(resolver)
|
|
|
|
if response.ok:
|
|
current_ip = response.text.strip()
|
|
|
|
# It suffices to check whether the search is not None since the regex matches from beginning to end.
|
|
is_ipv4 = re.search(
|
|
'^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None
|
|
|
|
if is_ipv4:
|
|
log(f"[OK][{resolver}] Current IP: '{current_ip}'")
|
|
return current_ip # Break if we have found our IP
|
|
else:
|
|
log(f"[ERROR][{resolver}] '{current_ip}' is not IPv4")
|
|
else:
|
|
log(f"[ERROR][{resolver}][{response.status_code}] {response.text}")
|
|
|
|
return None
|
|
|
|
|
|
def checkRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any]) -> dict[str, dict[str, list[str]]]:
|
|
'''
|
|
Return records that need to be renewed.
|
|
|
|
`api` The API section of the settings file.
|
|
|
|
`current_ip` The current IP address.
|
|
|
|
`log` A function that takes in a string and logs it.
|
|
'''
|
|
|
|
# TODO: Detect TTL change
|
|
|
|
records_to_renew = api
|
|
for key, domains in api.items():
|
|
headers = {'authorization': f"Apikey {key}"}
|
|
|
|
for domain, _ in domains.items():
|
|
response = requests.get(
|
|
f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
records_to_renew[key][domain] = list(set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (
|
|
filter(lambda x: current_ip in x['rrset_values'], response.json()))))
|
|
|
|
if records_to_renew[key][domain] != []:
|
|
log(f"[INFO][{','.join(records_to_renew)}] Record need to be renewed")
|
|
|
|
else:
|
|
log(f"[ERROR][{domain}][{response.status_code}] {response.json()}")
|
|
|
|
return records_to_renew
|
|
|
|
|
|
def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl: int = 300) -> None:
|
|
'''
|
|
Update records and create them, if they don't exist.
|
|
|
|
`api` The API section of the settings file.
|
|
|
|
`current_ip` The current IP address.
|
|
|
|
`log` A function that takes a string and logs it.
|
|
|
|
`ttl` How long a record should be cached.
|
|
'''
|
|
|
|
payload = json.dumps({
|
|
'rrset_values': [current_ip],
|
|
'rrset_ttl': ttl
|
|
})
|
|
|
|
for api_key, domains in api.items():
|
|
headers = {
|
|
'authorization': f"Apikey {api_key}",
|
|
'content-type': 'application/json',
|
|
}
|
|
|
|
for domain, records in domains.items():
|
|
for record in records:
|
|
response = requests.put(
|
|
f"https://api.gandi.net/v5/livedns/domains/{domain}/records/{record}/A", data=payload, headers=headers)
|
|
if response.status_code == 201:
|
|
log(f"[OK][{domain}][{record}] Renewed with IP '{current_ip}'")
|
|
else:
|
|
log(f"[ERROR][{domain}][{record}][{response.status_code}] {response.json()}")
|
|
|
|
|
|
def parseArgs(args: list[str]) -> argparse.Namespace:
|
|
'''
|
|
Parse command-line arguments.
|
|
|
|
`args` A list of user-supplied command line arguments.
|
|
'''
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='A program that renews A records via the Gandi API v5.')
|
|
|
|
parser.add_argument('--config', type=str, default=None,
|
|
help='Path to a config file.')
|
|
parser.add_argument('--dry-run', action='store_true',
|
|
help='Just perform a dry-run.')
|
|
|
|
return parser.parse_args(args)
|
|
|
|
|
|
def main():
|
|
# Parse user-supplied command-line arguments.
|
|
args = parseArgs(os.sys.argv[1:])
|
|
|
|
# Read in the settings file.
|
|
config = loadSettings(args.config)
|
|
|
|
def logger(x: str) -> Any: return log(x, config['log_path'], args.dry_run)
|
|
|
|
current_ip = getCurrentIP(config['resolvers'], logger)
|
|
if current_ip is not None:
|
|
records_to_renew = checkRecords(config['api'], current_ip, logger)
|
|
|
|
# Do not perform an actual record change during a dry-run.
|
|
if not args.dry_run:
|
|
renewRecords(records_to_renew, current_ip, logger, config['ttl'])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|