Skip to content

Commit

Permalink
Enabling remote pilot logging with Tornado
Browse files Browse the repository at this point in the history
  • Loading branch information
martynia committed Jun 14, 2022
1 parent abce213 commit 405b86a
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 35 deletions.
6 changes: 3 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,9 +984,9 @@ def _getPilotOptions(self, queue, **kwargs):
pilotLogging = opsHelper.getValue("/Services/JobMonitoring/usePilotsLoggingFlag", False)
if pilotLogging:
pilotOptions.append("-z ")
# internal extended logger logging to debug the logger itself.
extLoggingLevel = opsHelper.getValue("/Services/JobMonitoring/extLoggerLoggingLevel", "WARNING")
pilotOptions.append("-g %s" % extLoggingLevel)
# remote logger URL.
remoteLoggerURL = opsHelper.getValue("/Services/JobMonitoring/remoteLoggerURL", "localhost")
pilotOptions.append("-g %s" % remoteLoggerURL)

pilotOptions.append("--pythonVersion=3")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self):

sLog.warning("BasicPilotLoggingPlugin is being used. It only logs locally at a debug level.")

def sendMessage(self, message):
def sendMessage(self, message, UUID):
"""
Dummy sendMessage method.
Expand All @@ -29,7 +29,7 @@ def sendMessage(self, message):
sLog.debug(message)
return S_OK("Message sent")

def finaliseLogs(self, payload):
def finaliseLogs(self, payload, UUID):
"""
Dummy finaliseLogs method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,15 @@ def __init__(self):
os.makedirs(logPath)
sLog.info("Pilot logging directory:", logPath)

def sendMessage(self, message):
def sendMessage(self, message, pilotUUID):
"""
File cache sendMessage method.
File cache sendMessage method. Write the log message to a file line by line.
:param message: text to log
:param message: text to log in json format
:type message: str
:return: None
:rtype: None
:return: S_OK or S_ERROR
:rtype: dict
"""
sLog.info(message)
messageDict = json.loads(message)
pilotUUID = messageDict.get("pilotUUID", "Unspecified_ID")

res = self.pattern.match(pilotUUID)
if not res:
Expand All @@ -48,23 +45,31 @@ def sendMessage(self, message):

with open(os.path.join(self.meta["LogPath"], pilotUUID), "a") as pilotLog:
try:
pilotLog.write(message + "\n")
messageContent = json.loads(message)
if isinstance(messageContent, list):
for elem in messageContent:
pilotLog.write(elem + "\n")
else:
# it could be a string, if emitted by pilot logger StringIO handler
pilotLog.write(messageContent)
except IOError as ioerr:
sLog.error("Error writing to log file:", str(ioerr))
return S_ERROR(str(ioerr))
return S_OK("Message logged successfully for pilot: %s" % (pilotUUID,))

def finaliseLogs(self, payload):
def finaliseLogs(self, payload, logfile):
"""
Finalise a log file. Finalised logfile can be copied to a secure location.
:param logfile: payload containing log filename.
:param payload: additional info, a plugin might want to use (i.e. the system return code of a pilot script)
:type payload: dict
:param logfile: log filename (pilotUUID).
:type logfile: json representation of dict
:return: S_OK or S_ERROR
:rtype: dict
"""

logfile = json.loads(payload).get("pilotUUID", "Unspecified_ID")
returnCode = json.loads(payload).get("retCode", 0)
res = self.pattern.match(logfile)
if not res:
sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (logfile,))
Expand All @@ -73,8 +78,10 @@ def finaliseLogs(self, payload):
try:
filepath = self.meta["LogPath"]
os.rename(os.path.join(filepath, logfile), os.path.join(filepath, logfile + ".log"))
return S_OK("Log file finalised for pilot: %s" % (logfile,))
sLog.info("Log file finalised for pilot: %s (return code: %s)" % (logfile, returnCode))
return S_OK()
except Exception as err:
sLog.exception("Exception when finalising log: ", err)
return S_ERROR(str(err))

