Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[integration] Remote Pilot Logger to Tornado #6208

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
""" :mod: PilotLoggingAgent

PilotLoggingAgent sends Pilot log files to an SE.

.. literalinclude:: ../ConfigTemplate.cfg
:start-after: ##BEGIN PilotLoggingAgent
:end-before: ##END
:dedent: 2
:caption: PilotLoggingAgent options
"""

# # imports
import os
import time
from DIRAC import S_OK, S_ERROR, gConfig
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient


class PilotLoggingAgent(AgentModule):
"""
.. class:: PilotLoggingAgent

The agent sends completed pilot log files to permanent storage for analysis.
"""

def __init__(self, *args, **kwargs):
"""c'tor"""
super().__init__(*args, **kwargs)
fstagni marked this conversation as resolved.
Show resolved Hide resolved

def initialize(self):
"""
agent's initialisation. Use this agent's CS information to:
Determine what Defaults/Shifter shifter proxy to use.,
get the target SE name from the CS.
Obtain log file location from Tornado.

:param self: self reference
"""
# pilot logs lifetime in days
self.clearPilotsDelay = self.am_getOption("ClearPilotsDelay", 30)
# configured VOs and setup
res = getVOs()
if not res["OK"]:
return res
self.voList = res.get("Value", [])

if isinstance(self.voList, str):
self.voList = [self.voList]

self.setup = gConfig.getValue("/DIRAC/Setup", None)
martynia marked this conversation as resolved.
Show resolved Hide resolved

return S_OK()

def execute(self):
"""
Execute one agent cycle. Upload log files to the SE and register them in the DFC.
Use a shifter proxy dynamically loaded for every VO

:param self: self reference
"""
voRes = {}
for vo in self.voList:
self.opsHelper = Operations(vo=vo, setup=self.setup)
# is remote pilot logging enabled for the VO ?
pilotLogging = self.opsHelper.getValue("/Pilot/RemoteLogging", False)
if pilotLogging:
res = self.opsHelper.getOptionsDict("Shifter/DataManager")
if not res["OK"]:
voRes[vo] = "No shifter defined - skipped"
self.log.error(f"No shifter defined for VO: {vo} - skipping ...")
continue

proxyUser = res["Value"].get("User")
proxyGroup = res["Value"].get("Group")
if proxyGroup is None or proxyUser is None:
self.log.error(
f"No proxy user or group defined for pilot: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}"
)
voRes[vo] = "No proxy user or group defined - skipped"
continue

self.log.info(f"Proxy used for pilot logging: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}")
res = self.executeForVO( # pylint: disable=unexpected-keyword-arg
vo, proxyUserName=proxyUser, proxyUserGroup=proxyGroup
)
if not res["OK"]:
voRes[vo] = res["Message"]
if voRes:
for key, value in voRes.items():
self.log.error(f"Error for {key} vo; message: {value}")
voRes.update(S_ERROR("Agent cycle for some VO finished with errors"))
return voRes
return S_OK()

@executeWithUserProxy
def executeForVO(self, vo):
"""
Execute one agent cycle for a VO. It obtains VO-specific configuration pilot options from the CS:
UploadPath - the path where the VO wants to upload pilot logs. It has to start with a VO name (/vo/path).
UploadSE - Storage element where the logs will be kept.

:param str vo: vo enabled for remote pilot logging
:return: S_OK or S_ERROR
:rtype: dict
"""

self.log.info(f"Pilot files upload cycle started for VO: {vo}")
res = self.opsHelper.getOptionsDict("Pilot")
if not res["OK"]:
return S_ERROR(f"No pilot section for {vo} vo")
pilotOptions = res["Value"]
uploadSE = pilotOptions.get("UploadSE")
if uploadSE is None:
return S_ERROR("Upload SE not defined")
self.log.info(f"Pilot upload SE: {uploadSE}")

uploadPath = pilotOptions.get("UploadPath")
if uploadPath is None:
return S_ERROR(f"Upload path on SE {uploadSE} not defined")
self.log.info(f"Pilot upload path: {uploadPath}")

client = TornadoPilotLoggingClient(useCertificates=True)
resDict = client.getMetadata()

if not resDict["OK"]:
return resDict

# vo-specific source log path:
pilotLogPath = os.path.join(resDict["Value"]["LogPath"], vo)
# check for new files and upload them
if not os.path.exists(pilotLogPath):
# not a disaster, the VO is enabled, but no logfiles were ever stored.
return S_OK()
# delete old pilot log files for the vo VO
self.clearOldPilotLogs(pilotLogPath)

