diff --git a/Dockerfile b/Dockerfile index 92031ed0b..3b72e7be6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -48,6 +48,7 @@ RUN mkdir -p /var/log/panda/wsgisocks RUN mkdir -p /var/log/panda/pandacache RUN mkdir -p /run/httpd/wsgisocks RUN mkdir -p /var/log/panda/pandacache/jedilog +RUN mkdir -p /var/cache/pandaserver/schedconfig RUN mkdir -p /var/run/panda RUN mkdir -p /var/cric @@ -75,6 +76,7 @@ RUN chmod -R 777 /var/log/panda/pandacache RUN chmod -R 777 /var/run/panda RUN chmod -R 777 /var/lib/logrotate RUN chmod -R 777 /var/cric +RUN chmod -R 777 /var/cache/pandaserver ENV PANDA_LOCK_DIR /var/run/panda RUN mkdir -p ${PANDA_LOCK_DIR} && chmod 777 ${PANDA_LOCK_DIR} diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index fa035dc9c..9589ccde0 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.0.52" +release_version = "0.0.53" diff --git a/pandaserver/daemons/scripts/add_main.py b/pandaserver/daemons/scripts/add_main.py index 588cec9b7..4b3ed9dc0 100644 --- a/pandaserver/daemons/scripts/add_main.py +++ b/pandaserver/daemons/scripts/add_main.py @@ -19,7 +19,7 @@ # main -def main(argv=tuple(), tbuf=None, **kwargs): +def main(argv=tuple(), tbuf=None, lock_pool=None, **kwargs): try: long @@ -61,11 +61,12 @@ def main(argv=tuple(), tbuf=None, **kwargs): class AdderThread(GenericThread): def __init__(self, taskBuffer, aSiteMapper, - job_output_reports): + job_output_reports, lock_pool): GenericThread.__init__(self) self.taskBuffer = taskBuffer self.aSiteMapper = aSiteMapper self.job_output_reports = job_output_reports + self.lock_pool = lock_pool # main loop def run(self): @@ -111,7 +112,8 @@ def run(self): # get adder adder_gen = AdderGen(taskBuffer, panda_id, job_status, attempt_nr, ignoreTmpError=ignoreTmpError, siteMapper=aSiteMapper, pid=uniq_pid, - prelock_pid=uniq_pid, lock_offset=lock_interval-retry_interval) + prelock_pid=uniq_pid, lock_offset=lock_interval-retry_interval, + lock_pool=lock_pool) n_processed += 1 # execute adder_gen.run() @@ -176,9 +178,9 @@ def proc_join(self): tbuf = TaskBuffer() tbuf_list.append(tbuf) tbuf.init(panda_config.dbhost, panda_config.dbpasswd, nDBConnection=1, useTimeout=True) - thr = AdderThread(tbuf, aSiteMapper, jor_lists) + thr = AdderThread(tbuf, aSiteMapper, jor_lists, lock_pool) else: - thr = AdderThread(taskBufferIF.getInterface(), aSiteMapper, jor_lists) + thr = AdderThread(taskBufferIF.getInterface(), aSiteMapper, jor_lists, lock_pool) adderThrList.append(thr) # start all threads for thr in adderThrList: diff --git a/pandaserver/daemons/utils.py b/pandaserver/daemons/utils.py index 2f4ba78bf..a608fefd7 100644 --- a/pandaserver/daemons/utils.py +++ b/pandaserver/daemons/utils.py @@ -16,6 +16,7 @@ import psutil from pandacommon.pandalogger import logger_utils +from pandacommon.pandautils.thread_utils import LockPool from pandaserver.config import panda_config, daemon_config @@ -63,7 +64,7 @@ def kill_proc_tree(pid, sig=signal.SIGKILL, include_parent=True, # worker process loop of daemon -def daemon_loop(dem_config, msg_queue, pipe_conn, worker_lifetime, tbuf=None): +def daemon_loop(dem_config, msg_queue, pipe_conn, worker_lifetime, tbuf=None, lock_pool=None): # pid of the worker my_pid = os.getpid() my_full_pid = '{0}-{1}-{2}'.format(socket.getfqdn().split('.')[0], os.getpgrp(), my_pid) @@ -202,7 +203,7 @@ def got_end_sig(sig, frame): tmp_log.info('{dem} start looping'.format(dem=dem_name)) start_ts = time.time() while True: - ret_val = the_module.main(argv=mod_argv, tbuf=tbuf) + ret_val = the_module.main(argv=mod_argv, tbuf=tbuf, lock_pool=lock_pool) now_ts = time.time() if not ret_val: # daemon main function says stop the loop @@ -214,7 +215,7 @@ def got_end_sig(sig, frame): else: # execute the module script with arguments tmp_log.info('{dem} start'.format(dem=dem_name)) - the_module.main(argv=mod_argv, tbuf=tbuf) + the_module.main(argv=mod_argv, tbuf=tbuf, lock_pool=lock_pool) tmp_log.info('{dem} finish'.format(dem=dem_name)) except Exception as e: # with error @@ -262,14 +263,15 @@ class DaemonWorker(object): _lock = threading.Lock() # constructor - def __init__(self, dem_config, msg_queue, worker_lifetime, tbuf=None): + def __init__(self, dem_config, msg_queue, worker_lifetime, tbuf=None, lock_pool=None): # synchronized with lock with self._lock: self._make_pipe() self._make_process( dem_config=dem_config, msg_queue=msg_queue, worker_lifetime=worker_lifetime, - tbuf=tbuf) + tbuf=tbuf, + lock_pool=lock_pool) # make pipe connection pairs for the worker def _make_pipe(self): @@ -281,8 +283,8 @@ def _close_pipe(self): self.child_conn.close() # make associated process - def _make_process(self, dem_config, msg_queue, worker_lifetime, tbuf): - args = (dem_config, msg_queue, self.child_conn, worker_lifetime, tbuf) + def _make_process(self, dem_config, msg_queue, worker_lifetime, tbuf, lock_pool): + args = (dem_config, msg_queue, self.child_conn, worker_lifetime, tbuf, lock_pool) self.process = multiprocessing.Process(target=daemon_loop, args=args) # start worker process @@ -351,6 +353,8 @@ def __init__(self, logger, n_workers=1, n_dbconn=1, worker_lifetime=28800, use_t # shared taskBufferIF self.tbif = None self._make_tbif() + # shared lock pool + self.lock_pool = LockPool() # spawn workers self._spawn_workers(self.n_workers) @@ -386,7 +390,8 @@ def _spawn_workers(self, n_workers=1, auto_start=False): worker = DaemonWorker( dem_config=self.dem_config, msg_queue=self.msg_queue, worker_lifetime=self.worker_lifetime, - tbuf=tbuf) + tbuf=tbuf, + lock_pool=self.lock_pool) self.worker_pool.add(worker) if auto_start: worker.start() diff --git a/pandaserver/dataservice/AdderGen.py b/pandaserver/dataservice/AdderGen.py index 6a00e5e08..fc549992c 100644 --- a/pandaserver/dataservice/AdderGen.py +++ b/pandaserver/dataservice/AdderGen.py @@ -38,7 +38,7 @@ class AdderGen(object): # constructor def __init__(self, taskBuffer, jobID, jobStatus, attemptNr, ignoreTmpError=True, siteMapper=None, - pid=None, prelock_pid=None, lock_offset=10): + pid=None, prelock_pid=None, lock_offset=10, lock_pool=None): self.job = None self.jobID = jobID self.jobStatus = jobStatus @@ -52,6 +52,7 @@ def __init__(self, taskBuffer, jobID, jobStatus, attemptNr, ignoreTmpError=True, self.pid = pid self.prelock_pid = prelock_pid self.data = None + self.lock_pool = lock_pool # logger self.logger = LogWrapper(_logger,str(self.jobID)) @@ -304,13 +305,25 @@ def run(self): if self.job.commandToPilot == 'tobekilled' and self.job.jobStatus == 'failed': self.job.jobStatus = 'cancelled' # update job - if oldJobStatus in ['cancelled','closed']: + if oldJobStatus in ['cancelled', 'closed']: pass else: + db_lock = None + if self.job.jediTaskID not in [0, None, 'NULL'] and self.lock_pool: + db_lock = self.lock_pool.get(self.job.jediTaskID) + if db_lock: + db_lock.acquire() + self.logger.debug("got DB lock for jediTaskID={}".format(self.job.jediTaskID)) + else: + self.logger.debug("couldn't get DB lock for jediTaskID={}".format(self.job.jediTaskID)) self.logger.debug("updating DB") retU = self.taskBuffer.updateJobs([self.job],False,oldJobStatusList=[oldJobStatus], extraInfo=self.extraInfo) self.logger.debug("retU: %s" % retU) + if db_lock: + self.logger.debug("release DB lock for jediTaskID={}".format(self.job.jediTaskID)) + db_lock.release() + self.lock_pool.release(self.job.jediTaskID) # failed if not retU[0]: self.logger.error('failed to update DB for pandaid={0}'.format(self.job.PandaID)) diff --git a/setup.py b/setup.py index 81700f2a8..6f05100cb 100755 --- a/setup.py +++ b/setup.py @@ -238,7 +238,7 @@ def run (self): author_email='atlas-adc-panda@cern.ch', url='https://twiki.cern.ch/twiki/bin/view/Atlas/PanDA', zip_safe=False, - install_requires=['panda-common>=0.0.31', + install_requires=['panda-common>=0.0.34', 'panda-client', 'pyOpenSSL', 'python-daemon',