def getMeta(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,24 @@ def initializeRequest(self):

auth_sendMessage = ["Operator", "Pilot", "GenericPilot"]

def export_sendMessage(self, message):
def export_sendMessage(self, message, pilotUUID):
# def export_sendMessage(self, message, pilotUUID):
"""
The method logs messages to Tornado and writes pilot log files, one per pilot.
The method logs messages to Tornado and forwards pilot log files, one per pilot, to a relevant plugin.
The pilot is identified by its UUID.
:param message: message sent by a client, in JSON format
:param pilotUUID: pilot UUID - used to create a log file
:return: S_OK or S_ERROR if a file cannot be created or written to.
:param message: message sent by a client, a list of strings in JSON format
:param pilotUUID: pilot UUID
:return: S_OK or S_ERROR if a plugin cannot process the message.
:rtype: dict
"""
## Insert your method here, don't forget the return should be serializable
## Returned value may be an S_OK/S_ERROR
## You don't need to serialize in JSON, Tornado will do it
self.log.info("Message: ", message)
# self.log.info("Message: ", message)
# the plugin returns S_OK or S_ERROR
result = self.loggingPlugin.sendMessage(message)
# leave JSON decoding to the selected plugin:
result = self.loggingPlugin.sendMessage(message, pilotUUID)
return result

auth_getMetadata = ["Operator", "TrustedHost"]
Expand All @@ -90,16 +93,17 @@ def export_getMetadata(self):

auth_finaliseLogs = ["Operator", "Pilot", "GenericPilot"]

def export_finaliseLogs(self, payload):
def export_finaliseLogs(self, payload, pilotUUID):
"""
Finalise a log file. Finalised logfile can be copied to a secure location. if a file cache is used.
Finalise a log file. Finalised logfile can be copied to a secure location, if a file cache is used.
:param payload: data passed to the plugin finaliser, a string in the file cache plugin.
:type payload: str or dict
:param payload: data passed to the plugin finaliser.
:type payload: dict
:param pilotUUID: pilot UUID
:return: S_OK or S_ERROR (via the plugin involved)
:rtype: dict
"""

# The plugin returns the Dirac S_OK or S_ERROR object

return self.loggingPlugin.finaliseLogs(payload)
return self.loggingPlugin.finaliseLogs(payload, pilotUUID)
123 changes: 118 additions & 5 deletions src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,81 @@
import time
import tarfile
import hashlib
# for remote logging
import subprocess
import json
import os
import urllib
import ssl
import argparse
import shlex
from uuid import uuid1
try:
# For Python 3.0 and later
from urllib.request import urlopen, HTTPError, URLError
from urllib.parse import urlencode
except ImportError:
# Fall back to Python 2's urllib2
from urllib2 import urlopen, HTTPError, URLError
from urllib import urlencode
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
# formatting with microsecond accuracy, (ISO-8601)
class MicrosecondFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
ct = self.converter(record.created)
if datefmt:
s = time.strftime(datefmt, ct)
else:
t = time.strftime("%%Y-%%m-%%dT%%H:%%M:%%S", ct)
s = "%%s,%%06dZ" %% (t, (record.created - int(record.created)) * 1e6)
return s
def sendMessage(url, method, rawMessage, pilotUUID):
message = json.dumps((json.dumps(rawMessage), pilotUUID))
if major >= 3:
data = urlencode({'method': method, 'args': message}).encode('utf-8') # encode to bytes ! for python3
else:
data = urlencode({'method': method, 'args': message})
caPath = os.getenv('X509_CERT_DIR')
cert = os.getenv('X509_USER_PROXY')
context = ssl.create_default_context()
context.load_verify_locations(capath=caPath)
context.load_cert_chain(cert)
try:
res = urlopen(url, data, context=context)
# logger.info(res.read().strip())
res.close()
except URLError as err:
logger.error(err)
# setting up the logging
formatter = logging.Formatter(fmt='%%(asctime)s UTC %%(levelname)-8s %%(message)s', datefmt='%%Y-%%m-%%d %%H:%%M:%%S')
# formatter = logging.Formatter(fmt='%%(asctime)s UTC %%(levelname)-8s %%(message)s', datefmt='%%Y-%%m-%%d %%H:%%M:%%S')
formatter = MicrosecondFormatter('%%(asctime)s %%(levelname)-8s [%%(name)s] %%(message)s')
logging.Formatter.converter = time.gmtime
try:
screen_handler = logging.StreamHandler(stream=sys.stdout)
except TypeError: # python2.6
screen_handler = logging.StreamHandler(strm=sys.stdout)
screen_handler.setFormatter(formatter)
# add a string buffer handler
sio = StringIO()
buffer = logging.StreamHandler(sio)
buffer.setFormatter(formatter)
logger = logging.getLogger('pilotLogger')
logger.setLevel(logging.DEBUG)
logger.addHandler(screen_handler)
logger.addHandler(buffer)
# just logging the environment as first thing
logger.debug('===========================================================')
Expand All @@ -66,6 +129,8 @@
logger.debug(key + '=' + val)
logger.debug('===========================================================\\n')
logger.debug(sys.version)
# putting ourselves in the right directory
pilotExecDir = '%(pilotExecDir)s'
if not pilotExecDir:
Expand Down Expand Up @@ -268,12 +333,60 @@ def pilotWrapperScript(
localPilot += (
"""
# now finally launching the pilot script (which should be called dirac-pilot.py)
cmd = "$py dirac-pilot.py %s"
logger.info('Executing: %%s' %% cmd)
sys.stdout.flush()
ret = os.system(cmd)
# get the setup name an -z, if present to get remote logging in place
opt = "%s"
# generate pilot UUID
UUID = str(uuid1())
opt = opt + " --pilotUUID " + UUID
args = opt.split()
# let's see if we have remote logging enabled (-z), if not run the pilot script with os.system, as before
logger.info("dirac-pilot.py will be called: with %%s " %% args)
optParser = argparse.ArgumentParser()
optParser.add_argument('-z', '--pilotLogging', action='store_true')
optParser.add_argument('-g', '--loggerURL', default="")
optParser.add_argument('-S', '--setup', default="")
optParser.add_argument('-F', '--pilotCFGFile', default="pilot.json")
res, unknown = optParser.parse_known_args(args)
setup = res.setup
major, minor, micro, _, _ = sys.version_info
if res.pilotLogging:
loggerURL = res.loggerURL
if loggerURL:
logger.info("Remote logging activated.")
# send what we have so far.
sendMessage(loggerURL, 'sendMessage', buffer.stream.getvalue(), UUID)
#opt = opt + " --pilotUUID " + UUID
proc = subprocess.Popen(shlex.split("$py dirac-pilot.py " + opt), bufsize = 1,
stdout=sys.stdout, stderr=sys.stderr, universal_newlines = True)
proc.wait()
ret = proc.returncode
else:
# classic logger
logger.error("No Logging URL - cannot activate remote logger ")
cmd = "$py dirac-pilot.py %%s" %% opt
logger.info('Executing: %%s' %% cmd)
sys.stdout.flush()
ret = os.system(cmd)
else:
cmd = "$py dirac-pilot.py %%s" %% opt
logger.info('Executing: %%s' %% cmd)
sys.stdout.flush()
ret = os.system(cmd)
# and cleaning up
if res.pilotLogging and loggerURL:
sendMessage(loggerURL, 'finaliseLogs', {'retCode': ret}, UUID)
buffer.stream.close()
shutil.rmtree(pilotWorkingDirectory)
# did it fail?
Expand Down

0 comments on commit 405b86a

Please sign in to comment.