type annotations. passes mypy
This commit is contained in:
parent
ff9a154f0b
commit
cf025c271d
3
.gitignore
vendored
3
.gitignore
vendored
@ -1 +1,2 @@
|
||||
.direnv/
|
||||
.direnv/
|
||||
.mypy_cache/
|
@ -7,11 +7,12 @@ from ipaddress import ip_address
|
||||
import socket
|
||||
import time
|
||||
from sys import argv
|
||||
from typing import Any
|
||||
|
||||
class LogEntry:
|
||||
"""Class describing a connection attempt"""
|
||||
|
||||
def __init__(self, log_entry: str):
|
||||
def __init__(self: 'LogEntry', log_entry: str) -> None:
|
||||
self.date = datetime.strptime(log_entry[0:19], '%Y-%m-%d %H:%M:%S')
|
||||
self.ip = log_entry.split('[')[1].split(']')[0]
|
||||
self.failed = 'FAIL' in log_entry
|
||||
@ -29,7 +30,7 @@ def isValidIP(ip: str) -> bool:
|
||||
|
||||
def isReachable(ip: str = '1.1.1.1', port: int = 80, timeout: float = 2) -> bool:
|
||||
"""Return wether an IP is reachable
|
||||
[modified from source: https://gist.github.com/betrcode/0248f0fda894013382d7?permalink_comment_id=4438869#gistcomment-4438869]
|
||||
modified from source: https://gist.github.com/betrcode/0248f0fda894013382d7?permalink_comment_id=4438869#gistcomment-4438869
|
||||
"""
|
||||
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
@ -47,7 +48,7 @@ def isReachable(ip: str = '1.1.1.1', port: int = 80, timeout: float = 2) -> bool
|
||||
return reachable
|
||||
|
||||
|
||||
def logConnectionStatus(log_path: str = 'log.txt', ips: list = ['1.1.1.1'], port: int = 80, timeout: float = 2, dry_run: bool = False) -> list:
|
||||
def logConnectionStatus(log_path: str = './log.txt', ips: list[str] = ['1.1.1.1'], port: int = 80, timeout: float = 2, dry_run: bool = False) -> list[str]:
|
||||
"""Log IP, time and result of a connection check
|
||||
|
||||
Output has this format:
|
||||
@ -61,13 +62,14 @@ def logConnectionStatus(log_path: str = 'log.txt', ips: list = ['1.1.1.1'], port
|
||||
f"{time.strftime('%Y-%m-%d %H:%M:%S')} [{ip}] [{'OK' if isReachable(ip) else 'FAIL'}]" for ip in ips]
|
||||
|
||||
if not dry_run:
|
||||
# TODO: move this into a log function which is passed as an argument
|
||||
with open(log_path, 'a') as file:
|
||||
file.writelines(log_line + '\n' for log_line in log_lines)
|
||||
|
||||
return log_lines
|
||||
|
||||
|
||||
def printOutages(log_path: str = 'log.txt'):
|
||||
def printOutages(log_path: str = './log.txt') -> None:
|
||||
"""Prints out the Outages with duration and ip sorted by starttime
|
||||
|
||||
Output has this format:
|
||||
@ -89,9 +91,9 @@ def printOutages(log_path: str = 'log.txt'):
|
||||
|
||||
# cluster fails to outages
|
||||
# we consider the time of subsequent failed connection attemts to the same ip an outage
|
||||
outages = []
|
||||
outages: list[list[LogEntry]] = []
|
||||
last_failed = False
|
||||
for _, entries in log_by_ip.items():
|
||||
for entries in log_by_ip.values():
|
||||
for entry in entries:
|
||||
if entry.failed and last_failed:
|
||||
outages[-1].append(entry)
|
||||
@ -118,18 +120,18 @@ def printOutages(log_path: str = 'log.txt'):
|
||||
def getSanitizedArguments() -> dict:
|
||||
"""Read and sanitize command line arguments"""
|
||||
|
||||
cli_options = {}
|
||||
cli_options: dict[str, Any] = {}
|
||||
|
||||
# get command-line parameters and arguments
|
||||
if len(argv) != 1:
|
||||
buffer = None
|
||||
if len(argv) > 1:
|
||||
buffer = ''
|
||||
|
||||
# loop through the arguments and create a dictionary where
|
||||
# elements starting with a '--' are considered keys
|
||||
# and the elements folowwing items
|
||||
for arg in argv[1::]: # remove the first argument (filepath)
|
||||
if arg[:2] == '--':
|
||||
buffer = arg[2:]
|
||||
for arg in argv[1:]: # remove the first argument (filepath)
|
||||
if arg.startswith('--'):
|
||||
buffer = arg.removeprefix('--')
|
||||
|
||||
# ensures that the same parameter can be passed twice and the arguments get joined
|
||||
if not buffer in cli_options.keys():
|
||||
@ -139,7 +141,7 @@ def getSanitizedArguments() -> dict:
|
||||
|
||||
# sanitize args
|
||||
# 'OPTION_NAME': number of permitted arguments
|
||||
# -1 means any number of arguments
|
||||
# -1 for any number of arguments
|
||||
permitted_options = {
|
||||
'help': 0, 'log_path': 1, 'ip': -1, 'port': 1, 'timeout': 1, 'log': 0, 'outages': 0, 'test': 0
|
||||
}
|
||||
@ -162,43 +164,45 @@ def getSanitizedArguments() -> dict:
|
||||
|
||||
### parameter-specific checks
|
||||
|
||||
# LOG_PATH has only one argument at this point so it's safe to make the list into a string
|
||||
# log_path has only one argument at this point so it's safe to make the list into a string
|
||||
if 'log_path' in cli_options:
|
||||
cli_options['LOG_PATH'] = cli_options['LOG_PATH'][0]
|
||||
cli_options['log_path'] = cli_options['log_path'][0]
|
||||
|
||||
# check if all IPs are of valid format
|
||||
if 'ip' in cli_options:
|
||||
for ip in cli_options['IP']:
|
||||
if not isValidIP(ip):
|
||||
for ip in cli_options['ip']:
|
||||
if not isValidIP(ip): # TODO: just use a regex for IPv4
|
||||
print(f"[ERROR] '{ip}' is not a IP address.")
|
||||
printHelp()
|
||||
quit()
|
||||
|
||||
# check if PORT is an integer in the valid port range [0-65535]
|
||||
# check if port is an integer in the valid port range [0-65535]
|
||||
if 'port' in cli_options:
|
||||
try:
|
||||
cli_options['PORT'] = int(cli_options['PORT'][0])
|
||||
cli_options['port'] = int(cli_options['port'][0])
|
||||
except ValueError:
|
||||
print(f"[ERROR] 'PORT' expects a value between 0 and 65535. Got '{cli_options['PORT'][0]}'")
|
||||
print(
|
||||
f"[ERROR] 'port' expects an integer between 0 and 65535. Got '{cli_options['port'][0]}'")
|
||||
printHelp()
|
||||
quit()
|
||||
|
||||
if not (0 <= cli_options['PORT'] and cli_options['PORT'] <= 65535):
|
||||
if not (0 <= cli_options['port'] and cli_options['port'] <= 65535):
|
||||
print(f"[ERROR] 'PORT' outside of valid range 0 and 65535")
|
||||
printHelp()
|
||||
quit()
|
||||
|
||||
# check if TIMEOUT is a valid float larger than 0
|
||||
# check if timeout is a valid float larger than 0
|
||||
if 'timeout' in cli_options:
|
||||
try:
|
||||
cli_options['TIMEOUT'] = float(cli_options['TIMEOUT'][0])
|
||||
cli_options['timeout'] = float(cli_options['timeout'][0])
|
||||
except ValueError:
|
||||
print(f"[ERROR] 'TIMEOUT' expects a value between float value. Got '{cli_options['TIMEOUT'][0]}'")
|
||||
print(
|
||||
f"[ERROR] 'timeout' expects a value between float value. Got '{cli_options['timeout'][0]}'")
|
||||
printHelp()
|
||||
quit()
|
||||
|
||||
if cli_options['TIMEOUT'] <= 0:
|
||||
print(f"[ERROR] 'TIMEOUT' needs to be larger than 0")
|
||||
if cli_options['timeout'] <= 0:
|
||||
print(f"[ERROR] 'timeout' needs to be larger than 0")
|
||||
printHelp()
|
||||
quit()
|
||||
|
||||
@ -209,7 +213,8 @@ def getSanitizedArguments() -> dict:
|
||||
|
||||
return cli_options
|
||||
|
||||
def printHelp():
|
||||
|
||||
def printHelp() -> None:
|
||||
print(
|
||||
"""
|
||||
Utility to determine the networks connection status by pinging IPs and logging the result.
|
||||
@ -243,7 +248,7 @@ def main():
|
||||
args = getSanitizedArguments()
|
||||
|
||||
# set variables based on user input or use the defaults
|
||||
log_path = args['log_path'] if 'log_path' in args else 'log.txt'
|
||||
log_path = args['log_path'] if 'log_path' in args else './log.txt'
|
||||
ips = args['ip'] if 'ip' in args else ['1.1.1.1']
|
||||
port = args['port'] if 'port' in args else 80
|
||||
timeout = args['timeout'] if 'timeout' in args else 2
|
||||
|
Loading…
Reference in New Issue
Block a user