9 Commits

Author SHA1 Message Date
c10ff428f6 fixed type errors 2024-08-05 00:17:15 +02:00
9f2e496f4f add setuptools to build fasthtml from source 2024-08-05 00:09:41 +02:00
fad0f6e5dd Switched to FastHTML 2024-08-04 20:37:06 +02:00
dd0806fe89 Updated testcases 2024-08-04 19:56:53 +02:00
9e28973c40 removed unneeded import 2024-08-03 01:06:05 +02:00
3063bc0167 created a type model for the config file 2024-08-03 01:05:22 +02:00
3f8323d297 implemented some pydandic models and fixed type errors 2024-08-03 00:52:27 +02:00
93c51a00a0 added mypy 2024-08-02 22:48:38 +02:00
ff8820c80a Made project structure more modular.
* Project can now be started as a package
* Added unit-tests
2024-08-02 22:23:20 +02:00
29 changed files with 1717 additions and 965 deletions

37
.gitignore vendored
View File

@ -1,31 +1,14 @@
# Python #
# Virtual Environment
/.venv/
# Cache
__pycache__/
# Build
/dist/
*.egg-info/
# Tools
/.pytest_cache/
/.mypy_cache/
# Nix #
# Build
/result
# MicroVM
/var.img
/control.socket
# Direnv #
/.direnv/
# Project specific files #
config.json
db.json
log.jsonl
# config and database
*.json
# python
__pycache__/
.pytest_cache/
.mypy_cache/
# FastHTML
/.sesskey

View File

@ -1,28 +0,0 @@
# Nix builder
FROM nixos/nix:latest AS builder
# Copy our source and setup our working dir.
COPY . /tmp/build
WORKDIR /tmp/build
# Build our Nix environment
RUN nix \
--extra-experimental-features "nix-command flakes" \
--option filter-syscalls false \
build
# Copy the Nix store closure into a directory. The Nix store closure is the
# entire set of Nix store values that we need for our build.
RUN mkdir /tmp/nix-store-closure
RUN cp -r $(nix-store -qR result/) /tmp/nix-store-closure
# Final image is based on scratch. We copy a bunch of Nix dependencies
# but they're fully self-contained so we don't need Nix anymore.
FROM scratch
WORKDIR /app
# Copy /nix/store
COPY --from=builder /tmp/nix-store-closure /nix/store
COPY --from=builder /tmp/build/result /app
CMD ["/app/bin/testdata"]

View File

@ -1,15 +1,19 @@
# Simple Testdata Generator
# Simple Test Data Generator
## Example Config
```json
{
"keys": ["TESTKEY1", "TESTKEY2", "TESTKEY3"],
"max-size": "1GiB",
"max-data": "1TiB",
"buffer-size": "12MiB",
"database": "./db.json",
"database-update-interval": 5.0,
"log": "./log.jsonl"
"binds": [
"127.0.0.1:9250"
],
"log": "-",
"buffer-size": "4KiB",
"max-size": "2GB",
"api-keys": [
"TESTKEY"
],
"max-data": "10GB",
"database": "database.json"
}
```

View File

@ -1,15 +0,0 @@
services:
testdata:
image: result/latest
build:
dockerfile: ./Dockerfile
environment:
TESTDATA_HOST: 0.0.0.0
TESTDATA_PORT: 1234
TESTDATA_CONFIG: ./config.json
volumes:
- ./config.json:/app/config.json
- ./db.json:/app/db.json
- ./log.jsonl:/app/log.jsonl

144
flake.lock generated
View File

