initial commit
This commit is contained in:
commit
d222fda156
|
@ -0,0 +1 @@
|
||||||
|
.direnv/
|
|
@ -0,0 +1,27 @@
|
||||||
|
{
|
||||||
|
"nodes": {
|
||||||
|
"nixpkgs": {
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1692025715,
|
||||||
|
"narHash": "sha256-tsRiiopGT7HA8d/cuk5xYBRXgdnnvD+JhUGUe3x7vmY=",
|
||||||
|
"owner": "nixos",
|
||||||
|
"repo": "nixpkgs",
|
||||||
|
"rev": "09a137528c3aea3780720d19f99cd706f52c3823",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "nixos",
|
||||||
|
"ref": "nixos-23.05",
|
||||||
|
"repo": "nixpkgs",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"root": {
|
||||||
|
"inputs": {
|
||||||
|
"nixpkgs": "nixpkgs"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"root": "root",
|
||||||
|
"version": 7
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
{
|
||||||
|
description = "A very basic flake";
|
||||||
|
|
||||||
|
inputs = {
|
||||||
|
nixpkgs.url = "github:nixos/nixpkgs/nixos-23.05";
|
||||||
|
};
|
||||||
|
|
||||||
|
outputs = {
|
||||||
|
self,
|
||||||
|
nixpkgs,
|
||||||
|
} @ inputs: let
|
||||||
|
supportedSystems = ["x86_64-linux" "x86_64-darwin" "aarch64-linux" "aarch64-darwin"];
|
||||||
|
forAllSystems = nixpkgs.lib.genAttrs supportedSystems;
|
||||||
|
pkgs = forAllSystems (system: nixpkgs.legacyPackages.${system});
|
||||||
|
in {
|
||||||
|
# Formatter
|
||||||
|
formatter = forAllSystems (system: nixpkgs.legacyPackages.${system}.alejandra);
|
||||||
|
|
||||||
|
# Packages
|
||||||
|
packages = forAllSystems (system: {
|
||||||
|
default = pkgs.${system}.poetry2nix.mkPoetryApplication {projectDir = self;};
|
||||||
|
});
|
||||||
|
|
||||||
|
# Dev Shell
|
||||||
|
devShells = forAllSystems (system: {
|
||||||
|
default = pkgs.${system}.mkShellNoCC {
|
||||||
|
packages = with pkgs.${system}; [
|
||||||
|
(poetry2nix.mkPoetryEnv {projectDir = self;})
|
||||||
|
poetry
|
||||||
|
mypy
|
||||||
|
python310Packages.types-requests
|
||||||
|
];
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
# Run as apps
|
||||||
|
apps = forAllSystems (system: {
|
||||||
|
default = {
|
||||||
|
program = "${self.packages.${system}.default}/bin/outage_detector";
|
||||||
|
type = "app";
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
# Home Manager Module
|
||||||
|
# homeManagerModules.default = import ./nix/hm-module.nix self;
|
||||||
|
# NixOS Module
|
||||||
|
# nixosModules.default = import ./nix/module.nix inputs;
|
||||||
|
};
|
||||||
|
}
|
|
@ -0,0 +1,259 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# Utility to determine the networks connection status by pinging IPs and logging the result
|
||||||
|
# It can also output a list of outages
|
||||||
|
|
||||||
|
class LogEntry:
|
||||||
|
"""Class describing a connection attempt"""
|
||||||
|
def __init__(self, log_entry: str):
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
self.date = datetime.strptime(log_entry[0:19], '%Y-%m-%d %H:%M:%S')
|
||||||
|
self.ip = log_entry.split('[')[1].split(']')[0]
|
||||||
|
self.failed = 'FAIL' in log_entry
|
||||||
|
|
||||||
|
def isValidIP(ip: str) -> bool:
|
||||||
|
"""Check if a string is a valid IP"""
|
||||||
|
|
||||||
|
from ipaddress import ip_address
|
||||||
|
|
||||||
|
try:
|
||||||
|
ip_address(ip)
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def isReachable(ip: str = '1.1.1.1', port: int = 80, timeout: float = 2) -> bool:
|
||||||
|
"""Return wether an IP is reachable
|
||||||
|
[modified from source: https://gist.github.com/betrcode/0248f0fda894013382d7?permalink_comment_id=4438869#gistcomment-4438869]
|
||||||
|
"""
|
||||||
|
import socket
|
||||||
|
|
||||||
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
s.settimeout(timeout)
|
||||||
|
|
||||||
|
try:
|
||||||
|
reachable = s.connect_ex((ip, int(port))) == 0 # True if open, False if not
|
||||||
|
if reachable: s.shutdown(socket.SHUT_RDWR)
|
||||||
|
except Exception:
|
||||||
|
reachable = False
|
||||||
|
s.close()
|
||||||
|
|
||||||
|
return reachable
|
||||||
|
|
||||||
|
def logConnectionStatus(log_path: str = 'log.txt', ips: list = ['1.1.1.1'], port: int = 80, timeout: float = 2, dry_run: bool = False) -> list:
|
||||||
|
"""Log IP, time and result of a connection check
|
||||||
|
|
||||||
|
Output has this format:
|
||||||
|
2023-01-19 07:02:01 [1.0.0.1] [FAIL]
|
||||||
|
2023-01-19 07:03:01 [1.1.1.1] [FAIL]
|
||||||
|
2023-01-19 07:04:01 [1.1.1.1] [OK]
|
||||||
|
2023-01-19 07:05:02 [1.1.1.1] [OK]
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
log_lines = [ f"{time.strftime('%Y-%m-%d %H:%M:%S')} [{ip}] [{'OK' if isReachable(ip) else 'FAIL'}]" for ip in ips ]
|
||||||
|
|
||||||
|
if not dry_run:
|
||||||
|
with open(log_path, 'a') as file:
|
||||||
|
file.writelines(log_line + '\n' for log_line in log_lines)
|
||||||
|
|
||||||
|
return log_lines
|
||||||
|
|
||||||
|
def printOutages(log_path: str = 'log.txt'):
|
||||||
|
"""Prints out the Outages with duration and ip sorted by starttime
|
||||||
|
|
||||||
|
Output has this format:
|
||||||
|
[1.1.1.1] 2023-01-19 06:29:01 lasting for 15 Minutes
|
||||||
|
[1.0.0.1] 2023-01-19 06:45:01 lasting for 1 Hours and 19 Minutes
|
||||||
|
[1.1.1.1] 2023-01-19 07:05:01 lasting for 2 Hours and 3 Minutes
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open(log_path, 'r') as file:
|
||||||
|
log = list(map(lambda x: LogEntry(x), file.readlines()))
|
||||||
|
|
||||||
|
# cluster log entries by IP
|
||||||
|
log_by_ip = {}
|
||||||
|
for entry in log:
|
||||||
|
if entry.ip not in log_by_ip:
|
||||||
|
log_by_ip[entry.ip] = [entry]
|
||||||
|
else:
|
||||||
|
log_by_ip[entry.ip].append(entry)
|
||||||
|
|
||||||
|
# cluster fails to outages
|
||||||
|
# we consider the time of subsequent failed connection attemts to the same ip an outage
|
||||||
|
outages = []; last_failed = False
|
||||||
|
for _, entries in log_by_ip.items():
|
||||||
|
for entry in entries:
|
||||||
|
if entry.failed and last_failed:
|
||||||
|
outages[-1].append(entry)
|
||||||
|
if entry.failed and not last_failed:
|
||||||
|
outages.append([entry])
|
||||||
|
last_failed = True
|
||||||
|
if not entry.failed and last_failed:
|
||||||
|
last_failed = False
|
||||||
|
|
||||||
|
# print the outages by date of first fail
|
||||||
|
for outage in sorted(outages, key = lambda x: x[0].date):
|
||||||
|
|
||||||
|
# get duration of outage in hours and minutes
|
||||||
|
outage_duration = round((outage[-1].date - outage[0].date).seconds / 60)
|
||||||
|
hours = outage_duration // 60
|
||||||
|
minutes = outage_duration - (hours * 60)
|
||||||
|
ip = outage[0].ip
|
||||||
|
|
||||||
|
# Outputs outages in the form of "Outage at: 2023-19-01 06:29:01 lasting for 2 Hours and 39 Minutes"
|
||||||
|
print(f"[{ip}] {outage[0].date} lasting for {'{} Hours and '.format(hours) if hours >= 1 else ''}{minutes} Minutes")
|
||||||
|
|
||||||
|
def getSanitizedArguments() -> dict:
|
||||||
|
"""Read and sanitize command line arguments"""
|
||||||
|
|
||||||
|
from sys import argv
|
||||||
|
|
||||||
|
cli_options = {}
|
||||||
|
|
||||||
|
# get command-line parameters and arguments
|
||||||
|
if len(argv) != 1:
|
||||||
|
buffer = None
|
||||||
|
|
||||||
|
# loop through the arguments and create a dictionary where
|
||||||
|
# elements starting with a '--' are considered keys
|
||||||
|
# and the elements folowwing items
|
||||||
|
for arg in argv[1::]: # remove the first argument (filepath)
|
||||||
|
if arg[:2] == '--':
|
||||||
|
buffer = arg[2:]
|
||||||
|
|
||||||
|
# ensures that the same parameter can be passed twice and the arguments get joined
|
||||||
|
if not buffer in cli_options.keys():
|
||||||
|
cli_options[buffer] = []
|
||||||
|
else:
|
||||||
|
cli_options[buffer].append(arg)
|
||||||
|
|
||||||
|
# sanitize args
|
||||||
|
# 'OPTION_NAME': number of permitted arguments
|
||||||
|
# -1 means any number of arguments
|
||||||
|
permitted_options = {
|
||||||
|
'HELP': 0, 'LOG_PATH': 1, 'IP': -1, 'PORT': 1, 'TIMEOUT': 1, 'LOG': 0, 'OUTAGES': 0, 'TEST': 0
|
||||||
|
}
|
||||||
|
|
||||||
|
# check if non supported parameters have been passed
|
||||||
|
unknown_parameters = cli_options.keys() - permitted_options.keys()
|
||||||
|
if unknown_parameters != set():
|
||||||
|
print(f"[ERROR] Unknown parameters: {unknown_parameters}")
|
||||||
|
printHelp()
|
||||||
|
quit()
|
||||||
|
|
||||||
|
# check if the right number of arguments has been passed
|
||||||
|
for key in cli_options:
|
||||||
|
if permitted_options[key] == -1:
|
||||||
|
continue
|
||||||
|
if len(cli_options[key]) != permitted_options[key]:
|
||||||
|
print(f"[ERROR] '{key}' expected {permitted_options[key]} arguments but got {len(cli_options[key])}")
|
||||||
|
printHelp()
|
||||||
|
quit()
|
||||||
|
|
||||||
|
### parameter-specific checks
|
||||||
|
|
||||||
|
# LOG_PATH has only one argument at this point so it's safe to make the list into a string
|
||||||
|
if 'LOG_PATH' in cli_options:
|
||||||
|
cli_options['LOG_PATH'] = cli_options['LOG_PATH'][0]
|
||||||
|
|
||||||
|
# check if all IPs are of valid format
|
||||||
|
if 'IP' in cli_options:
|
||||||
|
for ip in cli_options['IP']:
|
||||||
|
if not isValidIP(ip):
|
||||||
|
print(f"[ERROR] '{ip}' is not a IP address.")
|
||||||
|
printHelp()
|
||||||
|
quit()
|
||||||
|
|
||||||
|
# check if PORT is an integer in the valid port range [0-65535]
|
||||||
|
if 'PORT' in cli_options:
|
||||||
|
try:
|
||||||
|
cli_options['PORT'] = int(cli_options['PORT'][0])
|
||||||
|
except ValueError:
|
||||||
|
print(f"[ERROR] 'PORT' expects a value between 0 and 65535. Got '{cli_options['PORT'][0]}'")
|
||||||
|
printHelp()
|
||||||
|
quit()
|
||||||
|
|
||||||
|
if not (0 <= cli_options['PORT'] and cli_options['PORT'] <= 65535):
|
||||||
|
print(f"[ERROR] 'PORT' outside of valid range 0 and 65535")
|
||||||
|
printHelp()
|
||||||
|
quit()
|
||||||
|
|
||||||
|
# check if TIMEOUT is a valid float larger than 0
|
||||||
|
if 'TIMEOUT' in cli_options:
|
||||||
|
try:
|
||||||
|
cli_options['TIMEOUT'] = float(cli_options['TIMEOUT'][0])
|
||||||
|
except ValueError:
|
||||||
|
print(f"[ERROR] 'TIMEOUT' expects a value between float value. Got '{cli_options['TIMEOUT'][0]}'")
|
||||||
|
printHelp()
|
||||||
|
quit()
|
||||||
|
|
||||||
|
if cli_options['TIMEOUT'] <= 0:
|
||||||
|
print(f"[ERROR] 'TIMEOUT' needs to be larger than 0")
|
||||||
|
printHelp()
|
||||||
|
quit()
|
||||||
|
|
||||||
|
# check if HELP is has been passed
|
||||||
|
if 'HELP' in cli_options:
|
||||||
|
printHelp()
|
||||||
|
quit()
|
||||||
|
|
||||||
|
return cli_options
|
||||||
|
|
||||||
|
def printHelp():
|
||||||
|
print(
|
||||||
|
"""
|
||||||
|
Utility to determine the networks connection status by pinging IPs and logging the result.
|
||||||
|
|
||||||
|
--LOG_PATH path to a the file where test results should be logged
|
||||||
|
Default: log.txt
|
||||||
|
|
||||||
|
--IP space separated list of IPs to ping and test
|
||||||
|
Default: 1.1.1.1
|
||||||
|
|
||||||
|
--PORT set the port to use for the conneciton test
|
||||||
|
Default: 80
|
||||||
|
|
||||||
|
--TIMEOUT set the timeout in seconds to use for the connection test
|
||||||
|
Default: 2
|
||||||
|
|
||||||
|
--TEST test connection and return result to terminal
|
||||||
|
use with --LOG to also log the result in a file
|
||||||
|
|
||||||
|
--LOG test connection and log to --LOG_PATH
|
||||||
|
use with --TEST to also print the reult to the terminal
|
||||||
|
|
||||||
|
--OUTAGES print the dates and lenghts of outages
|
||||||
|
|
||||||
|
--HELP print this menu
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
def main():
|
||||||
|
|
||||||
|
args = getSanitizedArguments()
|
||||||
|
|
||||||
|
# set variables based on user input or use the defaults
|
||||||
|
log_path = args['LOG_PATH'] if 'LOG_PATH' in args else 'log.txt'
|
||||||
|
ips = args['IP'] if 'IP' in args else ['1.1.1.1']
|
||||||
|
port = args['PORT'] if 'PORT' in args else 80
|
||||||
|
timeout = args['TIMEOUT'] if 'TIMEOUT' in args else 2
|
||||||
|
|
||||||
|
# perform functionality based on user input
|
||||||
|
if 'OUTAGES' in args:
|
||||||
|
printOutages(log_path)
|
||||||
|
|
||||||
|
if 'LOG' in args and 'TEST' in args:
|
||||||
|
for log_line in logConnectionStatus(log_path = log_path, ips = ips, port = port, timeout = timeout): print(log_line)
|
||||||
|
|
||||||
|
if 'LOG' in args and 'TEST' not in args:
|
||||||
|
logConnectionStatus(log_path = log_path, ips = ips, port = port, timeout = timeout)
|
||||||
|
|
||||||
|
if 'TEST' in args and 'LOG' not in args:
|
||||||
|
for log_line in logConnectionStatus(ips = ips, port = port, timeout = timeout, dry_run = True): print(log_line)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
|
@ -0,0 +1,17 @@
|
||||||
|
[tool.poetry]
|
||||||
|
name = "outage-detector"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = ""
|
||||||
|
authors = ["Kristian Krsnik <git@krsnik.at>"]
|
||||||
|
readme = "README.md"
|
||||||
|
packages = [{include = "outage_detector"}]
|
||||||
|
|
||||||
|
[tool.poetry.dependencies]
|
||||||
|
python = "^3.10"
|
||||||
|
|
||||||
|
[tool.poetry.scripts]
|
||||||
|
outage_detector = "outage_detector.main:main"
|
||||||
|
|
||||||
|
[build-system]
|
||||||
|
requires = ["poetry-core"]
|
||||||
|
build-backend = "poetry.core.masonry.api"
|
Loading…
Reference in New Issue