finalized code and added typing

This commit is contained in:
Kristian Krsnik 2023-08-13 13:58:30 +02:00
parent 9754ea32a6
commit b5ab25940a

View File

@ -3,11 +3,19 @@ import json
import re import re
import os import os
from datetime import datetime from datetime import datetime
from typing import Callable, Any
def loadSettings(path: str = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json") -> dict: def loadSettings() -> dict:
path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json"
if not os.path.exists(path):
path = '/etc/dyn-gandi.json'
if not os.path.exists(path):
quit()
# TODO: check integrity of the config file # TODO: check integrity of the config file
try:
with open(path, 'r') as file: with open(path, 'r') as file:
# Read if the API keys are path to files # Read if the API keys are path to files
config = json.load(file) config = json.load(file)
@ -21,9 +29,7 @@ def loadSettings(path: str = f"{os.path.expanduser('~')}/.config/dyn-gandi/confi
mask[key] = api_key mask[key] = api_key
config['api'] = {mask[key]: value for key, config['api'] = {mask[key]: value for key,
value in config['api'].items()} value in config['api'].items()}
except FileNotFoundError: return config
quit()
# TODO log error and remove code below (do not create an unwanted config)
def log(logline: str, path: str = './log.txt') -> None: def log(logline: str, path: str = './log.txt') -> None:
@ -33,12 +39,13 @@ def log(logline: str, path: str = './log.txt') -> None:
f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}\n") f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}\n")
def getCurrentIP(resolvers: list[str], log: callable) -> None | str: def getCurrentIP(resolvers: list[str], log: Callable[[str], Any]) -> str | None:
# Go through resolvers until one retuns an IP # Go through resolvers until one retuns an IP
for resolver in resolvers: for resolver in resolvers:
response = requests.get(resolver) response = requests.get(resolver)
if response.status_code == 200: if response.status_code == 200:
current_ip = response.text.strip() current_ip = response.text.strip()
# it suffices to check whether the search is not None since the regex is matches from beginning to end
is_ipv4 = re.search( is_ipv4 = re.search(
'^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None '^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None
if is_ipv4: if is_ipv4:
@ -51,25 +58,25 @@ def getCurrentIP(resolvers: list[str], log: callable) -> None | str:
return None return None
def checkRecords(api: dict, current_ip: str, log: callable) -> dict: def checkRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any]) -> dict[str, dict[str, list[str]]]:
# Returns the records that need to be renewed # Returns the records that need to be renewed
# TODO: Also detect TTL change # TODO: Also detect TTL change
records_to_renew = api records_to_renew = api
for key, domains in api.items(): for key, domains in api.items():
headers = {'authorization': f"Apikey {key}"} headers = {'authorization': f"Apikey {key}"}
for domain, records in domains.items(): for domain, _ in domains.items():
response = requests.get( response = requests.get(
f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers) f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers=headers)
if response.status_code == 200: if response.status_code == 200:
records_to_renew[key][domain] = list(set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in ( records_to_renew[key][domain] = list(
filter(lambda x: current_ip in x['rrset_values'], json.loads(response.json()))))) set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (filter(lambda x: current_ip in x['rrset_values'], response.json()))))
else: else:
log(f"[ERROR][{domain}][{response.status_code}] {response.json()}") log(f"[ERROR][{domain}][{response.status_code}] {response.json()}")
return records_to_renew return records_to_renew
def renewRecords(records, current_ip, log, ttl=3600): def renewRecords(api: dict[str, dict[str, list[str]]], current_ip: str, log: Callable[[str], Any], ttl: int = 3600) -> None:
# Updates the records and reates them if they don't exist # Updates the records and reates them if they don't exist
payload = json.dumps({ payload = json.dumps({
'items': [{ 'items': [{
@ -79,7 +86,7 @@ def renewRecords(records, current_ip, log, ttl=3600):
}] }]
}) })
for key, domains in records.items(): for key, domains in api.items():
headers = { headers = {
'authorization': f"Apikey {key}", 'authorization': f"Apikey {key}",
'content-type': 'application/json', 'content-type': 'application/json',
@ -97,13 +104,13 @@ def renewRecords(records, current_ip, log, ttl=3600):
def main(): def main():
settings = loadSettings() settings = loadSettings()
log_path = settings['log_path'] log_path, ttl = settings['log_path'], settings['ttl']
ttl = settings['ttl'] def logger(x: str) -> Any: return log(x, log_path)
def myLog(x): return log(x, log_path)
current_ip = getCurrentIP(settings['resolvers'], myLog) current_ip = getCurrentIP(settings['resolvers'], logger)
records_to_renew = checkRecords(settings['api'], current_ip, myLog) if current_ip is not None:
renewRecords(records_to_renew, current_ip, myLog, ttl=ttl) records_to_renew = checkRecords(settings['api'], current_ip, logger)
renewRecords(records_to_renew, current_ip, logger, ttl=ttl)
if __name__ == '__main__': if __name__ == '__main__':