dyn-gandi/dyn_gandi/main.py

137 lines
4.8 KiB
Python

import requests
import json
import re
import os
from datetime import datetime
from typing import Callable, Any
def loadSettings(path: str | None) -> dict:
if path is None:
path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json"
if not os.path.exists(path):
path = '/etc/dyn-gandi.json'
if not os.path.exists(path):
quit()
# TODO: check integrity of the config file
with open(path, 'r') as file:
# Read if the API keys are path to files
config = json.load(file)
keys = config['api'].keys()
mask = dict()
for key in keys:
api_key = key
if os.path.exists(key):
with open(key, 'r') as file:
api_key = file.readline().strip()
mask[key] = api_key
config['api'] = {mask[key]: value for key,
value in config['api'].items()}
return config
def log(logline: str, path: str = './log.txt') -> None:
# Appends logline at the end of the file file with a timestamp
with open(path, 'a') as file:
file.write(
f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}\n")
def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None:
# Go through resolvers until one retuns an IP
for resolver in resolvers:
response = requests.get(resolver)
if response.status_code == 200:
current_ip = response.text.strip()
# it suffices to check whether the search is not None since the regex is matches from beginning to end
is_ipv4 = re.search(
'^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None
if is_ipv4:
log(f"[OK][{resolver}] Current IP: '{current_ip}'")
return current_ip # Break if we have found our IP
else:
log(f"[ERROR][{resolver}] '{current_ip}' is not IPv4")
else:
log(f"[ERROR][{resolver}][{response.status_code}] {response.text}")
return None
def checkRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any]) -> dict[str, dict[str, list[str]]]:
# Returns the records that need to be renewed
# TODO: Also detect TTL change
records_to_renew = api
for key, domains in api.items():
headers = {'authorization': f"Apikey {key}"}
for domain, _ in domains.items():
response = requests.get(
f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers)
if response.status_code == 200:
records_to_renew[key][domain] = list(
set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (filter(lambda x: current_ip in x['rrset_values'], response.json()))))
else:
log(f"[ERROR][{domain}][{response.status_code}] {response.json()}")
return records_to_renew
def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl) -> None:
# Updates the records and reates them if they don't exist
payload = json.dumps({
'rrset_values': [current_ip],
'rrset_ttl': ttl
})
for key, domains in api.items():
headers = {
'authorization': f"Apikey {key}",
'content-type': 'application/json',
}
for domain, records in domains.items():
for record in records:
response = requests.put(
f"https://api.gandi.net/v5/livedns/domains/{domain}/records/{record}/A", data=payload, headers=headers)
if response.status_code == 201:
log(f"[OK][{domain}][{record}] Renewed with IP '{current_ip}'")
else:
log(f"[ERROR][{domain}][{record}][{response.status_code}] {response.json()}")
def parseArgv(argv: list[str]) -> dict[str, list[str]]:
cli_options = {}
buffer = None
for arg in argv:
if arg.startswith('--'):
cli_options[arg.removeprefix('--')] = []
buffer = arg.removeprefix('--')
else:
cli_options[buffer].append(arg)
return cli_options
def main():
argv = os.sys.argv
options = {}
if len(argv) > 1:
options = parseArgv(argv[1:])
config_path = None
if 'config' in options:
config_path = options['config'][0]
settings = loadSettings(config_path)
log_path, ttl = settings['log_path'], settings['ttl']
def logger(x: str) -> Any: return log(x, log_path)
current_ip = getCurrentIP(settings['resolvers'], logger)
if current_ip is not None:
records_to_renew = checkRecords(settings['api'], current_ip, logger)
renewRecords(records_to_renew, current_ip, logger, ttl=ttl)
if __name__ == '__main__':
main()