diff --git a/INSTALL.md b/INSTALL.md index 0706eabef..abb05253e 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -17,7 +17,7 @@ Installation pip install panda-server ``` -1. Install database modules: cx_Oracle for Oracle or MySQLdb for MySQL/MariaDB +1. Install database modules: oracledb for Oracle or MySQLdb for MySQL/MariaDB 1. Modify config files. ``` diff --git a/INSTALL_ATLAS.md b/INSTALL_ATLAS.md index 694063ffe..4845dbbe1 100644 --- a/INSTALL_ATLAS.md +++ b/INSTALL_ATLAS.md @@ -18,11 +18,11 @@ Installation for ATLAS on CC7 + virtualenv + python3 + atlpan chown atlpan:zp /var/log/panda/wsgisocks chown atlpan:zp /var/cache/pandaserver ``` -1. Install panda-server and cx_Oracle. +1. Install panda-server and oracledb. ``` source /opt/pandaserver/bin/activate pip install panda-server - pip install cx_Oracle + pip install oracledb ``` 1. Modify config files. diff --git a/pandaserver/daemons/scripts/panda_activeusers_query.py b/pandaserver/daemons/scripts/panda_activeusers_query.py index d88a96d50..ecce82b1d 100644 --- a/pandaserver/daemons/scripts/panda_activeusers_query.py +++ b/pandaserver/daemons/scripts/panda_activeusers_query.py @@ -3,12 +3,13 @@ from pandacommon.pandalogger.LogWrapper import LogWrapper from pandacommon.pandalogger.PandaLogger import PandaLogger from pandacommon.pandautils.thread_utils import GenericThread + from pandaserver.config import panda_config from pandaserver.proxycache import panda_proxy_cache from pandaserver.srvcore import CoreUtils # logger -_logger = PandaLogger().getLogger("panda_activeusers_query") +_logger = PandaLogger().getLogger("activeusers_query") # main @@ -28,7 +29,7 @@ def main(tbuf=None, **kwargs): panda_config.dbpasswd, nDBConnection=1, useTimeout=True, - requester=requesterd_id, + requester=requester_id, ) tmpLog.debug("Getting new connection - done") else: diff --git a/pandaserver/daemons/utils.py b/pandaserver/daemons/utils.py index b26f6dc84..39383ec5d 100644 --- a/pandaserver/daemons/utils.py +++ b/pandaserver/daemons/utils.py @@ -16,6 +16,7 @@ import psutil from pandacommon.pandalogger import logger_utils from pandacommon.pandautils.thread_utils import GenericThread, LockPool + from pandaserver.config import daemon_config, panda_config # list of signals accepted to end the main process @@ -88,7 +89,7 @@ def got_end_sig(sig, frame): expiry_ts = start_ts + worker_lifetime # create taskBuffer object if not given if tbuf is None: - # initialize cx_Oracle using dummy connection + # initialize oracledb using dummy connection try: from pandaserver.taskbuffer.Initializer import initializer diff --git a/pandaserver/dataservice/DDM.py b/pandaserver/dataservice/DDM.py index 96e2e0043..a98b190c5 100755 --- a/pandaserver/dataservice/DDM.py +++ b/pandaserver/dataservice/DDM.py @@ -8,7 +8,6 @@ import sys import traceback -from pandaserver.srvcore import CoreUtils from rucio.client import Client as RucioClient from rucio.common.exception import ( DataIdentifierAlreadyExists, @@ -20,6 +19,8 @@ UnsupportedOperation, ) +from pandaserver.srvcore import CoreUtils + # rucio class RucioAPI: @@ -337,33 +338,6 @@ def registerZipFiles(self, zipMap): ) iFiles += nFiles - # get disk usage at RSE - def getRseUsage(self, rse, src="srm"): - retMap = {} - try: - client = RucioClient() - itr = client.get_rse_usage(rse) - # look for srm - for item in itr: - if item["source"] == src: - try: - total = item["total"] / 1024 / 1024 / 1024 - except Exception: - total = None - try: - used = item["used"] / 1024 / 1024 / 1024 - except Exception: - used = None - try: - free = item["free"] / 1024 / 1024 / 1024 - except Exception: - free = None - retMap = {"total": total, "used": used, "free": free} - break - except Exception: - pass - return retMap - # list datasets def listDatasets(self, datasetName, old=False): result = {} @@ -463,16 +437,6 @@ def getMetaData(self, dsn): errType, errVale = sys.exc_info()[:2] return False, f"{errType} {errVale}" - # check if dataset exists - def checkDatasetExist(self, dsn): - # register dataset - client = RucioClient() - try: - scope, dsn = self.extract_scope(dsn) - return True - except DataIdentifierNotFound: - return False - # delete dataset def eraseDataset(self, dsn, scope=None, grace_period=None): presetScope = scope @@ -632,25 +596,6 @@ def getDatasetSize(self, datasetName): errMsg = f"{errtype.__name__} {errvalue}" return False, errMsg - # delete dataset replicas - def deleteDatasetReplicas(self, datasetName, locations): - # extract scope from dataset - scope, dsn = self.extract_scope(datasetName) - client = RucioClient() - try: - for rule in client.list_did_rules(scope, dsn): - if rule["account"] != client.account: - continue - if rule["rse_expression"] in locations: - client.delete_replication_rule(rule["id"]) - except DataIdentifierNotFound: - pass - except Exception: - errtype, errvalue = sys.exc_info()[:2] - errMsg = f"{errtype.__name__} {errvalue}" - return False, errMsg - return True, "" - # register files def registerFiles(self, files, rse): client = RucioClient() diff --git a/pandaserver/dataservice/DDMHandler.py b/pandaserver/dataservice/DDMHandler.py index 82b3018da..d566b95f5 100644 --- a/pandaserver/dataservice/DDMHandler.py +++ b/pandaserver/dataservice/DDMHandler.py @@ -1,7 +1,3 @@ -""" -master hander for DDM -""" - import re import threading @@ -52,4 +48,4 @@ def run(self): else: # finish transferring jobs Finisher(self.taskBuffer, dataset, site=self.site).start() - tmpLog.debug("end") \ No newline at end of file + tmpLog.debug("end") diff --git a/pandaserver/dataservice/forkSetupper.py b/pandaserver/dataservice/forkSetupper.py index ca6bdb6f9..b9eb92b16 100755 --- a/pandaserver/dataservice/forkSetupper.py +++ b/pandaserver/dataservice/forkSetupper.py @@ -2,6 +2,7 @@ import traceback from pandacommon.pandautils.thread_utils import GenericThread + from pandaserver.srvcore.CoreUtils import commands_get_status_output @@ -23,7 +24,7 @@ def run(inFile, v_onlyTA, v_firstSubmission): # password from pandaserver.config import panda_config - # initialize cx_Oracle using dummy connection + # initialize oracledb using dummy connection from pandaserver.taskbuffer.Initializer import initializer initializer.init() diff --git a/pandaserver/proxycache/DBMSql.py b/pandaserver/proxycache/DBMSql.py deleted file mode 100755 index 7342f93a1..000000000 --- a/pandaserver/proxycache/DBMSql.py +++ /dev/null @@ -1,111 +0,0 @@ -import sys -from types import TupleType - -import cx_Oracle -from pandaserver.config import panda_config - -# Common items -ORAC_CON = None - - -OracleList = ["oracle"] - - -# -# Connection initializers -# - - -def getOracleConnection(db_type): - global ORAC_CON - if ORAC_CON is not None: - return ORAC_CON - user = panda_config.dbuser - pssw = panda_config.dbpasswd - serv = panda_config.dbhost - poolmin = 1 - poolmax = 3 - poolincr = 1 - - print("Initializing Oracle connection") - try: - cpool = cx_Oracle.SessionPool(user, pssw, serv, poolmin, poolmax, poolincr) - ORAC_CON = cpool.acquire() - except Exception: - import traceback - - traceback.print_stack() - traceback.print_exc() - sys.exit(1) - return ORAC_CON - - -class DBMSql: - db_type = "" - oracle_con = None - - # constructor with database type [sqlite|postgres] - def __init__(self, _db_type): - self.db_type = _db_type - - if self.db_type in OracleList: - self.oracle_con = getOracleConnection(self.db_type) - else: - print("Unknown database type:" + self.db_type) - print("ERROR. Unknown database type", self.db_type) - sys.exit(1) - - print("Database connection created for " + self.db_type) - - # Method to execute sql query (SELECT) - def executeQuery(self, sql): - bindDict = None - if isinstance(sql, TupleType): - bindDict = sql[1] - sql = sql[0] - if sql.strip()[0:6].lower() != "select": - print("not a SELECT statement!!") - return [] - try: - if self.db_type in OracleList: - cursor = self.oracle_con.cursor() - if bindDict is not None: - cursor.execute(sql, bindDict) - else: - cursor.execute(sql) - colMap = cursor.description - result = cursor.fetchall() - ret = [] - for t in result: - i = 0 - d = {} - for e in t: - if str(type(e)) == "": - e = str(e) - d[colMap[i][0]] = e - i = i + 1 - ret.append(d) - cursor.close() - return ret - - else: - raise DBMSqlError(f"not support:{self.db_type}") - except Exception: - raise DBMSqlError(f"executeQuery error:{sys.exc_info()[1]} \n {sql}") - - -class DBMSqlError(Exception): - def __init__(self, value): - self.value = value - - def __str__(self): - return repr(self.value) - - -# ----------------------------- -# Codes below are for test... -# - -if __name__ == "__main__": - dbm = DBMSql(_db_type="oracle") - print(dbm.executeQuery("select * from services")) diff --git a/pandaserver/proxycache/panda_activeusers_query.py b/pandaserver/proxycache/panda_activeusers_query.py deleted file mode 100755 index 72d68c640..000000000 --- a/pandaserver/proxycache/panda_activeusers_query.py +++ /dev/null @@ -1,48 +0,0 @@ -import re - -from pandacommon.pandalogger.LogWrapper import LogWrapper -from pandacommon.pandalogger.PandaLogger import PandaLogger -from pandacommon.pandautils.thread_utils import GenericThread -from pandaserver.config import panda_config -from pandaserver.proxycache import panda_proxy_cache -from pandaserver.taskbuffer.TaskBuffer import taskBuffer - -# logger -_logger = PandaLogger().getLogger("panda_activeusers_query") -tmpLog = LogWrapper(_logger) - -if __name__ == "__main__": - tmpLog.debug("================= start ==================") - - requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__) - taskBuffer.init( - panda_config.dbhost, - panda_config.dbpasswd, - nDBConnection=1, - requester=requester_id, - ) - - # instantiate MyProxy I/F - my_proxy_interface_instance = panda_proxy_cache.MyProxyInterface() - - # roles - if hasattr(panda_config, "proxy_cache_roles"): - roles = panda_config.proxy_cache_roles.split(",") - else: - roles = ["atlas", "atlas:/atlas/Role=production", "atlas:/atlas/Role=pilot"] - # get users - sql = "select distinct DN FROM ATLAS_PANDAMETA.users WHERE GRIDPREF LIKE :patt" - varMap = {} - varMap[":patt"] = "%p%" - tmpStat, tmpRes = taskBuffer.querySQLS(sql, varMap) - for (realDN,) in tmpRes: - if realDN is None: - continue - realDN = re.sub("/CN=limited proxy", "", realDN) - realDN = re.sub("(/CN=proxy)+", "", realDN) - realDN = re.sub("(/CN=\d+)+$", "", realDN) - # check proxy - tmpLog.debug(f"check proxy cache for DN={realDN}") - for role in roles: - my_proxy_interface_instance.checkProxy(realDN, role=role) - tmpLog.debug("done") diff --git a/pandaserver/proxycache/panda_proxy_cache.py b/pandaserver/proxycache/panda_proxy_cache.py old mode 100755 new mode 100644 index 640bbd65a..177f71f39 --- a/pandaserver/proxycache/panda_proxy_cache.py +++ b/pandaserver/proxycache/panda_proxy_cache.py @@ -7,7 +7,6 @@ from pandacommon.pandalogger.LogWrapper import LogWrapper from pandacommon.pandalogger.PandaLogger import PandaLogger -# logger _logger = PandaLogger().getLogger("ProxyCache") @@ -29,9 +28,8 @@ def execute(program, log_stream): def cat(filename): """Given filename, print its text contents.""" - f = open(filename, "r") - out = f.read() - f.close() + with open(filename, "r") as f: + out = f.read() return out @@ -51,12 +49,16 @@ def store( production=False, server_name="myproxy.cern.ch", role=None, - force=False, log_stream=None, ): log_stream.info("store proxy") - """Retrieve proxy from myproxy.""" - proxy_path = os.path.join(self.__target_path, hashlib.sha1((user_dn + ".plain").encode("utf-8")).hexdigest()) + + # Retrieve proxy from myproxy + proxy_path = os.path.join( + self.__target_path, + hashlib.sha1(f"{user_dn}.plain".encode("utf-8")).hexdigest(), + ) + # check if empty dummy file if os.path.exists(proxy_path) and os.stat(proxy_path).st_size == 0: if datetime.datetime.now(datetime.timezone.utc) - datetime.datetime.fromtimestamp( @@ -65,7 +67,7 @@ def store( log_stream.info(f"skip too early to try again according to timestamp of {proxy_path}") return 2 cmd = f"myproxy-logon -s {server_name} --no_passphrase --out {proxy_path} -l '{user_dn}' -k {cred_name} -t 0" - # if myproxy.cern.ch fails, try myproxy on bnl as well + stdout, stderr, status = execute(cmd, log_stream) if stdout: log_stream.info(f"stdout is {stdout} ") @@ -76,7 +78,7 @@ def store( log_stream.info(f"test the status of plain... {status}") if status == 1: return status - # proxyValidity = checkValidity(proxy_path) + if role is not None: log_stream.info(f"proxy needs {role} - need to add voms attributes and store it in the cache") tmpExtension = self.getExtension(role) @@ -95,14 +97,10 @@ def store( log_stream.info("production proxy needed - need to add voms attributes and store it in the cache") prodproxy_path = os.path.join( self.__target_path, - str(hashlib.sha1((user_dn + ".prod").encode("utf-8")).hexdigest()), + str(hashlib.sha1(f"{user_dn}.prod".encode("utf-8")).hexdigest()), ) log_stream.info(prodproxy_path) - prodcmd = "voms-proxy-init -vomses /etc/vomses -valid 96:00 -rfc -cert %s -key %s -out %s -n -voms atlas:/atlas/Role=production" % ( - proxy_path, - proxy_path, - prodproxy_path, - ) + prodcmd = f"voms-proxy-init -vomses /etc/vomses -valid 96:00 -rfc -cert {proxy_path} -key {proxy_path} -out {prodproxy_path} -n -voms atlas:/atlas/Role=production" stdout, stderr, status = execute(prodcmd, log_stream) if stdout: log_stream.info(f"stdout is {stdout} ") @@ -135,7 +133,7 @@ def retrieve(self, user_dn, production=False, role=None): elif production: proxy_path = os.path.join( self.__target_path, - str(hashlib.sha1((user_dn + ".prod").encode("utf-8")).hexdigest()), + str(hashlib.sha1(f"{user_dn}.prod".encode("utf-8")).hexdigest()), ) else: proxy_path = os.path.join(self.__target_path, hashlib.sha1(user_dn.encode("utf-8")).hexdigest()) @@ -148,25 +146,28 @@ def retrieve(self, user_dn, production=False, role=None): def get_proxy_path(self, user_dn, production, role): if role is not None: tmpExtension = self.getExtension(role) - proxy_path = os.path.join( + return os.path.join( self.__target_path, str(hashlib.sha1((user_dn + tmpExtension).encode("utf-8")).hexdigest()), ) elif production: - proxy_path = os.path.join( + return os.path.join( self.__target_path, - str(hashlib.sha1((user_dn + ".prod").encode("utf-8")).hexdigest()), + str(hashlib.sha1(f"{user_dn}.prod".encode("utf-8")).hexdigest()), ) else: - proxy_path = os.path.join(self.__target_path, hashlib.sha1(user_dn.encode("utf-8")).hexdigest()) - return proxy_path + return os.path.join( + self.__target_path, + hashlib.sha1(user_dn.encode("utf-8")).hexdigest(), + ) def checkProxy(self, user_dn, production=False, role=None, name=None): log_stream = LogWrapper(_logger, f'< name="{name}" role={role} >') log_stream.info(f"check proxy for {user_dn}") - """Check the validity of a proxy.""" + + # Check the validity of a proxy proxy_path = self.get_proxy_path(user_dn, production, role) - isOK = False + is_ok = False if os.path.isfile(proxy_path): log_stream.info("proxy is there. Need to check validity") cmd = f"voms-proxy-info -exists -hours 94 -file {proxy_path}" @@ -189,14 +190,14 @@ def checkProxy(self, user_dn, production=False, role=None, name=None): # copy with compact name alt_proxy_path = self.get_proxy_path(name, production, role) shutil.copyfile(proxy_path, alt_proxy_path) - isOK = True + is_ok = True elif ret == 2: log_stream.info("proxy retrieval on hold") else: log_stream.error("proxy retrieval failed") else: log_stream.info("proxy is valid for more than 3 days") - isOK = True + is_ok = True else: log_stream.info("proxy is not in the cache repo. will try to get it from myproxy") ret = self.store(user_dn, self.__cred_name, production, role=role, log_stream=log_stream) @@ -204,13 +205,16 @@ def checkProxy(self, user_dn, production=False, role=None, name=None): log_stream.info("proxy stored successfully") alt_proxy_path = self.get_proxy_path(name, production, role) shutil.copyfile(proxy_path, alt_proxy_path) - isOK = True + is_ok = True elif ret == 2: log_stream.info("proxy retrieval on hold") else: log_stream.error("proxy retrieval failed") - if isOK: - plain_path = os.path.join(self.__target_path, hashlib.sha1((user_dn + ".plain").encode("utf-8")).hexdigest()) + if is_ok: + plain_path = os.path.join( + self.__target_path, + hashlib.sha1(f"{user_dn}.plain".encode("utf-8")).hexdigest(), + ) if os.path.isfile(plain_path): return self.checkValidity(plain_path, log_stream) else: @@ -218,15 +222,15 @@ def checkProxy(self, user_dn, production=False, role=None, name=None): def checkValidity(self, proxy_path, log_stream): log_stream.info("Need to check validity and expiry!") - datechecks = [24, 94, 168] - # datechecks = [1,2,3,4] + time_left_thresholds = [24, 94, 168] status = 0 - for i in datechecks: - cmd = f"voms-proxy-info -exists -hours {i} -file {proxy_path}" + for threshold in time_left_thresholds: + cmd = f"voms-proxy-info -exists -hours {threshold} -file {proxy_path}" stdout, stderr, status = execute(cmd, log_stream) if status == 1: - log_stream.warning(f"proxy expires in {i} hours") - return i + log_stream.warning(f"proxy expires in {threshold} hours") + return threshold + return status # get extension diff --git a/pandaserver/server/panda.py b/pandaserver/server/panda.py index f34045ebc..3a92944d0 100755 --- a/pandaserver/server/panda.py +++ b/pandaserver/server/panda.py @@ -16,10 +16,13 @@ import traceback from urllib.parse import parse_qsl -import pandaserver.taskbuffer.ErrorCode from pandacommon.pandalogger.LogWrapper import LogWrapper from pandacommon.pandalogger.PandaLogger import PandaLogger from pandacommon.pandautils.thread_utils import GenericThread +from werkzeug.datastructures import CombinedMultiDict, EnvironHeaders +from werkzeug.formparser import parse_form_data + +import pandaserver.taskbuffer.ErrorCode from pandaserver.config import panda_config # pylint: disable=W0611 @@ -173,10 +176,8 @@ updateWorkers, userIF, ) -from werkzeug.datastructures import CombinedMultiDict, EnvironHeaders -from werkzeug.formparser import parse_form_data -# initialize cx_Oracle using dummy connection +# initialize oracledb using dummy connection initializer.init() # initialize TaskBuffer diff --git a/pandaserver/taskbuffer/ConBridge.py b/pandaserver/taskbuffer/ConBridge.py index d69686de3..55c80925d 100644 --- a/pandaserver/taskbuffer/ConBridge.py +++ b/pandaserver/taskbuffer/ConBridge.py @@ -327,7 +327,7 @@ def bridge_run(self): # execute method = getattr(self.proxy, comStr) res = method(*variables[0], **variables[1]) - # FIXME : modify response since cx_Oracle types cannot be picked + # FIXME : modify response since oracledb types cannot be picked if comStr in ["querySQLS"]: newRes = [True] + list(res[1:]) res = newRes diff --git a/pandaserver/taskbuffer/EiDBProxy.py b/pandaserver/taskbuffer/EiDBProxy.py deleted file mode 100755 index 3025c3a55..000000000 --- a/pandaserver/taskbuffer/EiDBProxy.py +++ /dev/null @@ -1,125 +0,0 @@ -""" -proxy for database connection - -""" -import re -import warnings - -from pandacommon.pandalogger.LogWrapper import LogWrapper -from pandacommon.pandalogger.PandaLogger import PandaLogger -from pandaserver.config import panda_config -from pandaserver.taskbuffer import OraDBProxy - -warnings.filterwarnings("ignore") - -# logger -_logger = PandaLogger().getLogger("EiDBProxy") - - -# proxy -class EiDBProxy(OraDBProxy.DBProxy): - # constructor - def __init__(self, useOtherError=False): - OraDBProxy.DBProxy.__init__(self, useOtherError) - - # connect to DB (just for INTR) - def connect( - self, - dbhost=panda_config.ei_dbhost, - dbpasswd=panda_config.ei_dbpasswd, - dbuser=panda_config.ei_dbuser, - dbname=panda_config.ei_dbname, - dbtimeout=panda_config.ei_dbtimeout, - reconnect=False, - ): - return OraDBProxy.DBProxy.connect( - self, - dbhost=dbhost, - dbpasswd=dbpasswd, - dbuser=dbuser, - dbname=dbname, - dbtimeout=dbtimeout, - reconnect=reconnect, - ) - - # get index of AMI tag - def getIndexAmiTag(self, tagList, amiTag): - for idxTag, tagPattern in enumerate(tagList): - if re.search("^" + tagPattern + "$", amiTag) is not None: - return idxTag - return None - - # get GUIDs from EventIndex - def getGUIDsFromEventIndex(self, runEventList, streamName, amiTags, dataType): - comment = " /* DBProxy.getGUIDsFromEventIndex */" - methodName = comment.split(" ")[-2].split(".")[-1] - tmpLog = LogWrapper( - _logger, - methodName + f" ", - ) - try: - # change to list - if amiTags not in [None, ""]: - amiTags = amiTags.replace("*", ".*").split(",") - tmpLog.debug(f"start for {len(runEventList)} events") - # check data type - if dataType not in ["RAW", "ESD", "AOD"]: - return False, f"dataType={dataType} is unsupported" - # sql to insert runs and events - sqlRE = f"INSERT INTO {panda_config.schemaEI}.TMP_RUN_EVENT_PAIRS (runNumber,eventNumber) " - sqlRE += "VALUES (:runNumber,:eventNumber) " - varMaps = [] - for runNumber, eventNumber in runEventList: - varMap = {} - varMap[":runNumber"] = runNumber - varMap[":eventNumber"] = eventNumber - varMaps.append(varMap) - # begin transaction - self.conn.begin() - self.cur.arraysize = 100000 - # insert runs and events - self.cur.executemany(sqlRE + comment, varMaps) - # read GUIDs - varMap = {} - if amiTags in [None, ""]: - sqlRG = f"SELECT runNumber,eventNumber,guid_{dataType} " - sqlRG += f"FROM {panda_config.schemaEI}.V_PANDA_EVPICK_NOAMITAG_MANY " - else: - sqlRG = f"SELECT runNumber,eventNumber,guid_{dataType},amiTag " - sqlRG += f"FROM {panda_config.schemaEI}.V_PANDA_EVPICK_AMITAG_MANY " - if streamName not in [None, ""]: - sqlRG += "WHERE streamName=:streamName " - varMap[":streamName"] = streamName - self.cur.execute(sqlRG + comment, varMap) - resRG = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - retValue = {} - keyAmiIdxMap = {} - for tmpItem in resRG: - if amiTags in [None, ""]: - runNumber, eventNumber, guid = tmpItem - # dummy - idxTag = 0 - else: - runNumber, eventNumber, guid, amiTag = tmpItem - # get index number for the AMI tag in the list - idxTag = self.getIndexAmiTag(amiTags, amiTag) - # didn't match - if idxTag is None: - continue - tmpKey = (runNumber, eventNumber) - # use AMI tag in a preference orde - if tmpKey in keyAmiIdxMap and keyAmiIdxMap[tmpKey] < idxTag: - continue - keyAmiIdxMap[tmpKey] = idxTag - retValue[tmpKey] = [guid] - tmpLog.debug(f"found {len(retValue)} events") - return True, retValue - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, methodName) - return False, None diff --git a/pandaserver/taskbuffer/Initializer.py b/pandaserver/taskbuffer/Initializer.py index cbf3120aa..7a378a8a5 100644 --- a/pandaserver/taskbuffer/Initializer.py +++ b/pandaserver/taskbuffer/Initializer.py @@ -1,15 +1,21 @@ import sys from threading import Lock -# logger from pandacommon.pandalogger.PandaLogger import PandaLogger + from pandaserver.config import panda_config _logger = PandaLogger().getLogger("Initializer") -# initialize cx_Oracle using dummy connection to avoid "Unable to acquire Oracle environment handle" class Initializer: + """ + Initialize a dummy database connection. + + Returns: + bool: True if the initialization is successful, False otherwise. + """ + def __init__(self): self.lock = Lock() self.first = True @@ -27,14 +33,11 @@ def init(self): _logger.debug("connect") # connect if panda_config.backend == "oracle": - import cx_Oracle + import oracledb + + oracledb.init_oracle_client() + conn = oracledb.connect(user=panda_config.dbuser, password=panda_config.dbpasswd, dsn=panda_config.dbhost) - conn = cx_Oracle.connect( - dsn=panda_config.dbhost, - user=panda_config.dbuser, - password=panda_config.dbpasswd, - threaded=True, - ) elif panda_config.backend == "postgres": import psycopg2 @@ -46,6 +49,7 @@ def init(self): user=panda_config.dbuser, password=panda_config.dbpasswd, ) + else: import MySQLdb @@ -57,15 +61,18 @@ def init(self): user=panda_config.dbuser, passwd=panda_config.dbpasswd, ) + # close conn.close() _logger.debug("done") + except Exception: self.lock.release() - type, value, traceBack = sys.exc_info() - _logger.error(f"connect : {type} {value}") + exception_type, exception_value, traceback = sys.exc_info() + _logger.error(f"connect : {exception_type} {exception_value}") return False - # release + + # release the lock self.lock.release() return True diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 376246fc3..eae31071b 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -56,15 +56,18 @@ pass if panda_config.backend == "oracle": - import cx_Oracle + import oracledb + + oracledb.init_oracle_client() + varNUMBER = oracledb.NUMBER - varNUMBER = cx_Oracle.NUMBER elif panda_config.backend == "postgres": import psycopg2 as psycopg from . import WrappedPostgresConn varNUMBER = int + else: import MySQLdb @@ -77,10 +80,10 @@ _loggerFiltered = PandaLogger().getLogger("DBProxyFiltered") # add handlers -for hdr in _loggerFiltered.handlers: - hdr.setLevel(logging.INFO) - _logger.addHandler(hdr) - _loggerFiltered.removeHandler(hdr) +for handler in _loggerFiltered.handlers: + handler.setLevel(logging.INFO) + _logger.addHandler(handler) + _loggerFiltered.removeHandler(handler) # get mb proxies used in DBProxy methods @@ -195,17 +198,11 @@ def connect( # connect try: if self.backend == "oracle": - self.conn = cx_Oracle.connect( - dsn=self.dbhost, - user=self.dbuser, - password=self.dbpasswd, - threaded=True, - encoding="UTF-8", - ) + self.conn = oracledb.connect(dsn=self.dbhost, user=self.dbuser, password=self.dbpasswd) def OutputTypeHandler(cursor, name, defaultType, size, precision, scale): - if defaultType == cx_Oracle.CLOB: - return cursor.var(cx_Oracle.LONG_STRING, arraysize=cursor.arraysize) + if defaultType == oracledb.CLOB: + return cursor.var(oracledb.LONG_STRING, arraysize=cursor.arraysize) self.conn.outputtypehandler = OutputTypeHandler elif self.backend == "postgres": diff --git a/pandaserver/taskbuffer/WrappedCursor.py b/pandaserver/taskbuffer/WrappedCursor.py index e503bfdad..7a6246621 100644 --- a/pandaserver/taskbuffer/WrappedCursor.py +++ b/pandaserver/taskbuffer/WrappedCursor.py @@ -8,6 +8,7 @@ import warnings from pandacommon.pandalogger.PandaLogger import PandaLogger + from pandaserver.config import panda_config warnings.filterwarnings("ignore") @@ -78,8 +79,8 @@ def convert_query_in_printf_format(sql, var_dict, sql_conv_map): new_pat = None # check if table.column.field for table_name in table_names: - if item_l.startswith(table_name + "."): - item_body = re.sub(r"^" + table_name + r"\.", "", item, flags=re.IGNORECASE) + if item_l.startswith(f"{table_name}."): + item_body = re.sub(f"^{table_name}" + r"\.", "", item, flags=re.IGNORECASE) # no json field if item_body.count(".") == 0: to_skip = True @@ -313,7 +314,7 @@ def _returningIntoOracle(self, returningInputData, varDict, cur, dryRun=False): # assuming that we use RETURNING INTO only for PandaID or row_ID columns if not dryRun: for x in listInto: - varDict[x] = cur.var(cx_Oracle.NUMBER) + varDict[x] = cur.var(oracledb.NUMBER) result = f" RETURNING {valReturning} INTO {valInto} " except Exception: pass @@ -392,8 +393,7 @@ def getvalue(self, dataItem): def next(self): if self.backend == "mysql": return self.cur.fetchone() - else: - return self.cur.next() + return self.cur.next() # close def close(self): diff --git a/pandaserver/test/boostUser.py b/pandaserver/test/boostUser.py index 80cef5467..70c4ce343 100755 --- a/pandaserver/test/boostUser.py +++ b/pandaserver/test/boostUser.py @@ -2,11 +2,12 @@ from pandacommon.pandalogger.PandaLogger import PandaLogger from pandacommon.pandautils.thread_utils import GenericThread + from pandaserver.config import panda_config from pandaserver.taskbuffer.Initializer import initializer from pandaserver.taskbuffer.TaskBuffer import taskBuffer -# initialize cx_Oracle using dummy connection +# initialize DB using dummy connection initializer.init() # logger diff --git a/pyproject.toml b/pyproject.toml index 873b1c742..fbdd1ed38 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,12 +38,12 @@ dependencies = [ requires-python = ">=3.8" [project.optional-dependencies] -oracle = ['cx_Oracle'] +oracle = ['oracledb'] mysql = ['mysqlclient'] postgres = ['psycopg2-binary'] rucio = ['rucio-clients'] elasticsearch = ['elasticsearch'] -atlasprod = ['cx_Oracle', 'rucio-clients', 'elasticsearch', 'numpy', 'scipy'] +atlasprod = ['oracledb', 'rucio-clients', 'elasticsearch', 'numpy', 'scipy'] [project.urls] Homepage = "https://panda-wms.readthedocs.io/en/latest/"