From ff9a154f0b9f29da04c3abcea85b906734d9fc57 Mon Sep 17 00:00:00 2001 From: Kristian Krsnik Date: Tue, 15 Aug 2023 23:27:29 +0200 Subject: [PATCH] changed names and formatted --- outage_detector/main.py | 82 ++++++++++++++++++++++------------------- 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/outage_detector/main.py b/outage_detector/main.py index f3c093f..87d3ca4 100755 --- a/outage_detector/main.py +++ b/outage_detector/main.py @@ -2,44 +2,51 @@ # Utility to determine the networks connection status by pinging IPs and logging the result # It can also output a list of outages +from datetime import datetime +from ipaddress import ip_address +import socket +import time +from sys import argv + class LogEntry: """Class describing a connection attempt""" - def __init__(self, log_entry: str): - from datetime import datetime + def __init__(self, log_entry: str): self.date = datetime.strptime(log_entry[0:19], '%Y-%m-%d %H:%M:%S') self.ip = log_entry.split('[')[1].split(']')[0] self.failed = 'FAIL' in log_entry + def isValidIP(ip: str) -> bool: """Check if a string is a valid IP""" - from ipaddress import ip_address - try: ip_address(ip) return True except ValueError: return False + def isReachable(ip: str = '1.1.1.1', port: int = 80, timeout: float = 2) -> bool: """Return wether an IP is reachable [modified from source: https://gist.github.com/betrcode/0248f0fda894013382d7?permalink_comment_id=4438869#gistcomment-4438869] """ - import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) try: - reachable = s.connect_ex((ip, int(port))) == 0 # True if open, False if not - if reachable: s.shutdown(socket.SHUT_RDWR) + # True if open, False if not + reachable = s.connect_ex((ip, int(port))) == 0 + if reachable: + s.shutdown(socket.SHUT_RDWR) except Exception: reachable = False s.close() return reachable + def logConnectionStatus(log_path: str = 'log.txt', ips: list = ['1.1.1.1'], port: int = 80, timeout: float = 2, dry_run: bool = False) -> list: """Log IP, time and result of a connection check @@ -50,9 +57,8 @@ def logConnectionStatus(log_path: str = 'log.txt', ips: list = ['1.1.1.1'], port 2023-01-19 07:05:02 [1.1.1.1] [OK] """ - import time - - log_lines = [ f"{time.strftime('%Y-%m-%d %H:%M:%S')} [{ip}] [{'OK' if isReachable(ip) else 'FAIL'}]" for ip in ips ] + log_lines = [ + f"{time.strftime('%Y-%m-%d %H:%M:%S')} [{ip}] [{'OK' if isReachable(ip) else 'FAIL'}]" for ip in ips] if not dry_run: with open(log_path, 'a') as file: @@ -60,6 +66,7 @@ def logConnectionStatus(log_path: str = 'log.txt', ips: list = ['1.1.1.1'], port return log_lines + def printOutages(log_path: str = 'log.txt'): """Prints out the Outages with duration and ip sorted by starttime @@ -82,7 +89,8 @@ def printOutages(log_path: str = 'log.txt'): # cluster fails to outages # we consider the time of subsequent failed connection attemts to the same ip an outage - outages = []; last_failed = False + outages = [] + last_failed = False for _, entries in log_by_ip.items(): for entry in entries: if entry.failed and last_failed: @@ -94,10 +102,11 @@ def printOutages(log_path: str = 'log.txt'): last_failed = False # print the outages by date of first fail - for outage in sorted(outages, key = lambda x: x[0].date): + for outage in sorted(outages, key=lambda x: x[0].date): # get duration of outage in hours and minutes - outage_duration = round((outage[-1].date - outage[0].date).seconds / 60) + outage_duration = round( + (outage[-1].date - outage[0].date).seconds / 60) hours = outage_duration // 60 minutes = outage_duration - (hours * 60) ip = outage[0].ip @@ -105,11 +114,10 @@ def printOutages(log_path: str = 'log.txt'): # Outputs outages in the form of "Outage at: 2023-19-01 06:29:01 lasting for 2 Hours and 39 Minutes" print(f"[{ip}] {outage[0].date} lasting for {'{} Hours and '.format(hours) if hours >= 1 else ''}{minutes} Minutes") + def getSanitizedArguments() -> dict: """Read and sanitize command line arguments""" - from sys import argv - cli_options = {} # get command-line parameters and arguments @@ -133,7 +141,7 @@ def getSanitizedArguments() -> dict: # 'OPTION_NAME': number of permitted arguments # -1 means any number of arguments permitted_options = { - 'HELP': 0, 'LOG_PATH': 1, 'IP': -1, 'PORT': 1, 'TIMEOUT': 1, 'LOG': 0, 'OUTAGES': 0, 'TEST': 0 + 'help': 0, 'log_path': 1, 'ip': -1, 'port': 1, 'timeout': 1, 'log': 0, 'outages': 0, 'test': 0 } # check if non supported parameters have been passed @@ -155,11 +163,11 @@ def getSanitizedArguments() -> dict: ### parameter-specific checks # LOG_PATH has only one argument at this point so it's safe to make the list into a string - if 'LOG_PATH' in cli_options: + if 'log_path' in cli_options: cli_options['LOG_PATH'] = cli_options['LOG_PATH'][0] # check if all IPs are of valid format - if 'IP' in cli_options: + if 'ip' in cli_options: for ip in cli_options['IP']: if not isValidIP(ip): print(f"[ERROR] '{ip}' is not a IP address.") @@ -167,7 +175,7 @@ def getSanitizedArguments() -> dict: quit() # check if PORT is an integer in the valid port range [0-65535] - if 'PORT' in cli_options: + if 'port' in cli_options: try: cli_options['PORT'] = int(cli_options['PORT'][0]) except ValueError: @@ -181,7 +189,7 @@ def getSanitizedArguments() -> dict: quit() # check if TIMEOUT is a valid float larger than 0 - if 'TIMEOUT' in cli_options: + if 'timeout' in cli_options: try: cli_options['TIMEOUT'] = float(cli_options['TIMEOUT'][0]) except ValueError: @@ -195,7 +203,7 @@ def getSanitizedArguments() -> dict: quit() # check if HELP is has been passed - if 'HELP' in cli_options: + if 'help' in cli_options: printHelp() quit() @@ -206,27 +214,27 @@ def printHelp(): """ Utility to determine the networks connection status by pinging IPs and logging the result. - --LOG_PATH path to a the file where test results should be logged + --log_path path to a the file where test results should be logged Default: log.txt - --IP space separated list of IPs to ping and test + --ip space separated list of IPs to ping and test Default: 1.1.1.1 - --PORT set the port to use for the conneciton test + --port set the port to use for the conneciton test Default: 80 - --TIMEOUT set the timeout in seconds to use for the connection test + --timeout set the timeout in seconds to use for the connection test Default: 2 - --TEST test connection and return result to terminal + --test test connection and return result to terminal use with --LOG to also log the result in a file - --LOG test connection and log to --LOG_PATH + --log test connection and log to --LOG_PATH use with --TEST to also print the reult to the terminal - --OUTAGES print the dates and lenghts of outages + --outages print the dates and lenghts of outages - --HELP print this menu + --help print this menu """ ) @@ -235,22 +243,22 @@ def main(): args = getSanitizedArguments() # set variables based on user input or use the defaults - log_path = args['LOG_PATH'] if 'LOG_PATH' in args else 'log.txt' - ips = args['IP'] if 'IP' in args else ['1.1.1.1'] - port = args['PORT'] if 'PORT' in args else 80 - timeout = args['TIMEOUT'] if 'TIMEOUT' in args else 2 + log_path = args['log_path'] if 'log_path' in args else 'log.txt' + ips = args['ip'] if 'ip' in args else ['1.1.1.1'] + port = args['port'] if 'port' in args else 80 + timeout = args['timeout'] if 'timeout' in args else 2 # perform functionality based on user input - if 'OUTAGES' in args: + if 'outages' in args: printOutages(log_path) - if 'LOG' in args and 'TEST' in args: + if 'log' in args and 'test' in args: for log_line in logConnectionStatus(log_path = log_path, ips = ips, port = port, timeout = timeout): print(log_line) - if 'LOG' in args and 'TEST' not in args: + if 'log' in args and 'test' not in args: logConnectionStatus(log_path = log_path, ips = ips, port = port, timeout = timeout) - if 'TEST' in args and 'LOG' not in args: + if 'test' in args and 'log' not in args: for log_line in logConnectionStatus(ips = ips, port = port, timeout = timeout, dry_run = True): print(log_line)