self.log.info(f"Pilot log files location = {pilotLogPath} for VO: {vo}")

# get finalised (.log) files from Tornado and upload them to the selected SE

files = [
f for f in os.listdir(pilotLogPath) if os.path.isfile(os.path.join(pilotLogPath, f)) and f.endswith("log")
]

if not files:
self.log.info("No files to upload for this cycle")
for elem in files:
lfn = os.path.join(uploadPath, elem)
name = os.path.join(pilotLogPath, elem)
res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=uploadSE, overwrite=True)
if not res["OK"]:
self.log.error("Could not upload", f"to {uploadSE}: {res['Message']}")
else:
self.log.verbose("File uploaded: ", f"LFN = {res['Value']}")
try:
os.remove(name)
except Exception as excp:
self.log.exception("Cannot remove a local file after uploading", lException=excp)
return S_OK()

def clearOldPilotLogs(self, pilotLogPath):
"""
Delete old pilot log files unconditionally. Assumes that pilotLogPath exists.

:param str pilotLogPath: log files directory
:return: None
:rtype: None
"""

files = os.listdir(pilotLogPath)
seconds = int(self.clearPilotsDelay) * 86400
currentTime = time.time()

for file in files:
fullpath = os.path.join(pilotLogPath, file)
modifTime = os.stat(fullpath).st_mtime
if modifTime < currentTime - seconds:
self.log.debug(f" Deleting old log : {fullpath}")
try:
os.remove(fullpath)
except Exception as excp:
self.log.exception(f"Cannot remove an old log file after {fullpath}", lException=excp)
4 changes: 0 additions & 4 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,10 +1017,6 @@ def _getPilotOptions(self, queue, **kwargs):

pilotOptions.append("--pythonVersion=3")

# Debug
if self.pilotLogLevel.lower() == "debug":
pilotOptions.append("-ddd")

# DIRAC Extensions to be used in pilots
pilotExtensionsList = opsHelper.getValue("Pilot/Extensions", [])
extensionsList = []
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
""" Test class for PilotLoggingAgent Agent
"""
import os
import time
import tempfile

import pytest
from unittest.mock import MagicMock, patch

# DIRAC Components
import DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent as plaModule
from DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent import PilotLoggingAgent
from DIRAC import gLogger, gConfig, S_OK, S_ERROR

gLogger.setLevel("DEBUG")

# Mock Objects
mockReply = MagicMock()
mockReply1 = MagicMock()
mockOperations = MagicMock()
mockTornadoClient = MagicMock()
mockDataManager = MagicMock()
mockAM = MagicMock()
mockNone = MagicMock()
mockNone.return_value = None

upDict = {
"OK": True,
"Value": {"User": "proxyUser", "Group": "proxyGroup"},
}


@pytest.fixture
def plaBase(mocker):
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.AgentModule.__init__")
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule._AgentModule__moduleProperties",
side_effect=lambda x, y=None: y,
create=True,
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.AgentModule.am_getOption", return_value=mockAM)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.getVOs",
return_value={"OK": True, "Value": ["gridpp", "lz"]},
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.gConfig.getValue", return_value="GridPP")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getValue", side_effect=mockReply)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getOptionsDict", side_effect=mockReply1
)
pla = PilotLoggingAgent()
pla.log = gLogger
pla._AgentModule__configDefaults = mockAM
return pla


@pytest.fixture
def pla_initialised(mocker, plaBase):
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.PilotLoggingAgent.executeForVO")
plaBase.initialize()
return plaBase


@pytest.fixture
def pla(mocker, plaBase):
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.TornadoPilotLoggingClient",
side_effect=mockTornadoClient,
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations", side_effect=mockOperations)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.DataManager",
side_effect=mockDataManager,
)
plaBase.initialize()
return plaBase


def test_initialize(plaBase):
res = plaBase.initialize()
assert plaBase.voList == plaModule.getVOs()["Value"]
assert plaBase.setup is not None
assert res == S_OK()


