Compare commits

..

10 Commits

Author SHA1 Message Date
4089730bd4
updated 2024-08-11 17:25:47 +02:00
c10ff428f6
fixed type errors 2024-08-05 00:17:15 +02:00
9f2e496f4f
add setuptools to build fasthtml from source 2024-08-05 00:09:41 +02:00
fad0f6e5dd
Switched to FastHTML 2024-08-04 20:37:06 +02:00
dd0806fe89
Updated testcases 2024-08-04 19:56:53 +02:00
9e28973c40
removed unneeded import 2024-08-03 01:06:05 +02:00
3063bc0167
created a type model for the config file 2024-08-03 01:05:22 +02:00
3f8323d297
implemented some pydandic models and fixed type errors 2024-08-03 00:52:27 +02:00
93c51a00a0
added mypy 2024-08-02 22:48:38 +02:00
ff8820c80a
Made project structure more modular.
* Project can now be started as a package
* Added unit-tests
2024-08-02 22:23:20 +02:00
31 changed files with 1823 additions and 965 deletions

37
.gitignore vendored
View File

@ -1,31 +1,14 @@
# Python #
# Virtual Environment
/.venv/
# Cache
__pycache__/
# Build
/dist/
*.egg-info/
# Tools
/.pytest_cache/
/.mypy_cache/
# Nix #
# Build
/result
# MicroVM
/var.img
/control.socket
# Direnv #
/.direnv/
# Project specific files #
config.json
db.json
log.jsonl
# config and database
*.json
# python
__pycache__/
.pytest_cache/
.mypy_cache/
# FastHTML
/.sesskey

View File

@ -1,28 +0,0 @@
# Nix builder
FROM nixos/nix:latest AS builder
# Copy our source and setup our working dir.
COPY . /tmp/build
WORKDIR /tmp/build
# Build our Nix environment
RUN nix \
--extra-experimental-features "nix-command flakes" \
--option filter-syscalls false \
build
# Copy the Nix store closure into a directory. The Nix store closure is the
# entire set of Nix store values that we need for our build.
RUN mkdir /tmp/nix-store-closure
RUN cp -r $(nix-store -qR result/) /tmp/nix-store-closure
# Final image is based on scratch. We copy a bunch of Nix dependencies
# but they're fully self-contained so we don't need Nix anymore.
FROM scratch
WORKDIR /app
# Copy /nix/store
COPY --from=builder /tmp/nix-store-closure /nix/store
COPY --from=builder /tmp/build/result /app
CMD ["/app/bin/testdata"]

View File

@ -1,15 +1,19 @@
# Simple Testdata Generator
# Simple Test Data Generator
## Example Config
```json
{
"keys": ["TESTKEY1", "TESTKEY2", "TESTKEY3"],
"max-size": "1GiB",
"max-data": "1TiB",
"buffer-size": "12MiB",
"database": "./db.json",
"database-update-interval": 5.0,
"log": "./log.jsonl"
"binds": [
"127.0.0.1:9250"
],
"log": "-",
"buffer-size": "4KiB",
"max-size": "2GB",
"api-keys": [
"TESTKEY"
],
"max-data": "10GB",
"database": "database.json"
}
```

View File

@ -1,15 +0,0 @@
services:
testdata:
image: result/latest
build:
dockerfile: ./Dockerfile
environment:
TESTDATA_HOST: 0.0.0.0
TESTDATA_PORT: 1234
TESTDATA_CONFIG: ./config.json
volumes:
- ./config.json:/app/config.json
- ./db.json:/app/db.json
- ./log.jsonl:/app/log.jsonl

144
flake.lock generated
View File

@ -5,11 +5,29 @@
"systems": "systems"
},
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"lastModified": 1705309234,
"narHash": "sha256-uNRRNRKmJyCRC/8y1RqBkqWBLM034y4qN7EprSdmgyA=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"rev": "1ef2e671c3b0c19053962c07dbda38332dcebf26",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
@ -27,11 +45,11 @@
"spectrum": "spectrum"
},
"locked": {
"lastModified": 1735074045,
"narHash": "sha256-CeYsC8J2dNiV2FCQOxK1oZ/jNpOF2io7aCEFHmfi95U=",
"lastModified": 1720034501,
"narHash": "sha256-fzZpuVnhw5uOtA4OuXw3a+Otpy8C+QV0Uu5XfhGEPSg=",
"owner": "astro",
"repo": "microvm.nix",
"rev": "2ae08de8e8068b00193b9cfbc0acc9dfdda03181",
"rev": "a808af7775f508a2afedd1e4940a382fe1194f21",
"type": "github"
},
"original": {
@ -40,36 +58,82 @@
"type": "github"
}
},
"nix-github-actions": {
"inputs": {
"nixpkgs": [
"poetry2nix-lib",
"nixpkgs"
]
},
"locked": {
"lastModified": 1703863825,
"narHash": "sha256-rXwqjtwiGKJheXB43ybM8NwWB8rO2dSRrEqes0S7F5Y=",
"owner": "nix-community",
"repo": "nix-github-actions",
"rev": "5163432afc817cf8bd1f031418d1869e4c9d5547",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "nix-github-actions",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1733759999,
"narHash": "sha256-463SNPWmz46iLzJKRzO3Q2b0Aurff3U1n0nYItxq7jU=",
"lastModified": 1720031269,
"narHash": "sha256-rwz8NJZV+387rnWpTYcXaRNvzUSnnF9aHONoJIYmiUQ=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "a73246e2eef4c6ed172979932bc80e1404ba2d56",
"rev": "9f4128e00b0ae8ec65918efeba59db998750ead6",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"rev": "a73246e2eef4c6ed172979932bc80e1404ba2d56",
"type": "github"
}
},
"poetry2nix-lib": {
"inputs": {
"flake-utils": "flake-utils_2",
"nix-github-actions": "nix-github-actions",
"nixpkgs": [
"nixpkgs"
],
"systems": "systems_3",
"treefmt-nix": "treefmt-nix"
},
"locked": {
"lastModified": 1719850884,
"narHash": "sha256-UU/lVTHFx0GpEkihoLJrMuM9DcuhZmNe3db45vshSyI=",
"owner": "nix-community",
"repo": "poetry2nix",
"rev": "42262f382c68afab1113ebd1911d0c93822d756e",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "poetry2nix",
"type": "github"
}
},
"root": {
"inputs": {
"microvm": "microvm",
"nixpkgs": "nixpkgs"
"nixpkgs": "nixpkgs",
"poetry2nix-lib": "poetry2nix-lib"
}
},
"spectrum": {
"flake": false,
"locked": {
"lastModified": 1733308308,
"narHash": "sha256-+RcbMAjSxV1wW5UpS9abIG1lFZC8bITPiFIKNnE7RLs=",
"lastModified": 1708358594,
"narHash": "sha256-e71YOotu2FYA67HoC/voJDTFsiPpZNRwmiQb4f94OxQ=",
"ref": "refs/heads/main",
"rev": "80c9e9830d460c944c8f730065f18bb733bc7ee2",
"revCount": 792,
"rev": "6d0e73864d28794cdbd26ab7b37259ab0e1e044c",
"revCount": 614,
"type": "git",
"url": "https://spectrum-os.org/git/spectrum"
},
@ -92,6 +156,56 @@
"repo": "default",
"type": "github"
}
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"systems_3": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"id": "systems",
"type": "indirect"
}
},
"treefmt-nix": {
"inputs": {
"nixpkgs": [
"poetry2nix-lib",
"nixpkgs"
]
},
"locked": {
"lastModified": 1719749022,
"narHash": "sha256-ddPKHcqaKCIFSFc/cvxS14goUhCOAwsM1PbMr0ZtHMg=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "8df5ff62195d4e67e2264df0b7f5e8c9995fd0bd",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "treefmt-nix",
"type": "github"
}
}
},
"root": "root",

