Skip to content

Commit

Permalink
rebased to main
Browse files Browse the repository at this point in the history
  • Loading branch information
vlerkin committed Jan 28, 2025
1 parent 706f8bf commit 9175fec
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 27 deletions.
4 changes: 0 additions & 4 deletions scrapyd_k8s/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ def api_schedule():
_version = request.form.get('_version', 'latest') # TODO allow customizing latest tag
# any other parameter is passed as spider argument
args = { k: v for k, v in request.form.items() if k not in ('project', 'spider', 'setting', 'jobid', 'priority', '_version') }
# running_jobs = launcher.get_running_jobs_count()
# start_suspended = running_jobs >= k8s_scheduler.max_proc
# logger.info(
# f"Scheduling job {job_id} with start_suspended={start_suspended}. Running jobs: {running_jobs}, Max procs: {k8s_scheduler.max_proc}")
env_config, env_secret = project.env_config(), project.env_secret()
jobid = launcher.schedule(project, _version, spider, job_id, settings, args)
return { 'status': 'ok', 'jobid': job_id }
Expand Down
13 changes: 6 additions & 7 deletions scrapyd_k8s/joblogs/log_handler_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,14 @@ def stream_logs(self, job_id, pod_name):
self.concatenate_and_delete_files(log_file_path, temp_file_path)
else:
os.remove(temp_file_path)
logger.info(f"Removed temporary file '{temp_file_path}' after streaming logs for job '{job_name}'.")
logger.info(f"Removed temporary file '{temp_file_path}' after streaming logs for job '{job_id}'.")
except (IOError, OSError) as e:
logger.error(f"I/O error while streaming logs for job '{job_name}': {e}")
raise PodStreamingError(f"I/O error while streaming logs for job '{job_name}': {e}") from e
logger.error(f"I/O error while streaming logs for job '{job_id}': {e}")
raise PodStreamingError(f"I/O error while streaming logs for job '{job_id}': {e}") from e
except KubernetesJobLogHandlerError as e:
logger.error(f"Error processing logs for job '{job_name}': {e}")
raise PodStreamingError(f"Error processing logs for job '{job_name}': {e}") from e
finally:
w.stop()
logger.error(f"Error processing logs for job '{job_id}': {e}")
raise PodStreamingError(f"Error processing logs for job '{job_id}': {e}") from e

def handle_events(self, event):
"""
Watches Kubernetes pod events and handles actions such as starting log streaming or uploading logs.
Expand Down
4 changes: 3 additions & 1 deletion scrapyd_k8s/k8s_scheduler/k8s_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from kubernetes.client import ApiException

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

class KubernetesScheduler:
"""
Expand Down Expand Up @@ -35,6 +36,7 @@ def __init__(self, config, launcher, max_proc):
raise TypeError(f"max_proc must be an integer, got {type(max_proc).__name__}")
self.max_proc = max_proc
self.namespace = config.namespace()
logger.info("Scheduler feature is initialized")
except TypeError as e:
logger.exception(f"TypeError during KubernetesScheduler initialization: {e}")
raise
Expand Down Expand Up @@ -82,7 +84,7 @@ def handle_pod_event(self, event):
return

# If a pod has terminated (Succeeded or Failed), we may have capacity to unsuspend jobs
if pod_phase in ('Succeeded', 'Failed') and event_type in ('MODIFIED', 'DELETED'):
if pod_phase in ('Succeeded', 'Failed'):
logger.info(f"Pod {pod_name} has completed with phase {pod_phase}. Checking for suspended jobs.")
self.check_and_unsuspend_jobs()
else:
Expand Down
26 changes: 11 additions & 15 deletions scrapyd_k8s/launcher/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from scrapyd_k8s.joblogs import KubernetesJobLogHandler

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

class K8s:

Expand All @@ -27,6 +28,7 @@ class K8s:

def __init__(self, config):
self._namespace = config.scrapyd().get('namespace', 'default')
self.max_proc = config.scrapyd().get('max_proc')
self._pull_secret = config.scrapyd().get('pull_secret')
# TODO figure out where to put Kubernetes initialisation
try:
Expand All @@ -37,9 +39,8 @@ def __init__(self, config):
self._k8s = kubernetes.client.CoreV1Api()
self._k8s_batch = kubernetes.client.BatchV1Api()

self.scheduler = None
self.k8s_scheduler = None
self._init_resource_watcher(config)
self.max_proc = int(config.scrapyd().get('max_proc', 4))

def _init_resource_watcher(self, config):
self.resource_watcher = ResourceWatcher(self._namespace, config)
Expand All @@ -50,12 +51,7 @@ def _init_resource_watcher(self, config):
if self.max_proc is not None:
self.enable_k8s_scheduler(config)
else:
logger.debug("k8s scheduler not enabled; 'max_proc' configuration is missing in the scrapyd section.")

# Initialize KubernetesScheduler
self.max_proc = int(config.scrapyd().get('max_proc', 4))
self.k8s_scheduler = KubernetesScheduler(config, self, self.resource_watcher, self.max_proc)
logger.debug(f"KubernetesLauncher initialized with max_proc={self.max_proc}.")
logger.debug("k8s scheduler not enabled; jobs run directly after scheduling.")

def get_node_name(self):
deployment = os.getenv('MY_DEPLOYMENT_NAME', 'default')
Expand All @@ -71,11 +67,11 @@ def listjobs(self, project=None):
)

def schedule(self, project, version, spider, job_id, settings, args):
if self.scheduler:
if self.k8s_scheduler:
running_jobs = self.get_running_jobs_count()
start_suspended = running_jobs >= self.scheduler.max_proc
start_suspended = running_jobs >= self.k8s_scheduler.max_proc
logger.debug(
f"Scheduling job {job_id} with start_suspended={start_suspended}. Running jobs: {running_jobs}, Max procs: {self.scheduler.max_proc}")
f"Scheduling job {job_id} with start_suspended={start_suspended}. Running jobs: {running_jobs}, Max procs: {self.k8s_scheduler.max_proc}")
else:
start_suspended = False
logger.debug(f"Scheduling job {job_id} without suspension. Scheduler not enabled.")
Expand Down Expand Up @@ -175,16 +171,16 @@ def enable_joblogs(self, config):
def enable_k8s_scheduler(self, config):
try:
max_proc = int(self.max_proc)
self.scheduler = KubernetesScheduler(config, self, max_proc)
self.k8s_scheduler = KubernetesScheduler(config, self, max_proc)
logger.debug(f"KubernetesLauncher initialized with max_proc={max_proc}.")
self.resource_watcher.subscribe(self.scheduler.handle_pod_event)
self.resource_watcher.subscribe(self.k8s_scheduler.handle_pod_event)
logger.info("K8s scheduler started.")
except ValueError:
logger.error(f"Invalid max_proc value: {self.max_proc}. Scheduler not enabled.")
self.scheduler = None
self.k8s_scheduler = None

def unsuspend_job(self, job_id: str):
if not self.scheduler:
if not self.k8s_scheduler:
logger.error("Scheduler is not enabled. Cannot unsuspend jobs.")
return False
job_name = self._get_job_name(job_id)
Expand Down

0 comments on commit 9175fec

Please sign in to comment.