Skip to content

Commit

Permalink
[IMP] queue_job: HA job runner using session level advisory lock
Browse files Browse the repository at this point in the history
  • Loading branch information
sbidoul committed Jul 2, 2024
1 parent e267d92 commit deecd27
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,17 @@

SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5
PG_ADVISORY_LOCK_ID = 2293787760715711918

_logger = logging.getLogger(__name__)

select = selectors.DefaultSelector


class MasterElectionLost(Exception):
pass


# Unfortunately, it is not possible to extend the Odoo
# server command line arguments, so we resort to environment variables
# to configure the runner (channels mostly).
Expand Down Expand Up @@ -268,6 +273,7 @@ def __init__(self, db_name):
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.has_queue_job = self._has_queue_job()

Check warning on line 274 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L272-L274

Added lines #L272 - L274 were not covered by tests
if self.has_queue_job:
self._acquire_master_lock()
self._initialize()
except BaseException:
self.close()
Expand All @@ -284,6 +290,14 @@ def close(self):
pass
self.conn = None

def _acquire_master_lock(self):
"""Acquire the master runner lock or raise MasterElectionLost"""
with closing(self.conn.cursor()) as cr:
cr.execute("SELECT pg_try_advisory_lock(%s)", (PG_ADVISORY_LOCK_ID,))

Check warning on line 296 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L296

Added line #L296 was not covered by tests
if not cr.fetchone()[0]:
msg = f"could not acquire master runner lock on {self.db_name}"
raise MasterElectionLost(msg)

Check warning on line 299 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L298-L299

Added lines #L298 - L299 were not covered by tests

def _has_queue_job(self):
with closing(self.conn.cursor()) as cr:
cr.execute(
Expand Down Expand Up @@ -413,7 +427,7 @@ def get_db_names(self):
db_names = config["db_name"].split(",")
else:
db_names = odoo.service.db.list_dbs(True)
return db_names
return sorted(db_names)

Check warning on line 430 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L430

Added line #L430 was not covered by tests

def close_databases(self, remove_jobs=True):
for db_name, db in self.db_by_name.items():
Expand Down Expand Up @@ -522,7 +536,7 @@ def run(self):
while not self._stop:
# outer loop does exception recovery
try:
_logger.info("initializing database connections")
_logger.debug("initializing database connections")

Check warning on line 539 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L539

Added line #L539 was not covered by tests
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
Expand All @@ -537,6 +551,14 @@ def run(self):
except InterruptedError:
# Interrupted system call, i.e. KeyboardInterrupt during select
self.stop()
except MasterElectionLost as e:
_logger.debug(
"master election lost: %s, sleeping %ds and retrying",

Check warning on line 556 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L554-L556

Added lines #L554 - L556 were not covered by tests
e,
ERROR_RECOVERY_DELAY,
)
self.close_databases()
time.sleep(ERROR_RECOVERY_DELAY)

Check warning on line 561 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L559-L561

Added lines #L559 - L561 were not covered by tests
except Exception:
_logger.exception(
"exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY
Expand Down

0 comments on commit deecd27

Please sign in to comment.