Skip to content

Commit

Permalink
fix linter issues
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed Oct 19, 2023
1 parent be1806e commit 4f715e6
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
12 changes: 7 additions & 5 deletions merlin/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
"""
import logging
import os
import subprocess
import time
from datetime import datetime
from typing import Dict, List, Optional
Expand Down Expand Up @@ -260,7 +259,7 @@ def get_active_queues(task_server: str) -> Dict[str, List[str]]:
return active_queues


def wait_for_workers(sleep: int, task_server: str, spec: "MerlinSpec"):
def wait_for_workers(sleep: int, task_server: str, spec: "MerlinSpec"): # noqa
"""
Wait on workers to start up. Check on worker start 10 times with `sleep` seconds between
each check. If no workers are started in time, raise an error to kill the monitor (there
Expand Down Expand Up @@ -305,14 +304,17 @@ def check_workers_processing(queues_in_spec: List[str], task_server: str, app_na
:param `app_name`: The name of the app we're querying
:returns: True if workers are still processing tasks, False otherwise
"""
result = False

if task_server == "celery":
return check_celery_workers_processing(queues_in_spec, app_name)
result = check_celery_workers_processing(queues_in_spec, app_name)
else:
LOG.error("Celery is not specified as the task server!")
return False

return result


def check_merlin_status(args: "Namespace", spec: "MerlinSpec", app_name: Optional[str] = "merlin") -> bool:
def check_merlin_status(args: "Namespace", spec: "MerlinSpec", app_name: Optional[str] = "merlin") -> bool: # noqa
"""
Function to check merlin workers and queues to keep the allocation alive
Expand Down
4 changes: 2 additions & 2 deletions merlin/study/celeryadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ def query_celery_queues(queues: List[str]) -> Dict[str, List[str]]:
# Get a dict of active queues by querying the celery app
active_queues = app.control.inspect().active_queues()
if active_queues is not None:
# Loop through each worker in the output
for worker, active_queue_list in active_queues.items():
# Loop through each active queue that was found
for active_queue_list in active_queues.values():
# Loop through each queue that each worker is watching
for active_queue in active_queue_list:
# If this is a queue we're looking for, increment the consumer count
Expand Down

0 comments on commit 4f715e6

Please sign in to comment.