diff --git a/.gitignore b/.gitignore index 00c1844..8750dcf 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -.direnv/ \ No newline at end of file +.direnv/ +.mypy_cache/ \ No newline at end of file diff --git a/outage_detector/main.py b/outage_detector/main.py index 87d3ca4..8c2931b 100755 --- a/outage_detector/main.py +++ b/outage_detector/main.py @@ -7,11 +7,12 @@ from ipaddress import ip_address import socket import time from sys import argv +from typing import Any class LogEntry: """Class describing a connection attempt""" - def __init__(self, log_entry: str): + def __init__(self: 'LogEntry', log_entry: str) -> None: self.date = datetime.strptime(log_entry[0:19], '%Y-%m-%d %H:%M:%S') self.ip = log_entry.split('[')[1].split(']')[0] self.failed = 'FAIL' in log_entry @@ -29,7 +30,7 @@ def isValidIP(ip: str) -> bool: def isReachable(ip: str = '1.1.1.1', port: int = 80, timeout: float = 2) -> bool: """Return wether an IP is reachable - [modified from source: https://gist.github.com/betrcode/0248f0fda894013382d7?permalink_comment_id=4438869#gistcomment-4438869] + modified from source: https://gist.github.com/betrcode/0248f0fda894013382d7?permalink_comment_id=4438869#gistcomment-4438869 """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -47,7 +48,7 @@ def isReachable(ip: str = '1.1.1.1', port: int = 80, timeout: float = 2) -> bool return reachable -def logConnectionStatus(log_path: str = 'log.txt', ips: list = ['1.1.1.1'], port: int = 80, timeout: float = 2, dry_run: bool = False) -> list: +def logConnectionStatus(log_path: str = './log.txt', ips: list[str] = ['1.1.1.1'], port: int = 80, timeout: float = 2, dry_run: bool = False) -> list[str]: """Log IP, time and result of a connection check Output has this format: @@ -61,13 +62,14 @@ def logConnectionStatus(log_path: str = 'log.txt', ips: list = ['1.1.1.1'], port f"{time.strftime('%Y-%m-%d %H:%M:%S')} [{ip}] [{'OK' if isReachable(ip) else 'FAIL'}]" for ip in ips] if not dry_run: + # TODO: move this into a log function which is passed as an argument with open(log_path, 'a') as file: file.writelines(log_line + '\n' for log_line in log_lines) return log_lines -def printOutages(log_path: str = 'log.txt'): +def printOutages(log_path: str = './log.txt') -> None: """Prints out the Outages with duration and ip sorted by starttime Output has this format: @@ -89,9 +91,9 @@ def printOutages(log_path: str = 'log.txt'): # cluster fails to outages # we consider the time of subsequent failed connection attemts to the same ip an outage - outages = [] + outages: list[list[LogEntry]] = [] last_failed = False - for _, entries in log_by_ip.items(): + for entries in log_by_ip.values(): for entry in entries: if entry.failed and last_failed: outages[-1].append(entry) @@ -118,18 +120,18 @@ def printOutages(log_path: str = 'log.txt'): def getSanitizedArguments() -> dict: """Read and sanitize command line arguments""" - cli_options = {} + cli_options: dict[str, Any] = {} # get command-line parameters and arguments - if len(argv) != 1: - buffer = None + if len(argv) > 1: + buffer = '' # loop through the arguments and create a dictionary where # elements starting with a '--' are considered keys # and the elements folowwing items - for arg in argv[1::]: # remove the first argument (filepath) - if arg[:2] == '--': - buffer = arg[2:] + for arg in argv[1:]: # remove the first argument (filepath) + if arg.startswith('--'): + buffer = arg.removeprefix('--') # ensures that the same parameter can be passed twice and the arguments get joined if not buffer in cli_options.keys(): @@ -139,7 +141,7 @@ def getSanitizedArguments() -> dict: # sanitize args # 'OPTION_NAME': number of permitted arguments - # -1 means any number of arguments + # -1 for any number of arguments permitted_options = { 'help': 0, 'log_path': 1, 'ip': -1, 'port': 1, 'timeout': 1, 'log': 0, 'outages': 0, 'test': 0 } @@ -162,43 +164,45 @@ def getSanitizedArguments() -> dict: ### parameter-specific checks - # LOG_PATH has only one argument at this point so it's safe to make the list into a string + # log_path has only one argument at this point so it's safe to make the list into a string if 'log_path' in cli_options: - cli_options['LOG_PATH'] = cli_options['LOG_PATH'][0] + cli_options['log_path'] = cli_options['log_path'][0] # check if all IPs are of valid format if 'ip' in cli_options: - for ip in cli_options['IP']: - if not isValidIP(ip): + for ip in cli_options['ip']: + if not isValidIP(ip): # TODO: just use a regex for IPv4 print(f"[ERROR] '{ip}' is not a IP address.") printHelp() quit() - # check if PORT is an integer in the valid port range [0-65535] + # check if port is an integer in the valid port range [0-65535] if 'port' in cli_options: try: - cli_options['PORT'] = int(cli_options['PORT'][0]) + cli_options['port'] = int(cli_options['port'][0]) except ValueError: - print(f"[ERROR] 'PORT' expects a value between 0 and 65535. Got '{cli_options['PORT'][0]}'") + print( + f"[ERROR] 'port' expects an integer between 0 and 65535. Got '{cli_options['port'][0]}'") printHelp() quit() - if not (0 <= cli_options['PORT'] and cli_options['PORT'] <= 65535): + if not (0 <= cli_options['port'] and cli_options['port'] <= 65535): print(f"[ERROR] 'PORT' outside of valid range 0 and 65535") printHelp() quit() - # check if TIMEOUT is a valid float larger than 0 + # check if timeout is a valid float larger than 0 if 'timeout' in cli_options: try: - cli_options['TIMEOUT'] = float(cli_options['TIMEOUT'][0]) + cli_options['timeout'] = float(cli_options['timeout'][0]) except ValueError: - print(f"[ERROR] 'TIMEOUT' expects a value between float value. Got '{cli_options['TIMEOUT'][0]}'") + print( + f"[ERROR] 'timeout' expects a value between float value. Got '{cli_options['timeout'][0]}'") printHelp() quit() - if cli_options['TIMEOUT'] <= 0: - print(f"[ERROR] 'TIMEOUT' needs to be larger than 0") + if cli_options['timeout'] <= 0: + print(f"[ERROR] 'timeout' needs to be larger than 0") printHelp() quit() @@ -209,7 +213,8 @@ def getSanitizedArguments() -> dict: return cli_options -def printHelp(): + +def printHelp() -> None: print( """ Utility to determine the networks connection status by pinging IPs and logging the result. @@ -243,7 +248,7 @@ def main(): args = getSanitizedArguments() # set variables based on user input or use the defaults - log_path = args['log_path'] if 'log_path' in args else 'log.txt' + log_path = args['log_path'] if 'log_path' in args else './log.txt' ips = args['ip'] if 'ip' in args else ['1.1.1.1'] port = args['port'] if 'port' in args else 80 timeout = args['timeout'] if 'timeout' in args else 2