changed names and formatted
This commit is contained in:
parent
f96b809e78
commit
ff9a154f0b
@ -2,44 +2,51 @@
|
||||
# Utility to determine the networks connection status by pinging IPs and logging the result
|
||||
# It can also output a list of outages
|
||||
|
||||
from datetime import datetime
|
||||
from ipaddress import ip_address
|
||||
import socket
|
||||
import time
|
||||
from sys import argv
|
||||
|
||||
class LogEntry:
|
||||
"""Class describing a connection attempt"""
|
||||
def __init__(self, log_entry: str):
|
||||
from datetime import datetime
|
||||
|
||||
def __init__(self, log_entry: str):
|
||||
self.date = datetime.strptime(log_entry[0:19], '%Y-%m-%d %H:%M:%S')
|
||||
self.ip = log_entry.split('[')[1].split(']')[0]
|
||||
self.failed = 'FAIL' in log_entry
|
||||
|
||||
|
||||
def isValidIP(ip: str) -> bool:
|
||||
"""Check if a string is a valid IP"""
|
||||
|
||||
from ipaddress import ip_address
|
||||
|
||||
try:
|
||||
ip_address(ip)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def isReachable(ip: str = '1.1.1.1', port: int = 80, timeout: float = 2) -> bool:
|
||||
"""Return wether an IP is reachable
|
||||
[modified from source: https://gist.github.com/betrcode/0248f0fda894013382d7?permalink_comment_id=4438869#gistcomment-4438869]
|
||||
"""
|
||||
import socket
|
||||
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.settimeout(timeout)
|
||||
|
||||
try:
|
||||
reachable = s.connect_ex((ip, int(port))) == 0 # True if open, False if not
|
||||
if reachable: s.shutdown(socket.SHUT_RDWR)
|
||||
# True if open, False if not
|
||||
reachable = s.connect_ex((ip, int(port))) == 0
|
||||
if reachable:
|
||||
s.shutdown(socket.SHUT_RDWR)
|
||||
except Exception:
|
||||
reachable = False
|
||||
s.close()
|
||||
|
||||
return reachable
|
||||
|
||||
|
||||
def logConnectionStatus(log_path: str = 'log.txt', ips: list = ['1.1.1.1'], port: int = 80, timeout: float = 2, dry_run: bool = False) -> list:
|
||||
"""Log IP, time and result of a connection check
|
||||
|
||||
@ -50,9 +57,8 @@ def logConnectionStatus(log_path: str = 'log.txt', ips: list = ['1.1.1.1'], port
|
||||
2023-01-19 07:05:02 [1.1.1.1] [OK]
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
log_lines = [ f"{time.strftime('%Y-%m-%d %H:%M:%S')} [{ip}] [{'OK' if isReachable(ip) else 'FAIL'}]" for ip in ips ]
|
||||
log_lines = [
|
||||
f"{time.strftime('%Y-%m-%d %H:%M:%S')} [{ip}] [{'OK' if isReachable(ip) else 'FAIL'}]" for ip in ips]
|
||||
|
||||
if not dry_run:
|
||||
with open(log_path, 'a') as file:
|
||||
@ -60,6 +66,7 @@ def logConnectionStatus(log_path: str = 'log.txt', ips: list = ['1.1.1.1'], port
|
||||
|
||||
return log_lines
|
||||
|
||||
|
||||
def printOutages(log_path: str = 'log.txt'):
|
||||
"""Prints out the Outages with duration and ip sorted by starttime
|
||||
|
||||
@ -82,7 +89,8 @@ def printOutages(log_path: str = 'log.txt'):
|
||||
|
||||
# cluster fails to outages
|
||||
# we consider the time of subsequent failed connection attemts to the same ip an outage
|
||||
outages = []; last_failed = False
|
||||
outages = []
|
||||
last_failed = False
|
||||
for _, entries in log_by_ip.items():
|
||||
for entry in entries:
|
||||
if entry.failed and last_failed:
|
||||
@ -94,10 +102,11 @@ def printOutages(log_path: str = 'log.txt'):
|
||||
last_failed = False
|
||||
|
||||
# print the outages by date of first fail
|
||||
for outage in sorted(outages, key = lambda x: x[0].date):
|
||||
for outage in sorted(outages, key=lambda x: x[0].date):
|
||||
|
||||
# get duration of outage in hours and minutes
|
||||
outage_duration = round((outage[-1].date - outage[0].date).seconds / 60)
|
||||
outage_duration = round(
|
||||
(outage[-1].date - outage[0].date).seconds / 60)
|
||||
hours = outage_duration // 60
|
||||
minutes = outage_duration - (hours * 60)
|
||||
ip = outage[0].ip
|
||||
@ -105,11 +114,10 @@ def printOutages(log_path: str = 'log.txt'):
|
||||
# Outputs outages in the form of "Outage at: 2023-19-01 06:29:01 lasting for 2 Hours and 39 Minutes"
|
||||
print(f"[{ip}] {outage[0].date} lasting for {'{} Hours and '.format(hours) if hours >= 1 else ''}{minutes} Minutes")
|
||||
|
||||
|
||||
def getSanitizedArguments() -> dict:
|
||||
"""Read and sanitize command line arguments"""
|
||||
|
||||
from sys import argv
|
||||
|
||||
cli_options = {}
|
||||
|
||||
# get command-line parameters and arguments
|
||||
@ -133,7 +141,7 @@ def getSanitizedArguments() -> dict:
|
||||
# 'OPTION_NAME': number of permitted arguments
|
||||
# -1 means any number of arguments
|
||||
permitted_options = {
|
||||
'HELP': 0, 'LOG_PATH': 1, 'IP': -1, 'PORT': 1, 'TIMEOUT': 1, 'LOG': 0, 'OUTAGES': 0, 'TEST': 0
|
||||
'help': 0, 'log_path': 1, 'ip': -1, 'port': 1, 'timeout': 1, 'log': 0, 'outages': 0, 'test': 0
|
||||
}
|
||||
|
||||
# check if non supported parameters have been passed
|
||||
@ -155,11 +163,11 @@ def getSanitizedArguments() -> dict:
|
||||
### parameter-specific checks
|
||||
|
||||
# LOG_PATH has only one argument at this point so it's safe to make the list into a string
|
||||
if 'LOG_PATH' in cli_options:
|
||||
if 'log_path' in cli_options:
|
||||
cli_options['LOG_PATH'] = cli_options['LOG_PATH'][0]
|
||||
|
||||
# check if all IPs are of valid format
|
||||
if 'IP' in cli_options:
|
||||
if 'ip' in cli_options:
|
||||
for ip in cli_options['IP']:
|
||||
if not isValidIP(ip):
|
||||
print(f"[ERROR] '{ip}' is not a IP address.")
|
||||
@ -167,7 +175,7 @@ def getSanitizedArguments() -> dict:
|
||||
quit()
|
||||
|
||||
# check if PORT is an integer in the valid port range [0-65535]
|
||||
if 'PORT' in cli_options:
|
||||
if 'port' in cli_options:
|
||||
try:
|
||||
cli_options['PORT'] = int(cli_options['PORT'][0])
|
||||
except ValueError:
|
||||
@ -181,7 +189,7 @@ def getSanitizedArguments() -> dict:
|
||||
quit()
|
||||
|
||||
# check if TIMEOUT is a valid float larger than 0
|
||||
if 'TIMEOUT' in cli_options:
|
||||
if 'timeout' in cli_options:
|
||||
try:
|
||||
cli_options['TIMEOUT'] = float(cli_options['TIMEOUT'][0])
|
||||
except ValueError:
|
||||
@ -195,7 +203,7 @@ def getSanitizedArguments() -> dict:
|
||||
quit()
|
||||
|
||||
# check if HELP is has been passed
|
||||
if 'HELP' in cli_options:
|
||||
if 'help' in cli_options:
|
||||
printHelp()
|
||||
quit()
|
||||
|
||||
@ -206,27 +214,27 @@ def printHelp():
|
||||
"""
|
||||
Utility to determine the networks connection status by pinging IPs and logging the result.
|
||||
|
||||
--LOG_PATH path to a the file where test results should be logged
|
||||
--log_path path to a the file where test results should be logged
|
||||
Default: log.txt
|
||||
|
||||
--IP space separated list of IPs to ping and test
|
||||
--ip space separated list of IPs to ping and test
|
||||
Default: 1.1.1.1
|
||||
|
||||
--PORT set the port to use for the conneciton test
|
||||
--port set the port to use for the conneciton test
|
||||
Default: 80
|
||||
|
||||
--TIMEOUT set the timeout in seconds to use for the connection test
|
||||
--timeout set the timeout in seconds to use for the connection test
|
||||
Default: 2
|
||||
|
||||
--TEST test connection and return result to terminal
|
||||
--test test connection and return result to terminal
|
||||
use with --LOG to also log the result in a file
|
||||
|
||||
--LOG test connection and log to --LOG_PATH
|
||||
--log test connection and log to --LOG_PATH
|
||||
use with --TEST to also print the reult to the terminal
|
||||
|
||||
--OUTAGES print the dates and lenghts of outages
|
||||
--outages print the dates and lenghts of outages
|
||||
|
||||
--HELP print this menu
|
||||
--help print this menu
|
||||
"""
|
||||
)
|
||||
|
||||
@ -235,22 +243,22 @@ def main():
|
||||
args = getSanitizedArguments()
|
||||
|
||||
# set variables based on user input or use the defaults
|
||||
log_path = args['LOG_PATH'] if 'LOG_PATH' in args else 'log.txt'
|
||||
ips = args['IP'] if 'IP' in args else ['1.1.1.1']
|
||||
port = args['PORT'] if 'PORT' in args else 80
|
||||
timeout = args['TIMEOUT'] if 'TIMEOUT' in args else 2
|
||||
log_path = args['log_path'] if 'log_path' in args else 'log.txt'
|
||||
ips = args['ip'] if 'ip' in args else ['1.1.1.1']
|
||||
port = args['port'] if 'port' in args else 80
|
||||
timeout = args['timeout'] if 'timeout' in args else 2
|
||||
|
||||
# perform functionality based on user input
|
||||
if 'OUTAGES' in args:
|
||||
if 'outages' in args:
|
||||
printOutages(log_path)
|
||||
|
||||
if 'LOG' in args and 'TEST' in args:
|
||||
if 'log' in args and 'test' in args:
|
||||
for log_line in logConnectionStatus(log_path = log_path, ips = ips, port = port, timeout = timeout): print(log_line)
|
||||
|
||||
if 'LOG' in args and 'TEST' not in args:
|
||||
if 'log' in args and 'test' not in args:
|
||||
logConnectionStatus(log_path = log_path, ips = ips, port = port, timeout = timeout)
|
||||
|
||||
if 'TEST' in args and 'LOG' not in args:
|
||||
if 'test' in args and 'log' not in args:
|
||||
for log_line in logConnectionStatus(ips = ips, port = port, timeout = timeout, dry_run = True): print(log_line)
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user