@ -5,11 +5,29 @@
"systems": "systems"
},
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"lastModified": 1705309234,
"narHash": "sha256-uNRRNRKmJyCRC/8y1RqBkqWBLM034y4qN7EprSdmgyA=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"rev": "1ef2e671c3b0c19053962c07dbda38332dcebf26",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
@ -27,11 +45,11 @@
"spectrum": "spectrum"
},
"locked": {
"lastModified": 1735074045,
"narHash": "sha256-CeYsC8J2dNiV2FCQOxK1oZ/jNpOF2io7aCEFHmfi95U=",
"lastModified": 1720034501,
"narHash": "sha256-fzZpuVnhw5uOtA4OuXw3a+Otpy8C+QV0Uu5XfhGEPSg=",
"owner": "astro",
"repo": "microvm.nix",
"rev": "2ae08de8e8068b00193b9cfbc0acc9dfdda03181",
"rev": "a808af7775f508a2afedd1e4940a382fe1194f21",
"type": "github"
},
"original": {
@ -40,36 +58,82 @@
"type": "github"
}
},
"nix-github-actions": {
"inputs": {
"nixpkgs": [
"poetry2nix-lib",
"nixpkgs"
]
},
"locked": {
"lastModified": 1703863825,
"narHash": "sha256-rXwqjtwiGKJheXB43ybM8NwWB8rO2dSRrEqes0S7F5Y=",
"owner": "nix-community",
"repo": "nix-github-actions",
"rev": "5163432afc817cf8bd1f031418d1869e4c9d5547",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "nix-github-actions",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1733759999,
"narHash": "sha256-463SNPWmz46iLzJKRzO3Q2b0Aurff3U1n0nYItxq7jU=",
"lastModified": 1720031269,
"narHash": "sha256-rwz8NJZV+387rnWpTYcXaRNvzUSnnF9aHONoJIYmiUQ=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "a73246e2eef4c6ed172979932bc80e1404ba2d56",
"rev": "9f4128e00b0ae8ec65918efeba59db998750ead6",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"rev": "a73246e2eef4c6ed172979932bc80e1404ba2d56",
"type": "github"
}
},
"poetry2nix-lib": {
"inputs": {
"flake-utils": "flake-utils_2",
"nix-github-actions": "nix-github-actions",
"nixpkgs": [
"nixpkgs"
],
"systems": "systems_3",
"treefmt-nix": "treefmt-nix"
},
"locked": {
"lastModified": 1719850884,
"narHash": "sha256-UU/lVTHFx0GpEkihoLJrMuM9DcuhZmNe3db45vshSyI=",
"owner": "nix-community",
"repo": "poetry2nix",
"rev": "42262f382c68afab1113ebd1911d0c93822d756e",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "poetry2nix",
"type": "github"
}
},
"root": {
"inputs": {
"microvm": "microvm",
"nixpkgs": "nixpkgs"
"nixpkgs": "nixpkgs",
"poetry2nix-lib": "poetry2nix-lib"
}
},
"spectrum": {
"flake": false,
"locked": {
"lastModified": 1733308308,
"narHash": "sha256-+RcbMAjSxV1wW5UpS9abIG1lFZC8bITPiFIKNnE7RLs=",
"lastModified": 1708358594,
"narHash": "sha256-e71YOotu2FYA67HoC/voJDTFsiPpZNRwmiQb4f94OxQ=",
"ref": "refs/heads/main",
"rev": "80c9e9830d460c944c8f730065f18bb733bc7ee2",
"revCount": 792,
"rev": "6d0e73864d28794cdbd26ab7b37259ab0e1e044c",
"revCount": 614,
"type": "git",
"url": "https://spectrum-os.org/git/spectrum"
},
@ -92,6 +156,56 @@
"repo": "default",
"type": "github"
}
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"systems_3": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"id": "systems",
"type": "indirect"
}
},
"treefmt-nix": {
"inputs": {
"nixpkgs": [
"poetry2nix-lib",
"nixpkgs"
]
},
"locked": {
"lastModified": 1719749022,
"narHash": "sha256-ddPKHcqaKCIFSFc/cvxS14goUhCOAwsM1PbMr0ZtHMg=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "8df5ff62195d4e67e2264df0b7f5e8c9995fd0bd",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "treefmt-nix",
"type": "github"
}
}
},
"root": "root",

220
flake.nix
View File