220
flake.nix
View File

@ -1,191 +1,89 @@
{
description = "A webserver to create files for testing purposes";
description = "A webserver to create files for tetsing purposes";
inputs = {
nixpkgs.url = "github:nixos/nixpkgs?rev=a73246e2eef4c6ed172979932bc80e1404ba2d56";
nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable";
poetry2nix-lib = {
url = "github:nix-community/poetry2nix";
inputs.nixpkgs.follows = "nixpkgs";
};
microvm = {
url = "github:astro/microvm.nix";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs = {
self,
nixpkgs,
...
} @ inputs: let
outputs = {self, ...} @ inputs: let
supportedSystems = ["x86_64-linux" "x86_64-darwin" "aarch64-linux" "aarch64-darwin"];
forAllSystems = nixpkgs.lib.genAttrs supportedSystems;
pkgs = forAllSystems (system: nixpkgs.legacyPackages.${system}.extend overlay);
forAllSystems = inputs.nixpkgs.lib.genAttrs supportedSystems;
pkgs = forAllSystems (system: inputs.nixpkgs.legacyPackages.${system});
poetry2nix = forAllSystems (system: inputs.poetry2nix-lib.lib.mkPoetry2Nix {pkgs = pkgs.${system};});
overlay = final: prev: rec {
python3Packages = prev.python3Packages.overrideScope (pfinal: pprev: {
packageNameToDrv = x: builtins.getAttr (cleanPythonPackageName x) final.python3Packages;
});
cleanPythonPackageName = x: let
cleanName = builtins.match "([a-z,A-Z,0-9,_,-]+).*" x;
in
if cleanName != null
then builtins.elemAt cleanName 0
else builtins.warn "Could not determine package name from '${x}'" null;
};
pyproject = builtins.fromTOML (builtins.readFile ./pyproject.toml);
buildDependencies = forAllSystems (system: builtins.map pkgs.${system}.python3Packages.packageNameToDrv pyproject.build-system.requires);
runtimeDependencies = forAllSystems (system: builtins.map pkgs.${system}.python3Packages.packageNameToDrv pyproject.project.dependencies);
optionalDependencies = forAllSystems (system: builtins.mapAttrs (name: value: builtins.map pkgs.${system}.python3Packages.packageNameToDrv value) pyproject.project.optional-dependencies);
addSetuptools = final: prev: list:
builtins.listToAttrs (builtins.map (package: {
name = "${package}";
value = prev."${package}".overridePythonAttrs (old: {
nativeBuildInputs = (old.nativeBuildInputs or []) ++ [final.setuptools];
});
})
list);
in {
# `nix build`
packages = forAllSystems (system: let
buildTestdata = {skipCheck ? false}:
pkgs.${system}.python3Packages.buildPythonPackage {
pname = pyproject.project.name;
version = pyproject.project.version;
src = ./.;
pyproject = true;
build-system = buildDependencies.${system};
dependencies = runtimeDependencies.${system};
optional-dependencies = optionalDependencies.${system};
nativeCheckInputs = optionalDependencies.${system}.dev;
checkPhase = let
dev = builtins.map (x: x.pname) optionalDependencies.${system}.dev;
in ''
${
if builtins.elem "pytest" dev && !skipCheck
then "pytest tests"
else ""
}
${
if builtins.elem "mypy" dev && !skipCheck
then "mypy src"
else ""
}
${
if builtins.elem "pylint" dev && !skipCheck
then "pylint src"
else ""
}
'';
mkPackage = {debug ? false}:
poetry2nix.${system}.mkPoetryApplication {
projectDir = ./.;
checkPhase =
if debug
then "pyright --warnings testdata && pytest"
else "";
# doCheck = debug;
preferWheels = false;
nativeBuildInputs = with pkgs.${system}; [pyright];
overrides =
poetry2nix.${system}.overrides.withDefaults (final: prev:
addSetuptools final prev ["sqlite-minutils" "fastlite" "python-fasthtml"]);
};
in rec {
default = testdata;
testdata = buildTestdata {skipCheck = false;};
quick = buildTestdata {skipCheck = true;};
in {
default = mkPackage {debug = false;};
debug = mkPackage {debug = true;};
vm = self.nixosConfigurations.vm.config.microvm.declaredRunner;
});
# `nix run`
apps = forAllSystems (system: {
default = {
program = "${self.packages.${system}.default}/bin/testdata";
type = "app";
};
});
# `nix fmt`
formatter = forAllSystems (system: pkgs.${system}.alejandra);
# `nix develop`
devShells = forAllSystems (system: rec {
default = venv;
devShells = forAllSystems (system: {
# Shell for app dependencies.
#
# nix develop
#
# Use this shell for developing your app.
default = pkgs.${system}.mkShellNoCC {
inputsFrom = [self.packages.${system}.default];
packages = [self.packages.${system}.default];
};
venv = pkgs.${system}.mkShell {
shellHook = ''
if [ ! -d .venv/ ]; then
echo "Creating Virtual Environment..."
${pkgs.${system}.python3}/bin/python3 -m venv .venv
fi
alias activate='source .venv/bin/activate'
echo "Entering Virtual Environment..."
source .venv/bin/activate
'';
# Shell for poetry.
#
# nix develop .#poetry
#
# Use this shell for changes to pyproject.toml and poetry.lock.
poetry = pkgs.${system}.mkShellNoCC {
packages = [pkgs.${system}.poetry];
};
});
# NixOS Module
nixosModules.default = import ./nix/module.nix inputs;
# nixos definition for a microvm to test nixosModules
nixosConfigurations = let
system = "x86_64-linux";
in {
vm = nixpkgs.lib.nixosSystem {
inherit system;
modules = [
inputs.microvm.nixosModules.microvm
({config, ...}: {
system.stateVersion = config.system.nixos.version;
networking.hostName = "vm";
users.users.root.password = "";
microvm = {
# volumes = [
# {
# mountPoint = "/var";
# image = "var.img";
# size = 256;
# }
# ];
shares = [
{
# use proto = "virtiofs" for MicroVMs that are started by systemd
proto = "9p";
tag = "ro-store";
# a host's /nix/store will be picked up so that no
# squashfs/erofs will be built for it.
source = "/nix/store";
mountPoint = "/nix/.ro-store";
}
];
interfaces = [
{
type = "user";
id = "qemu";
mac = "02:00:00:01:01:01";
}
];
forwardPorts = [
{
host.port = config.services.testdata.port;
guest.port = config.services.testdata.port;
}
];
# "qemu" has 9p built-in!
hypervisor = "qemu";
socket = "control.socket";
};
})
self.nixosModules.default
rec {
networking.firewall.allowedTCPPorts = [services.testdata.port];
services.testdata = {
enable = true;
host = "0.0.0.0";
port = 1234;
settings = {
keys = ["one" "two" "three"];
max-size = "1GB";
max-data = "100GB";
buffer-size = "12MiB";
database = "/root/testdata_state.json";
database-update-interval = 5.0;
log = "/root/log.jsonl";
};
};
}
];
};
};
};
}

View File

@ -2,16 +2,17 @@ inputs: {
config,
lib,
pkgs,
system,
...
}: let
cfg = config.services.testdata;
cfg = config.testdata;
package = inputs.self.packages.${pkgs.stdenv.hostPlatform.system}.default;
inherit (lib) mkIf mkEnableOption mkOption types;
format = pkgs.formats.json {};
configFile = format.generate "config.json" cfg.settings;
in {
options.services.testdata = {
options.testdata = {
enable = mkEnableOption "testdata";
settings = mkOption {
@ -28,17 +29,7 @@ in {
]);
in
valueType;
default = throw "Please specify services.testdata.settings";
};
host = mkOption {
type = types.str;
default = throw "Please specify a services.testdata.port";
};
port = mkOption {
type = types.int;
default = throw "Please specify a services.testdata.port";
default = throw "Please specify testdata.settings";
};
};
@ -50,7 +41,7 @@ in {
serviceConfig = {
Type = "simple";
ExecStart = "${package}/bin/testdata --config ${configFile} --listen ${cfg.host} --port ${builtins.toString cfg.port}";
ExecStart = "${package}/bin/testdata --config ${configFile}";
};
wantedBy = ["multi-user.target"];

1155
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,44 +1,27 @@
[project]
[tool.poetry]
name = "testdata"
version = "1.2.1"
requires-python = "~=3.12, <4"
dependencies = [
"fastapi~=0.115",
"uvicorn~=0.32",
"pydantic~=2.9",
]
version = "0.1.0"
description = ""
authors = ["Kristian Krsnik <git@krsnik.at>"]
readme = "README.md"
packages = [{ include = "testdata" }]
[project.optional-dependencies]
dev = [
"pytest~=8.3",
"mypy~=1.13",
"pylint~=3.3",
"requests~=2.32",
"types-requests~=2.32"
]
[tool.poetry.dependencies]
python = "^3.11"
pydantic = "^2.6.4"
ipaddress = "^1.0.23"
uvicorn = "^0.30.3"
python-fasthtml = "^0.2.1"
[project.scripts]
[tool.poetry.scripts]
testdata = "testdata.main:main"
[tool.poetry.group.dev.dependencies]
pytest = "^8.3.2"
requests = "^2.32.3"
types-requests = "^2.32.0.20240712"
pylint = "^3.2.6"
[build-system]
requires = ["setuptools~=75.1"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.package-data]
testdata = ["py.typed"]
[tool.autopep8]
max_line_length = 150
[tool.pylint.'MESSAGES CONTROL']
disable = [
"line-too-long",
"missing-module-docstring",
"missing-class-docstring",
"missing-function-docstring",
"too-few-public-methods",
"broad-exception-caught"
]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@ -1,3 +0,0 @@
from .testdata import Testdata
from .utils import convert_to_bytes
from .main import run

View File

@ -1,3 +0,0 @@
from .main import main
main()

View File

@ -1,3 +0,0 @@
from logging import getLogger, Logger
from .logger import setup_logging

View File

@ -1,173 +0,0 @@
import sys
import json
import logging
import logging.handlers
import atexit
from datetime import datetime, timezone
from typing_extensions import override
LOG_RECORD_BUILTIN_ATTRS = {
"args",
"asctime",
"created",
"exc_info",
"exc_text",
"filename",
"funcName",
"levelname",
"levelno",
"lineno",
"module",
"msecs",
"message",
"msg",
"name",
"pathname",
"process",
"processName",
"relativeCreated",
"stack_info",
"thread",
"threadName",
"taskName",
}
class JSONFormatter(logging.Formatter):
def __init__(self, *, fmt_keys: dict[str, str] | None = None):
super().__init__()
self.fmt_keys = fmt_keys if fmt_keys is not None else {}
@override
def format(self, record: logging.LogRecord) -> str:
message = self._prepare_log_dict(record)
return json.dumps(message, default=str)
def _prepare_log_dict(self, record: logging.LogRecord) -> dict:
always_fields = {
'message': record.getMessage(),
'timestamp': datetime.fromtimestamp(
record.created, tz=timezone.utc
).isoformat()
}
if record.exc_info is not None:
always_fields['exc_info'] = self.formatException(record.exc_info)
if record.stack_info is not None:
always_fields['stack_info'] = self.formatStack(record.stack_info)
message = {
key: msg_value
if (msg_value := always_fields.pop(value, None)) is not None
else getattr(record, value)
for key, value in self.fmt_keys.items()
}
message.update(always_fields)
for key, value in record.__dict__.items():
if key not in LOG_RECORD_BUILTIN_ATTRS:
message[key] = value
return message
class NonErrorFilter(logging.Filter):
@override
def filter(self, record: logging.LogRecord) -> bool | logging.LogRecord:
return record.levelno <= logging.INFO
def generate_log_config(log_path: str | None = None) -> dict:
logger_config: dict = {
'version': 1,
'disable_existing_loggers': False,
'filters': {
'no_errors': {
"()": NonErrorFilter
}
},
'formatters': {
'simple': {
'format': '[%(asctime)s][%(levelname)s] %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'
},
'detailed': {
'format': '[%(asctime)s][%(levelname)s] %(message)s',
'datefmt': '%Y-%m-%dT%H:%M:%S%z' # ISO-8601 Timestamp
},
'json': {
'()': JSONFormatter,
'fmt_keys': {
'timestamp': 'timestamp',
'level': 'levelname',
'message': 'message',
'logger': 'name',
'module': 'module',
'function': 'funcName',
'line': 'lineno',
'thread_name': 'threadName'
},
}
},
'handlers': {
'stdout': {
'class': logging.StreamHandler,
'level': 'INFO',
'filters': ['no_errors'],
'formatter': 'simple',
'stream': 'ext://sys.stdout'
},
'stderr': {
'class': logging.StreamHandler,
'level': 'WARNING',
'formatter': 'simple',
'stream': 'ext://sys.stderr'
}
} | ({'file': {
'class': logging.handlers.RotatingFileHandler,
'level': 'DEBUG',
'formatter': 'json',
'filename': log_path,
'maxBytes': 1024 * 1024 * 10, # 10 MiB
'backupCount': 3
}} if log_path is not None else {}),
'loggers': {
'root': {
'level': 'DEBUG',
'handlers': [
'stdout',
'stderr'
] + (['file'] if log_path is not None else []),
}
}
}
if sys.version_info >= (3, 12): # Python 3.12+
logger_config['handlers']['queue_handler'] = {
'class': logging.handlers.QueueHandler,
'respect_handler_level': True,
'handlers': [
'stdout',
'stderr'
] + (['file'] if log_path is not None else []),
}
logger_config['loggers']['root']['handlers'] = ['queue_handler']
return logger_config
def setup_logging(log_path: str | None = None) -> None:
log_config = generate_log_config(log_path if log_path != '-' else None)
logging.config.dictConfig(log_config)
if sys.version_info >= (3, 12): # Python 3.12+
queue_handler = logging.getHandlerByName('queue_handler')
if queue_handler is not None:
queue_handler.listener.start() # type: ignore
atexit.register(queue_handler.listener.stop) # type: ignore

46
src/testdata/main.py vendored
View File

@ -1,46 +0,0 @@
import os
import sys
import argparse
import asyncio
import shutil
from .testdata import Testdata
def parse_args(args: list[str]):
def formatter(prog):
return argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=shutil.get_terminal_size().columns)
parser = argparse.ArgumentParser(formatter_class=formatter)
parser.add_argument(
'-c', '--config', type=argparse.FileType('r'),
default=os.environ['TESTDATA_CONFIG'] if 'TESTDATA_CONFIG' in os.environ else './config.json',
help='Path to config file in JSON format.'
)
parser.add_argument(
'-l', '--listen', type=str,
default=os.environ['TESTDATA_HOST'] if 'TESTDATA_HOST' in os.environ else '0.0.0.0',
help='IP on which to listen.'
)
parser.add_argument(
'-p', '--port', type=int,
default=os.environ['TESTDATA_PORT'] if 'TESTDATA_PORT' in os.environ else 8080,
help='Port on which to serve the webserver.'
)
return parser.parse_args(args)
def run(argv: list[str]) -> None:
# Parse command-line parameters
args = parse_args(argv)
# Load Config
config = Testdata.Config.model_validate_json(args.config.read())
# Run webserver
asyncio.run(Testdata(config).run(args.listen, args.port))
def main() -> None:
run(sys.argv[1:])

View File

View File

@ -1,240 +0,0 @@
import os
import json
import asyncio
import inspect
import functools
import random
import importlib.metadata
from datetime import datetime
import uvicorn
from typing_extensions import Annotated
from fastapi import FastAPI, Request, Security, status, HTTPException
from fastapi.security import APIKeyHeader, APIKeyQuery
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, ConfigDict, Field, BeforeValidator, ValidationError
from . import logger
from .utils import convert_to_bytes, generate_data
class MaxSizePerRequestError(Exception):
pass
class MinSizePerRequestError(Exception):
pass
class Testdata:
class Config(BaseModel):
model_config = ConfigDict(extra='forbid')
@staticmethod
def to_bytes(value: int | str) -> int:
try:
return convert_to_bytes(value)
except Exception as err:
raise ValidationError from err
@staticmethod
def is_authorized_keys(value: set[str] | str) -> set[str]:
if isinstance(value, str):
with open(value, encoding='utf-8') as file:
return set(filter(lambda x: x.strip() != '', file.read().splitlines()))
return value
authorized_keys: Annotated[set[str], BeforeValidator(is_authorized_keys)] = Field(alias='keys')
max_size: Annotated[int, BeforeValidator(to_bytes)] = Field(alias='max-size')
max_data: Annotated[int, BeforeValidator(to_bytes)] = Field(alias='max-data')
buffer_size: Annotated[int, BeforeValidator(to_bytes)] = Field(alias='buffer-size')
database: str | None = None
log: str | None = Field(alias='log', default=None)
database_update_interval: float = Field(alias='database-update-interval', default=5)
_config: Config
_api: FastAPI
_state: dict
_logger: logger.Logger
def __init__(self, config: Config):
self._config = config
self._logger = logger.getLogger('testdata')
self._api = self._setup_api()
# Store internal state
self._state = {
'version': importlib.metadata.version('testdata'), # For future compatibility
'data-used': {f'{(today := datetime.today()).year}-{today.month:02}': 0} # math each months data usage
}
def _setup_api(self) -> FastAPI:
api = FastAPI(docs_url='/', redoc_url=None)
# Security
def get_api_key(
api_key_query: str = Security(APIKeyQuery(name="api_key", auto_error=False)),
api_key_header: str = Security(APIKeyHeader(name="x-api-key", auto_error=False))
) -> str:
# https://joshdimella.com/blog/adding-api-key-auth-to-fast-api
if api_key_query in self._config.authorized_keys:
return api_key_query
if api_key_header in self._config.authorized_keys:
return api_key_header
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid or missing API Key'
)
# A wrapper to set the function signature to accept the api key dependency
def secure(func):
# Get old signature
positional_only, positional_or_keyword, variadic_positional, keyword_only, variadic_keyword = [], [], [], [], []
for value in inspect.signature(func).parameters.values():
if value.kind == inspect.Parameter.POSITIONAL_ONLY:
positional_only.append(value)
elif value.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:
positional_or_keyword.append(value)
elif value.kind == inspect.Parameter.VAR_POSITIONAL:
variadic_positional.append(value)
elif value.kind == inspect.Parameter.KEYWORD_ONLY:
keyword_only.append(value)
elif value.kind == inspect.Parameter.VAR_KEYWORD:
variadic_keyword.append(value)
# Avoid passing an unrecognized keyword
if inspect.iscoroutinefunction(func):
async def wrapper(*args, **kwargs):
if len(variadic_keyword) == 0:
if 'api_key' in kwargs:
del kwargs['api_key']
return await func(*args, **kwargs)
else:
def wrapper(*args, **kwargs):
if len(variadic_keyword) == 0:
if 'api_key' in kwargs:
del kwargs['api_key']
return func(*args, **kwargs)
# Override signature
wrapper.__signature__ = inspect.signature(func).replace(
parameters=(
*positional_only,
*positional_or_keyword,
*variadic_positional,
*keyword_only,
inspect.Parameter('api_key', inspect.Parameter.POSITIONAL_OR_KEYWORD, default=Security(get_api_key)),
*variadic_keyword
)
)
return functools.wraps(func)(wrapper)
# Routes
api.get('/zeros')(secure(self._zeros))
return api
async def _zeros(self, size: int | str, request: Request, filename: str = 'zeros.bin') -> StreamingResponse:
try:
extra = {'id': f'{random.randint(0, 2 ** 32 - 1):08X}'}
self._logger.debug(
'Initiated request.',
extra=extra | {
'ip': request.client.host if request.client is not None else None,
'query-params': dict(request.query_params),
'headers': dict(request.headers)
}
)
try:
size = convert_to_bytes(size)
except ValueError as err:
self._logger.warning('Invalid format for size.', extra=extra)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Invalid format for size.'
) from err
if size < 0:
raise MinSizePerRequestError
if self._config.max_size < size:
raise MaxSizePerRequestError
# update internal state
current_date = f'{(today := datetime.today()).year}-{today.month:02}'
if current_date not in self._state['data-used']:
self._state['data-used'][current_date] = 0
if self._config.max_data < self._state['data-used'][current_date] + size:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Service not available.'
)
self._state['data-used'][current_date] += size
self._logger.debug('Successfully processed request.', extra=extra)
return StreamingResponse(
status_code=status.HTTP_200_OK,
content=generate_data(size, self._config.buffer_size),
media_type='application/octet-stream',
headers={
'Content-Length': str(size),
'Content-Disposition': f'attachment; filename="{filename}"'
}
)
except MinSizePerRequestError as err:
self._logger.warning('Size if negative.', extra=extra)
raise HTTPException(
status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE,
detail='Size has to be non-negative.'
) from err
except MaxSizePerRequestError as err:
self._logger.warning('Exceeded max size per request.', extra=extra)
raise HTTPException(
status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE,
detail=f'Exceeded max size per request of {self._config.max_size} Bytes.'
) from err
except Exception as err:
self._logger.exception(err)
raise err
async def _update_state(self) -> None:
assert self._config.database is not None
mode = 'r+' if os.path.exists(self._config.database) else 'w+'
with open(self._config.database, mode, encoding='utf-8') as file:
try:
self._state = json.load(file)
except json.JSONDecodeError:
pass
while True:
file.seek(0)
json.dump(self._state, file, indent=2)
file.truncate()
await asyncio.sleep(self._config.database_update_interval)
async def run(self, host: str, port: int) -> None:
try:
if self._config.log is not None:
logger.setup_logging(self._config.log)
self._logger = logger.getLogger('testdata')
self._logger.info('Server started.')
coroutines = [uvicorn.Server(uvicorn.Config(self._api, host, port)).serve()]
if self._config.database is not None:
coroutines.append(self._update_state())
await asyncio.gather(*coroutines)
except asyncio.exceptions.CancelledError:
self._logger.info('Server stopped.')
except Exception as err:
self._logger.exception(err)

40
src/testdata/utils.py vendored
View File

@ -1,40 +0,0 @@
import asyncio
from typing import AsyncGenerator
def convert_to_bytes(size: int | str) -> int:
if isinstance(size, int):
return size
if isinstance(size, str):
try:
return int(size)
except ValueError as err:
units = {
'TB': 1000 ** 4, 'TiB': 1024 ** 4,
'GB': 1000 ** 3, 'GiB': 1024 ** 3,
'MB': 1000 ** 2, 'MiB': 1024 ** 2,
'KB': 1000, 'KiB': 1024,
'B': 1
}
for unit, value in units.items():
if size.endswith(unit):
return int(float(size.removesuffix(unit)) * value)
raise ValueError from err
else:
raise ValueError
async def generate_data(size: int, buffer_size: int = 4 * 1024) -> AsyncGenerator[bytes, None]:
# https://github.com/tiangolo/fastapi/issues/5183
# https://github.com/encode/starlette/discussions/1776#discussioncomment-3207518
size_left = size
while size_left > buffer_size:
size_left -= buffer_size
yield b'\0' * buffer_size
await asyncio.sleep(0)
yield b'\0' * size_left
await asyncio.sleep(0)

3
test/test_imports.py Normal file
View File

@ -0,0 +1,3 @@
def test_import():
import testdata
from testdata import run

96
test/test_run.py Normal file
View File

@ -0,0 +1,96 @@
from multiprocessing import Process
import os
import json
import tempfile
import random
import pytest
import requests
import testdata
PROTOCOL = 'http'
HOST = '127.0.0.1'
PORT = 8080
BUFFER_SIZE = 4 * 1024
MAX_SIZE = 2 * 1024 * 1024 * 1024
MAX_DATA = 10 * 1024 * 1024 * 1024
API_KEY = f'{random.randrange(16 ** 32):032x}'
API_KEYS = { API_KEY }
TIMEOUT = 5
@pytest.fixture(autouse = True)
def server():
# Create Temporary Databases File
database = tempfile.NamedTemporaryFile(delete = False).name
proc = Process(target = testdata.run, args = (HOST, PORT, API_KEYS, MAX_SIZE, MAX_DATA, database, BUFFER_SIZE))
proc.start()
# Wait until webserver becomes available
while True:
try:
requests.get(f'{PROTOCOL}://{HOST}:{PORT}', timeout = TIMEOUT)
except requests.ConnectionError:
continue
break
yield database
# Terminate webserver
proc.terminate()
proc.join()
# Delete Temporary File
os.unlink(database)
def test_get_file():
response = requests.get(
f'{PROTOCOL}://{HOST}:{PORT}/zeros/?api_key={API_KEY}&size=32', timeout=TIMEOUT)
assert response.content == b'\0' * 32
def test_get_file_B():
response = requests.get(
f'{PROTOCOL}://{HOST}:{PORT}/zeros/?api_key={API_KEY}&size=32B', timeout=TIMEOUT)
assert response.content == b'\0' * 32
def test_get_file_KB():
response = requests.get(
f'{PROTOCOL}://{HOST}:{PORT}/zeros/?api_key={API_KEY}&size=32KB', timeout=TIMEOUT)
assert response.status_code == 200
assert response.content == b'\0' * 32 * 1000
def test_get_file_KiB():
response = requests.get(
f'{PROTOCOL}://{HOST}:{PORT}/zeros/?api_key={API_KEY}&size=32KiB', timeout=TIMEOUT)
assert response.status_code == 200
assert response.content == b'\0' * 32 * 1024
def test_get_file_invalid_format():
response = requests.get(
f'{PROTOCOL}://{HOST}:{PORT}/zeros/?api_key={API_KEY}&size=32Invalid', timeout=TIMEOUT)
assert response.status_code == 400
assert response.text == 'Invalid Format.'
def test_database_data_used(server):
requests.get(
f'{PROTOCOL}://{HOST}:{PORT}/zeros/?api_key={API_KEY}&size=32', timeout=TIMEOUT)
with open(server) as file:
assert json.loads(file.read())['data-used'] == 32

23
test/test_utils.py Normal file
View File

@ -0,0 +1,23 @@
from pytest import raises
from testdata.utils import convert_to_bytes
def test_convert_to_bytes():
test_values = {
('0', 0),
('32', 32),
('10_000', 10000),
('999999999999999999', 999999999999999999),
}
for input, expected_output in test_values:
assert convert_to_bytes(input) == expected_output
test_exceptions = {
('9E3', ValueError)
}
for input, exception in test_exceptions:
with raises(exception):
convert_to_bytes(input)

1
testdata/__init__.py vendored Normal file
View File

@ -0,0 +1 @@
from .main import run

3
testdata/__main__.py vendored Normal file
View File

@ -0,0 +1,3 @@
from testdata.main import main
main()

124
testdata/app.py vendored Normal file
View File

@ -0,0 +1,124 @@
# pylint: disable=missing-module-docstring,missing-class-docstring,missing-function-docstring
# pylint: disable=line-too-long
from pathlib import Path
from fasthtml.fastapp import FastHTML, FastHTMLWithLiveReload, Request, StaticFiles, picolink
from fasthtml.common import Form, Label, Input, Br, Button, A, Html, serve, Body, Link, P, Body, Div, Span, Main, Title # pylint: disable=no-name-in-module
from starlette import status
from starlette.responses import Response, StreamingResponse
from starlette.exceptions import HTTPException
from pydantic import ValidationError
from .utils import load_database, save_database, generate_data
from .custom_types import TestDataBody, Config
class MaxSizePerRequestError(Exception):
pass
class MinSizePerRequestError(Exception):
pass
with open('./config.json') as file: # parse_cli_arguments(sys.argv[1:])
config = Config.model_validate_json(file.read())
app = FastHTMLWithLiveReload(hdrs=(
(Link(rel='stylesheet', type='text/css', href='static/style.css'),)
))
app.mount(
'/static',
StaticFiles(directory=Path(__file__).parent.absolute() / 'static'),
name='static'
)
@app.get('/')
async def homepage():
return Main(
P('Testdata Download'),
P('Download a file of exact size. The file will only contain zeros.'),
Div(
Div(
Form(
Div(
Label('API Key'), Br(),
Input(name='api_key'), Br()
),
Div(
Label('Size'), Br(),
Input(name='size', placeholder='2MiB, 1GB, etc.'), Br()
),
Button('Download'),
action='/zeros', hx_boost="true"
),
P(f'Max Download Size: {config.max_size / (1024 * 1024 * 1024):.02f} GiB'), Br(), # nopep8
cls='download'
),
P('Contact: ', A('contact.testdata@krsnik.at', href='mailto:contact.testdata@krsnik.at'), id='contact') # nopep8
)
)
@ app.get('/zeros') # type: ignore
async def get_zeros(api_key: str, size: str, request: Request) -> StreamingResponse:
# AJAX Requests cannot return a file, thus send a redirect.
if 'HX-Request' in request.headers:
return Response(
content='<p>Hello</p>',
headers={'HX-Redirect': f'/zeros/?api_key={api_key}&size={size}'})
try:
body = TestDataBody(api_key=api_key, size=size) # type: ignore
except ValidationError as err:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Invalid Format.'
) from err
try:
if body.api_key not in config.api_keys:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid API Key.'
)
if body.size < 0:
raise MinSizePerRequestError
if config.max_size < body.size:
raise MaxSizePerRequestError
db = load_database(config.database)
if config.max_data <= db['data-used'] + body.size:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Service not available.'
)
db['data-used'] += body.size
save_database(config.database, db)
return StreamingResponse(
status_code=status.HTTP_200_OK,
content=generate_data(body.size, config.buffer_size),
media_type='application/octet-stream',
headers={
'Content-Length': str(body.size),
}
)
except MinSizePerRequestError as err:
raise HTTPException(
status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE,
detail='Size has to be not-negative.'
) from err
except MaxSizePerRequestError as err:
raise HTTPException(
status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE,
detail=f'Exceeded max size per request of {config.max_size} Bytes.'
) from err

41
testdata/custom_types.py vendored Normal file
View File

@ -0,0 +1,41 @@
from pydantic import BaseModel, Field, ConfigDict, field_validator
from .utils import convert_to_bytes
class Config(BaseModel):
host: str
port: int
buffer_size: int = Field(alias='buffer-size', default=4 * 1024) # 4KB
max_size: int = Field(alias='max-size', default=2 * 1024 ** 3) # 2GB
max_data: int = Field(alias='max-data', default=0) # unlimited
api_keys: set[str] = Field(alias='api-keys')
database: str
log: str = '-'
model_config = ConfigDict(extra='forbid')
@field_validator('buffer_size', 'max_size', 'max_data', mode='before')
@classmethod
def convert_size(cls, value: int | str) -> int:
return convert_to_bytes(value)
@field_validator('api_keys', mode='before')
@classmethod
def check_if_key_in_file(cls, value: str | list[str]) -> set[str]:
if isinstance(value, str):
with open(value) as file:
return set(filter(lambda x: x.strip() != '', file.read().splitlines()))
return set(value)
class TestDataBody(BaseModel):
api_key: str
size: int
model_config = ConfigDict(extra='forbid')
@field_validator('size', mode='before')
@classmethod
def convert_size(cls, value: str) -> int:
return convert_to_bytes(value)

38
testdata/main.py vendored Normal file
View File

@ -0,0 +1,38 @@
import sys
import argparse
import os
from os.path import exists
import uvicorn
from .utils import save_database
from .custom_types import Config
# Setup Parser
def parse_cli_arguments(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
'-c',
'--config',
type = argparse.FileType('r'),
default = './config.json',
help = 'Path to config file in JSON format.'
)
return parser.parse_args(argv)
def run(host: str, port: int, api_keys: set[str], max_size: int, max_data: int, database: str, buffer_size: int, reload: bool = False):
if not exists(database) or os.stat(database).st_size == 0:
save_database(database, {'data-used': 0})
# app = create_api(api_keys, max_size, max_data, database, buffer_size)
uvicorn.run("testdata.app:app", host=host, port=port, reload=reload)
def main():
args = parse_cli_arguments(sys.argv[1:])
config = Config.model_validate_json(args.config.read())
run(**config.model_dump(exclude={'log'}), reload=True)

BIN
testdata/static/Roboto-Regular.ttf vendored Normal file

Binary file not shown.

48
testdata/static/style.css vendored Normal file
View File

@ -0,0 +1,48 @@
@font-face {
font-family: roboto;
src: url(Roboto-Regular.ttf);
}
:root {
--background-color: #ffffff;
--form-background-color: #e2ddff;
}
body {
background-color: var(--background-color);
font-family: roboto;
}
.download {
background-color: var(--form-background-color);
border-radius: 1em;
text-align: center;
width: 70%;
margin: 0 auto;
margin-top: 5em;
}
.download > form > div {
width: 70%;
margin: 0 auto;
}
.download > form > div > label {
font-size: 3em;
}
.download > form > div > input {
width: 100%;
font-size: 3em;
text-indent: 1em;
background-color: var(--background-color);
}
.download > form > button {
margin-top: 1em;
font-size: 3em;
}
#contact {
text-align: center;
}

55
testdata/utils.py vendored Normal file
View File

@ -0,0 +1,55 @@
import json
import asyncio
from typing import AsyncGenerator
def convert_to_bytes(size: int | str) -> int:
if isinstance(size, int):
return size
try:
return int(size)
except ValueError as err:
units = {
'TB': 1000 ** 4, 'TiB': 1024 ** 4,
'GB': 1000 ** 3, 'GiB': 1024 ** 3,
'MB': 1000 ** 2, 'MiB': 1024 ** 2,
'KB': 1000, 'KiB': 1024,
'B': 1
}
for unit, value in units.items():
if size.endswith(unit):
return int(float(size.removesuffix(unit)) * value)
raise ValueError(
'Invalid format. Expected integer or float ending with a data unit (B, KB, MiB,...).'
) from err
async def generate_data(size: int, buffer_size: int = 4 * 1024) -> AsyncGenerator[bytes, None]:
size_left = size
# https://github.com/tiangolo/fastapi/issues/5183
# https://github.com/encode/starlette/discussions/1776#discussioncomment-3207518
try:
while size_left > buffer_size:
size_left -= buffer_size
yield b'\0' * buffer_size
await asyncio.sleep(0)
yield b'\0' * size_left
await asyncio.sleep(0)
except asyncio.CancelledError as err:
raise GeneratorExit from err
def load_database(path: str) -> dict:
with open(path, 'r', encoding='utf-8') as file:
return json.load(file)
def save_database(path: str, database: dict) -> None:
with open(path, 'w', encoding='utf-8') as file:
json.dump(database, file, indent=2)

View File

@ -1,5 +0,0 @@
import sys
import pytest
retcode = pytest.main(sys.argv[1:])

View File

@ -1,2 +0,0 @@
def test_import_testdata():
import testdata # pylint: disable=unused-import,import-outside-toplevel

View File

@ -1,144 +0,0 @@
import json
import time
import tempfile
import asyncio
from multiprocessing import Process
from typing import Generator
import pytest
import requests
import testdata
PROTOCOL = 'http'
HOST = 'localhost'
PORT = 1234
TIMEOUT = 1 # seconds
@pytest.fixture(scope='function')
def _server(request) -> Generator[str, None, None]:
with tempfile.NamedTemporaryFile() as tmpfile:
request.param['database'] = tmpfile.name
config = testdata.Testdata.Config.model_validate_json(json.dumps(request.param))
server = testdata.Testdata(config)
def run_server():
asyncio.run(server.run(HOST, PORT))
process = Process(target=run_server)
process.start()
# Wait until webserver becomes available
start = time.time()
while (time.time() - start) < TIMEOUT:
try:
requests.get(f'{PROTOCOL}://{HOST}:{PORT}', timeout=TIMEOUT)
break
except requests.exceptions.ConnectionError:
pass
yield tmpfile.name
process.terminate()
# Wait until webserver is completely shut down
start = time.time()
while (time.time() - start) < TIMEOUT:
try:
requests.get(f'{PROTOCOL}://{HOST}:{PORT}', timeout=TIMEOUT)
except requests.exceptions.ConnectionError:
break
@pytest.mark.parametrize('_server', [({
'keys': ['one', 'two', 'three'],
'max-size': '100',
'max-data': 1234,
'buffer-size': '12MiB',
})], indirect=['_server'])
def test_invalid_api_key(_server):
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=four&size=100', timeout=TIMEOUT)
assert response.status_code == 401
@pytest.mark.parametrize('_server', [({
'keys': ['one', 'two', 'three'],
'max-size': '100',
'max-data': 1234,
'buffer-size': '12MiB',
})], indirect=['_server'])
def test_request_size_lower_bound(_server):
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=-1', timeout=TIMEOUT)
assert response.status_code == 416
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=0', timeout=TIMEOUT)
assert response.status_code == 200
assert response.content == b''
@pytest.mark.parametrize('_server', [({
'keys': ['one', 'two', 'three'],
'max-size': '100',
'max-data': 1234,
'buffer-size': '12MiB',
})], indirect=['_server'])
def test_request_size_upper_bound(_server):
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=100', timeout=TIMEOUT)
assert response.status_code == 200
assert response.content == b'\0' * 100
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=101', timeout=TIMEOUT)
assert response.status_code == 416
@pytest.mark.parametrize('_server', [({
'keys': ['one', 'two', 'three'],
'max-size': '100KB',
'max-data': '100KB',
'buffer-size': '12MiB',
})], indirect=['_server'])
def test_request_max_data_used(_server):
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=100KB', timeout=TIMEOUT)
assert response.status_code == 200
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=1', timeout=TIMEOUT)
assert response.status_code == 500
@pytest.mark.parametrize('_server', [({
'keys': ['one', 'two', 'three'],
'max-size': '1KB',
'max-data': '1KB',
'buffer-size': '12MiB',
'database-update-interval': 0.1
})], indirect=['_server'])
def test_check_database_update(_server):
import importlib.metadata
from datetime import datetime
database = _server
with open(database, 'r', encoding='utf-8') as file:
file.seek(0)
today = datetime.today()
assert json.load(file) == {
'version': importlib.metadata.version('testdata'),
'data-used': {
f'{today.year}-{today.month:02}': 0
}
}
response = requests.get(f'{PROTOCOL}://{HOST}:{PORT}/zeros?api_key=one&size=100', timeout=TIMEOUT)
assert response.status_code == 200
time.sleep(0.1)
file.seek(0)
assert json.load(file) == {
'version': importlib.metadata.version('testdata'),
'data-used': {
f'{today.year}-{today.month:02}': 100
}
}