Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics #300

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/end_to_end_test_ci.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ jobs:
cache: "pip"

- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'

- name: Create wallet files
run: |
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/executor_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Create dotenv file
run: cp ./envs/dev/.env.template .env
- name: Run linters
Expand All @@ -45,7 +45,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Create dotenv file
run: cp ./envs/dev/.env.template .env
- name: Run mypy
Expand All @@ -66,7 +66,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Setup common virtualenv
# In order not to exhaust disk on GitHub runner, we use one single
# virtualenv for all pdm projects: miner, executor, validator.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Setup common virtualenv
# In order not to exhaust disk on GitHub runner, we use one single
# virtualenv for all pdm projects: miner, executor, validator.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/library_cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
with:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Get version from tag
id: get-version
run: echo "version=${GITHUB_REF#refs/tags/library-v}" >> "$GITHUB_OUTPUT"
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/library_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Run linters
run: nox -vs lint
- name: Check for missing migrations
Expand All @@ -45,7 +45,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Run mypy
run: nox -vs type_check
test:
Expand All @@ -64,6 +64,6 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Run unit tests
run: nox -vs test
6 changes: 3 additions & 3 deletions .github/workflows/miner_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Create dotenv file
run: cp ./envs/dev/.env.template .env
- name: Run linters check
Expand All @@ -45,7 +45,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Create dotenv file
run: cp ./envs/dev/.env.template .env
- name: Run mypy
Expand All @@ -66,7 +66,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Setup common virtualenv
# In order not to exhaust disk on GitHub runner, we use one single
# virtualenv for all pdm projects: miner, executor, validator.
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/validator_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Create dotenv file
run: cp ./envs/dev/.env.template .env
- name: Run linters
Expand All @@ -45,7 +45,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Create dotenv file
run: cp ./envs/dev/.env.template .env
- name: Run mypy
Expand All @@ -66,7 +66,7 @@ jobs:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"
- name: Install dependencies
run: python -m pip install --upgrade nox 'pdm>=2.12,<3'
run: python -m pip install --upgrade nox 'pdm==2.19.3'
- name: Setup common virtualenv
# In order not to exhaust disk on GitHub runner, we use one single
# virtualenv for all pdm projects: miner, executor, validator.
Expand Down
2 changes: 1 addition & 1 deletion compute_horde/compute_horde/executor_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ExecutorClassSpec:
description="always on, NVIDIA RTX A6000 GPU machine for LLM prompts solving",
has_gpu=True,
gpu_vram_gb=48,
spin_up_time=int(timedelta(minutes=1).total_seconds()),
spin_up_time=int(timedelta(minutes=4).total_seconds()),
),
# ExecutorClass.always_on__cpu_16c__ram_64gb: ExecutorClassSpec(
# cpu_cores=16,
Expand Down
2 changes: 1 addition & 1 deletion validator/app/envs/prod/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ LABEL builder=true

WORKDIR /root/src/

RUN pip3 install --no-cache-dir 'pdm>=2.12,<3'
RUN pip3 install --no-cache-dir 'pdm==2.19.3'

RUN apt-get update && apt-get install -y git
COPY pyproject.toml pdm.lock ./
Expand Down
7 changes: 7 additions & 0 deletions validator/app/src/compute_horde_validator/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
"compute_horde_validator.validator.tasks.fetch_dynamic_config": DEFAULT_QUEUE,
}

CELERY_TASK_QUEUES = list(set(TASK_QUEUE_MAP.values()))


def route_task(name, args, kwargs, options, task=None, **kw):
if name not in TASK_QUEUE_MAP:
Expand All @@ -60,3 +62,8 @@ def apply_startup_hook(*args, **kwargs):
importlib.import_module(hook_script_file)
else:
print("Not loading any startup hook")