@ -1,191 +1,89 @@
{
description = "A webserver to create files for testing purposes";
description = "A webserver to create files for tetsing purposes";
inputs = {
nixpkgs.url = "github:nixos/nixpkgs?rev=a73246e2eef4c6ed172979932bc80e1404ba2d56";
nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable";
poetry2nix-lib = {
url = "github:nix-community/poetry2nix";
inputs.nixpkgs.follows = "nixpkgs";
};
microvm = {
url = "github:astro/microvm.nix";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs = {
self,
nixpkgs,
...
} @ inputs: let
outputs = {self, ...} @ inputs: let
supportedSystems = ["x86_64-linux" "x86_64-darwin" "aarch64-linux" "aarch64-darwin"];
forAllSystems = nixpkgs.lib.genAttrs supportedSystems;
pkgs = forAllSystems (system: nixpkgs.legacyPackages.${system}.extend overlay);
forAllSystems = inputs.nixpkgs.lib.genAttrs supportedSystems;
pkgs = forAllSystems (system: inputs.nixpkgs.legacyPackages.${system});
poetry2nix = forAllSystems (system: inputs.poetry2nix-lib.lib.mkPoetry2Nix {pkgs = pkgs.${system};});
overlay = final: prev: rec {
python3Packages = prev.python3Packages.overrideScope (pfinal: pprev: {
packageNameToDrv = x: builtins.getAttr (cleanPythonPackageName x) final.python3Packages;
});
cleanPythonPackageName = x: let
cleanName = builtins.match "([a-z,A-Z,0-9,_,-]+).*" x;
in
if cleanName != null
then builtins.elemAt cleanName 0
else builtins.warn "Could not determine package name from '${x}'" null;
};
pyproject = builtins.fromTOML (builtins.readFile ./pyproject.toml);
buildDependencies = forAllSystems (system: builtins.map pkgs.${system}.python3Packages.packageNameToDrv pyproject.build-system.requires);
runtimeDependencies = forAllSystems (system: builtins.map pkgs.${system}.python3Packages.packageNameToDrv pyproject.project.dependencies);
optionalDependencies = forAllSystems (system: builtins.mapAttrs (name: value: builtins.map pkgs.${system}.python3Packages.packageNameToDrv value) pyproject.project.optional-dependencies);
addSetuptools = self: super: list:
builtins.listToAttrs (builtins.map (package: {
name = "${package}";
value = super."${package}".overridePythonAttrs (old: {
nativeBuildInputs = (old.nativeBuildInputs or []) ++ [self.setuptools];
});
})
list);
in {
# `nix build`
packages = forAllSystems (system: let
buildTestdata = {skipCheck ? false}:
pkgs.${system}.python3Packages.buildPythonPackage {
pname = pyproject.project.name;
version = pyproject.project.version;
src = ./.;
pyproject = true;
build-system = buildDependencies.${system};
dependencies = runtimeDependencies.${system};
optional-dependencies = optionalDependencies.${system};
nativeCheckInputs = optionalDependencies.${system}.dev;
checkPhase = let
dev = builtins.map (x: x.pname) optionalDependencies.${system}.dev;
in ''
${
if builtins.elem "pytest" dev && !skipCheck
then "pytest tests"
else ""
}
${
if builtins.elem "mypy" dev && !skipCheck
then "mypy src"
else ""
}
${
if builtins.elem "pylint" dev && !skipCheck
then "pylint src"
else ""
}
'';
mkPackage = {debug ? false}:
poetry2nix.${system}.mkPoetryApplication {
projectDir = self;
checkPhase =
if debug
then "pyright --warnings testdata && pytest"
else "";
# doCheck = debug;
preferWheels = false;
nativeBuildInputs = with pkgs.${system}; [pyright];
overrides =
poetry2nix.${system}.overrides.withDefaults (self: super:
addSetuptools self super ["sqlite-minutils" "fastlite" "python-fasthtml"]);
};
in rec {
default = testdata;
testdata = buildTestdata {skipCheck = false;};
quick = buildTestdata {skipCheck = true;};
in {
default = mkPackage {debug = false;};
debug = mkPackage {debug = true;};
vm = self.nixosConfigurations.vm.config.microvm.declaredRunner;
});
# `nix run`
apps = forAllSystems (system: {
default = {
program = "${self.packages.${system}.default}/bin/testdata";
type = "app";
};
});
# `nix fmt`
formatter = forAllSystems (system: pkgs.${system}.alejandra);
# `nix develop`
devShells = forAllSystems (system: rec {
default = venv;
devShells = forAllSystems (system: {
# Shell for app dependencies.
#
# nix develop
#
# Use this shell for developing your app.
default = pkgs.${system}.mkShellNoCC {
inputsFrom = [self.packages.${system}.default];
packages = [self.packages.${system}.default];
};
venv = pkgs.${system}.mkShell {
shellHook = ''
if [ ! -d .venv/ ]; then
echo "Creating Virtual Environment..."
${pkgs.${system}.python3}/bin/python3 -m venv .venv
fi
alias activate='source .venv/bin/activate'
echo "Entering Virtual Environment..."
source .venv/bin/activate
'';
# Shell for poetry.
#
# nix develop .#poetry
#
# Use this shell for changes to pyproject.toml and poetry.lock.
poetry = pkgs.${system}.mkShellNoCC {
packages = [pkgs.${system}.poetry];
};
});
# NixOS Module
nixosModules.default = import ./nix/module.nix inputs;
# nixos definition for a microvm to test nixosModules
nixosConfigurations = let
system = "x86_64-linux";
in {
vm = nixpkgs.lib.nixosSystem {
inherit system;
modules = [
inputs.microvm.nixosModules.microvm
({config, ...}: {
system.stateVersion = config.system.nixos.version;
networking.hostName = "vm";
users.users.root.password = "";
microvm = {
# volumes = [
# {
# mountPoint = "/var";
# image = "var.img";
# size = 256;
# }
# ];
shares = [
{
# use proto = "virtiofs" for MicroVMs that are started by systemd
proto = "9p";
tag = "ro-store";
# a host's /nix/store will be picked up so that no
# squashfs/erofs will be built for it.
source = "/nix/store";
mountPoint = "/nix/.ro-store";
}
];
interfaces = [
{
type = "user";
id = "qemu";
mac = "02:00:00:01:01:01";
}
];
forwardPorts = [
{
host.port = config.services.testdata.port;
guest.port = config.services.testdata.port;
}
];
# "qemu" has 9p built-in!
hypervisor = "qemu";
socket = "control.socket";
};
})
self.nixosModules.default
rec {
networking.firewall.allowedTCPPorts = [services.testdata.port];
services.testdata = {
enable = true;
host = "0.0.0.0";
port = 1234;
settings = {
keys = ["one" "two" "three"];
max-size = "1GB";
max-data = "100GB";
buffer-size = "12MiB";
database = "/root/testdata_state.json";
database-update-interval = 5.0;
log = "/root/log.jsonl";
};
};
}
];
};
};
};
}

View File

@ -2,16 +2,17 @@ inputs: {
config,
lib,
pkgs,
system,
...
}: let
cfg = config.services.testdata;
cfg = config.testdata;
package = inputs.self.packages.${pkgs.stdenv.hostPlatform.system}.default;
inherit (lib) mkIf mkEnableOption mkOption types;
format = pkgs.formats.json {};
configFile = format.generate "config.json" cfg.settings;
in {
options.services.testdata = {
options.testdata = {
enable = mkEnableOption "testdata";
settings = mkOption {
@ -28,17 +29,7 @@ in {
]);
in
valueType;
default = throw "Please specify services.testdata.settings";
};
host = mkOption {
type = types.str;
default = throw "Please specify a services.testdata.port";
};
port = mkOption {
type = types.int;
default = throw "Please specify a services.testdata.port";
default = throw "Please specify testdata.settings";
};
};
@ -50,7 +41,7 @@ in {
serviceConfig = {
Type = "simple";
ExecStart = "${package}/bin/testdata --config ${configFile} --listen ${cfg.host} --port ${builtins.toString cfg.port}";
ExecStart = "${package}/bin/testdata --config ${configFile}";
};
wantedBy = ["multi-user.target"];

1155
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,44 +1,27 @@
[project]
[tool.poetry]
name = "testdata"
version = "1.2.1"
requires-python = "~=3.12, <4"
dependencies = [
"fastapi~=0.115",
"uvicorn~=0.32",
"pydantic~=2.9",
]
version = "0.1.0"
description = ""
authors = ["Kristian Krsnik <git@krsnik.at>"]
readme = "README.md"
packages = [{ include = "testdata" }]
[project.optional-dependencies]
dev = [
"pytest~=8.3",
"mypy~=1.13",
"pylint~=3.3",
"requests~=2.32",
"types-requests~=2.32"
]
[tool.poetry.dependencies]
python = "^3.11"
pydantic = "^2.6.4"
ipaddress = "^1.0.23"
uvicorn = "^0.30.3"
python-fasthtml = "^0.2.1"
[project.scripts]
[tool.poetry.scripts]
testdata = "testdata.main:main"
[tool.poetry.group.dev.dependencies]
pytest = "^8.3.2"
requests = "^2.32.3"
types-requests = "^2.32.0.20240712"
pylint = "^3.2.6"
[build-system]
requires = ["setuptools~=75.1"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.package-data]
testdata = ["py.typed"]
[tool.autopep8]
max_line_length = 150
[tool.pylint.'MESSAGES CONTROL']
disable = [
"line-too-long",
"missing-module-docstring",
"missing-class-docstring",
"missing-function-docstring",
"too-few-public-methods",
"broad-exception-caught"
]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@ -1,3 +0,0 @@
from .testdata import Testdata
from .utils import convert_to_bytes
from .main import run

View File

@ -1,3 +0,0 @@
from .main import main
main()

View File

@ -1,3 +0,0 @@
from logging import getLogger, Logger
from .logger import setup_logging

View File

@ -1,173 +0,0 @@
import sys
import json
import logging
import logging.handlers
import atexit
from datetime import datetime, timezone
from typing_extensions import override
LOG_RECORD_BUILTIN_ATTRS = {
"args",
"asctime",
"created",
"exc_info",
"exc_text",
"filename",
"funcName",
"levelname",
"levelno",
"lineno",
"module",
"msecs",
"message",
"msg",
"name",
"pathname",
"process",
"processName",
"relativeCreated",
"stack_info",
"thread",
"threadName",
"taskName",
}
class JSONFormatter(logging.Formatter):
def __init__(self, *, fmt_keys: dict[str, str] | None = None):
super().__init__()
self.fmt_keys = fmt_keys if fmt_keys is not None else {}
@override
def format(self, record: logging.LogRecord) -> str:
message = self._prepare_log_dict(record)
return json.dumps(message, default=str)
def _prepare_log_dict(self, record: logging.LogRecord) -> dict:
always_fields = {
'message': record.getMessage(),
'timestamp': datetime.fromtimestamp(
record.created, tz=timezone.utc
).isoformat()
}
if record.exc_info is not None:
always_fields['exc_info'] = self.formatException(record.exc_info)
if record.stack_info is not None:
always_fields['stack_info'] = self.formatStack(record.stack_info)
message = {
key: msg_value
if (msg_value := always_fields.pop(value, None)) is not None
else getattr(record, value)
for key, value in self.fmt_keys.items()
}
message.update(always_fields)
for key, value in record.__dict__.items():
if key not in LOG_RECORD_BUILTIN_ATTRS:
message[key] = value
return message
class NonErrorFilter(logging.Filter):
@override
def filter(self, record: logging.LogRecord) -> bool | logging.LogRecord:
return record.levelno <= logging.INFO
def generate_log_config(log_path: str | None = None) -> dict:
logger_config: dict = {
'version': 1,
'disable_existing_loggers': False,
'filters': {
'no_errors': {
"()": NonErrorFilter
}
},
'formatters': {
'simple': {
'format': '[%(asctime)s][%(levelname)s] %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'
},
'detailed': {
'format': '[%(asctime)s][%(levelname)s] %(message)s',
'datefmt': '%Y-%m-%dT%H:%M:%S%z' # ISO-8601 Timestamp
},
'json': {
'()': JSONFormatter,
'fmt_keys': {
'timestamp': 'timestamp',
'level': 'levelname',
'message': 'message',
'logger': 'name',
'module': 'module',
'function': 'funcName',
'line': 'lineno',
'thread_name': 'threadName'
},
}
},
'handlers': {
'stdout': {
'class': logging.StreamHandler,
'level': 'INFO',
'filters': ['no_errors'],
'formatter': 'simple',
'stream': 'ext://sys.stdout'
},
'stderr': {
'class': logging.StreamHandler,
'level': 'WARNING',
'formatter': 'simple',
'stream': 'ext://sys.stderr'
}
} | ({'file': {
'class': logging.handlers.RotatingFileHandler,
'level': 'DEBUG',
'formatter': 'json',
'filename': log_path,
'maxBytes': 1024 * 1024 * 10, # 10 MiB
'backupCount': 3
}} if log_path is not None else {}),
'loggers': {
'root': {
'level': 'DEBUG',
'handlers': [
'stdout',
'stderr'
] + (['file'] if log_path is not None else []),
}
}
}
if sys.version_info >= (3, 12): # Python 3.12+
logger_config['handlers']['queue_handler'] = {
'class': logging.handlers.QueueHandler,
'respect_handler_level': True,
'handlers': [
'stdout',
'stderr'
] + (['file'] if log_path is not None else []),
}
logger_config['loggers']['root']['handlers'] = ['queue_handler']
return logger_config
def setup_logging(log_path: str | None = None) -> None:
log_config = generate_log_config(log_path if log_path != '-' else None)
logging.config.dictConfig(log_config)
if sys.version_info >= (3, 12): # Python 3.12+
queue_handler = logging.getHandlerByName('queue_handler')
if queue_handler is not None:
queue_handler.listener.start() # type: ignore
atexit.register(queue_handler.listener.stop) # type: ignore

46
src/testdata/main.py vendored
View File

@ -1,46 +0,0 @@
import os
import sys
import argparse
import asyncio
import shutil
from .testdata import Testdata
def parse_args(args: list[str]):
def formatter(prog):
return argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=shutil.get_terminal_size().columns)
parser = argparse.ArgumentParser(formatter_class=formatter)
parser.add_argument(
'-c', '--config', type=argparse.FileType('r'),
default=os.environ['TESTDATA_CONFIG'] if 'TESTDATA_CONFIG' in os.environ else './config.json',
help='Path to config file in JSON format.'
)
parser.add_argument(
'-l', '--listen', type=str,
default=os.environ['TESTDATA_HOST'] if 'TESTDATA_HOST' in os.environ else '0.0.0.0',
help='IP on which to listen.'
)
parser.add_argument(
'-p', '--port', type=int,
default=os.environ['TESTDATA_PORT'] if 'TESTDATA_PORT' in os.environ else 8080,
help='Port on which to serve the webserver.'
)
return parser.parse_args(args)
def run(argv: list[str]) -> None:
# Parse command-line parameters
args = parse_args(argv)
# Load Config
config = Testdata.Config.model_validate_json(args.config.read())
# Run webserver
asyncio.run(Testdata(config).run(args.listen, args.port))
def main() -> None:
run(sys.argv[1:])

View File

View File

@ -1,240 +0,0 @@
import os
import json
import asyncio
import inspect
import functools
import random
import importlib.metadata
from datetime import datetime
import uvicorn
from typing_extensions import Annotated
from fastapi import FastAPI, Request, Security, status, HTTPException
from fastapi.security import APIKeyHeader, APIKeyQuery
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, ConfigDict, Field, BeforeValidator, ValidationError
from . import logger
from .utils import convert_to_bytes, generate_data
class MaxSizePerRequestError(Exception):
pass
class MinSizePerRequestError(Exception):
pass
class Testdata:
class Config(BaseModel):
model_config = ConfigDict(extra='forbid')
@staticmethod
def to_bytes(value: int | str) -> int:
try:
return convert_to_bytes(value)
except Exception as err:
raise ValidationError from err
@staticmethod
def is_authorized_keys(value: set[str] | str) -> set[str]:
if isinstance(value, str):
with open(value, encoding='utf-8') as file:
return set(filter(lambda x: x.strip() != '', file.read().splitlines()))
return value
authorized_keys: Annotated[set[str], BeforeValidator(is_authorized_keys)] = Field(alias='keys')
max_size: Annotated[int, BeforeValidator(to_bytes)] = Field(alias='max-size')
max_data: Annotated[int, BeforeValidator(to_bytes)] = Field(alias='max-data')
buffer_size: Annotated[int, BeforeValidator(to_bytes)] = Field(alias='buffer-size')
database: str | None = None
log: str | None = Field(alias='log', default=None)
database_update_interval: float = Field(alias='database-update-interval', default=5)
_config: Config
_api: FastAPI
_state: dict
_logger: logger.Logger
def __init__(self, config: Config):
self._config = config
self._logger = logger.getLogger('testdata')
self._api = self._setup_api()
# Store internal state
self._state = {
'version': importlib.metadata.version('testdata'), # For future compatibility
'data-used': {f'{(today := datetime.today()).year}-{today.month:02}': 0} # math each months data usage
}
def _setup_api(self) -> FastAPI:
api = FastAPI(docs_url='/', redoc_url=None)
# Security
def get_api_key(
api_key_query: str = Security(APIKeyQuery(name="api_key", auto_error=False)),
api_key_header: str = Security(APIKeyHeader(name="x-api-key", auto_error=False))
) -> str:
# https://joshdimella.com/blog/adding-api-key-auth-to-fast-api
if api_key_query in self._config.authorized_keys:
return api_key_query
if api_key_header in self._config.authorized_keys:
return api_key_header
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid or missing API Key'
)
# A wrapper to set the function signature to accept the api key dependency
def secure(func):
# Get old signature
positional_only, positional_or_keyword, variadic_positional, keyword_only, variadic_keyword = [], [], [], [], []
for value in inspect.signature(func).parameters.values():
if value.kind == inspect.Parameter.POSITIONAL_ONLY:
positional_only.append(value)
elif value.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:
positional_or_keyword.append(value)
elif value.kind == inspect.Parameter.VAR_POSITIONAL:
variadic_positional.append(value)
elif value.kind == inspect.Parameter.KEYWORD_ONLY:
keyword_only.append(value)
elif value.kind == inspect.Parameter.VAR_KEYWORD:
variadic_keyword.append(value)
# Avoid passing an unrecognized keyword
if inspect.iscoroutinefunction(func):
async def wrapper(*args, **kwargs):
if len(variadic_keyword) == 0:
if 'api_key' in kwargs:
del kwargs['api_key']
return await func(*args, **kwargs)
else:
def wrapper(*args, **kwargs):
if len(variadic_keyword) == 0:
if 'api_key' in kwargs:
del kwargs['api_key']
return func(*args, **kwargs)
# Override signature
wrapper.__signature__ = inspect.signature(func).replace(
parameters=(
*positional_only,
*positional_or_keyword,
*variadic_positional,
*keyword_only,
inspect.Parameter('api_key', inspect.Parameter.POSITIONAL_OR_KEYWORD, default=Security(get_api_key)),
*variadic_keyword
)
)
return functools.wraps(func)(wrapper)
# Routes
api.get('/zeros')(secure(self._zeros))
return api
async def _zeros(self, size: int | str, request: Request, filename: str = 'zeros.bin') -> StreamingResponse:
try:
extra = {'id': f'{random.randint(0, 2 ** 32 - 1):08X}'}
self._logger.debug(
'Initiated request.',
extra=extra | {
'ip': request.client.host if request.client is not None else None,
'query-params': dict(request.query_params),
'headers': dict(request.headers)
}
)
try:
size = convert_to_bytes(size)
except ValueError as err:
self._logger.warning('Invalid format for size.', extra=extra)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Invalid format for size.'
) from err
if size < 0:
raise MinSizePerRequestError
if self._config.max_size < size:
raise MaxSizePerRequestError
# update internal state
current_date = f'{(today := datetime.today()).year}-{today.month:02}'
if current_date not in self._state['data-used']:
self._state['data-used'][current_date] = 0
if self._config.max_data < self._state['data-used'][current_date] + size:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Service not available.'
)
self._state['data-used'][current_date] += size
self._logger.debug('Successfully processed request.', extra=extra)
return StreamingResponse(
status_code=status.HTTP_200_OK,
content=generate_data(size, self._config.buffer_size),
media_type='application/octet-stream',
headers={
'Content-Length': str(size),
'Content-Disposition': f'attachment; filename="{filename}"'
}
)
except MinSizePerRequestError as err:
self._logger.warning('Size if negative.', extra=extra)
raise HTTPException(
status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE,
detail='Size has to be non-negative.'
) from err
except MaxSizePerRequestError as err:
self._logger.warning('Exceeded max size per request.', extra=extra)
raise HTTPException(
status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE,
detail=f'Exceeded max size per request of {self._config.max_size} Bytes.'
) from err
except Exception as err:
self._logger.exception(err)
raise err
async def _update_state(self) -> None:
assert self._config.database is not None
mode = 'r+' if os.path.exists(self._config.database) else 'w+'
with open(self._config.database, mode, encoding='utf-8') as file:
try:
self._state = json.load(file)
except json.JSONDecodeError:
pass
while True:
file.seek(0)
json.dump(self._state, file, indent=2)
file.truncate()
await asyncio.sleep(self._config.database_update_interval)
async def run(self, host: str, port: int) -> None:
try:
if self._config.log is not None:
logger.setup_logging(self._config.log)
self._logger = logger.getLogger('testdata')
self._logger.info('Server started.')
coroutines = [uvicorn.Server(uvicorn.Config(self._api, host, port)).serve()]
if self._config.database is not None:
coroutines.append(self._update_state())
await asyncio.gather(*coroutines)
except asyncio.exceptions.CancelledError:
self._logger.info('Server stopped.')
except Exception as err:
self._logger.exception(err)

40
src/testdata/utils.py vendored
View File

@ -1,40 +0,0 @@
import asyncio
from typing import AsyncGenerator
def convert_to_bytes(size: int | str) -> int:
if isinstance(size, int):
return size
if isinstance(size, str):
try:
return int(size)
except ValueError as err:
units = {
'TB': 1000 ** 4, 'TiB': 1024 ** 4,
'GB': 1000 ** 3, 'GiB': 1024 ** 3,
'MB': 1000 ** 2, 'MiB': 1024 ** 2,
'KB': 1000, 'KiB': 1024,
'B': 1
}
for unit, value in units.items():
if size.endswith(unit):
return int(float(size.removesuffix(unit)) * value)
raise ValueError from err
else:
raise ValueError
async def generate_data(size: int, buffer_size: int = 4 * 1024) -> AsyncGenerator[bytes, None]:
# https://github.com/tiangolo/fastapi/issues/5183
# https://github.com/encode/starlette/discussions/1776#discussioncomment-3207518
size_left = size
while size_left > buffer_size:
size_left -= buffer_size
yield b'\0' * buffer_size
await asyncio.sleep(0)
yield b'\0' * size_left
await asyncio.sleep(0)

3
test/test_imports.py Normal file
View File

@ -0,0 +1,3 @@
def test_import():
import testdata
from testdata import run

94
test/test_run.py Normal file
View File

@ -0,0 +1,94 @@
from multiprocessing import Process
import os
import json
import tempfile
import random
import pytest
import requests
import testdata
PROTOCOL = 'http'
HOST = '127.0.0.1'
PORT = 8080
BUFFER_SIZE = 4 * 1024
MAX_SIZE = 2 * 1024 * 1024 * 1024
MAX_DATA = 10 * 1024 * 1024 * 1024
API_KEY = f'{random.randrange(16 ** 32):032x}'
API_KEYS = { API_KEY }
TIMEOUT = 5
@pytest.fixture(autouse = True)
def server():
# Create Temporary Databases File
database = tempfile.NamedTemporaryFile(delete = False).name
proc = Process(target = testdata.run, args = (HOST, PORT, API_KEYS, MAX_SIZE, MAX_DATA, database, BUFFER_SIZE))
proc.start()
# Wait until webserver becomes available
while True:
try:
requests.get(f'{PROTOCOL}://{HOST}:{PORT}', timeout = TIMEOUT)
except requests.ConnectionError:
continue
break
yield database
# Terminate webserver
proc.terminate()
proc.join()
# Delete Temporary File
os.unlink(database)
def test_get_file():
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/?api_key={API_KEY}&size=32', timeout = TIMEOUT)
assert response.content == b'\0' * 32
def test_get_file_B():
response = requests.get(
f'{PROTOCOL}://{HOST}:{PORT}/?api_key={API_KEY}&size=32B', timeout=TIMEOUT)
assert response.content == b'\0' * 32
def test_get_file_KB():
response = requests.get(
f'{PROTOCOL}://{HOST}:{PORT}/?api_key={API_KEY}&size=32KB', timeout=TIMEOUT)
assert response.status_code == 200
assert response.content == b'\0' * 32 * 1000
def test_get_file_KiB():
response = requests.get(
f'{PROTOCOL}://{HOST}:{PORT}/?api_key={API_KEY}&size=32KiB', timeout=TIMEOUT)
assert response.status_code == 200
assert response.content == b'\0' * 32 * 1024
def test_get_file_invalid_format():
response = requests.get(
f'{PROTOCOL}://{HOST}:{PORT}/?api_key={API_KEY}&size=32Invalid', timeout=TIMEOUT)
assert response.status_code == 400
assert response.text == 'Invalid Format.'
def test_database_data_used(server):
requests.get(f'{PROTOCOL}://{HOST}:{PORT}/?api_key={API_KEY}&size=32', timeout = TIMEOUT)
with open(server) as file:
assert json.loads(file.read())['data-used'] == 32

23
test/test_utils.py Normal file
View File

@ -0,0 +1,23 @@
from pytest import raises
from testdata.utils import convert_to_bytes
def test_convert_to_bytes():
test_values = {
('0', 0),
('32', 32),
('10_000', 10000),
('999999999999999999', 999999999999999999),
}
for input, expected_output in test_values:
assert convert_to_bytes(input) == expected_output
test_exceptions = {
('9E3', ValueError)
}
for input, exception in test_exceptions:
with raises(exception):
convert_to_bytes(input)

1
testdata/__init__.py vendored Normal file
View File

@ -0,0 +1 @@
from .main import run

3
testdata/__main__.py vendored Normal file
View File

@ -0,0 +1,3 @@
from testdata.main import main
main()

74
testdata/api.py vendored Normal file
View File

@ -0,0 +1,74 @@
from fasthtml.fastapp import FastHTML
from starlette import status
from starlette.responses import StreamingResponse
from starlette.exceptions import HTTPException
from pydantic import ValidationError
from .utils import load_database, save_database, generate_data
from .custom_types import TestDataBody
def create_api(api_keys: set[str], max_size: int, max_data: int, database: str, buffer_size: int) -> FastHTML:
api = FastHTML()
class MaxSizePerRequestError(Exception):
pass
class MinSizePerRequestError(Exception):
pass
@api.get('/') # type: ignore
async def test_data(api_key: str, size: str) -> StreamingResponse:
try:
body = TestDataBody(api_key=api_key, size=size) # type: ignore
except ValidationError as err:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Invalid Format.'
) from err
try:
if body.api_key not in api_keys:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid API Key.'
)
if body.size < 0:
raise MinSizePerRequestError
if max_size < body.size:
raise MaxSizePerRequestError
db = load_database(database)
if max_data <= db['data-used'] + body.size:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Service not available.'
)
db['data-used'] += body.size
save_database(database, db)
return StreamingResponse(
status_code=status.HTTP_200_OK,
content=generate_data(body.size, buffer_size),
media_type='application/octet-stream',
headers={
'Content-Length': str(body.size)
}
)
except MinSizePerRequestError as err:
raise HTTPException(
status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE,
detail='Size has to be not-negative.'
) from err
except MaxSizePerRequestError as err:
raise HTTPException(
status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE,
detail=f'Exceeded max size per request of {max_size} Bytes.'
) from err
return api

33
testdata/custom_types.py vendored Normal file
View File

@ -0,0 +1,33 @@
from pydantic import BaseModel, Field, ConfigDict, field_validator
from .utils import convert_to_bytes
class Config(BaseModel):
host: str
port: int
buffer_size: int = Field(alias='buffer-size', default=4 * 1024) # 4KB
max_size: int = Field(alias='max-size', default=2 * 1024 ** 3) # 2GB
max_data: int = Field(alias='max-data', default=0) # unlimited
api_keys: set[str] = Field(alias='api-keys')
database: str
log: str = '-'
model_config = ConfigDict(extra='forbid')
@field_validator('buffer_size', 'max_size', 'max_data', mode='before')
@classmethod
def convert_size(cls, value: int | str) -> int:
return convert_to_bytes(value)
class TestDataBody(BaseModel):
api_key: str
size: int
model_config = ConfigDict(extra='forbid')
@field_validator('size', mode='before')
@classmethod
def convert_size(cls, value: str) -> int:
return convert_to_bytes(value)

40
testdata/main.py vendored Normal file
View File

@ -0,0 +1,40 @@
import sys
import argparse
import json
import os
from os.path import exists
import uvicorn
from .utils import save_database
from .api import create_api
from .custom_types import Config
# Setup Parser
def parse_cli_arguments(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
'-c',
'--config',
type = argparse.FileType('r'),
default = './config.json',
help = 'Path to config file in JSON format.'
)
return parser.parse_args(argv)
def run(host: str, port: int, api_keys: set[str], max_size: int, max_data: int, database: str, buffer_size: int):
if not exists(database) or os.stat(database).st_size == 0:
save_database(database, {'data-used': 0})
api = create_api(api_keys, max_size, max_data, database, buffer_size)
uvicorn.run(api, host = host, port = port)
def main():
args = parse_cli_arguments(sys.argv[1:])
config = Config.model_validate_json(args.config.read())
run(**config.model_dump(exclude={'log'}))

55
testdata/utils.py vendored Normal file
View File

@ -0,0 +1,55 @@
import json
import asyncio
from typing import AsyncGenerator
def convert_to_bytes(size: int | str) -> int:
if isinstance(size, int):
return size
try:
return int(size)
except ValueError as err:
units = {
'TB': 1000 ** 4, 'TiB': 1024 ** 4,
'GB': 1000 ** 3, 'GiB': 1024 ** 3,
'MB': 1000 ** 2, 'MiB': 1024 ** 2,
'KB': 1000, 'KiB': 1024,
'B': 1
}
for unit, value in units.items():
if size.endswith(unit):
return int(float(size.removesuffix(unit)) * value)
raise ValueError(
'Invalid format. Expected integer or float ending with a data unit (B, KB, MiB,...).'
) from err
async def generate_data(size: int, buffer_size: int = 4 * 1024) -> AsyncGenerator[bytes, None]:
size_left = size
# https://github.com/tiangolo/fastapi/issues/5183
# https://github.com/encode/starlette/discussions/1776#discussioncomment-3207518
try:
while size_left > buffer_size:
size_left -= buffer_size
yield b'\0' * buffer_size
await asyncio.sleep(0)
yield b'\0' * size_left
await asyncio.sleep(0)
except asyncio.CancelledError as err:
raise GeneratorExit from err
def load_database(path: str) -> dict:
with open(path, 'r', encoding='utf-8') as file:
return json.load(file)
def save_database(path: str, database: dict) -> None:
with open(path, 'w', encoding='utf-8') as file:
json.dump(database, file, indent=2)

View File

@ -1,5 +0,0 @@
import sys
import pytest
retcode = pytest.main(sys.argv[1:])

View File

@ -1,2 +0,0 @@
def test_import_testdata():
import testdata # pylint: disable=unused-import,import-outside-toplevel

View File

@ -1,144 +0,0 @@
import json
import time
import tempfile
import asyncio
from multiprocessing import Process
from typing import Generator
import pytest
import requests
import testdata
PROTOCOL = 'http'
HOST = 'localhost'
PORT = 1234
TIMEOUT = 1 # seconds
@pytest.fixture(scope='function')
def _server(request) -> Generator[str, None, None]:
with tempfile.NamedTemporaryFile() as tmpfile:
request.param['database'] = tmpfile.name
config = testdata.Testdata.Config.model_validate_json(json.dumps(request.param))
server = testdata.Testdata(config)
def run_server():
asyncio.run(server.run(HOST, PORT))
process = Process(target=run_server)
process.start()
# Wait until webserver becomes available
start = time.time()
while (time.time() - start) < TIMEOUT:
try:
requests.get(f'{PROTOCOL}://{HOST}:{PORT}', timeout=TIMEOUT)
break
except requests.exceptions.ConnectionError:
pass
yield tmpfile.name
process.terminate()
# Wait until webserver is completely shut down
start = time.time()
while (time.time() - start) < TIMEOUT:
try:
requests.get(f'{PROTOCOL}://{HOST}:{PORT}', timeout=TIMEOUT)
except requests.exceptions.ConnectionError:
break
@pytest.mark.parametrize('_server', [({
'keys': ['one', 'two', 'three'],
'max-size': '100',
'max-data': 1234,
'buffer-size': '12MiB',
})], indirect=['_server'])
def test_invalid_api_key(_server):
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=four&size=100', timeout=TIMEOUT)
assert response.status_code == 401
@pytest.mark.parametrize('_server', [({
'keys': ['one', 'two', 'three'],
'max-size': '100',
'max-data': 1234,
'buffer-size': '12MiB',
})], indirect=['_server'])
def test_request_size_lower_bound(_server):
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=-1', timeout=TIMEOUT)
assert response.status_code == 416
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=0', timeout=TIMEOUT)
assert response.status_code == 200
assert response.content == b''
@pytest.mark.parametrize('_server', [({
'keys': ['one', 'two', 'three'],
'max-size': '100',
'max-data': 1234,
'buffer-size': '12MiB',
})], indirect=['_server'])
def test_request_size_upper_bound(_server):
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=100', timeout=TIMEOUT)
assert response.status_code == 200
assert response.content == b'\0' * 100
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=101', timeout=TIMEOUT)
assert response.status_code == 416
@pytest.mark.parametrize('_server', [({
'keys': ['one', 'two', 'three'],
'max-size': '100KB',
'max-data': '100KB',
'buffer-size': '12MiB',
})], indirect=['_server'])
def test_request_max_data_used(_server):
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=100KB', timeout=TIMEOUT)
assert response.status_code == 200
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=1', timeout=TIMEOUT)
assert response.status_code == 500
@pytest.mark.parametrize('_server', [({
'keys': ['one', 'two', 'three'],
'max-size': '1KB',
'max-data': '1KB',
'buffer-size': '12MiB',
'database-update-interval': 0.1
})], indirect=['_server'])
def test_check_database_update(_server):
import importlib.metadata
from datetime import datetime
database = _server
with open(database, 'r', encoding='utf-8') as file:
file.seek(0)
today = datetime.today()
assert json.load(file) == {
'version': importlib.metadata.version('testdata'),
'data-used': {
f'{today.year}-{today.month:02}': 0
}
}
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=100', timeout=TIMEOUT)
assert response.status_code == 200
time.sleep(0.1)
file.seek(0)
assert json.load(file) == {
'version': importlib.metadata.version('testdata'),
'data-used': {
f'{today.year}-{today.month:02}': 100
}
}