Compare commits

...

2 Commits

Author SHA1 Message Date
8d070a059c
reworked comments and formatting 2023-10-04 18:34:53 +02:00
115420d368
documentation 2023-10-04 17:56:44 +02:00

View File

@ -6,61 +6,91 @@ import requests
class LogEntry:
"""Class describing a connection attempt"""
'''Class describing a connection attempt.'''
def __init__(self: 'LogEntry', log_entry: str) -> None:
self.date = datetime.strptime(log_entry[1:20], '%Y-%m-%d %H:%M:%S')
self.host = log_entry.split('[')[2].split(']')[0]
self.failed = 'FAIL' in log_entry
def __repr__(self: 'LogEntry') -> str:
return f"[{self.date}][{self.host}][{'OK' if not self.failed else 'FAIL'}]"
def printOutages(filepath: str, time: int) -> None:
"""Print outages from a log file"""
'''
Print outages from a log file
`filepath` Path to a file with connection logs.
`time` Minimum duration for a connection loss to be considered an outage.
'''
# Get a sorted list of log entries.
with open(filepath, 'r') as file:
log = list(map(lambda x: LogEntry(x), file.readlines()))
log = sorted(
map(lambda x: LogEntry(x), file.readlines()),
key=lambda x: x.date
)
# cluster log entries by host
# Cluster log entries by host.
log_by_host = {}
for entry in log:
if entry.host not in log_by_host:
log_by_host[entry.host] = [entry]
else:
log_by_host[entry.host].append(entry)
# cluster fails to outages
# we consider the time of subsequent failed connection attempts to the same host an outage
# Cluster fails to outages.
# We consider the time of subsequent failed connection attempts to the same host an outage.
outages: list = []
last_failed = False
for entries in log_by_host.values():
for entry in entries:
# Continue an outage.
if entry.failed and last_failed:
outages[-1].append(entry)
if entry.failed and not last_failed:
# Start a new outage.
elif entry.failed and not last_failed:
outages.append([entry])
last_failed = True
if not entry.failed and last_failed:
# End an outage.
elif not entry.failed and last_failed:
last_failed = False
# print the outages by date of first fail
for outage in sorted(outages, key=lambda x: x[0].date):
# Print the outages.
for outage in outages:
# get duration of outage in hours and minutes
# Get duration of outage in hours and minutes.
outage_duration = round(
(outage[-1].date - outage[0].date).seconds / 60)
# continue if outage is shorter than the minimum duration
(outage[-1].date - outage[0].date).seconds / 60
)
# Skip printing if outage is shorter than the minimum duration.
if outage_duration < time:
continue
# Get hours and minutes for printing.
hours = outage_duration // 60
minutes = outage_duration - (hours * 60)
host = outage[0].host
# Outputs outages in the form of "Outage at: 2023-19-01 06:29:01 lasting for 2 Hours and 39 Minutes"
# Outputs outages in the form of "Outage at: 2023-19-01 06:29:01 lasting for 2 Hours and 39 Minutes".
print(f"[{outage[0].date}][{host}] lasting for {'{} Hours and '.format(hours) if hours >= 1 else ''}{minutes} Minutes")
def isOnline(host: str, timeout: int) -> bool:
"""Check if connection to a host can be established"""
'''
Check if connection to a host can be established.
`host` IP or hostname of a server to query.
`timeout` Set the timeout in seconds to use for the connection test.
'''
try:
requests.head(f"https://{host}", timeout=timeout)
@ -70,7 +100,15 @@ def isOnline(host: str, timeout: int) -> bool:
def log(host: str, timeout: int, filepath: None | str = None) -> None:
"""Log the connection status of a host"""
'''
Log the connection status of a host.
`host` IP or hostname of a server to query.
`timeout` Set the timeout in seconds to use for the connection test.
`filepath` Path to a the file where results should be appended to. Creates the file if it does not exist. If not specified, results are printed to stdout.
'''
logline = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}][{host}][{'OK' if isOnline(host, timeout) else 'FAIL'}]"
@ -82,6 +120,11 @@ def log(host: str, timeout: int, filepath: None | str = None) -> None:
def parseArgs(args: list[str]) -> argparse.Namespace:
'''
Parse command line arguments.
`args` List of user-supplied command line arguments.
'''
parser = argparse.ArgumentParser(
prog='outage_detector', description='Log outages and print statistics.'
@ -89,7 +132,7 @@ def parseArgs(args: list[str]) -> argparse.Namespace:
subparsers = parser.add_subparsers(dest='command')
# Arguments for the log command
# Arguments for the log command.
parser_log = subparsers.add_parser(
'log', help='Log the connection status.'
)
@ -106,7 +149,7 @@ def parseArgs(args: list[str]) -> argparse.Namespace:
help='Set the timeout in seconds to use for the connection test. (default: %(default)s)'
)
# Arguments for the outages command
# Arguments for the outages command.
parser_outages = subparsers.add_parser(
'outages', help='Print/log outages'
)
@ -122,7 +165,7 @@ def parseArgs(args: list[str]) -> argparse.Namespace:
return parser.parse_args(args)
def main():
def main() -> None:
args = parseArgs(sys.argv[1:])