def get_num_tasks_in_queue(queue_name: str) -> int:
with app.pool.acquire(block=True) as conn:
return conn.default_channel.client.llen(queue_name)
2 changes: 2 additions & 0 deletions validator/app/src/compute_horde_validator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ def wrapped(*args, **kwargs):
CELERY_TASK_ROUTES = ["compute_horde_validator.celery.route_task"]
CELERY_TASK_TIME_LIMIT = int(timedelta(hours=2, minutes=5).total_seconds())
CELERY_TASK_ALWAYS_EAGER = env.bool("CELERY_TASK_ALWAYS_EAGER", default=False)
CELERY_WORKER_SEND_TASK_EVENTS = True
CELERY_TASK_SEND_SENT_EVENT = True
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
Expand Down
21 changes: 21 additions & 0 deletions validator/app/src/compute_horde_validator/validator/metrics.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import glob
import os
from collections.abc import Iterator

import prometheus_client
from django.http import HttpResponse
from django_prometheus.exports import ExportToDjangoView
from prometheus_client import multiprocess
from prometheus_client.core import REGISTRY, GaugeMetricFamily, Metric
from prometheus_client.registry import Collector

from ..celery import get_num_tasks_in_queue, CELERY_TASK_QUEUES


class RecursiveMultiProcessCollector(multiprocess.MultiProcessCollector):
Expand All @@ -23,9 +28,25 @@ def metrics_view(request):
if os.environ.get(ENV_VAR_NAME):
registry = prometheus_client.CollectorRegistry()
RecursiveMultiProcessCollector(registry)
registry.register(CustomCeleryCollector())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why register twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've copied if from cookiecutter template but I see now it is redundant

return HttpResponse(
prometheus_client.generate_latest(registry),
content_type=prometheus_client.CONTENT_TYPE_LATEST,
)
else:
return ExportToDjangoView(request)


class CustomCeleryCollector(Collector):
def collect(self) -> Iterator[Metric]:
num_tasks_in_queue = GaugeMetricFamily(
"celery_queue_len",
"How many tasks are there in a queue",
labels=("queue",),
)
for queue in CELERY_TASK_QUEUES:
num_tasks_in_queue.add_metric([queue], get_num_tasks_in_queue(queue))
yield num_tasks_in_queue


REGISTRY.register(CustomCeleryCollector())
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ def _db_persist_system_events(ctx: BatchContext) -> None:

# sync_to_async is needed since we use the sync Django ORM
@sync_to_async
def _db_persist(ctx: BatchContext) -> None:
def _db_persist_critical(ctx: BatchContext) -> None:
start_time = time.time()

# persist the batch and the jobs in the same transaction, to
Expand Down Expand Up @@ -1505,6 +1505,19 @@ def _db_persist(ctx: BatchContext) -> None:
)
synthetic_jobs.append(synthetic_job)
synthetic_jobs = SyntheticJob.objects.bulk_create(synthetic_jobs)
duration = time.time() - start_time
logger.info("Persisted to database in %.2f seconds", duration)


# sync_to_async is needed since we use the sync Django ORM
@sync_to_async
def _db_persist(ctx: BatchContext) -> None:
start_time = time.time()

if ctx.batch_id is not None:
batch = SyntheticJobBatch.objects.get(id=ctx.batch_id)
else:
batch = SyntheticJobBatch.objects.get(started_at=ctx.stage_start_time["BATCH_BEGIN"])

miner_manifests: list[MinerManifest] = []
for miner in ctx.miners.values():
Expand All @@ -1523,7 +1536,7 @@ def _db_persist(ctx: BatchContext) -> None:

# TODO: refactor into nicer abstraction
synthetic_jobs_map: dict[str, SyntheticJob] = {
str(synthetic_job.job_uuid): synthetic_job for synthetic_job in synthetic_jobs
str(synthetic_job.job_uuid): synthetic_job for synthetic_job in batch.synthetic_jobs.all()
}
prompt_samples: list[PromptSample] = []

Expand Down Expand Up @@ -1700,6 +1713,9 @@ async def execute_synthetic_batch_run(
func="_multi_close_client",
)

await ctx.checkpoint_system_event("_db_persist_critical")
await _db_persist_critical(ctx)

await ctx.checkpoint_system_event("_emit_telemetry_events")
try:
_emit_telemetry_events(ctx)
Expand Down
87 changes: 0 additions & 87 deletions validator/envs/prod/.env.template

This file was deleted.

Loading
Loading