diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py new file mode 100644 index 00000000000..e3bf17b55aa --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py @@ -0,0 +1,89 @@ +""" :mod: PilotLoggingAgent + + PilotLoggingAgent sends Pilot log files to an SE +""" + +# # imports +import os, requests +from DIRAC import S_OK, S_ERROR +from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.Security.Locations import getHostCertificateAndKeyLocation, getCAsLocation +from DIRAC.DataManagementSystem.Client.DataManager import DataManager + + +class PilotLoggingAgent(AgentModule): + """ + .. class:: PilotLoggingAgent + + The agent sends completed pilot log files to permanent storage for analysis. + """ + + def initialize(self): + """ + agent's initalisation. Use this agent's CS information to: + Determine what Defaults/Shifter shifter proxy to use., + get the target SE name from the CS. + Obtain log file location from Tornado. + + :param self: self reference + """ + + # get shifter proxy for uploads (VO-specific shifter from the Defaults CS section) + self.shifterName = self.am_getOption("ShifterName", "GridPPLogManager") + self.am_setOption("shifterProxy", self.shifterName) + self.uploadSE = self.am_getOption("UploadSE", "UKI-LT2-IC-HEP-disk") + + certAndKeyLocation = getHostCertificateAndKeyLocation() + casLocation = getCAsLocation() + + data = {"method": "getMetadata"} + self.server = self.am_getOption("DownloadLocation", None) + + if not self.server: + return S_ERROR("No DownloadLocation set in the CS !") + try: + with requests.post(self.server, data=data, verify=casLocation, cert=certAndKeyLocation) as res: + if res.status_code not in (200, 202): + message = "Could not get metadata from %s: status %s" % (self.server, res.status_code) + self.log.error(message) + return S_ERROR(message) + resDict = res.json() + except Exception as exc: + message = "Call to server %s failed" % (self.server,) + self.log.exception(message, lException=exc) + return S_ERROR(message) + if resDict["OK"]: + meta = resDict["Value"] + self.pilotLogPath = meta["LogPath"] + else: + return S_ERROR(resDict["Message"]) + self.log.info("Pilot log files location = %s " % self.pilotLogPath) + return S_OK() + + def execute(self): + """ + Execute one agent cycle. Upload log files to the SE and register them in the DFC. + + :param self: self reference + """ + + self.log.info("Pilot files upload cycle started.") + files = [ + f + for f in os.listdir(self.pilotLogPath) + if os.path.isfile(os.path.join(self.pilotLogPath, f)) and f.endswith("log") + ] + for elem in files: + lfn = os.path.join("/gridpp/pilotlogs/", elem) + name = os.path.join(self.pilotLogPath, elem) + res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=self.uploadSE, overwrite=True) + if not res["OK"]: + self.log.error("Could not upload", "to %s: %s" % (self.uploadSE, res["Message"])) + else: + self.log.info("File uploaded: ", "LFN = %s" % res["Value"]) + try: + pass + # os.remove(name) + except Exception as excp: + self.log.exception("Cannot remove a local file after uploading", lException=excp) + return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 13f1100647e..18077b741a3 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -980,14 +980,17 @@ def _getPilotOptions(self, queue, **kwargs): else: self.log.info("DIRAC project will be installed by pilots") - # Pilot Logging defined? + # Pilot Logging defined? This enables the extended (possibly remote) logger pilotLogging = opsHelper.getValue("/Services/JobMonitoring/usePilotsLoggingFlag", False) if pilotLogging: pilotOptions.append("-z ") + # remote logger URL. + remoteLoggerURL = opsHelper.getValue("/Services/JobMonitoring/remoteLoggerURL", "localhost") + pilotOptions.append("-g %s" % remoteLoggerURL) pilotOptions.append("--pythonVersion=3") - # Debug + # Debug. Both for the standard and (if enabled) extended logger. if self.pilotLogLevel.lower() == "debug": pilotOptions.append("-ddd") diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index 130c75069d2..958c5b9f28f 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -19,6 +19,14 @@ Services } } ##END + TornadoPilotLogging + { + Protocol = https + Authorization + { + Default = authenticated + } + } JobMonitoring { Port = 9130 @@ -190,6 +198,13 @@ Agents PilotAccountingEnabled = yes } ##END + ##BEGIN PilotLoggingAgent + PilotLoggingAgent + { + PollingTime = 600 + LogLevel = DEBUG + } + ##END JobAgent { FillingModeFlag = true diff --git a/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py new file mode 100644 index 00000000000..25b4774cc71 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py @@ -0,0 +1,51 @@ +""" +Basic Pilot logging plugin. Just log messages. +""" +from DIRAC import S_OK, S_ERROR, gLogger + +sLog = gLogger.getSubLogger(__name__) + + +class BasicPilotLoggingPlugin(object): + """ + This is a no-op fallback solution class, to be used when no plugin is defined for remote logging. + Any pilot logger plugin could inherit from this class to receive a set of no-op methods required by + :class:`TornadoPilotLoggingHandler` and only overwrite needed methods. + """ + + def __init__(self): + + sLog.warning("BasicPilotLoggingPlugin is being used. It only logs locally at a debug level.") + + def sendMessage(self, message, UUID): + """ + Dummy sendMessage method. + + :param message: text to log + :type message: str + :return: None + :rtype: None + """ + sLog.debug(message) + return S_OK("Message sent") + + def finaliseLogs(self, payload, UUID): + """ + Dummy finaliseLogs method. + + :param payload: + :type payload: + :return: S_OK or S_ERROR + :rtype: dict + """ + + return S_OK("Finaliser!") + + def getMeta(self): + """ + Get metadata dummy method. + + :return: S_OK with an empty dict + :rtype: dict + """ + return S_OK({}) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py new file mode 100644 index 00000000000..b3aac55d2a0 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py @@ -0,0 +1,96 @@ +""" +Basic Pilot logging plugin. Just log messages. +""" +import os, json, re +from DIRAC import S_OK, S_ERROR, gLogger + +sLog = gLogger.getSubLogger(__name__) + + +class FileCacheLoggingPlugin(object): + """ + File cache logging. Log records are appended to a file, one for each pilot. + It is assumed that an agent will be installed together with this plugin, which will copy + the files to a safe place and clear the cache. + """ + + def __init__(self): + """ + Sets the pilot log files location for a WebServer. + + """ + # UUID pattern + self.pattern = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") + self.meta = {} + logPath = os.path.join(os.getcwd(), "pilotlogs") + self.meta["LogPath"] = logPath + if not os.path.exists(logPath): + os.makedirs(logPath) + sLog.info("Pilot logging directory:", logPath) + + def sendMessage(self, message, pilotUUID): + """ + File cache sendMessage method. Write the log message to a file line by line. + + :param message: text to log in json format + :type message: str + :return: S_OK or S_ERROR + :rtype: dict + """ + + res = self.pattern.match(pilotUUID) + if not res: + sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (pilotUUID,)) + return S_ERROR("Pilot UUID is invalid") + + with open(os.path.join(self.meta["LogPath"], pilotUUID), "a") as pilotLog: + try: + messageContent = json.loads(message) + if isinstance(messageContent, list): + for elem in messageContent: + pilotLog.write(elem + "\n") + else: + # it could be a string, if emitted by pilot logger StringIO handler + pilotLog.write(messageContent) + except IOError as ioerr: + sLog.error("Error writing to log file:", str(ioerr)) + return S_ERROR(str(ioerr)) + return S_OK("Message logged successfully for pilot: %s" % (pilotUUID,)) + + def finaliseLogs(self, payload, logfile): + """ + Finalise a log file. Finalised logfile can be copied to a secure location. + + :param payload: additional info, a plugin might want to use (i.e. the system return code of a pilot script) + :type payload: dict + :param logfile: log filename (pilotUUID). + :type logfile: json representation of dict + :return: S_OK or S_ERROR + :rtype: dict + """ + + returnCode = json.loads(payload).get("retCode", 0) + res = self.pattern.match(logfile) + if not res: + sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (logfile,)) + return S_ERROR("Pilot UUID is invalid") + + try: + filepath = self.meta["LogPath"] + os.rename(os.path.join(filepath, logfile), os.path.join(filepath, logfile + ".log")) + sLog.info("Log file finalised for pilot: %s (return code: %s)" % (logfile, returnCode)) + return S_OK() + except Exception as err: + sLog.exception("Exception when finalising log: ", err) + return S_ERROR(str(err)) + + def getMeta(self): + """ + Return any metadata related to this plugin. The "LogPath" is the minimum requirement for the dict to contain. + + :return: Dirac S_OK containing the metadata or S_ERROR if the LogPath is not defined. + :rtype: dict + """ + if "LogPath" in self.meta: + return S_OK(self.meta) + return S_ERROR("No Pilot logging directory defined") diff --git a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py new file mode 100644 index 00000000000..87af9f1a186 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py @@ -0,0 +1,109 @@ +""" Tornado-based HTTPs JobMonitoring service. +""" + + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +__RCSID__ = "$Id$" + +import os, json +from DIRAC import gLogger, S_OK, S_ERROR +from DIRAC.Core.Tornado.Server.TornadoService import TornadoService +from DIRAC.Core.DISET.RequestHandler import RequestHandler, getServiceOption +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader + +sLog = gLogger.getSubLogger(__name__) + + +class TornadoPilotLoggingHandler(TornadoService): + log = sLog + + @classmethod + def initializeHandler(cls, infoDict): + """ + Called once, at the first request. Create a directory where pilot logs will be stored. + + :param infoDict: + :return: None + """ + + cls.log.info("Handler initialised ...") + cls.log.debug("with a dict: ", str(infoDict)) + defaultOption, defaultClass = "LoggingPlugin", "BasicPilotLoggingPlugin" + configValue = getServiceOption(infoDict, defaultOption, defaultClass) + + result = ObjectLoader().loadObject("WorkloadManagementSystem.Service.%s" % (configValue,), configValue) + if not result["OK"]: + cls.log.error("Failed to load LoggingPlugin", "%s: %s" % (configValue, result["Message"])) + return result + + componentClass = result["Value"] + cls.loggingPlugin = componentClass() + cls.log.info("Loaded: PilotLoggingPlugin class", configValue) + + cls.meta = {} + logPath = os.path.join(os.getcwd(), "pilotlogs") + cls.meta["LogPath"] = logPath + if not os.path.exists(logPath): + os.makedirs(logPath) + cls.log.info("Pilot logging directory:", logPath) + + def initializeRequest(self): + """ + Called for each request. + + :return: None + """ + + self.log.info("Request initialised.. ") + + auth_sendMessage = ["Operator", "Pilot", "GenericPilot"] + + def export_sendMessage(self, message, pilotUUID): + # def export_sendMessage(self, message, pilotUUID): + """ + The method logs messages to Tornado and forwards pilot log files, one per pilot, to a relevant plugin. + The pilot is identified by its UUID. + + :param message: message sent by a client, a list of strings in JSON format + :param pilotUUID: pilot UUID + :return: S_OK or S_ERROR if a plugin cannot process the message. + :rtype: dict + """ + ## Insert your method here, don't forget the return should be serializable + ## Returned value may be an S_OK/S_ERROR + ## You don't need to serialize in JSON, Tornado will do it + # self.log.info("Message: ", message) + # the plugin returns S_OK or S_ERROR + # leave JSON decoding to the selected plugin: + result = self.loggingPlugin.sendMessage(message, pilotUUID) + return result + + auth_getMetadata = ["Operator", "TrustedHost"] + + def export_getMetadata(self): + """ + Get PilotLoggingHandler metadata. Intended to be used by a client or an agent. + + :return: S_OK containing a metadata dictionary + """ + return self.loggingPlugin.getMeta() + + auth_finaliseLogs = ["Operator", "Pilot", "GenericPilot"] + + def export_finaliseLogs(self, payload, pilotUUID): + """ + Finalise a log file. Finalised logfile can be copied to a secure location, if a file cache is used. + + :param payload: data passed to the plugin finaliser. + :type payload: dict + :param pilotUUID: pilot UUID + :return: S_OK or S_ERROR (via the plugin involved) + :rtype: dict + """ + + # The plugin returns the Dirac S_OK or S_ERROR object + + return self.loggingPlugin.finaliseLogs(payload, pilotUUID) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py index efa74fd8fd3..4a3e28db75d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py @@ -244,6 +244,29 @@ def _getPilotOptionsPerSetup(self, setup, pilotDict): return queueOptionRes queuesDict[queue] = queueOptionRes["Value"] pilotDict["Setups"][setup]["Logging"]["Queues"] = queuesDict + elif "loggingRESTService" in pilotDict["Setups"][setup]: + self.log.debug( + "Getting option of ", "/DIRAC/Setups/%s/%s" % (setup, pilotDict["Setups"][setup]["loggingRESTService"]) + ) + result = gConfig.getOption( + "/DIRAC/Setups/%s/%s" % (setup, pilotDict["Setups"][setup]["loggingRESTService"]) + ) + if not result["OK"]: + return result + optValue = result["Value"] + self.log.debug("value: ", optValue) + tornadoService = gConfig.getOptionsDict( + "/Systems/%s/%s" % (pilotDict["Setups"][setup]["loggingRESTService"], optValue) + ) + if not tornadoService["OK"]: + self.log.error(tornadoService["Message"]) + return tornadoService + pilotDict["Setups"][setup]["Logging"] = {"LoggingType": "REST_API"} + pilotDict["Setups"][setup]["Logging"]["Port"] = tornadoService["Value"]["Port"] + # host ? os.environ.get('HOSTNAME') as a fallback ? + pilotDict["Setups"][setup]["Logging"]["Host"] = tornadoService["Value"].get( + "Host", os.environ.get("HOSTNAME") + ) def syncScripts(self): """Clone the pilot scripts from the Pilot repositories (handle also extensions)""" diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py index fbb597584a7..97116ad3e69 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py @@ -46,18 +46,81 @@ import time import tarfile import hashlib +# for remote logging +import subprocess +import json +import os +import urllib +import ssl +import argparse +import shlex +from uuid import uuid1 + +try: + # For Python 3.0 and later + from urllib.request import urlopen, HTTPError, URLError + from urllib.parse import urlencode +except ImportError: + # Fall back to Python 2's urllib2 + from urllib2 import urlopen, HTTPError, URLError + from urllib import urlencode + +try: + from cStringIO import StringIO +except ImportError: + from io import StringIO + +# formatting with microsecond accuracy, (ISO-8601) + +class MicrosecondFormatter(logging.Formatter): + def formatTime(self, record, datefmt=None): + ct = self.converter(record.created) + if datefmt: + s = time.strftime(datefmt, ct) + else: + t = time.strftime("%%Y-%%m-%%dT%%H:%%M:%%S", ct) + s = "%%s,%%06dZ" %% (t, (record.created - int(record.created)) * 1e6) + return s + +def sendMessage(url, method, rawMessage, pilotUUID): + + message = json.dumps((json.dumps(rawMessage), pilotUUID)) + if major >= 3: + data = urlencode({'method': method, 'args': message}).encode('utf-8') # encode to bytes ! for python3 + else: + data = urlencode({'method': method, 'args': message}) + caPath = os.getenv('X509_CERT_DIR') + cert = os.getenv('X509_USER_PROXY') + + context = ssl.create_default_context() + context.load_verify_locations(capath=caPath) + context.load_cert_chain(cert) + try: + res = urlopen(url, data, context=context) + # logger.info(res.read().strip()) + res.close() + except URLError as err: + logger.error(err) # setting up the logging -formatter = logging.Formatter(fmt='%%(asctime)s UTC %%(levelname)-8s %%(message)s', datefmt='%%Y-%%m-%%d %%H:%%M:%%S') +# formatter = logging.Formatter(fmt='%%(asctime)s UTC %%(levelname)-8s %%(message)s', datefmt='%%Y-%%m-%%d %%H:%%M:%%S') +formatter = MicrosecondFormatter('%%(asctime)s %%(levelname)-8s [%%(name)s] %%(message)s') logging.Formatter.converter = time.gmtime try: screen_handler = logging.StreamHandler(stream=sys.stdout) except TypeError: # python2.6 screen_handler = logging.StreamHandler(strm=sys.stdout) screen_handler.setFormatter(formatter) + +# add a string buffer handler +sio = StringIO() +buffer = logging.StreamHandler(sio) +buffer.setFormatter(formatter) + logger = logging.getLogger('pilotLogger') logger.setLevel(logging.DEBUG) logger.addHandler(screen_handler) +logger.addHandler(buffer) # just logging the environment as first thing logger.debug('===========================================================') @@ -66,6 +129,8 @@ logger.debug(key + '=' + val) logger.debug('===========================================================\\n') +logger.debug(sys.version) + # putting ourselves in the right directory pilotExecDir = '%(pilotExecDir)s' if not pilotExecDir: @@ -268,12 +333,60 @@ def pilotWrapperScript( localPilot += ( """ # now finally launching the pilot script (which should be called dirac-pilot.py) -cmd = "$py dirac-pilot.py %s" -logger.info('Executing: %%s' %% cmd) -sys.stdout.flush() -ret = os.system(cmd) +# get the setup name an -z, if present to get remote logging in place +opt = "%s" +# generate pilot UUID +UUID = str(uuid1()) +opt = opt + " --pilotUUID " + UUID + +args = opt.split() + +# let's see if we have remote logging enabled (-z), if not run the pilot script with os.system, as before + +logger.info("dirac-pilot.py will be called: with %%s " %% args) +optParser = argparse.ArgumentParser() +optParser.add_argument('-z', '--pilotLogging', action='store_true') +optParser.add_argument('-g', '--loggerURL', default="") +optParser.add_argument('-S', '--setup', default="") +optParser.add_argument('-F', '--pilotCFGFile', default="pilot.json") + +res, unknown = optParser.parse_known_args(args) + +setup = res.setup + +major, minor, micro, _, _ = sys.version_info + +if res.pilotLogging: + loggerURL = res.loggerURL + + if loggerURL: + logger.info("Remote logging activated.") + # send what we have so far. + sendMessage(loggerURL, 'sendMessage', buffer.stream.getvalue(), UUID) + + #opt = opt + " --pilotUUID " + UUID + proc = subprocess.Popen(shlex.split("$py dirac-pilot.py " + opt), bufsize = 1, + stdout=sys.stdout, stderr=sys.stderr, universal_newlines = True) + proc.wait() + ret = proc.returncode + + else: + # classic logger + logger.error("No Logging URL - cannot activate remote logger ") + cmd = "$py dirac-pilot.py %%s" %% opt + logger.info('Executing: %%s' %% cmd) + sys.stdout.flush() + ret = os.system(cmd) +else: + cmd = "$py dirac-pilot.py %%s" %% opt + logger.info('Executing: %%s' %% cmd) + sys.stdout.flush() + ret = os.system(cmd) # and cleaning up +if res.pilotLogging and loggerURL: + sendMessage(loggerURL, 'finaliseLogs', {'retCode': ret}, UUID) +buffer.stream.close() shutil.rmtree(pilotWorkingDirectory) # did it fail?