renamed source directory

This commit is contained in:
2023-10-13 15:20:28 +02:00
parent 994481e7fe
commit 113ec3130c
4 changed files with 5 additions and 3 deletions

0
src/__init__.py Normal file
View File

211
src/main.py Normal file
View File

@ -0,0 +1,211 @@
import sys
import argparse
from datetime import datetime, timedelta
import requests
class LogEntry:
'''Class describing a connection attempt.'''
def __init__(self: 'LogEntry', log_entry: str) -> None:
self.date = datetime.strptime(log_entry[1:20], '%Y-%m-%d %H:%M:%S')
self.host = log_entry.split('[')[2].split(']')[0]
self.failed = 'FAIL' in log_entry
def __repr__(self: 'LogEntry') -> str:
return f"[{self.date}][{self.host}][{'OK' if not self.failed else 'FAIL'}]"
class Outage:
'''
Class describing an outage.
`self.start` The start of the outage represented by a LogEntry object.
`self.end` The end of an outage represented by a LogEntry object.
Could be None to represent that the outage is still ongoing.
'''
def __init__(self, start: LogEntry, end: None | LogEntry = None) -> None:
'''
Initialize an outage.
`start` The start of an outage represented by a LogEntry object.
`end` The end of an outage represented by a LogEntry object.
By default it is none to represent an ongoing outage.
'''
self.start = start
self.end = end
def duration(self) -> timedelta:
'''
Return the duration of the outage as datetime object.
If the outage does not have an end, the time from the start to now will be returned.
'''
if self.end is None:
return datetime.now() - self.start.date
else:
return self.end.date - self.start.date
def printOutages(filepath: str, time: int) -> None:
'''
Print outages from a log file
`filepath` Path to a file with connection logs.
`time` Minimum duration for a connection loss to be considered an outage.
'''
# Get a sorted list of log entries.
with open(filepath, 'r') as file:
log = sorted(
map(lambda x: LogEntry(x), file.readlines()),
key=lambda x: x.date
)
# Cluster log entries by host.
log_by_host = {}
for entry in log:
if entry.host not in log_by_host:
log_by_host[entry.host] = [entry]
else:
log_by_host[entry.host].append(entry)
# Cluster fails to outages.
# We consider the time of subsequent failed connection attempts to the same host an outage.
outages: list = []
last_failed = False
for _, entries in sorted(log_by_host.items()): # sort by hostname
for entry in entries:
# Start a new outage.
if entry.failed and not last_failed:
outages.append(Outage(entry))
last_failed = True
# End an outage.
elif not entry.failed and last_failed:
outages[-1].end = entry
last_failed = False
# Print the outages.
for outage in outages:
# Get duration of outage in hours and minutes.
outage_duration = round(outage.duration().seconds / 60)
# Skip printing if outage is shorter than the minimum duration.
if outage_duration < time:
continue
# Get hours and minutes for printing.
hours = outage_duration // 60
minutes = outage_duration - (hours * 60)
host = outage.start.host
# Outputs outages in the form of "Outage at: 2023-19-01 06:29:01 lasting for 2 Hours and 39 Minutes".
print(f"[{outage.start.date}][{host}] lasting for {'{} Hours and '.format(hours) if hours >= 1 else ''}{minutes} Minutes")
def isOnline(host: str, timeout: int) -> bool:
'''
Check if connection to a host can be established.
`host` IP or hostname of a server to query.
`timeout` Set the timeout in seconds to use for the connection test.
'''
try:
requests.head(f"https://{host}", timeout=timeout)
return True
except requests.ConnectionError:
return False
def log(host: str, timeout: int, filepath: None | str = None) -> None:
'''
Log the connection status of a host.
`host` IP or hostname of a server to query.
`timeout` Set the timeout in seconds to use for the connection test.
`filepath` Path to a the file where results should be appended to. Creates the file if it does not exist. If not specified, results are printed to stdout.
'''
logline = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}][{host}][{'OK' if isOnline(host, timeout) else 'FAIL'}]"
if filepath is not None:
with open(filepath, 'a') as file:
file.write(logline + '\n')
else:
print(logline)
def parseArgs(args: list[str]) -> argparse.Namespace:
'''
Parse command line arguments.
`args` List of user-supplied command line arguments.
'''
parser = argparse.ArgumentParser(
prog='outage_detector', description='Log outages and print statistics.'
)
subparsers = parser.add_subparsers(dest='command')
# Arguments for the log command.
parser_log = subparsers.add_parser(
'log', help='Log the connection status.'
)
parser_log.add_argument(
'filename', type=str, default=None, nargs='?',
help='Path to a the file where results should be appended to. Creates the file if it does not exist. If not specified, results are printed to stdout.'
)
parser_log.add_argument(
'--host', type=str, default='1.1.1.1',
help='IP or hostname of a server to query. (default: %(default)s)'
)
parser_log.add_argument(
'--timeout', type=int, default=2,
help='Set the timeout in seconds to use for the connection test. (default: %(default)s)'
)
# Arguments for the outages command.
parser_outages = subparsers.add_parser(
'outages', help='Print/log outages'
)
parser_outages.add_argument(
'filename', type=str, nargs='?',
help='Path to a file with connection logs.'
)
parser_outages.add_argument(
'--time', type=int, default=3,
help='Minimum duration for a connection loss to be considered an outage. (default: %(default)s)'
)
return parser.parse_args(args)
def main() -> None:
args = parseArgs(sys.argv[1:])
if args.command == 'log':
log(args.host, args.timeout, args.filename)
elif args.command == 'outages':
printOutages(args.filename, args.time)
if __name__ == '__main__':
main()