103 lines
4.1 KiB
Python
103 lines
4.1 KiB
Python
import requests, json, re, os
|
|
from datetime import datetime
|
|
|
|
def loadSettings(path = f"{os.path.expanduser('~')}/.config/dyn-gandi/config.json") -> dict:
|
|
# TODO: check integrity
|
|
try:
|
|
with open(path, 'r') as file:
|
|
# Read if the API keys are path to files
|
|
config = json.load(file)
|
|
keys = config['api'].keys()
|
|
mask = dict()
|
|
for key in keys:
|
|
api_key = key
|
|
if os.path.exists(key):
|
|
with open(key, 'r') as file:
|
|
api_key = file.readline().strip()
|
|
mask[key] = api_key
|
|
config['api'] = { mask[key]: value for key, value in config['api'].items() }
|
|
except FileNotFoundError:
|
|
sample_config = {
|
|
'api': {},
|
|
'ttl': 3600,
|
|
'resolvers': [ 'https://ifconfig.me' ],
|
|
'log_path': './log.txt'
|
|
}
|
|
|
|
with open('config.json', 'w') as file:
|
|
json.dump(sample_config, file, indent = 2)
|
|
|
|
return sample_config
|
|
|
|
# Appends logline at the end of the file file with a timestamp
|
|
def log(logline: str, path: str = './log.txt') -> None:
|
|
with open(path, 'a') as file:
|
|
file.write(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]{logline}\n")
|
|
|
|
# Go through resolvers until one retuns an IP
|
|
def getCurrentIP(resolvers: list[str], log: callable) -> None | str:
|
|
for resolver in resolvers:
|
|
response = requests.get(resolver)
|
|
if response.status_code == 200:
|
|
current_ip = response.text.strip()
|
|
is_ipv4 = re.search('^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\\b){4}$', current_ip) is not None
|
|
if is_ipv4:
|
|
log(f"[OK][{resolver}] Current IP: '{current_ip}'")
|
|
return current_ip # Break if we have found our IP
|
|
else:
|
|
log(f"[ERROR][{resolver}] '{current_ip}' is not IPv4")
|
|
else:
|
|
log(f"[ERROR][{resolver}][{response.status_code}] {response.text}")
|
|
return None
|
|
|
|
# Returns the records that need to be renewed
|
|
# TODO: Also detect TTL change
|
|
def checkRecords(api: dict, current_ip: str, log: callable) -> dict:
|
|
records_to_renew = api
|
|
for key, domains in api.items():
|
|
headers = { 'authorization': f"Apikey {key}" }
|
|
for domain, records in domains.items():
|
|
response = requests.get(f"https://api.gandi.net/v5/livedns/domains/{domain}/records?rrset_type=A", headers = headers)
|
|
if response.status_code == 200:
|
|
records_to_renew[key][domain] = list(set(records_to_renew[key][domain]) - set(record['rrset_name'] for record in (filter(lambda x: current_ip in x['rrset_values'], json.loads(response.json())))))
|
|
else:
|
|
log(f"[ERROR][{domain}][{response.status_code}] {response.json()}")
|
|
|
|
return records_to_renew
|
|
|
|
# Updates the records and reates them if they don't exist
|
|
def renewRecords(records, current_ip, log, ttl = 3600):
|
|
payload = json.dumps({
|
|
'items': [{
|
|
'rrset_type': 'A',
|
|
'rrset_values': [ current_ip ],
|
|
'rrset_ttl': ttl
|
|
}]
|
|
})
|
|
|
|
for key, domains in records.items():
|
|
headers = {
|
|
'authorization': f"Apikey {key}",
|
|
'content-type': 'application/json',
|
|
}
|
|
|
|
for domain, records in domains.items():
|
|
for record in records:
|
|
response = requests.put(f"https://api.gandi.net/v5/livedns/domains/{domain}/records/{record}", data = payload, headers = headers)
|
|
if response.status_code == 201:
|
|
log(f"[OK][{domain}][{record}] Renewed with IP '{current_ip}'")
|
|
else:
|
|
log(f"[ERROR][{domain}][{record}][{response.status_code}] {response.json()}")
|
|
|
|
def main():
|
|
settings = loadSettings()
|
|
log_path = settings['log_path']
|
|
ttl = settings['ttl']
|
|
myLog = lambda x: log(x, log_path)
|
|
|
|
current_ip = getCurrentIP(settings['resolvers'], myLog)
|
|
records_to_renew = checkRecords(settings['api'], current_ip, myLog)
|
|
renewRecords(records_to_renew, current_ip, myLog, ttl = ttl)
|
|
|
|
if __name__ == '__main__':
|
|
main() |