From 4d330fef0fdb9470bd3eef87145eee774914cbea Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 14 Feb 2024 17:35:13 +0100 Subject: [PATCH 01/12] Replaced cx_Oracle with oracledb --- INSTALL.md | 2 +- INSTALL_ATLAS.md | 4 +- pandaserver/daemons/utils.py | 3 +- pandaserver/dataservice/DDM.py | 59 +---------- pandaserver/dataservice/forkSetupper.py | 3 +- pandaserver/proxycache/DBMSql.py | 42 ++++---- pandaserver/server/panda.py | 9 +- pandaserver/taskbuffer/ConBridge.py | 2 +- pandaserver/taskbuffer/EiDBProxy.py | 125 ------------------------ pandaserver/taskbuffer/Initializer.py | 10 +- pandaserver/taskbuffer/OraDBProxy.py | 12 ++- pandaserver/taskbuffer/WrappedCursor.py | 3 +- pandaserver/test/boostUser.py | 3 +- pyproject.toml | 4 +- 14 files changed, 56 insertions(+), 225 deletions(-) delete mode 100755 pandaserver/taskbuffer/EiDBProxy.py diff --git a/INSTALL.md b/INSTALL.md index 0706eabef..abb05253e 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -17,7 +17,7 @@ Installation pip install panda-server ``` -1. Install database modules: cx_Oracle for Oracle or MySQLdb for MySQL/MariaDB +1. Install database modules: oracledb for Oracle or MySQLdb for MySQL/MariaDB 1. Modify config files. ``` diff --git a/INSTALL_ATLAS.md b/INSTALL_ATLAS.md index 694063ffe..4845dbbe1 100644 --- a/INSTALL_ATLAS.md +++ b/INSTALL_ATLAS.md @@ -18,11 +18,11 @@ Installation for ATLAS on CC7 + virtualenv + python3 + atlpan chown atlpan:zp /var/log/panda/wsgisocks chown atlpan:zp /var/cache/pandaserver ``` -1. Install panda-server and cx_Oracle. +1. Install panda-server and oracledb. ``` source /opt/pandaserver/bin/activate pip install panda-server - pip install cx_Oracle + pip install oracledb ``` 1. Modify config files. diff --git a/pandaserver/daemons/utils.py b/pandaserver/daemons/utils.py index b26f6dc84..39383ec5d 100644 --- a/pandaserver/daemons/utils.py +++ b/pandaserver/daemons/utils.py @@ -16,6 +16,7 @@ import psutil from pandacommon.pandalogger import logger_utils from pandacommon.pandautils.thread_utils import GenericThread, LockPool + from pandaserver.config import daemon_config, panda_config # list of signals accepted to end the main process @@ -88,7 +89,7 @@ def got_end_sig(sig, frame): expiry_ts = start_ts + worker_lifetime # create taskBuffer object if not given if tbuf is None: - # initialize cx_Oracle using dummy connection + # initialize oracledb using dummy connection try: from pandaserver.taskbuffer.Initializer import initializer diff --git a/pandaserver/dataservice/DDM.py b/pandaserver/dataservice/DDM.py index 96e2e0043..a98b190c5 100755 --- a/pandaserver/dataservice/DDM.py +++ b/pandaserver/dataservice/DDM.py @@ -8,7 +8,6 @@ import sys import traceback -from pandaserver.srvcore import CoreUtils from rucio.client import Client as RucioClient from rucio.common.exception import ( DataIdentifierAlreadyExists, @@ -20,6 +19,8 @@ UnsupportedOperation, ) +from pandaserver.srvcore import CoreUtils + # rucio class RucioAPI: @@ -337,33 +338,6 @@ def registerZipFiles(self, zipMap): ) iFiles += nFiles - # get disk usage at RSE - def getRseUsage(self, rse, src="srm"): - retMap = {} - try: - client = RucioClient() - itr = client.get_rse_usage(rse) - # look for srm - for item in itr: - if item["source"] == src: - try: - total = item["total"] / 1024 / 1024 / 1024 - except Exception: - total = None - try: - used = item["used"] / 1024 / 1024 / 1024 - except Exception: - used = None - try: - free = item["free"] / 1024 / 1024 / 1024 - except Exception: - free = None - retMap = {"total": total, "used": used, "free": free} - break - except Exception: - pass - return retMap - # list datasets def listDatasets(self, datasetName, old=False): result = {} @@ -463,16 +437,6 @@ def getMetaData(self, dsn): errType, errVale = sys.exc_info()[:2] return False, f"{errType} {errVale}" - # check if dataset exists - def checkDatasetExist(self, dsn): - # register dataset - client = RucioClient() - try: - scope, dsn = self.extract_scope(dsn) - return True - except DataIdentifierNotFound: - return False - # delete dataset def eraseDataset(self, dsn, scope=None, grace_period=None): presetScope = scope @@ -632,25 +596,6 @@ def getDatasetSize(self, datasetName): errMsg = f"{errtype.__name__} {errvalue}" return False, errMsg - # delete dataset replicas - def deleteDatasetReplicas(self, datasetName, locations): - # extract scope from dataset - scope, dsn = self.extract_scope(datasetName) - client = RucioClient() - try: - for rule in client.list_did_rules(scope, dsn): - if rule["account"] != client.account: - continue - if rule["rse_expression"] in locations: - client.delete_replication_rule(rule["id"]) - except DataIdentifierNotFound: - pass - except Exception: - errtype, errvalue = sys.exc_info()[:2] - errMsg = f"{errtype.__name__} {errvalue}" - return False, errMsg - return True, "" - # register files def registerFiles(self, files, rse): client = RucioClient() diff --git a/pandaserver/dataservice/forkSetupper.py b/pandaserver/dataservice/forkSetupper.py index ca6bdb6f9..b9eb92b16 100755 --- a/pandaserver/dataservice/forkSetupper.py +++ b/pandaserver/dataservice/forkSetupper.py @@ -2,6 +2,7 @@ import traceback from pandacommon.pandautils.thread_utils import GenericThread + from pandaserver.srvcore.CoreUtils import commands_get_status_output @@ -23,7 +24,7 @@ def run(inFile, v_onlyTA, v_firstSubmission): # password from pandaserver.config import panda_config - # initialize cx_Oracle using dummy connection + # initialize oracledb using dummy connection from pandaserver.taskbuffer.Initializer import initializer initializer.init() diff --git a/pandaserver/proxycache/DBMSql.py b/pandaserver/proxycache/DBMSql.py index 7342f93a1..eca8fa4cf 100755 --- a/pandaserver/proxycache/DBMSql.py +++ b/pandaserver/proxycache/DBMSql.py @@ -1,7 +1,9 @@ import sys from types import TupleType -import cx_Oracle +import oracledb + +oracledb.init_oracle_client() from pandaserver.config import panda_config # Common items @@ -29,7 +31,7 @@ def getOracleConnection(db_type): print("Initializing Oracle connection") try: - cpool = cx_Oracle.SessionPool(user, pssw, serv, poolmin, poolmax, poolincr) + cpool = oracledb.create_pool(user=user, password=pssw, dsn=serv, min=poolmin, max=poolmax, increment=poolincr) ORAC_CON = cpool.acquire() except Exception: import traceback @@ -68,24 +70,24 @@ def executeQuery(self, sql): return [] try: if self.db_type in OracleList: - cursor = self.oracle_con.cursor() - if bindDict is not None: - cursor.execute(sql, bindDict) - else: - cursor.execute(sql) - colMap = cursor.description - result = cursor.fetchall() - ret = [] - for t in result: - i = 0 - d = {} - for e in t: - if str(type(e)) == "": - e = str(e) - d[colMap[i][0]] = e - i = i + 1 - ret.append(d) - cursor.close() + with self.oracle_con.cursor() as cursor: + if bindDict is not None: + cursor.execute(sql, bindDict) + else: + cursor.execute(sql) + colMap = cursor.description + result = cursor.fetchall() + ret = [] + for t in result: + i = 0 + d = {} + for e in t: + if isinstance(e, oracledb.Timestamp): + e = str(e) + d[colMap[i][0]] = e + i = i + 1 + ret.append(d) + return ret else: diff --git a/pandaserver/server/panda.py b/pandaserver/server/panda.py index 47735cdf4..d82ca0916 100755 --- a/pandaserver/server/panda.py +++ b/pandaserver/server/panda.py @@ -16,10 +16,13 @@ import traceback from urllib.parse import parse_qsl -import pandaserver.taskbuffer.ErrorCode from pandacommon.pandalogger.LogWrapper import LogWrapper from pandacommon.pandalogger.PandaLogger import PandaLogger from pandacommon.pandautils.thread_utils import GenericThread +from werkzeug.datastructures import CombinedMultiDict, EnvironHeaders +from werkzeug.formparser import parse_form_data + +import pandaserver.taskbuffer.ErrorCode from pandaserver.config import panda_config # pylint: disable=W0611 @@ -181,10 +184,8 @@ updateWorkers, userIF, ) -from werkzeug.datastructures import CombinedMultiDict, EnvironHeaders -from werkzeug.formparser import parse_form_data -# initialize cx_Oracle using dummy connection +# initialize oracledb using dummy connection initializer.init() # initialize TaskBuffer diff --git a/pandaserver/taskbuffer/ConBridge.py b/pandaserver/taskbuffer/ConBridge.py index d69686de3..55c80925d 100644 --- a/pandaserver/taskbuffer/ConBridge.py +++ b/pandaserver/taskbuffer/ConBridge.py @@ -327,7 +327,7 @@ def bridge_run(self): # execute method = getattr(self.proxy, comStr) res = method(*variables[0], **variables[1]) - # FIXME : modify response since cx_Oracle types cannot be picked + # FIXME : modify response since oracledb types cannot be picked if comStr in ["querySQLS"]: newRes = [True] + list(res[1:]) res = newRes diff --git a/pandaserver/taskbuffer/EiDBProxy.py b/pandaserver/taskbuffer/EiDBProxy.py deleted file mode 100755 index 3025c3a55..000000000 --- a/pandaserver/taskbuffer/EiDBProxy.py +++ /dev/null @@ -1,125 +0,0 @@ -""" -proxy for database connection - -""" -import re -import warnings - -from pandacommon.pandalogger.LogWrapper import LogWrapper -from pandacommon.pandalogger.PandaLogger import PandaLogger -from pandaserver.config import panda_config -from pandaserver.taskbuffer import OraDBProxy - -warnings.filterwarnings("ignore") - -# logger -_logger = PandaLogger().getLogger("EiDBProxy") - - -# proxy -class EiDBProxy(OraDBProxy.DBProxy): - # constructor - def __init__(self, useOtherError=False): - OraDBProxy.DBProxy.__init__(self, useOtherError) - - # connect to DB (just for INTR) - def connect( - self, - dbhost=panda_config.ei_dbhost, - dbpasswd=panda_config.ei_dbpasswd, - dbuser=panda_config.ei_dbuser, - dbname=panda_config.ei_dbname, - dbtimeout=panda_config.ei_dbtimeout, - reconnect=False, - ): - return OraDBProxy.DBProxy.connect( - self, - dbhost=dbhost, - dbpasswd=dbpasswd, - dbuser=dbuser, - dbname=dbname, - dbtimeout=dbtimeout, - reconnect=reconnect, - ) - - # get index of AMI tag - def getIndexAmiTag(self, tagList, amiTag): - for idxTag, tagPattern in enumerate(tagList): - if re.search("^" + tagPattern + "$", amiTag) is not None: - return idxTag - return None - - # get GUIDs from EventIndex - def getGUIDsFromEventIndex(self, runEventList, streamName, amiTags, dataType): - comment = " /* DBProxy.getGUIDsFromEventIndex */" - methodName = comment.split(" ")[-2].split(".")[-1] - tmpLog = LogWrapper( - _logger, - methodName + f" ", - ) - try: - # change to list - if amiTags not in [None, ""]: - amiTags = amiTags.replace("*", ".*").split(",") - tmpLog.debug(f"start for {len(runEventList)} events") - # check data type - if dataType not in ["RAW", "ESD", "AOD"]: - return False, f"dataType={dataType} is unsupported" - # sql to insert runs and events - sqlRE = f"INSERT INTO {panda_config.schemaEI}.TMP_RUN_EVENT_PAIRS (runNumber,eventNumber) " - sqlRE += "VALUES (:runNumber,:eventNumber) " - varMaps = [] - for runNumber, eventNumber in runEventList: - varMap = {} - varMap[":runNumber"] = runNumber - varMap[":eventNumber"] = eventNumber - varMaps.append(varMap) - # begin transaction - self.conn.begin() - self.cur.arraysize = 100000 - # insert runs and events - self.cur.executemany(sqlRE + comment, varMaps) - # read GUIDs - varMap = {} - if amiTags in [None, ""]: - sqlRG = f"SELECT runNumber,eventNumber,guid_{dataType} " - sqlRG += f"FROM {panda_config.schemaEI}.V_PANDA_EVPICK_NOAMITAG_MANY " - else: - sqlRG = f"SELECT runNumber,eventNumber,guid_{dataType},amiTag " - sqlRG += f"FROM {panda_config.schemaEI}.V_PANDA_EVPICK_AMITAG_MANY " - if streamName not in [None, ""]: - sqlRG += "WHERE streamName=:streamName " - varMap[":streamName"] = streamName - self.cur.execute(sqlRG + comment, varMap) - resRG = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - retValue = {} - keyAmiIdxMap = {} - for tmpItem in resRG: - if amiTags in [None, ""]: - runNumber, eventNumber, guid = tmpItem - # dummy - idxTag = 0 - else: - runNumber, eventNumber, guid, amiTag = tmpItem - # get index number for the AMI tag in the list - idxTag = self.getIndexAmiTag(amiTags, amiTag) - # didn't match - if idxTag is None: - continue - tmpKey = (runNumber, eventNumber) - # use AMI tag in a preference orde - if tmpKey in keyAmiIdxMap and keyAmiIdxMap[tmpKey] < idxTag: - continue - keyAmiIdxMap[tmpKey] = idxTag - retValue[tmpKey] = [guid] - tmpLog.debug(f"found {len(retValue)} events") - return True, retValue - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, methodName) - return False, None diff --git a/pandaserver/taskbuffer/Initializer.py b/pandaserver/taskbuffer/Initializer.py index cbf3120aa..81a5261f9 100644 --- a/pandaserver/taskbuffer/Initializer.py +++ b/pandaserver/taskbuffer/Initializer.py @@ -3,12 +3,13 @@ # logger from pandacommon.pandalogger.PandaLogger import PandaLogger + from pandaserver.config import panda_config _logger = PandaLogger().getLogger("Initializer") -# initialize cx_Oracle using dummy connection to avoid "Unable to acquire Oracle environment handle" +# initialize oracledb using dummy connection to avoid "Unable to acquire Oracle environment handle" class Initializer: def __init__(self): self.lock = Lock() @@ -27,12 +28,13 @@ def init(self): _logger.debug("connect") # connect if panda_config.backend == "oracle": - import cx_Oracle + import oracledb - conn = cx_Oracle.connect( - dsn=panda_config.dbhost, + oracledb.init_oracle_client() + conn = oracledb.connect( user=panda_config.dbuser, password=panda_config.dbpasswd, + dsn=panda_config.dbhost, threaded=True, ) elif panda_config.backend == "postgres": diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index f2b7fba1d..03ed37120 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -56,9 +56,11 @@ pass if panda_config.backend == "oracle": - import cx_Oracle + import oracledb - varNUMBER = cx_Oracle.NUMBER + oracledb.init_oracle_client() + + varNUMBER = oracledb.NUMBER elif panda_config.backend == "postgres": import psycopg2 as psycopg @@ -195,7 +197,7 @@ def connect( # connect try: if self.backend == "oracle": - self.conn = cx_Oracle.connect( + self.conn = oracledb.connect( dsn=self.dbhost, user=self.dbuser, password=self.dbpasswd, @@ -204,8 +206,8 @@ def connect( ) def OutputTypeHandler(cursor, name, defaultType, size, precision, scale): - if defaultType == cx_Oracle.CLOB: - return cursor.var(cx_Oracle.LONG_STRING, arraysize=cursor.arraysize) + if defaultType == oracledb.CLOB: + return cursor.var(oracledb.LONG_STRING, arraysize=cursor.arraysize) self.conn.outputtypehandler = OutputTypeHandler elif self.backend == "postgres": diff --git a/pandaserver/taskbuffer/WrappedCursor.py b/pandaserver/taskbuffer/WrappedCursor.py index e503bfdad..4002d111a 100644 --- a/pandaserver/taskbuffer/WrappedCursor.py +++ b/pandaserver/taskbuffer/WrappedCursor.py @@ -8,6 +8,7 @@ import warnings from pandacommon.pandalogger.PandaLogger import PandaLogger + from pandaserver.config import panda_config warnings.filterwarnings("ignore") @@ -313,7 +314,7 @@ def _returningIntoOracle(self, returningInputData, varDict, cur, dryRun=False): # assuming that we use RETURNING INTO only for PandaID or row_ID columns if not dryRun: for x in listInto: - varDict[x] = cur.var(cx_Oracle.NUMBER) + varDict[x] = cur.var(oracledb.NUMBER) result = f" RETURNING {valReturning} INTO {valInto} " except Exception: pass diff --git a/pandaserver/test/boostUser.py b/pandaserver/test/boostUser.py index 80cef5467..689288e40 100755 --- a/pandaserver/test/boostUser.py +++ b/pandaserver/test/boostUser.py @@ -2,11 +2,12 @@ from pandacommon.pandalogger.PandaLogger import PandaLogger from pandacommon.pandautils.thread_utils import GenericThread + from pandaserver.config import panda_config from pandaserver.taskbuffer.Initializer import initializer from pandaserver.taskbuffer.TaskBuffer import taskBuffer -# initialize cx_Oracle using dummy connection +# initialize oracledb using dummy connection initializer.init() # logger diff --git a/pyproject.toml b/pyproject.toml index 873b1c742..fbdd1ed38 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,12 +38,12 @@ dependencies = [ requires-python = ">=3.8" [project.optional-dependencies] -oracle = ['cx_Oracle'] +oracle = ['oracledb'] mysql = ['mysqlclient'] postgres = ['psycopg2-binary'] rucio = ['rucio-clients'] elasticsearch = ['elasticsearch'] -atlasprod = ['cx_Oracle', 'rucio-clients', 'elasticsearch', 'numpy', 'scipy'] +atlasprod = ['oracledb', 'rucio-clients', 'elasticsearch', 'numpy', 'scipy'] [project.urls] Homepage = "https://panda-wms.readthedocs.io/en/latest/" From 33d834f0bcea7b399842666be0adf7a4c3015de5 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 14 Feb 2024 17:49:29 +0100 Subject: [PATCH 02/12] Replaced cx_Oracle with oracledb --- pandaserver/taskbuffer/Initializer.py | 7 +------ pandaserver/taskbuffer/OraDBProxy.py | 8 +------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/pandaserver/taskbuffer/Initializer.py b/pandaserver/taskbuffer/Initializer.py index 81a5261f9..58dba509f 100644 --- a/pandaserver/taskbuffer/Initializer.py +++ b/pandaserver/taskbuffer/Initializer.py @@ -31,12 +31,7 @@ def init(self): import oracledb oracledb.init_oracle_client() - conn = oracledb.connect( - user=panda_config.dbuser, - password=panda_config.dbpasswd, - dsn=panda_config.dbhost, - threaded=True, - ) + conn = oracledb.connect(user=panda_config.dbuser, password=panda_config.dbpasswd, dsn=panda_config.dbhost) elif panda_config.backend == "postgres": import psycopg2 diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 03ed37120..16d420200 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -197,13 +197,7 @@ def connect( # connect try: if self.backend == "oracle": - self.conn = oracledb.connect( - dsn=self.dbhost, - user=self.dbuser, - password=self.dbpasswd, - threaded=True, - encoding="UTF-8", - ) + self.conn = oracledb.connect(dsn=self.dbhost, user=self.dbuser, password=self.dbpasswd) def OutputTypeHandler(cursor, name, defaultType, size, precision, scale): if defaultType == oracledb.CLOB: From 0d40be2e30b6feb3b15e22e8eb96b9206928335f Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Fri, 16 Feb 2024 16:04:08 +0100 Subject: [PATCH 03/12] Remove obsoleted DBMSql.py --- pandaserver/proxycache/DBMSql.py | 113 ------------------------------- 1 file changed, 113 deletions(-) delete mode 100755 pandaserver/proxycache/DBMSql.py diff --git a/pandaserver/proxycache/DBMSql.py b/pandaserver/proxycache/DBMSql.py deleted file mode 100755 index eca8fa4cf..000000000 --- a/pandaserver/proxycache/DBMSql.py +++ /dev/null @@ -1,113 +0,0 @@ -import sys -from types import TupleType - -import oracledb - -oracledb.init_oracle_client() -from pandaserver.config import panda_config - -# Common items -ORAC_CON = None - - -OracleList = ["oracle"] - - -# -# Connection initializers -# - - -def getOracleConnection(db_type): - global ORAC_CON - if ORAC_CON is not None: - return ORAC_CON - user = panda_config.dbuser - pssw = panda_config.dbpasswd - serv = panda_config.dbhost - poolmin = 1 - poolmax = 3 - poolincr = 1 - - print("Initializing Oracle connection") - try: - cpool = oracledb.create_pool(user=user, password=pssw, dsn=serv, min=poolmin, max=poolmax, increment=poolincr) - ORAC_CON = cpool.acquire() - except Exception: - import traceback - - traceback.print_stack() - traceback.print_exc() - sys.exit(1) - return ORAC_CON - - -class DBMSql: - db_type = "" - oracle_con = None - - # constructor with database type [sqlite|postgres] - def __init__(self, _db_type): - self.db_type = _db_type - - if self.db_type in OracleList: - self.oracle_con = getOracleConnection(self.db_type) - else: - print("Unknown database type:" + self.db_type) - print("ERROR. Unknown database type", self.db_type) - sys.exit(1) - - print("Database connection created for " + self.db_type) - - # Method to execute sql query (SELECT) - def executeQuery(self, sql): - bindDict = None - if isinstance(sql, TupleType): - bindDict = sql[1] - sql = sql[0] - if sql.strip()[0:6].lower() != "select": - print("not a SELECT statement!!") - return [] - try: - if self.db_type in OracleList: - with self.oracle_con.cursor() as cursor: - if bindDict is not None: - cursor.execute(sql, bindDict) - else: - cursor.execute(sql) - colMap = cursor.description - result = cursor.fetchall() - ret = [] - for t in result: - i = 0 - d = {} - for e in t: - if isinstance(e, oracledb.Timestamp): - e = str(e) - d[colMap[i][0]] = e - i = i + 1 - ret.append(d) - - return ret - - else: - raise DBMSqlError(f"not support:{self.db_type}") - except Exception: - raise DBMSqlError(f"executeQuery error:{sys.exc_info()[1]} \n {sql}") - - -class DBMSqlError(Exception): - def __init__(self, value): - self.value = value - - def __str__(self): - return repr(self.value) - - -# ----------------------------- -# Codes below are for test... -# - -if __name__ == "__main__": - dbm = DBMSql(_db_type="oracle") - print(dbm.executeQuery("select * from services")) From 1a918939a8bc736b4216fec863ad040a829eb220 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Fri, 16 Feb 2024 16:14:08 +0100 Subject: [PATCH 04/12] Import sys --- pandaserver/proxycache/panda_activeusers_query.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pandaserver/proxycache/panda_activeusers_query.py b/pandaserver/proxycache/panda_activeusers_query.py index 72d68c640..0722adff5 100755 --- a/pandaserver/proxycache/panda_activeusers_query.py +++ b/pandaserver/proxycache/panda_activeusers_query.py @@ -1,18 +1,20 @@ import re +import sys from pandacommon.pandalogger.LogWrapper import LogWrapper from pandacommon.pandalogger.PandaLogger import PandaLogger from pandacommon.pandautils.thread_utils import GenericThread + from pandaserver.config import panda_config from pandaserver.proxycache import panda_proxy_cache from pandaserver.taskbuffer.TaskBuffer import taskBuffer # logger -_logger = PandaLogger().getLogger("panda_activeusers_query") -tmpLog = LogWrapper(_logger) +_logger = PandaLogger().getLogger("activeusers_query") +tmp_log = LogWrapper(_logger) if __name__ == "__main__": - tmpLog.debug("================= start ==================") + tmp_log.debug("================= start ==================") requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__) taskBuffer.init( @@ -34,7 +36,7 @@ sql = "select distinct DN FROM ATLAS_PANDAMETA.users WHERE GRIDPREF LIKE :patt" varMap = {} varMap[":patt"] = "%p%" - tmpStat, tmpRes = taskBuffer.querySQLS(sql, varMap) + _, tmpRes = taskBuffer.querySQLS(sql, varMap) for (realDN,) in tmpRes: if realDN is None: continue @@ -42,7 +44,7 @@ realDN = re.sub("(/CN=proxy)+", "", realDN) realDN = re.sub("(/CN=\d+)+$", "", realDN) # check proxy - tmpLog.debug(f"check proxy cache for DN={realDN}") + tmp_log.debug(f"check proxy cache for DN={realDN}") for role in roles: my_proxy_interface_instance.checkProxy(realDN, role=role) - tmpLog.debug("done") + tmp_log.debug("done") From 6b7bf1ccbadcf7ad01c99f869e3dd3cf917372cb Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Fri, 16 Feb 2024 16:31:08 +0100 Subject: [PATCH 05/12] Small refactoring --- .../proxycache/panda_activeusers_query.py | 24 ++++--- pandaserver/proxycache/panda_proxy_cache.py | 72 ++++++++++--------- 2 files changed, 52 insertions(+), 44 deletions(-) diff --git a/pandaserver/proxycache/panda_activeusers_query.py b/pandaserver/proxycache/panda_activeusers_query.py index 0722adff5..d7f5d25ff 100755 --- a/pandaserver/proxycache/panda_activeusers_query.py +++ b/pandaserver/proxycache/panda_activeusers_query.py @@ -32,19 +32,23 @@ roles = panda_config.proxy_cache_roles.split(",") else: roles = ["atlas", "atlas:/atlas/Role=production", "atlas:/atlas/Role=pilot"] + # get users sql = "select distinct DN FROM ATLAS_PANDAMETA.users WHERE GRIDPREF LIKE :patt" - varMap = {} - varMap[":patt"] = "%p%" - _, tmpRes = taskBuffer.querySQLS(sql, varMap) - for (realDN,) in tmpRes: - if realDN is None: + var_map = {":patt": "%p%"} + _, results = taskBuffer.querySQLS(sql, var_map) + + # iterate over user DNs, clean them up and check proxy cache + for (real_dn,) in results: + if real_dn is None: continue - realDN = re.sub("/CN=limited proxy", "", realDN) - realDN = re.sub("(/CN=proxy)+", "", realDN) - realDN = re.sub("(/CN=\d+)+$", "", realDN) + real_dn = re.sub("/CN=limited proxy", "", real_dn) + real_dn = re.sub("(/CN=proxy)+", "", real_dn) + real_dn = re.sub("(/CN=\d+)+$", "", real_dn) + # check proxy - tmp_log.debug(f"check proxy cache for DN={realDN}") + tmp_log.debug(f"check proxy cache for DN={real_dn}") for role in roles: - my_proxy_interface_instance.checkProxy(realDN, role=role) + my_proxy_interface_instance.checkProxy(real_dn, role=role) + tmp_log.debug("done") diff --git a/pandaserver/proxycache/panda_proxy_cache.py b/pandaserver/proxycache/panda_proxy_cache.py index 640bbd65a..177f71f39 100755 --- a/pandaserver/proxycache/panda_proxy_cache.py +++ b/pandaserver/proxycache/panda_proxy_cache.py @@ -7,7 +7,6 @@ from pandacommon.pandalogger.LogWrapper import LogWrapper from pandacommon.pandalogger.PandaLogger import PandaLogger -# logger _logger = PandaLogger().getLogger("ProxyCache") @@ -29,9 +28,8 @@ def execute(program, log_stream): def cat(filename): """Given filename, print its text contents.""" - f = open(filename, "r") - out = f.read() - f.close() + with open(filename, "r") as f: + out = f.read() return out @@ -51,12 +49,16 @@ def store( production=False, server_name="myproxy.cern.ch", role=None, - force=False, log_stream=None, ): log_stream.info("store proxy") - """Retrieve proxy from myproxy.""" - proxy_path = os.path.join(self.__target_path, hashlib.sha1((user_dn + ".plain").encode("utf-8")).hexdigest()) + + # Retrieve proxy from myproxy + proxy_path = os.path.join( + self.__target_path, + hashlib.sha1(f"{user_dn}.plain".encode("utf-8")).hexdigest(), + ) + # check if empty dummy file if os.path.exists(proxy_path) and os.stat(proxy_path).st_size == 0: if datetime.datetime.now(datetime.timezone.utc) - datetime.datetime.fromtimestamp( @@ -65,7 +67,7 @@ def store( log_stream.info(f"skip too early to try again according to timestamp of {proxy_path}") return 2 cmd = f"myproxy-logon -s {server_name} --no_passphrase --out {proxy_path} -l '{user_dn}' -k {cred_name} -t 0" - # if myproxy.cern.ch fails, try myproxy on bnl as well + stdout, stderr, status = execute(cmd, log_stream) if stdout: log_stream.info(f"stdout is {stdout} ") @@ -76,7 +78,7 @@ def store( log_stream.info(f"test the status of plain... {status}") if status == 1: return status - # proxyValidity = checkValidity(proxy_path) + if role is not None: log_stream.info(f"proxy needs {role} - need to add voms attributes and store it in the cache") tmpExtension = self.getExtension(role) @@ -95,14 +97,10 @@ def store( log_stream.info("production proxy needed - need to add voms attributes and store it in the cache") prodproxy_path = os.path.join( self.__target_path, - str(hashlib.sha1((user_dn + ".prod").encode("utf-8")).hexdigest()), + str(hashlib.sha1(f"{user_dn}.prod".encode("utf-8")).hexdigest()), ) log_stream.info(prodproxy_path) - prodcmd = "voms-proxy-init -vomses /etc/vomses -valid 96:00 -rfc -cert %s -key %s -out %s -n -voms atlas:/atlas/Role=production" % ( - proxy_path, - proxy_path, - prodproxy_path, - ) + prodcmd = f"voms-proxy-init -vomses /etc/vomses -valid 96:00 -rfc -cert {proxy_path} -key {proxy_path} -out {prodproxy_path} -n -voms atlas:/atlas/Role=production" stdout, stderr, status = execute(prodcmd, log_stream) if stdout: log_stream.info(f"stdout is {stdout} ") @@ -135,7 +133,7 @@ def retrieve(self, user_dn, production=False, role=None): elif production: proxy_path = os.path.join( self.__target_path, - str(hashlib.sha1((user_dn + ".prod").encode("utf-8")).hexdigest()), + str(hashlib.sha1(f"{user_dn}.prod".encode("utf-8")).hexdigest()), ) else: proxy_path = os.path.join(self.__target_path, hashlib.sha1(user_dn.encode("utf-8")).hexdigest()) @@ -148,25 +146,28 @@ def retrieve(self, user_dn, production=False, role=None): def get_proxy_path(self, user_dn, production, role): if role is not None: tmpExtension = self.getExtension(role) - proxy_path = os.path.join( + return os.path.join( self.__target_path, str(hashlib.sha1((user_dn + tmpExtension).encode("utf-8")).hexdigest()), ) elif production: - proxy_path = os.path.join( + return os.path.join( self.__target_path, - str(hashlib.sha1((user_dn + ".prod").encode("utf-8")).hexdigest()), + str(hashlib.sha1(f"{user_dn}.prod".encode("utf-8")).hexdigest()), ) else: - proxy_path = os.path.join(self.__target_path, hashlib.sha1(user_dn.encode("utf-8")).hexdigest()) - return proxy_path + return os.path.join( + self.__target_path, + hashlib.sha1(user_dn.encode("utf-8")).hexdigest(), + ) def checkProxy(self, user_dn, production=False, role=None, name=None): log_stream = LogWrapper(_logger, f'< name="{name}" role={role} >') log_stream.info(f"check proxy for {user_dn}") - """Check the validity of a proxy.""" + + # Check the validity of a proxy proxy_path = self.get_proxy_path(user_dn, production, role) - isOK = False + is_ok = False if os.path.isfile(proxy_path): log_stream.info("proxy is there. Need to check validity") cmd = f"voms-proxy-info -exists -hours 94 -file {proxy_path}" @@ -189,14 +190,14 @@ def checkProxy(self, user_dn, production=False, role=None, name=None): # copy with compact name alt_proxy_path = self.get_proxy_path(name, production, role) shutil.copyfile(proxy_path, alt_proxy_path) - isOK = True + is_ok = True elif ret == 2: log_stream.info("proxy retrieval on hold") else: log_stream.error("proxy retrieval failed") else: log_stream.info("proxy is valid for more than 3 days") - isOK = True + is_ok = True else: log_stream.info("proxy is not in the cache repo. will try to get it from myproxy") ret = self.store(user_dn, self.__cred_name, production, role=role, log_stream=log_stream) @@ -204,13 +205,16 @@ def checkProxy(self, user_dn, production=False, role=None, name=None): log_stream.info("proxy stored successfully") alt_proxy_path = self.get_proxy_path(name, production, role) shutil.copyfile(proxy_path, alt_proxy_path) - isOK = True + is_ok = True elif ret == 2: log_stream.info("proxy retrieval on hold") else: log_stream.error("proxy retrieval failed") - if isOK: - plain_path = os.path.join(self.__target_path, hashlib.sha1((user_dn + ".plain").encode("utf-8")).hexdigest()) + if is_ok: + plain_path = os.path.join( + self.__target_path, + hashlib.sha1(f"{user_dn}.plain".encode("utf-8")).hexdigest(), + ) if os.path.isfile(plain_path): return self.checkValidity(plain_path, log_stream) else: @@ -218,15 +222,15 @@ def checkProxy(self, user_dn, production=False, role=None, name=None): def checkValidity(self, proxy_path, log_stream): log_stream.info("Need to check validity and expiry!") - datechecks = [24, 94, 168] - # datechecks = [1,2,3,4] + time_left_thresholds = [24, 94, 168] status = 0 - for i in datechecks: - cmd = f"voms-proxy-info -exists -hours {i} -file {proxy_path}" + for threshold in time_left_thresholds: + cmd = f"voms-proxy-info -exists -hours {threshold} -file {proxy_path}" stdout, stderr, status = execute(cmd, log_stream) if status == 1: - log_stream.warning(f"proxy expires in {i} hours") - return i + log_stream.warning(f"proxy expires in {threshold} hours") + return threshold + return status # get extension From 4008be2c32ce1fc97cb148e2e9a994d113a9b71e Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Fri, 16 Feb 2024 16:47:31 +0100 Subject: [PATCH 06/12] Small refactoring of Initializer.py --- pandaserver/taskbuffer/Initializer.py | 20 +++++++++++++++----- pandaserver/taskbuffer/OraDBProxy.py | 11 ++++++----- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/pandaserver/taskbuffer/Initializer.py b/pandaserver/taskbuffer/Initializer.py index 58dba509f..7a378a8a5 100644 --- a/pandaserver/taskbuffer/Initializer.py +++ b/pandaserver/taskbuffer/Initializer.py @@ -1,7 +1,6 @@ import sys from threading import Lock -# logger from pandacommon.pandalogger.PandaLogger import PandaLogger from pandaserver.config import panda_config @@ -9,8 +8,14 @@ _logger = PandaLogger().getLogger("Initializer") -# initialize oracledb using dummy connection to avoid "Unable to acquire Oracle environment handle" class Initializer: + """ + Initialize a dummy database connection. + + Returns: + bool: True if the initialization is successful, False otherwise. + """ + def __init__(self): self.lock = Lock() self.first = True @@ -32,6 +37,7 @@ def init(self): oracledb.init_oracle_client() conn = oracledb.connect(user=panda_config.dbuser, password=panda_config.dbpasswd, dsn=panda_config.dbhost) + elif panda_config.backend == "postgres": import psycopg2 @@ -43,6 +49,7 @@ def init(self): user=panda_config.dbuser, password=panda_config.dbpasswd, ) + else: import MySQLdb @@ -54,15 +61,18 @@ def init(self): user=panda_config.dbuser, passwd=panda_config.dbpasswd, ) + # close conn.close() _logger.debug("done") + except Exception: self.lock.release() - type, value, traceBack = sys.exc_info() - _logger.error(f"connect : {type} {value}") + exception_type, exception_value, traceback = sys.exc_info() + _logger.error(f"connect : {exception_type} {exception_value}") return False - # release + + # release the lock self.lock.release() return True diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 16d420200..ac8af6641 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -59,14 +59,15 @@ import oracledb oracledb.init_oracle_client() - varNUMBER = oracledb.NUMBER + elif panda_config.backend == "postgres": import psycopg2 as psycopg from . import WrappedPostgresConn varNUMBER = int + else: import MySQLdb @@ -79,10 +80,10 @@ _loggerFiltered = PandaLogger().getLogger("DBProxyFiltered") # add handlers -for hdr in _loggerFiltered.handlers: - hdr.setLevel(logging.INFO) - _logger.addHandler(hdr) - _loggerFiltered.removeHandler(hdr) +for handler in _loggerFiltered.handlers: + handler.setLevel(logging.INFO) + _logger.addHandler(handler) + _loggerFiltered.removeHandler(handler) # get mb proxies used in DBProxy methods From a33f88d8f070101b1443b7530f529e1415455a98 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Fri, 16 Feb 2024 16:53:54 +0100 Subject: [PATCH 07/12] Replaced cx_Oracle with oracledb --- pandaserver/taskbuffer/WrappedCursor.py | 7 +++---- pandaserver/test/boostUser.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pandaserver/taskbuffer/WrappedCursor.py b/pandaserver/taskbuffer/WrappedCursor.py index 4002d111a..7a6246621 100644 --- a/pandaserver/taskbuffer/WrappedCursor.py +++ b/pandaserver/taskbuffer/WrappedCursor.py @@ -79,8 +79,8 @@ def convert_query_in_printf_format(sql, var_dict, sql_conv_map): new_pat = None # check if table.column.field for table_name in table_names: - if item_l.startswith(table_name + "."): - item_body = re.sub(r"^" + table_name + r"\.", "", item, flags=re.IGNORECASE) + if item_l.startswith(f"{table_name}."): + item_body = re.sub(f"^{table_name}" + r"\.", "", item, flags=re.IGNORECASE) # no json field if item_body.count(".") == 0: to_skip = True @@ -393,8 +393,7 @@ def getvalue(self, dataItem): def next(self): if self.backend == "mysql": return self.cur.fetchone() - else: - return self.cur.next() + return self.cur.next() # close def close(self): diff --git a/pandaserver/test/boostUser.py b/pandaserver/test/boostUser.py index 689288e40..70c4ce343 100755 --- a/pandaserver/test/boostUser.py +++ b/pandaserver/test/boostUser.py @@ -7,7 +7,7 @@ from pandaserver.taskbuffer.Initializer import initializer from pandaserver.taskbuffer.TaskBuffer import taskBuffer -# initialize oracledb using dummy connection +# initialize DB using dummy connection initializer.init() # logger From 4c9b29931fe7603aa2175b717cfdd18db1a4493f Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Fri, 16 Feb 2024 17:00:59 +0100 Subject: [PATCH 08/12] Removed panda_activeusers_query.py --- .../proxycache/panda_activeusers_query.py | 54 ------------------- pandaserver/proxycache/panda_proxy_cache.py | 0 2 files changed, 54 deletions(-) delete mode 100755 pandaserver/proxycache/panda_activeusers_query.py mode change 100755 => 100644 pandaserver/proxycache/panda_proxy_cache.py diff --git a/pandaserver/proxycache/panda_activeusers_query.py b/pandaserver/proxycache/panda_activeusers_query.py deleted file mode 100755 index d7f5d25ff..000000000 --- a/pandaserver/proxycache/panda_activeusers_query.py +++ /dev/null @@ -1,54 +0,0 @@ -import re -import sys - -from pandacommon.pandalogger.LogWrapper import LogWrapper -from pandacommon.pandalogger.PandaLogger import PandaLogger -from pandacommon.pandautils.thread_utils import GenericThread - -from pandaserver.config import panda_config -from pandaserver.proxycache import panda_proxy_cache -from pandaserver.taskbuffer.TaskBuffer import taskBuffer - -# logger -_logger = PandaLogger().getLogger("activeusers_query") -tmp_log = LogWrapper(_logger) - -if __name__ == "__main__": - tmp_log.debug("================= start ==================") - - requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__) - taskBuffer.init( - panda_config.dbhost, - panda_config.dbpasswd, - nDBConnection=1, - requester=requester_id, - ) - - # instantiate MyProxy I/F - my_proxy_interface_instance = panda_proxy_cache.MyProxyInterface() - - # roles - if hasattr(panda_config, "proxy_cache_roles"): - roles = panda_config.proxy_cache_roles.split(",") - else: - roles = ["atlas", "atlas:/atlas/Role=production", "atlas:/atlas/Role=pilot"] - - # get users - sql = "select distinct DN FROM ATLAS_PANDAMETA.users WHERE GRIDPREF LIKE :patt" - var_map = {":patt": "%p%"} - _, results = taskBuffer.querySQLS(sql, var_map) - - # iterate over user DNs, clean them up and check proxy cache - for (real_dn,) in results: - if real_dn is None: - continue - real_dn = re.sub("/CN=limited proxy", "", real_dn) - real_dn = re.sub("(/CN=proxy)+", "", real_dn) - real_dn = re.sub("(/CN=\d+)+$", "", real_dn) - - # check proxy - tmp_log.debug(f"check proxy cache for DN={real_dn}") - for role in roles: - my_proxy_interface_instance.checkProxy(real_dn, role=role) - - tmp_log.debug("done") diff --git a/pandaserver/proxycache/panda_proxy_cache.py b/pandaserver/proxycache/panda_proxy_cache.py old mode 100755 new mode 100644 From 77c0a1993864e9e17ebc54411ea4ebb131103382 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Fri, 16 Feb 2024 17:37:41 +0100 Subject: [PATCH 09/12] Renamed logger --- pandaserver/daemons/scripts/panda_activeusers_query.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pandaserver/daemons/scripts/panda_activeusers_query.py b/pandaserver/daemons/scripts/panda_activeusers_query.py index d88a96d50..c5078425b 100644 --- a/pandaserver/daemons/scripts/panda_activeusers_query.py +++ b/pandaserver/daemons/scripts/panda_activeusers_query.py @@ -3,12 +3,13 @@ from pandacommon.pandalogger.LogWrapper import LogWrapper from pandacommon.pandalogger.PandaLogger import PandaLogger from pandacommon.pandautils.thread_utils import GenericThread + from pandaserver.config import panda_config from pandaserver.proxycache import panda_proxy_cache from pandaserver.srvcore import CoreUtils # logger -_logger = PandaLogger().getLogger("panda_activeusers_query") +_logger = PandaLogger().getLogger("activeusers_query") # main From 97f52185be4836c1b879880061a9086ab41cc109 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Fri, 16 Feb 2024 17:38:25 +0100 Subject: [PATCH 10/12] Typo --- pandaserver/daemons/scripts/panda_activeusers_query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaserver/daemons/scripts/panda_activeusers_query.py b/pandaserver/daemons/scripts/panda_activeusers_query.py index c5078425b..ecce82b1d 100644 --- a/pandaserver/daemons/scripts/panda_activeusers_query.py +++ b/pandaserver/daemons/scripts/panda_activeusers_query.py @@ -29,7 +29,7 @@ def main(tbuf=None, **kwargs): panda_config.dbpasswd, nDBConnection=1, useTimeout=True, - requester=requesterd_id, + requester=requester_id, ) tmpLog.debug("Getting new connection - done") else: From 7e4ea49e75d04db5291b58aa1dc30a44ca6b55ce Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Mon, 19 Feb 2024 10:48:59 +0100 Subject: [PATCH 11/12] Adding back DDMHandler --- pandaserver/dataservice/DDMHandler.py | 57 +++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 pandaserver/dataservice/DDMHandler.py diff --git a/pandaserver/dataservice/DDMHandler.py b/pandaserver/dataservice/DDMHandler.py new file mode 100644 index 000000000..b484df36b --- /dev/null +++ b/pandaserver/dataservice/DDMHandler.py @@ -0,0 +1,57 @@ +""" +master hander for DDM + +""" + +import re +import threading + +from pandacommon.pandalogger.LogWrapper import LogWrapper +from pandacommon.pandalogger.PandaLogger import PandaLogger + +from pandaserver.dataservice.Activator import Activator +from pandaserver.dataservice.Finisher import Finisher + +# logger +_logger = PandaLogger().getLogger("DDMHandler") + + +class DDMHandler(threading.Thread): + # constructor + def __init__(self, taskBuffer, vuid, site=None, dataset=None, scope=None): + threading.Thread.__init__(self) + self.vuid = vuid + self.taskBuffer = taskBuffer + self.site = site + self.scope = scope + self.dataset = dataset + + # main + def run(self): + # get logger + tmpLog = LogWrapper( + _logger, + f"", + ) + # query dataset + tmpLog.debug("start") + if self.vuid is not None: + dataset = self.taskBuffer.queryDatasetWithMap({"vuid": self.vuid}) + else: + dataset = self.taskBuffer.queryDatasetWithMap({"name": self.dataset}) + if dataset is None: + tmpLog.error("Not found") + tmpLog.debug("end") + return + tmpLog.debug(f"type:{dataset.type} name:{dataset.name}") + if dataset.type == "dispatch": + # activate jobs in jobsDefined + Activator(self.taskBuffer, dataset).start() + if dataset.type == "output": + if dataset.name is not None and re.search("^panda\..*_zip$", dataset.name) is not None: + # start unmerge jobs + Activator(self.taskBuffer, dataset, enforce=True).start() + else: + # finish transferring jobs + Finisher(self.taskBuffer, dataset, site=self.site).start() + tmpLog.debug("end") From a5892d22753f8ab1022f994eee6ea02412879c75 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Mon, 19 Feb 2024 10:51:04 +0100 Subject: [PATCH 12/12] Fixing file name changes --- pandaserver/dataservice/DDMHandler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaserver/dataservice/DDMHandler.py b/pandaserver/dataservice/DDMHandler.py index b484df36b..af308686a 100644 --- a/pandaserver/dataservice/DDMHandler.py +++ b/pandaserver/dataservice/DDMHandler.py @@ -10,7 +10,7 @@ from pandacommon.pandalogger.PandaLogger import PandaLogger from pandaserver.dataservice.Activator import Activator -from pandaserver.dataservice.Finisher import Finisher +from pandaserver.dataservice.finisher import Finisher # logger _logger = PandaLogger().getLogger("DDMHandler")