From ba770f89d75e80c6029c5d3d248421d5e12a94d9 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Wed, 11 Sep 2024 16:34:00 +0200 Subject: [PATCH] feat (FTS3): add experimental token support --- .../Systems/DataManagement/fts3.rst | 24 ++++ .../DataManagementSystem/Agent/FTS3Agent.py | 21 ++- .../DataManagementSystem/Client/FTS3File.py | 1 + .../DataManagementSystem/Client/FTS3Job.py | 129 +++++++++++++++--- .../DataManagementSystem/ConfigTemplate.cfg | 3 + 5 files changed, 154 insertions(+), 24 deletions(-) diff --git a/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst b/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst index 0566058e3ea..9403936398e 100644 --- a/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst +++ b/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst @@ -200,3 +200,27 @@ More details on how the intermediate SE selection is done and how the matrix is Work in FTS has a `task `_ to try and bring that feature in. A future solution may come from DIRAC. In the meantime, the best solution is to ask the site to either cleanup themselves (some storages like EOS have that built in) or to give you a dump of the namespace, and then do the cleaning yourself. + + +Token support +---------------- + +.. versionadded:: v8.0.51 + +.. warning:: + Very experimental feature + + +The current state is the one in which LHCb ran the DC24 challenge. It only worked for dCache site, as there is still not a uniform way for storages to understand permissions... +A transfer will happen with token if: + + * ``UseTokens`` is true in the FTSAgent configuration + * ``WLCGTokenBasePath`` is set for both the source and the destination + +The tokens use specific file path, and not generic wildcard permissions. + +.. warning:: + Token support is as experimental as can be in any layer of the stack (DIRAC, storage, FTS... even the model is experimental) + +.. warning:: + The FTS3Agent got occasionaly stuck when tokens were used diff --git a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py index 4e009158d43..764527855b3 100644 --- a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py +++ b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py @@ -12,6 +12,7 @@ :caption: FTS3Agent options """ + import datetime import errno import os @@ -38,6 +39,7 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.FrameworkSystem.Client.Logger import gLogger from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager +from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager from DIRAC.DataManagementSystem.private import FTS3Utilities from DIRAC.DataManagementSystem.DB.FTS3DB import FTS3DB from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job @@ -105,8 +107,8 @@ def __readConf(self): self.maxDelete = self.am_getOption("DeleteLimitPerCycle", 100) # lifetime of the proxy we download to delegate to FTS self.proxyLifetime = self.am_getOption("ProxyLifetime", PROXY_LIFETIME) - self.jobMonitoringBatchSize = self.am_getOption("JobMonitoringBatchSize", JOB_MONITORING_BATCH_SIZE) + self.useTokens = self.am_getOption("UseTokens", False) return S_OK() @@ -497,7 +499,22 @@ def _treatOperation(self, operation): log.error("Could not select TPC list", repr(e)) continue - res = ftsJob.submit(context=context, protocols=tpcProtocols) + # If we use token, get an access token with the + # fts scope in it + # The FTS3Job will decide to use it or not + fts_access_token = None + if self.useTokens: + res = gTokenManager.getToken( + userGroup=ftsJob.userGroup, + requiredTimeLeft=3600, + scope=["fts"], + ) + if not res["OK"]: + return res + + fts_access_token = res["Value"]["access_token"] + + res = ftsJob.submit(context=context, protocols=tpcProtocols, fts_access_token=fts_access_token) if not res["OK"]: log.error("Could not submit FTS3Job", f"FTS3Operation {operation.operationID} : {res}") diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3File.py b/src/DIRAC/DataManagementSystem/Client/FTS3File.py index 0b687300af3..bcd4e34a55f 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3File.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3File.py @@ -21,6 +21,7 @@ class FTS3File(JSerializable): "Started", # From FTS: File transfer has started "Not_used", # From FTS: Transfer not being considered yet, waiting for another one (multihop) "Archiving", # From FTS: file not yet migrated to tape + "Token_prep", # From FTS: When using token, used before Submitted until FTS fetched a refresh token ] # These are the states that we consider final. diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py index 83f2724027a..4d17ca1b430 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py @@ -25,6 +25,7 @@ from DIRAC.Resources.Storage.StorageElement import StorageElement from DIRAC.FrameworkSystem.Client.Logger import gLogger +from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR from DIRAC.Core.Utilities.DErrno import cmpError @@ -301,7 +302,18 @@ def __isTapeSE(seName, vo): return isTape - def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=None): + @staticmethod + def __seTokenSupport(seObj): + """Check whether a given SE supports token + + :param seObj: StorageElement object + + :returns: True/False + In case of error, returns False + """ + return seObj.options.get("TokenSupport", "").lower() in ("true", "yes") + + def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=None, tokensEnabled=False): """Build a job for transfer Some attributes of the job are expected to be set @@ -329,6 +341,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N log = gLogger.getSubLogger(f"constructTransferJob/{self.operationID}/{self.sourceSE}_{self.targetSE}") isMultiHop = False + useTokens = False # Check if it is a multiHop transfer if self.multiHopSE: @@ -429,6 +442,9 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N log.debug(f"Not preparing transfer for file {ftsFile.lfn}") continue + srcToken = None + dstToken = None + sourceSURL, targetSURL = allSrcDstSURLs[ftsFile.lfn] stageURL = allStageURLs.get(ftsFile.lfn) @@ -485,6 +501,44 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N if self.activity: trans_metadata["activity"] = self.activity + # Add tokens if both storages support it and if the requested + if tokensEnabled and self.__seTokenSupport(srcSE) and self.__seTokenSupport(dstSE): + # We get a read token for the source + # offline_access is to allow FTS to refresh it + res = srcSE.getWLCGTokenPath(ftsFile.lfn) + if not res["OK"]: + return res + srcTokenPath = res["Value"] + res = gTokenManager.getToken( + userGroup=self.userGroup, + requiredTimeLeft=3600, + scope=[f"storage.read:/{srcTokenPath}", "offline_access"], + useCache=False, + ) + if not res["OK"]: + return res + srcToken = res["Value"]["access_token"] + + # We get a token with modify and read for the destination + # We need the read to be able to stat + # CAUTION: only works with dcache for now, other storages + # interpret permissions differently + # offline_access is to allow FTS to refresh it + res = dstSE.getWLCGTokenPath(ftsFile.lfn) + if not res["OK"]: + return res + dstTokenPath = res["Value"] + res = gTokenManager.getToken( + userGroup=self.userGroup, + requiredTimeLeft=3600, + scope=[f"storage.modify:/{dstTokenPath}", f"storage.read:/{dstTokenPath}", "offline_access"], + useCache=False, + ) + if not res["OK"]: + return res + dstToken = res["Value"]["access_token"] + useTokens = True + # because of an xroot bug (https://github.com/xrootd/xrootd/issues/1433) # the checksum needs to be lowercase. It does not impact the other # protocol, so it's fine to put it here. @@ -497,6 +551,8 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N filesize=ftsFile.size, metadata=trans_metadata, activity=self.activity, + source_token=srcToken, + destination_token=dstToken, ) transfers.append(trans) @@ -514,6 +570,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N "rmsReqID": self.rmsReqID, "sourceSE": self.sourceSE, "targetSE": self.targetSE, + "useTokens": useTokens, # Store the information here to propagate it to submission } if self.activity: @@ -676,7 +733,7 @@ def _constructStagingJob(self, pinTime, allLFNs, target_spacetoken): return S_OK((job, fileIDsInTheJob)) - def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None): + def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None, fts_access_token=None): """submit the job to the FTS server Some attributes are expected to be defined for the submission to work: @@ -700,17 +757,13 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc :param ucert: path to the user certificate/proxy. Might be inferred by the fts cli (see its doc) :param protocols: list of protocols from which we should choose the protocol to use + :param fts_access_token: token to be used to talk to FTS and to be passed when creating a context :returns: S_OK([FTSFiles ids of files submitted]) """ log = gLogger.getLocalSubLogger(f"submit/{self.operationID}/{self.sourceSE}_{self.targetSE}") - if not context: - if not ftsServer: - ftsServer = self.ftsServer - context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False) - # Construct the target SURL res = self.__fetchSpaceToken(self.targetSE, self.vo) if not res["OK"]: @@ -720,7 +773,10 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc allLFNs = [ftsFile.lfn for ftsFile in self.filesToSubmit] if self.type == "Transfer": - res = self._constructTransferJob(pinTime, allLFNs, target_spacetoken, protocols=protocols) + res = self._constructTransferJob( + pinTime, allLFNs, target_spacetoken, protocols=protocols, tokensEnabled=bool(fts_access_token) + ) + elif self.type == "Staging": res = self._constructStagingJob(pinTime, allLFNs, target_spacetoken) # elif self.type == 'Removal': @@ -731,6 +787,21 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc job, fileIDsInTheJob = res["Value"] + # If we need a token, don't use the context given in parameter + # because the one given in parameter is only with X509 creds + if job["params"].get("job_metadata", {}).get("useTokens"): + if not fts_access_token: + return S_ERROR("Job needs token support but no FTS token was supplied") + context = None + + if not context: + if not ftsServer: + ftsServer = self.ftsServer + res = self.generateContext(ftsServer, ucert, fts_access_token) + if not res["OK"]: + return res + context = res["Value"] + try: self.ftsGUID = fts3.submit(context, job) log.info(f"Got GUID {self.ftsGUID}") @@ -766,31 +837,45 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc return S_OK(fileIDsInTheJob) @staticmethod - def generateContext(ftsServer, ucert, lifetime=25200): + def generateContext(ftsServer, ucert, fts_access_token=None, lifetime=25200): """This method generates an fts3 context + Only a certificate or an fts token can be given + :param ftsServer: address of the fts3 server :param ucert: the path to the certificate to be used + :param fts_access_token: token to access FTS :param lifetime: duration (in sec) of the delegation to the FTS3 server (default is 7h, like FTS3 default) :returns: an fts3 context """ + if fts_access_token and ucert: + return S_ERROR("fts_access_token and ucert cannot be both set") + try: - context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False) + context = fts3.Context( + endpoint=ftsServer, + ucert=ucert, + request_class=ftsSSLRequest, + verify=False, + fts_access_token=fts_access_token, + ) - # Explicitely delegate to be sure we have the lifetime we want - # Note: the delegation will re-happen only when the FTS server - # decides that there is not enough timeleft. - # At the moment, this is 1 hour, which effectively means that if you do - # not submit a job for more than 1h, you have no valid proxy in FTS servers - # anymore, and all the jobs failed. So we force it when - # one third of the lifetime will be left. - # Also, the proxy given as parameter might have less than "lifetime" left - # since it is cached, but it does not matter, because in the FTS3Agent - # we make sure that we renew it often enough - td_lifetime = datetime.timedelta(seconds=lifetime) - fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3) + # The delegation only makes sense for X509 auth + if ucert: + # Explicitely delegate to be sure we have the lifetime we want + # Note: the delegation will re-happen only when the FTS server + # decides that there is not enough timeleft. + # At the moment, this is 1 hour, which effectively means that if you do + # not submit a job for more than 1h, you have no valid proxy in FTS servers + # anymore, and all the jobs failed. So we force it when + # one third of the lifetime will be left. + # Also, the proxy given as parameter might have less than "lifetime" left + # since it is cached, but it does not matter, because in the FTS3Agent + # we make sure that we renew it often enough + td_lifetime = datetime.timedelta(seconds=lifetime) + fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3) return S_OK(context) except FTS3ClientException as e: diff --git a/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg b/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg index c4b9636dab2..f2c1fa5b6f8 100644 --- a/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg @@ -163,6 +163,9 @@ Agents KickLimitPerCycle = 100 # Lifetime in sec of the Proxy we download to delegate to FTS3 (default 36h) ProxyLifetime = 129600 + # Whether we use tokens to submit jobs to FTS3 + # VERY EXPERIMENTAL + UseTokens = False } ##END FTS3Agent }