Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set up wireguard network for task worker from manager container #21

Merged
merged 15 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/worker-task-Publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Build and publish Docker Image
uses: openzim/docker-publish-action@v10
with:
image-name: kiwix/mirrors-qa-worker-task
image-name: kiwix/mirrors-qa-task-worker
latest-on-tag: true
tag-pattern: /^v([0-9.]+)$/
restrict-to: kiwix/mirrors-qa
Expand Down
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,5 +165,8 @@ cython_debug/

# project files
*.pem
data
*.conf
dev/data/**
!dev/data/README.md
!dev/.env
id_rsa
7 changes: 3 additions & 4 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
FROM python:3.11-slim-bookworm
LABEL org.opencontainers.image.source=https://github.com/kiwix/mirrors-qa
LABEL org.opencontainers.image.source=https://github.com/kiwix/mirrors-qa/backend

RUN apt-get update && apt-get install -y curl
# Copy code

COPY src /src/src
# Copy pyproject.toml and its dependencies

COPY pyproject.toml README.md /src/

# Install + cleanup
RUN pip install --no-cache-dir /src \
&& rm -rf /src

Expand Down
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies = [
"cryptography==42.0.8",
"PyJWT==2.8.0",
"paramiko==3.4.0",
"humanfriendly==10.0",
]
license = {text = "GPL-3.0-or-later"}
classifiers = [
Expand Down
3 changes: 1 addition & 2 deletions backend/src/mirrors_qa_backend/cli/mirrors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import sys
from argparse import Namespace

from mirrors_qa_backend import logger
from mirrors_qa_backend.db import Session
Expand All @@ -8,7 +7,7 @@
from mirrors_qa_backend.extract import get_current_mirrors


def update_mirrors(args: Namespace) -> None: # noqa: ARG001
def update_mirrors() -> None:
logger.info("Updating mirrors list.")
try:
with Session.begin() as session:
Expand Down
19 changes: 9 additions & 10 deletions backend/src/mirrors_qa_backend/cli/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime
import time
from argparse import Namespace

from mirrors_qa_backend import logger
from mirrors_qa_backend.db import Session
Expand All @@ -10,15 +9,17 @@
from mirrors_qa_backend.settings.scheduler import SchedulerSettings


def main(args: Namespace): # noqa: ARG001
def main(
sleep_seconds: float = SchedulerSettings.SLEEP_SECONDS,
expire_tests_since: float = SchedulerSettings.EXPIRE_TEST_SECONDS,
workers_since: float = SchedulerSettings.IDLE_WORKER_SECONDS,
):
while True:
with Session.begin() as session:
# expire tests whose results have not been reported
expired_tests = expire_tests(
session,
interval=datetime.timedelta(
seconds=SchedulerSettings.EXPIRE_TEST_SECONDS
),
interval=datetime.timedelta(seconds=expire_tests_since),
)
for expired_test in expired_tests:
logger.info(
Expand All @@ -30,7 +31,7 @@ def main(args: Namespace): # noqa: ARG001
idle_workers = get_idle_workers(
session,
interval=datetime.timedelta(
seconds=SchedulerSettings.IDLE_WORKER_SECONDS
seconds=workers_since,
),
)
if not idle_workers:
Expand Down Expand Up @@ -74,7 +75,5 @@ def main(args: Namespace): # noqa: ARG001
f"{idle_worker.id} in country {country.name}"
)

logger.info(
f"Sleeping for {SchedulerSettings.SCHEDULER_SLEEP_SECONDS} seconds."
)
time.sleep(SchedulerSettings.SCHEDULER_SLEEP_SECONDS)
logger.info(f"Sleeping for {sleep_seconds} seconds.")
time.sleep(sleep_seconds)
15 changes: 6 additions & 9 deletions backend/src/mirrors_qa_backend/cli/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import sys
from argparse import Namespace

import pycountry
from cryptography.hazmat.primitives import serialization
Expand All @@ -9,9 +8,8 @@
from mirrors_qa_backend.db.worker import create_worker as create_db_worker


def create_worker(args: Namespace):
def create_worker(worker_id: str, private_key_data: bytes, country_codes: list[str]):
# Ensure all the countries are valid country codes
country_codes: list[str] = args.countries if args.countries else []
for country_code in country_codes:
if len(country_code) != 2: # noqa: PLR2004
logger.info(f"Country code '{country_code}' must be two characters long")
Expand All @@ -22,10 +20,9 @@ def create_worker(args: Namespace):
sys.exit(1)

try:
with args.private_key_file as fp:
private_key = serialization.load_pem_private_key(
bytes(fp.read(), encoding="ascii"), password=None
) # pyright: ignore[reportReturnType]
private_key = serialization.load_pem_private_key(
private_key_data, password=None
) # pyright: ignore[reportReturnType]
except Exception as exc:
logger.info(f"Unable to load private key: {exc}")
sys.exit(1)
Expand All @@ -34,12 +31,12 @@ def create_worker(args: Namespace):
with Session.begin() as session:
create_db_worker(
session,
args.worker_id,
worker_id,
country_codes,
private_key, # pyright: ignore [reportGeneralTypeIssues, reportArgumentType]
)
except Exception as exc:
logger.info(f"error while creating worker: {exc}")
sys.exit(1)

logger.info(f"Created worker {args.worker_id} successfully")
logger.info(f"Created worker {worker_id} successfully")
99 changes: 73 additions & 26 deletions backend/src/mirrors_qa_backend/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,108 @@
import logging
import sys

from humanfriendly import parse_timespan

from mirrors_qa_backend import logger
from mirrors_qa_backend.__about__ import __version__
from mirrors_qa_backend.cli.mirrors import update_mirrors
from mirrors_qa_backend.cli.scheduler import main as start_scheduler
from mirrors_qa_backend.cli.worker import create_worker
from mirrors_qa_backend.settings.scheduler import SchedulerSettings

UPDATE_MIRRORS_CLI = "update-mirrors"
CREATE_WORKER_CLI = "create-worker"
SCHEDULER_CLI = "scheduler"


def main():
# The program is split into a number of sub-commands which each sbu-command
# performing different function and requring different different kinds of
# command line arguments
parser = argparse.ArgumentParser()
# Define the command-line arguments that are used by all the sub-commands
parser.add_argument(
"--verbose", "-v", help="Show verbose output", action="store_true"
"--verbose", "-v", help="Show verbose output.", action="store_true"
)
subparsers = parser.add_subparsers(required=True)
# Sub-command for updating mirrors
update_mirrors_cmd = subparsers.add_parser(
"update-mirrors", help="Update the list of mirrors"
parser.add_argument(
"--version",
help="Show version and exit.",
action="version",
version="%(prog)s: " + __version__,
)
# Register the function to handle the sub-command
update_mirrors_cmd.set_defaults(func=update_mirrors)

# Sub-command for starting the scheduler
scheduler_cmd = subparsers.add_parser(
"scheduler",
help="Start the scheduler",
subparsers = parser.add_subparsers(required=True, dest="cli_name")

subparsers.add_parser(UPDATE_MIRRORS_CLI, help="Update the list of mirrors")

scheduler_cli = subparsers.add_parser(
SCHEDULER_CLI,
help="Start the scheduler.",
)
scheduler_cli.add_argument(
"--sleep",
help="Duration to sleep after creating tests",
type=parse_timespan,
dest="scheduler_sleep_seconds",
default=SchedulerSettings.SLEEP_SECONDS,
metavar="duration",
)
scheduler_cli.add_argument(
"--workers-since",
help="Create tests for workers last seen in duration.",
type=parse_timespan,
default=SchedulerSettings.IDLE_WORKER_SECONDS,
metavar="duration",
elfkuzco marked this conversation as resolved.
Show resolved Hide resolved
)
scheduler_cli.add_argument(
"--expire-tests-since",
help="Expire tests whose results have not arrived since duration",
type=parse_timespan,
default=SchedulerSettings.EXPIRE_TEST_SECONDS,
metavar="duration",
)
# Register the function to handle the sub-command
scheduler_cmd.set_defaults(func=start_scheduler)

# Sub-command for creating a worker
create_worker_cmd = subparsers.add_parser(
"create-worker", help="Create a new worker"
create_worker_cli = subparsers.add_parser(
CREATE_WORKER_CLI, help="Create a new worker."
)
create_worker_cmd.add_argument(
"worker_id", help="ID of the worker", metavar="worker-id"
create_worker_cli.add_argument(
"worker_id", help="ID of the worker.", metavar="worker-id"
)
create_worker_cmd.add_argument(
create_worker_cli.add_argument(
"--countries",
help="Comma-seperated country codes each in ISO 3166-1 alpha-2 format",
type=lambda s: s.split(","),
help="Comma-seperated country codes each in ISO 3166-1 alpha-2 format.",
type=lambda countries: countries.split(","),
metavar="codes",
)
create_worker_cmd.add_argument(
create_worker_cli.add_argument(
"private_key_file",
metavar="private-key-file",
type=argparse.FileType("r", encoding="ascii"),
rgaudin marked this conversation as resolved.
Show resolved Hide resolved
nargs="?",
default=sys.stdin,
help="RSA private key file (default: stdin)",
help="RSA private key file (default: stdin).",
)
# Register the function to handle the sub-command
create_worker_cmd.set_defaults(func=create_worker)

args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)

args.func(args)
if args.cli_name == UPDATE_MIRRORS_CLI:
return update_mirrors()
elif args.cli_name == SCHEDULER_CLI:
return start_scheduler(
args.scheduler_sleep_seconds,
args.expire_tests_since,
args.workers_since,
)
elif args.cli_name == CREATE_WORKER_CLI:
return create_worker(
args.worker_id,
bytes(args.private_key_file.read(), encoding="ascii"),
args.countries if args.countries else [],
)
else:
args.print_help()


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions backend/src/mirrors_qa_backend/routes/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ def authenticate_worker(
# verify timestamp is less than MESSAGE_VALIDITY
if (
datetime.datetime.now(datetime.UTC) - timestamp
).total_seconds() > APISettings.MESSAGE_VALIDITY:
).total_seconds() > APISettings.MESSAGE_VALIDITY_SECONDS:
raise UnauthorizedError(
"Difference betweeen message time and server time is "
f"greater than {APISettings.MESSAGE_VALIDITY}s"
f"greater than {APISettings.MESSAGE_VALIDITY_SECONDS}s"
)

# verify worker with worker_id exists in database
Expand All @@ -77,5 +77,5 @@ def authenticate_worker(
return Token(
access_token=access_token,
token_type="bearer",
expires_in=datetime.timedelta(hours=APISettings.TOKEN_EXPIRY).total_seconds(),
expires_in=APISettings.TOKEN_EXPIRY_SECONDS,
)
6 changes: 5 additions & 1 deletion backend/src/mirrors_qa_backend/settings/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from typing import Any

from humanfriendly import parse_timespan


def getenv(key: str, *, mandatory: bool = False, default: Any = None) -> Any:
value = os.getenv(key, default=default)
Expand All @@ -17,7 +19,9 @@ class Settings:
DATABASE_URL: str = getenv("POSTGRES_URI", mandatory=True)
DEBUG = bool(getenv("DEBUG", default=False))
# number of seconds before requests time out
REQUESTS_TIMEOUT_SECONDS = int(getenv("REQUESTS_TIMEOUT_SECONDS", default=5))
REQUESTS_TIMEOUT_SECONDS = parse_timespan(
getenv("REQUESTS_TIMEOUT_DURATION", default="10s")
)
# maximum number of items to return from a request/query
MAX_PAGE_SIZE = int(getenv("PAGE_SIZE", default=20))
# url to fetch the list of mirrors
Expand Down
8 changes: 6 additions & 2 deletions backend/src/mirrors_qa_backend/settings/api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from humanfriendly import parse_timespan

from mirrors_qa_backend.settings import Settings, getenv


Expand All @@ -6,6 +8,8 @@ class APISettings(Settings):

JWT_SECRET: str = getenv("JWT_SECRET", mandatory=True)
# number of seconds before a message expire
MESSAGE_VALIDITY = int(getenv("MESSAGE_VALIDITY", default=60))
MESSAGE_VALIDITY_SECONDS = parse_timespan(
getenv("MESSAGE_VALIDITY_DURATION", default="1m")
)
# number of hours before access tokens expire
TOKEN_EXPIRY = int(getenv("TOKEN_EXPIRY", default=24))
TOKEN_EXPIRY_SECONDS = parse_timespan(getenv("TOKEN_EXPIRY_DURATION", default="6h"))
10 changes: 5 additions & 5 deletions backend/src/mirrors_qa_backend/settings/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from humanfriendly import parse_timespan

from mirrors_qa_backend.settings import Settings, getenv


class SchedulerSettings(Settings):
"""Scheduler settings"""

# number of seconds the scheduler sleeps before attempting to create tests
SCHEDULER_SLEEP_SECONDS = int(
getenv("SCHEDULER_SLEEP_SECONDS", default=60 * 60 * 3)
)
SLEEP_SECONDS = parse_timespan(getenv("SCHEDULER_SLEEP_DURATION", default="3h"))
# number of seconds into the past to determine if a worker is idle
IDLE_WORKER_SECONDS = int(getenv("IDLE_WORKER_SECONDS", default=60 * 60))
IDLE_WORKER_SECONDS = parse_timespan(getenv("IDLE_WORKER_DURATION", default="1h"))
# number of seconds to wait before expiring a test whose data never arrived
EXPIRE_TEST_SECONDS = int(getenv("EXPIRE_TEST_SECONDS", default=60 * 60 * 24))
EXPIRE_TEST_SECONDS = parse_timespan(getenv("EXPIRE_TEST_DURATION", default="1d"))
4 changes: 3 additions & 1 deletion backend/src/mirrors_qa_backend/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

def generate_access_token(worker_id: str) -> str:
issue_time = datetime.datetime.now(datetime.UTC)
expire_time = issue_time + datetime.timedelta(hours=APISettings.TOKEN_EXPIRY)
expire_time = issue_time + datetime.timedelta(
seconds=APISettings.TOKEN_EXPIRY_SECONDS
)
payload = {
"iss": "mirrors-qa-backend", # issuer
"exp": expire_time.timestamp(), # expiration time
Expand Down
1 change: 1 addition & 0 deletions dev/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
WORKER_ID=""
Loading
Loading