@pytest.mark.parametrize(
"mockReplyInput, expected, expectedExecOut, expected2",
[
("/Pilot/RemoteLogging", [True, False], S_OK(), upDict),
("/Pilot/RemoteLogging", [False, False], S_OK(), upDict),
("/Pilot/RemoteLogging", [True, False], S_ERROR("Execute for VO failed"), upDict),
],
)
def test_execute(pla_initialised, mockReplyInput, expected, expectedExecOut, expected2):
"""Testing a thin version of execute (executeForVO is mocked)"""
assert pla_initialised.voList == plaModule.getVOs()["Value"]
mockReply.side_effect = expected
mockReply1.return_value = expected2
# remote pilot logging on (gridpp only) and off.
pla_initialised.executeForVO.return_value = expectedExecOut
res = pla_initialised.execute()
if not any(expected):
pla_initialised.executeForVO.assert_not_called()
else:
assert pla_initialised.executeForVO.called
pla_initialised.executeForVO.assert_called_with(
"gridpp",
proxyUserName=upDict["Value"]["User"],
proxyUserGroup=upDict["Value"]["Group"],
)
assert res["OK"] == expectedExecOut["OK"]


@pytest.mark.parametrize(
"ppath, files, result",
[
("pilot/log/path/", ["file1.log", "file2.log", "file3.log"], S_OK()),
("pilot/log/path/", [], S_OK()),
],
)
def test_executeForVO(pla, ppath, files, result):
opsHelperValues = {"OK": True, "Value": {"UploadSE": "testUploadSE", "UploadPath": "/gridpp/uploadPath"}}
# full local temporary path:
filepath = os.path.join(tempfile.TemporaryDirectory().name, ppath)
# this is what getMetadata returns:
resDict = {"OK": True, "Value": {"LogPath": filepath}}
mockTornadoClient.return_value.getMetadata.return_value = resDict
mockDataManager.return_value.putAndRegister.return_value = result
if files:
os.makedirs(os.path.join(filepath, "gridpp"), exist_ok=True)
for elem in files:
open(os.path.join(filepath, "gridpp", elem), "w")
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues
pla.opsHelper = mockOperations.return_value
# success route
res = pla.executeForVO(vo="gridpp")
mockTornadoClient.assert_called_with(useCertificates=True)
assert mockTornadoClient.return_value.getMetadata.called
# only called with a non-empty file list:
if files:
assert mockDataManager.return_value.putAndRegister.called
assert res == S_OK()


def test_executeForVOMetaFails(pla):
opsHelperValues = {"OK": True, "Value": {"UploadSE": "testUploadSE", "UploadPath": "/gridpp/uploadPath"}}
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues
pla.opsHelper = mockOperations.return_value
# getMetadata call fails.
mockTornadoClient.return_value.getMetadata.return_value = {"OK": False, "Message": "Failed, sorry.."}
res = pla.executeForVO(vo="anything")
assert res["OK"] is False


@pytest.mark.parametrize(
"opsHelperValues, expectedRes",
[
({"OK": True, "Value": {"UploadPath": "/gridpp/uploadPath"}}, S_ERROR("Upload SE not defined")),
({"OK": True, "Value": {"UploadSE": "testUploadSE"}}, S_ERROR("Upload path on SE testUploadSE not defined")),
({"OK": False}, S_ERROR(f"No pilot section for gridpp vo")),
],
)
def test_executeForVOBadConfig(pla, opsHelperValues, expectedRes):
"""Testing an incomplete configuration"""
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues
pla.opsHelper = mockOperations.return_value
res = pla.executeForVO(vo="gridpp")
assert res["OK"] is False
assert res["Message"] == expectedRes["Message"]
mockTornadoClient.return_value.getMetadata.reset_mock()
mockTornadoClient.return_value.getMetadata.assert_not_called()


@pytest.mark.parametrize(
"filename, fileAge, ageLimit, expectedResult", [("survives.log", 10, 20, True), ("getsdeleted.log", 21, 20, False)]
)
def test_oldLogsCleaner(plaBase, filename, fileAge, ageLimit, expectedResult):
"""Testing old files removal"""
plaBase.clearPilotsDelay = ageLimit
filepath = tempfile.TemporaryDirectory().name
os.makedirs(filepath, exist_ok=True)
testfile = os.path.join(filepath, filename)
fd = open(testfile, "w")
fd.close()
assert os.path.exists(testfile) is True
# cannot patch os.stat globally because os.path.exists uses it !
with patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.os.stat") as mockOSStat:
mockOSStat.return_value.st_mtime = time.time() - fileAge * 86400 # file older that fileAge in seconds
plaBase.clearOldPilotLogs(filepath)
assert os.path.exists(testfile) is expectedResult
Loading
Loading