diff --git a/cax/main.py b/cax/main.py index 949468a..59477d3 100644 --- a/cax/main.py +++ b/cax/main.py @@ -63,7 +63,7 @@ def main(): ncpu = args.ncpu config.NCPU = ncpu - # Set information to update the run database + # Set information to update the run database config.set_database_log(database_log) # Check passwords and API keysspecified @@ -100,20 +100,20 @@ def main(): tasks = [ corrections.AddElectronLifetime(), # Add electron lifetime to run, which is just a function of calendar time - corrections.AddGains(), # Adds gains to a run, where this is computed using slow control information - corrections.AddDriftVelocity(), # Adds drift velocity to the run, also computed from slow control info - corrections.SetS2xyMap(), + corrections.AddGains(), # Adds gains to a run, where this is computed using slow control information + corrections.AddDriftVelocity(), # Adds drift velocity to the run, also computed from slow control info + corrections.SetS2xyMap(), corrections.SetLightCollectionEfficiency(), corrections.SetFieldDistortion(), - corrections.SetNeuralNetwork(), - #corrections.AddSlowControlInformation(), + corrections.SetNeuralNetwork(), + # corrections.AddSlowControlInformation(), data_mover.CopyPush(), # Upload data through e.g. scp or gridftp to this location where cax running - #tsm_mover.AddTSMChecksum(), # Add forgotten Checksum for runDB for TSM client. - checksum.CompareChecksums(), # See if local data corrupted + # tsm_mover.AddTSMChecksum(), # Add forgotten Checksum for runDB for TSM client. + checksum.CompareChecksums(), # See if local data corrupted clear.RetryStalledTransfer(), # If data transferring e.g. 48 hours, probably cax crashed so delete then retry clear.RetryBadChecksumTransfer(), # If bad checksum for local data and can fetch from somewhere else, delete our copy - data_mover.CopyPull(), # Download data through e.g. scp to this location + data_mover.CopyPull(), # Download data through e.g. scp to this location checksum.AddChecksum(), # Add checksum for data here so can know if corruption (useful for knowing when many good copies!) filesystem.SetPermission(), # Set any permissions (primarily for Tegner) for new data to make sure analysts can access @@ -121,7 +121,7 @@ def main(): process.ProcessBatchQueue(), # Process the data with pax process_hax.ProcessBatchQueueHax(), # Process the data with hax - clear.PurgeProcessed(), #Clear the processed data for a given version + clear.PurgeProcessed(), # Clear the processed data for a given version clear.BufferPurger() # Clear old data at some locations as specified in cax.json ] @@ -189,6 +189,8 @@ def massive(): help="Select the cluster partition") parser.add_argument('--reservation', type=str, help="Select the reservation") + parser.add_argument('--no-batch-mode', dest='no_batch_mode', action='store_true', + help="Use massive-cax without batch queue submission.") args = parser.parse_args() @@ -197,6 +199,7 @@ def massive(): exit() run_once = args.once + no_batch_mode = args.no_batch_mode config_arg = '' if args.config_file: @@ -210,7 +213,7 @@ def massive(): tag = '' if args.tag: tag = args.tag - #logging.info("Running only on tag: ", str(tag)) + # logging.info("Running only on tag: ", str(tag)) ncpu = args.ncpu config.NCPU = ncpu @@ -230,14 +233,13 @@ def massive(): elif partition == 'kicp': qos = 'xenon1t-kicp' - #else: # logging not working... + # else: # logging not working... # logging.error("Unkown partition", partition) reservation = None if args.reservation: reservation = args.reservation - # Setup logging cax_version = 'cax_v%s - ' % __version__ logging.basicConfig(filename='massive_cax.log', @@ -255,54 +257,86 @@ def massive(): ('detector', -1), ('_id', -1)) - dt = datetime.timedelta(days=1) - t0 = datetime.datetime.utcnow() - 2 * dt - + # dt = datetime.timedelta(days=1) + # t0 = datetime.datetime.utcnow() - 2 * dt - while True: # yeah yeah + while True: query = {} - #t1 = datetime.datetime.utcnow() - #if t1 - t0 < dt: + # t1 = datetime.datetime.utcnow() + # if t1 - t0 < dt: # logging.info("Iterative mode") # # See if there is something to do # query['start'] = {'$gt' : t0} # logging.info(query) - #else: + # else: # logging.info("Full mode") # t0 = t1 - if args.run: query['number'] = args.run if args.start: - query['number'] = {'$gte' : args.start} - + query['number'] = {'$gte': args.start} + if args.stop: - query['number'] = {'$lte' : args.stop} + query['number'] = {'$lte': args.stop} if tag is not '': query['tags.name'] = str(tag) docs = list(collection.find(query, sort=sort_key, - projection=['start', 'number','name', - 'detector', '_id'])) + projection=['start', 'number', 'name', + 'detector', '_id', 'processor'])) for doc in docs: + # If no_batch_mode is chosen, no jobs will submitted to batch queues + if no_batch_mode: + + # Define basic command: + basic_command = """#!/bin/bash +cax --once {config} --name {name} +""" + + print("no-batch-mode") + + # Pre-read the config file for the no-batch-mode modus: + config.set_json(os.path.abspath(args.config_file)) + task_list = config.get_config(config.get_hostname())['task_list'] + + # This is a list of tasks which refer to apply corrections to a run: + task_list_correction = ['AddElectronLifetime', 'AddGains', 'AddDriftVelocity', 'SetS2xyMap', 'SetLightCollectionEfficiency', 'SetFieldDistortion', 'SetNeuralNetwork'] + + # Start with single activities: Apply corrections only to data sets which do not have any correction yet: + if bool(set(task_list) & set(task_list_correction)) or task_list_correction == task_list: + + this_run_version = doc.get('processor', {}).get('correction_versions', {}) + if len(this_run_version) == 0: + command = basic_command.format(config=config_arg, name=doc['name']) + logging.info(command) + stdout_value = qsub.command_submission(command) + + # Return command output: + for i in stdout_value: + logging.info('AddCorrections: %s', i) + + # Nothing else + continue + + # Start with processing via batch (if no 'add corrections' was requested) job_name = '' - + if doc['detector'] == 'tpc': job_name = str(doc['number']) - job = dict(command='cax --once --run {number} '+config_arg+' --ncpu '+str(ncpu), + job = dict(command='cax --once --run {number} ' + config_arg + ' --ncpu ' + str(ncpu), number=int(job_name), ncpus=ncpu) elif doc['detector'] == 'muon_veto': job_name = doc['name'] - job = dict(command='cax --once --name {number} '+config_arg+' --ncpu '+str(ncpu), + job = dict(command='cax --once --name {number} ' + config_arg + ' --ncpu ' + str(ncpu), number=job_name, ncpus=ncpu) job['mem_per_cpu'] = mem_per_cpu @@ -310,14 +344,14 @@ def massive(): job['time'] = walltime if partition is not None: - job['partition'] = '\n#SBATCH --partition='+partition + job['partition'] = '\n#SBATCH --partition=' + partition job['extra'] = '' if qos is not None: - job['extra'] += '\n#SBATCH --qos='+qos + job['extra'] += '\n#SBATCH --qos=' + qos if reservation is not None: - job['extra'] += '\n#SBATCH --reservation='+reservation + job['extra'] += '\n#SBATCH --reservation=' + reservation script = config.processing_script(job) @@ -329,7 +363,6 @@ def massive(): logging.info("Speed break 60s because %d in queue" % qsub.get_number_in_queue(partition=partition)) time.sleep(60) - print(script) qsub.submit_job(script) @@ -338,10 +371,10 @@ def massive(): if run_once: break - #else: - # pace = 5 - # logging.info("Done, waiting %d minutes" % pace) - # time.sleep(60*pace) # Pace 5 minutes + else: + pace = 5 + logging.info("Done, waiting %d minutes" % pace) + time.sleep(60 * pace) # Pace 5 minutes def move(): @@ -373,7 +406,7 @@ def remove(): help="Location of file or folder to be removed") parser.add_argument('--disable_database_update', action='store_true', help="Disable the update function the run data base") - parser.add_argument('--run', type=int, required= True, + parser.add_argument('--run', type=int, required=True, help="Run number to process") args = parser.parse_args() @@ -386,21 +419,23 @@ def remove(): filesystem.RemoveSingle(args.location).go(args.run) + def stray(): parser = argparse.ArgumentParser(description="Find stray files.") parser.add_argument('--delete', action='store_true', help="Delete strays (default: false)") - args = parser.parse_args() + # args = parser.parse_args() config.mongo_password() filesystem.FindStrays().go() + def status(): - #Ask the database for the actual status of the file or folder: - + # Ask the database for the actual status of the file or folder: + parser = argparse.ArgumentParser(description="Check the database status") - + parser.add_argument('--host', type=str, required=True, help="Select the host") parser.add_argument('--status', type=str, required=True, @@ -431,48 +466,49 @@ def status(): console.setFormatter(formatter) # add the handler to the root logger logging.getLogger('').addHandler(console) - # Set information to update the run database config.set_database_log(database_log) config.mongo_password() - + filesystem.StatusSingle(args.host, args.status).go() -#Rucio Stuff +# Rucio Stuff + + def ruciax(): parser = argparse.ArgumentParser(description="Copying All kinds of XENON1T " "data.") parser.add_argument('--once', action='store_true', help="Run all tasks just one, then exits") - + parser.add_argument('--config', action='store', type=str, dest='config_file', help="Load a custom .json config file into cax") - + parser.add_argument('--rucio-rule', type=str, dest='config_rule', - help="Load the a rule file") - + help="Load the a rule file") + parser.add_argument('--log', dest='log', type=str, default='info', help="Logging level e.g. debug") - + parser.add_argument('--log-file', dest='logfile', type=str, default='ruciax.log', help="Specify a certain logfile") - + parser.add_argument('--disable_database_update', action='store_true', help="Disable the update function the run data base") - + parser.add_argument('--run', type=int, help="Select a single run by its number") - + parser.add_argument('--name', type=str, help="Select a single run by its name") - + parser.add_argument('--host', type=str, help="Host to pretend to be") - - #parser for rucio arguments + + # parser for rucio arguments parser.add_argument('--rucio-scope', type=str, dest='rucio_scope', help="Rucio: Choose your scope") @@ -481,11 +517,10 @@ def ruciax(): parser.add_argument('--rucio-upload', type=str, dest='rucio_upload', help="Rucio: Select a data file or data set") - - + args = parser.parse_args() - - #This one is mandatory: hardcoded science run number! + + # This one is mandatory: hardcoded science run number! config.set_rucio_campaign("001") log_level = getattr(logging, args.log.upper()) @@ -495,14 +530,14 @@ def ruciax(): run_once = args.once database_log = not args.disable_database_update - # Set information to update the run database + # Set information to update the run database config.set_database_log(database_log) # Check passwords and API keysspecified config.mongo_password() # Set information for rucio transfer rules (config file) - config.set_rucio_rules( args.config_rule) + config.set_rucio_rules(args.config_rule) # Setup logging cax_version = 'ruciax_v%s - ' % __version__ @@ -512,7 +547,7 @@ def ruciax(): '%(message)s') logging.info('Daemon is starting') logging.info('Logfile: %s', args.logfile) - + # define a Handler which writes INFO messages or higher to the sys.stderr console = logging.StreamHandler() console.setLevel(log_level) @@ -548,19 +583,19 @@ def ruciax(): while True: for task in tasks: name = task.__class__.__name__ - + # Skip tasks that user did not specify if user_tasks and name not in user_tasks: continue - + logging.info("Executing %s." % name) try: - if args.name is not None: - task.go(args.name) - else: - task.go(args.run) - + if args.name is not None: + task.go(args.name) + else: + task.go(args.run) + except Exception as e: logging.fatal("Exception caught from task %s" % name, exc_info=True) @@ -574,6 +609,7 @@ def ruciax(): logging.info('Sleeping.') time.sleep(60) + def massiveruciax(): # Command line arguments setup parser = argparse.ArgumentParser(description="Submit ruciax tasks to batch queue.") @@ -588,11 +624,11 @@ def massiveruciax(): help="Load a custom .json config file into cax") parser.add_argument('--run', type=int, help="Select a single run") - parser.add_argument('--from-run', dest='from_run', type=int, + parser.add_argument('--from-run', dest='from_run', type=int, help="Choose: run number start") - parser.add_argument('--to-run', dest='to_run', type=int, + parser.add_argument('--to-run', dest='to_run', type=int, help="Choose: run number end") - parser.add_argument('--last-days', dest='last_days', type=int, + parser.add_argument('--last-days', dest='last_days', type=int, help="Choose: Focus up/downloads only on the last days") parser.add_argument('--log-file', dest='logfile', type=str, default='massive_rucio.log', help="Specify a certain logfile") @@ -602,42 +638,41 @@ def massiveruciax(): parser.add_argument('--skip-error', action='store_true', dest='skip_error', help="Skip all database entries with an error") - args = parser.parse_args() run_once = args.once - #Check on from-run/to-run option: + # Check on from-run/to-run option: beg_run = -1 end_run = -1 run_window = False run_window_lastruns = False run_lastdays = False - - if args.from_run == None and args.to_run == None and args.last_days == None: - pass - elif (args.from_run != None and args.to_run == None) or (args.from_run == None and args.to_run != None): - logging.info("Select (tpc) runs between %s and %s", args.from_run, args.to_run) - logging.info("Make a full selection!") - elif args.from_run != None and args.to_run != None and args.from_run > 0 and args.to_run > 0 and args.from_run > args.to_run: - logging.info("The last run is smaller then the first run!") - logging.inof("--> Ruciax exits here") - exit() - elif args.from_run != None and args.to_run != None and args.from_run > 0 and args.to_run > 0 and args.from_run < args.to_run: - logging.info("Start (tpc) runs between %s and %s", args.from_run, args.to_run) - run_window = True - beg_run = args.from_run - end_run = args.to_run - elif args.from_run != None and args.to_run != None and args.from_run > 0 and args.to_run == -1: - logging.info("Start (tpc) runs between %s and to last", args.from_run) - run_window_lastruns = True - beg_run = args.from_run - end_run = args.to_run - elif args.from_run == None and args.to_run == None and args.last_days != None: - run_lastdays = True - - #configure cax.json + + if args.from_run is None and args.to_run is None and args.last_days is None: + pass + elif (args.from_run is not None and args.to_run is None) or (args.from_run is None and args.to_run is not None): + logging.info("Select (tpc) runs between %s and %s", args.from_run, args.to_run) + logging.info("Make a full selection!") + elif args.from_run is not None and args.to_run is not None and args.from_run > 0 and args.to_run > 0 and args.from_run > args.to_run: + logging.info("The last run is smaller then the first run!") + logging.inof("--> Ruciax exits here") + exit() + elif args.from_run is not None and args.to_run is not None and args.from_run > 0 and args.to_run > 0 and args.from_run < args.to_run: + logging.info("Start (tpc) runs between %s and %s", args.from_run, args.to_run) + run_window = True + beg_run = args.from_run + end_run = args.to_run + elif args.from_run is not None and args.to_run is not None and args.from_run > 0 and args.to_run == -1: + logging.info("Start (tpc) runs between %s and to last", args.from_run) + run_window_lastruns = True + beg_run = args.from_run + end_run = args.to_run + elif args.from_run is None and args.to_run is None and args.last_days is not None: + run_lastdays = True + + # configure cax.json config_arg = '' if args.config_file: if not os.path.isfile(args.config_file): @@ -647,38 +682,37 @@ def massiveruciax(): args.config_file) config.set_json(args.config_file) config_arg = '--config ' + os.path.abspath(args.config_file) - #print(os.environ["HOME"]) - + # print(os.environ["HOME"]) + # Setup logging - log_path = {"xe1t-datamanager": os.path.join(os.environ["HOME"],"rucio_log"), - "midway-login1": os.path.join(os.environ["HOME"],"rucio_log"), - "tegner-login-1": os.path.join(os.environ["HOME"],"rucio_log"), - "login": os.path.join(os.environ["HOME"],"rucio_log")} - - #Check if log path exists and create it if not + log_path = {"xe1t-datamanager": os.path.join(os.environ["HOME"], "rucio_log"), + "midway-login1": os.path.join(os.environ["HOME"], "rucio_log"), + "tegner-login-1": os.path.join(os.environ["HOME"], "rucio_log"), + "login": os.path.join(os.environ["HOME"], "rucio_log")} + + # Check if log path exists and create it if not if not os.path.exists(log_path[config.get_hostname()]): os.makedirs(log_path[config.get_hostname()]) - #Configure the massive-ruciax logging + # Configure the massive-ruciax logging cax_version = 'massive_ruciax_v%s - ' % __version__ logging.basicConfig(filename="{logp}/{logf}".format(logp=log_path[config.get_hostname()], logf=args.logfile), level=logging.INFO, format=cax_version + '%(asctime)s [%(levelname)s] ' '%(message)s') - + # Check Mongo connection config.mongo_password() - - #Define additional file to define the rules: - #Check if massive ruciax is used to upload or to verify data sets: + + # Define additional file to define the rules: + # Check if massive ruciax is used to upload or to verify data sets: verfication_only = False if args.config_rule: - abs_config_rule = os.path.abspath( args.config_rule ) - logging.info("Rucio Rule File: %s", abs_config_rule) - rucio_rule = "--rucio-rule {rulefile}".format( rulefile=abs_config_rule ) - verfication_only = json.loads(open(abs_config_rule, 'r').read())[0]['verification_only'] + abs_config_rule = os.path.abspath(args.config_rule) + logging.info("Rucio Rule File: %s", abs_config_rule) + rucio_rule = "--rucio-rule {rulefile}".format(rulefile=abs_config_rule) + verfication_only = json.loads(open(abs_config_rule, 'r').read())[0]['verification_only'] else: - rucio_rule = "" - verfication_only = False - + rucio_rule = "" + verfication_only = False # Establish mongo connection collection = config.mongo_collection() @@ -687,166 +721,162 @@ def massiveruciax(): ('detector', -1), ('_id', -1)) - #Construct the pre-basic bash script(s) from rucio_mover.RucioConfig() + # Construct the pre-basic bash script(s) from rucio_mover.RucioConfig() RucioBashConfig = rucio_mover.RucioConfig() - - + dt = datetime.timedelta(days=1) - - while True: # yeah yeah - #query = {'detector':'tpc'} + + while True: + # query = {'detector':'tpc'} query = {} - - + if args.run: query['number'] = args.run - if run_window == True: - query['number'] = { '$lt': end_run+1, '$gt': beg_run-1 } - - if run_window_lastruns == True: - query['number'] = { '$gt': beg_run-1 } - - if run_lastdays == True: - t1 = datetime.datetime.utcnow() - t0 = datetime.datetime.utcnow() - int(args.last_days) * dt - if t1 - t0 < (int(args.last_days) * dt): - logging.info("Run ruciax up/downloads only on latest %s days", args.last_days) - #See if there is something to do - query['start'] = {'$gt' : t0} - - #Select specific data sets + if run_window: + query['number'] = {'$lt': end_run + 1, '$gt': beg_run - 1} + + if run_window_lastruns: + query['number'] = {'$gt': beg_run - 1} + + if run_lastdays: + t1 = datetime.datetime.utcnow() + t0 = datetime.datetime.utcnow() - int(args.last_days) * dt + if t1 - t0 < (int(args.last_days) * dt): + logging.info("Run ruciax up/downloads only on latest %s days", args.last_days) + # See if there is something to do + query['start'] = {'$gt': t0} + + # Select specific data sets selection = {"detector": True, - "number" : True, - "data" : True, - "_id" : True, - #"tags" : True, + "number": True, + "data": True, + "_id": True, + # "tags" : True, "name": True} docs = list(collection.find(query, selection, sort=sort_key) - ) - + ) + for doc in docs: - - - #Select a single run for rucio upload (massive-ruciax -> ruciax) + + # Select a single run for rucio upload (massive-ruciax -> ruciax) if args.run: - if args.run != doc['number']: - continue - - #Double check if a 'data' field is defind in doc + if args.run != doc['number']: + continue + + # Double check if a 'data' field is defind in doc if 'data' not in doc: - continue - - #Check now if the data field is larger then zero: + continue + + # Check now if the data field is larger then zero: if len(doc['data']) == 0: - continue - - #Double check that rucio uploads are only triggered when data exists at the host + continue + + # Double check that rucio uploads are only triggered when data exists at the host host_data = False rucio_data = False host_data_error = False for idoc in doc['data']: - if idoc['host'] == config.get_hostname() and idoc['status'] == "transferred": - host_data = True - if idoc['host'] == config.get_hostname() and idoc['status'] == "error": - host_data = True - host_data_error = True - if idoc['host'] == "rucio-catalogue": - rucio_data = True - - if host_data == False and args.on_disk_only == True: - continue - - if host_data == True and host_data_error == True and args.skip_error == True: - continue - - #massive-ruciax does not care about data which are already - #in the rucio catalogue for upload - if rucio_data == True and verfication_only == False: - continue - - #Get the local time: + if idoc['host'] == config.get_hostname() and idoc['status'] == "transferred": + host_data = True + if idoc['host'] == config.get_hostname() and idoc['status'] == "error": + host_data = True + host_data_error = True + if idoc['host'] == "rucio-catalogue": + rucio_data = True + + if not host_data and args.on_disk_only: + continue + + if host_data and host_data_error and args.skip_error: + continue + + # massive-ruciax does not care about data which are already + # in the rucio catalogue for upload + if rucio_data and not verfication_only: + continue + + # Get the local time: local_time = time.strftime("%Y%m%d_%H%M%S", time.localtime()) - - #Detector choice + + # Detector choice run = "" runlogfile = "" if doc['detector'] == 'tpc': - run = "--run {number}".format(number=doc['number']) - runlogfile = "--log-file {log_path}/ruciax_log_{number}_{timestamp}.txt".format( - log_path=log_path[config.get_hostname()], - number=doc['number'], - timestamp=local_time) - + run = "--run {number}".format(number=doc['number']) + runlogfile = "--log-file {log_path}/ruciax_log_{number}_{timestamp}.txt".format( + log_path=log_path[config.get_hostname()], + number=doc['number'], + timestamp=local_time) + elif doc['detector'] == 'muon_veto': - run = "--name {number}".format(number=doc['name']) - runlogfile = "--log-file {log_path}/ruciax_log_{number}_{timestamp}.txt".format( - log_path=log_path[config.get_hostname()], - number=doc['number'], - timestamp=local_time) - - #Define the job: + run = "--name {number}".format(number=doc['name']) + runlogfile = "--log-file {log_path}/ruciax_log_{number}_{timestamp}.txt".format( + log_path=log_path[config.get_hostname()], + number=doc['number'], + timestamp=local_time) + + # Define the job: job = "{conf} {run} {rucio_rule} {runlogfile}".format( conf=config_arg, run=run, rucio_rule=rucio_rule, runlogfile=runlogfile) - - #start the time for an upload: + + # start the time for an upload: time_start = datetime.datetime.utcnow() - - #Create the command - command=""" + + # Create the command + command = """ ruciax --once {job} """.format(job=job) - pre_bash_command = RucioBashConfig.load_host_config( config.get_hostname(), "py3" ).format( - account=config.get_config("rucio-catalogue")['rucio_account'] - ) - + pre_bash_command = RucioBashConfig.load_host_config(config.get_hostname(), "py3").format( + account=config.get_config("rucio-catalogue")['rucio_account'] + ) + command = pre_bash_command + command - + logging.info(command) - - - #Submit the command + + # Submit the command sc = qsub.create_script(command) - execute = subprocess.Popen( ['sh', sc.name] , - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, shell=False ) + execute = subprocess.Popen(['sh', sc.name], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, shell=False) stdout_value, stderr_value = execute.communicate() stdout_value = stdout_value.decode("utf-8") stdout_value = stdout_value.split("\n") - - #Return command output: + + # Return command output: for i in stdout_value: - logging.info('massive-ruciax: %s', i) - - #Manage the upload time: + logging.info('massive-ruciax: %s', i) + + # Manage the upload time: time_end = datetime.datetime.utcnow() - diff = time_end-time_start + diff = time_end - time_start dd = divmod(diff.total_seconds(), 60) - - #delete script: - qsub.delete_script( sc ) - + + # delete script: + qsub.delete_script(sc) + logging.info("+--------------------------->>>") - logging.info("| Summary: massive-ruciax for run/name: %s/%s", doc['number'], doc['name'] ) + logging.info("| Summary: massive-ruciax for run/name: %s/%s", doc['number'], doc['name']) logging.info("| Configuration script: %s", runlogfile) logging.info("| Rucio-rule script: %s", abs_config_rule) - logging.info("| Run time of ruciax: %s min %s", str(dd[0]), str(dd[1]) ) + logging.info("| Run time of ruciax: %s min %s", str(dd[0]), str(dd[1])) logging.info("+------------------------------------------------->>>") if run_once: - break + break else: - logging.info('Sleeping.') - time.sleep(60) + logging.info('Sleeping.') + time.sleep(60) + - def remove_from_tsm(): parser = argparse.ArgumentParser(description="Remove data and notify" " the run database.") @@ -858,7 +888,7 @@ def remove_from_tsm(): help="Select a single run by number") parser.add_argument('--name', type=str, required=False, help="Select a single run by name") - + args = parser.parse_args() database_log = not args.disable_database_update @@ -866,12 +896,12 @@ def remove_from_tsm(): # Set information to update the run database config.set_database_log(database_log) config.mongo_password() - + number_name = None if args.name is not None: - number_name = args.name + number_name = args.name else: - number_name = args.run + number_name = args.run filesystem.RemoveTSMEntry(args.location).go(number_name) @@ -887,12 +917,12 @@ def massive_tsmclient(): help="Load a custom .json config file into cax") parser.add_argument('--run', type=int, help="Select a single run") - parser.add_argument('--from-run', dest='from_run', type=int, + parser.add_argument('--from-run', dest='from_run', type=int, help="Choose: run number start") - parser.add_argument('--to-run', dest='to_run', type=int, + parser.add_argument('--to-run', dest='to_run', type=int, help="Choose: run number end") - parser.add_argument('--last-days', dest='last_days', type=int, - help="Choose: Focus up/downloads only on the last days") + parser.add_argument('--last-days', dest='last_days', type=int, + help="Choose: Focus up/downloads only on the last days") parser.add_argument('--log', dest='log', type=str, default='INFO', help="Logging level e.g. debug") parser.add_argument('--logfile', dest='logfile', type=str, default='massive_tsm.log', @@ -901,42 +931,41 @@ def massive_tsmclient(): help="Disable the update function the run data base") args = parser.parse_args() - - log_level = args.log + + # log_level = args.log run_once = args.once - #Check on from-run/to-run option: + # Check on from-run/to-run option: beg_run = -1 end_run = -1 run_window = False run_window_lastruns = False run_lastdays = False - - if args.from_run == None and args.to_run == None and args.last_days == None: - pass - elif (args.from_run != None and args.to_run == None) or (args.from_run == None and args.to_run != None): - logging.info("Select (tpc) runs between %s and %s", args.from_run, args.to_run) - logging.info("Make a full selection!") - elif args.from_run != None and args.to_run != None and args.from_run > 0 and args.to_run > 0 and args.from_run > args.to_run: - logging.info("The last run is smaller then the first run!") - logging.inof("--> Ruciax exits here") - exit() - elif args.from_run != None and args.to_run != None and args.from_run > 0 and args.to_run > 0 and args.from_run < args.to_run: - logging.info("Start (tpc) runs between %s and %s", args.from_run, args.to_run) - run_window = True - beg_run = args.from_run - end_run = args.to_run - elif args.from_run != None and args.to_run != None and args.from_run > 0 and args.to_run == -1: - logging.info("Start (tpc) runs between %s and to last", args.from_run) - run_window_lastruns = True - beg_run = args.from_run - end_run = args.to_run - elif args.from_run == None and args.to_run == None and args.last_days != None: - run_lastdays = True - - - #configure cax.json + + if args.from_run is None and args.to_run is None and args.last_days is None: + pass + elif (args.from_run is not None and args.to_run is None) or (args.from_run is None and args.to_run is not None): + logging.info("Select (tpc) runs between %s and %s", args.from_run, args.to_run) + logging.info("Make a full selection!") + elif args.from_run is not None and args.to_run is not None and args.from_run > 0 and args.to_run > 0 and args.from_run > args.to_run: + logging.info("The last run is smaller then the first run!") + logging.inof("--> Ruciax exits here") + exit() + elif args.from_run is not None and args.to_run is not None and args.from_run > 0 and args.to_run > 0 and args.from_run < args.to_run: + logging.info("Start (tpc) runs between %s and %s", args.from_run, args.to_run) + run_window = True + beg_run = args.from_run + end_run = args.to_run + elif args.from_run is not None and args.to_run is not None and args.from_run > 0 and args.to_run == -1: + logging.info("Start (tpc) runs between %s and to last", args.from_run) + run_window_lastruns = True + beg_run = args.from_run + end_run = args.to_run + elif args.from_run is None and args.to_run is None and args.last_days is not None: + run_lastdays = True + + # configure cax.json config_arg = '' if args.config_file: if not os.path.isfile(args.config_file): @@ -946,23 +975,23 @@ def massive_tsmclient(): args.config_file) config.set_json(args.config_file) config_arg = os.path.abspath(args.config_file) - + # Setup logging log_path = {"xe1t-datamanager": "/home/xe1ttransfer/tsm_log", "midway-login1": "n/a", "tegner-login-1": "/afs/pdc.kth.se/home/b/bobau/tsm_log"} - + if log_path[config.get_hostname()] == "n/a": print("Modify the log path in main.py") exit() - + if not os.path.exists(log_path[config.get_hostname()]): os.makedirs(log_path[config.get_hostname()]) cax_version = 'massive_tsm-client_v%s - ' % __version__ logging.basicConfig(filename="{logp}/{logf}".format(logp=log_path[config.get_hostname()], logf=args.logfile), level="INFO", format=cax_version + '%(asctime)s [%(levelname)s] ' '%(message)s') - + # define a Handler which writes INFO messages or higher to the sys.stderr console = logging.StreamHandler() console.setLevel("INFO") @@ -974,141 +1003,140 @@ def massive_tsmclient(): console.setFormatter(formatter) # add the handler to the root logger logging.getLogger('').addHandler(console) - + # Check Mongo connection config.mongo_password() # Establish mongo connection collection = config.mongo_collection() - + sort_key = (('start', -1), ('number', -1), ('detector', -1), ('_id', -1)) - - + dt = datetime.timedelta(days=1) - - while True: # yeah yeah - + + while True: # yeah yeah + query = {} - + if args.run: query['number'] = args.run - if run_window == True: - query['number'] = { '$lt': end_run+1, '$gt': beg_run-1 } - - if run_window_lastruns == True: - query['number'] = { '$gt': beg_run-1 } - - if run_lastdays == True: - t1 = datetime.datetime.utcnow() - t0 = datetime.datetime.utcnow() - int(args.last_days) * dt - - if t1 - t0 < (int(args.last_days) * dt): - logging.info("Run massive-tsm for up-/downloads only on latest %s days", args.last_days) - #See if there is something to do - query['start'] = {'$gt' : t0} - - #Select specific data sets + if run_window: + query['number'] = {'$lt': end_run + 1, '$gt': beg_run - 1} + + if run_window_lastruns: + query['number'] = {'$gt': beg_run - 1} + + if run_lastdays: + t1 = datetime.datetime.utcnow() + t0 = datetime.datetime.utcnow() - int(args.last_days) * dt + + if t1 - t0 < (int(args.last_days) * dt): + logging.info("Run massive-tsm for up-/downloads only on latest %s days", args.last_days) + # See if there is something to do + query['start'] = {'$gt': t0} + + # Select specific data sets selection = {"detector": True, - "number" : True, - "data" : True, - "_id" : True, - #"tags" : True, + "number": True, + "data": True, + "_id": True, + # "tags" : True, "name": True} docs = list(collection.find(query, selection, sort=sort_key) - ) + ) for doc in docs: - - #Select a single run for rucio upload (massive-ruciax -> ruciax) + + # Select a single run for rucio upload (massive-ruciax -> ruciax) if args.run: - if args.run != doc['number']: - continue - - #Double check if a 'data' field is defind in doc (runDB entry) + if args.run != doc['number']: + continue + + # Double check if a 'data' field is defind in doc (runDB entry) if 'data' not in doc: - continue - - #Double check that tsm uploads are only triggered when data exists at the host + continue + + # Double check that tsm uploads are only triggered when data exists at the host host_data = False tsm_data = False for idoc in doc['data']: - if idoc['host'] == config.get_hostname() and idoc['status'] == "transferred": - host_data = True - if idoc['host'] == "tsm-server" and idoc['status'] == "transferred": - tsm_data = True - - if host_data == False: - #Do not try upload data which are not registered in the runDB - continue - #make sure now that only "new data" are uploaded - if tsm_data == True: - continue - - - #Detector choice + if idoc['host'] == config.get_hostname() and idoc['status'] == "transferred": + host_data = True + if idoc['host'] == "tsm-server" and idoc['status'] == "transferred": + tsm_data = True + + if not host_data: + # Do not try upload data which are not registered in the runDB + continue + # make sure now that only "new data" are uploaded + if tsm_data: + continue + + # Detector choice local_time = time.strftime("%Y%m%d_%H%M%S", time.localtime()) if doc['detector'] == 'tpc': - job = "--config {conf} --run {number} --log-file {log_path}/tsm_log_{number}_{timestamp}.txt".format( - conf=config_arg, - number=doc['number'], - log_path=log_path[config.get_hostname()], - timestamp=local_time) - + job = "--config {conf} --run {number} --log-file {log_path}/tsm_log_{number}_{timestamp}.txt".format( + conf=config_arg, + number=doc['number'], + log_path=log_path[config.get_hostname()], + timestamp=local_time) + elif doc['detector'] == 'muon_veto': - job = "--config {conf} --name {number} --log-file {log_path}/tsm_log_{number}_{timestamp}.txt".format( - conf=config_arg, - number=doc['name'], - log_path=log_path[config.get_hostname()], - timestamp=local_time) - - #start the time for an upload: + job = "--config {conf} --name {number} --log-file {log_path}/tsm_log_{number}_{timestamp}.txt".format( + conf=config_arg, + number=doc['name'], + log_path=log_path[config.get_hostname()], + timestamp=local_time) + + # start the time for an upload: time_start = datetime.datetime.utcnow() - - #Create the command and execute the job only once - command="cax --once {job}".format(job=job) - - #Disable runDB notifications - if args.disable_database_update == True: - command = command + " --disable_database_update" - + + # Create the command and execute the job only once + command = "cax --once {job}".format(job=job) + + # Disable runDB notifications + if args.disable_database_update: + command = command + " --disable_database_update" + logging.info("Command: %s", command) - + command = command.replace("\n", "") command = command.split(" ") - execute = subprocess.Popen( command , - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, shell=False ) + execute = subprocess.Popen(command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, shell=False) stdout_value, stderr_value = execute.communicate() stdout_value = stdout_value.decode() - + stdout_value = str(stdout_value).split("\n") stderr_value = str(stderr_value).split("\n") - - stdout_value.remove('') #remove '' entries from an array - - #Return command output: + + stdout_value.remove('') # remove '' entries from an array + + # Return command output: for i in stdout_value: - logging.info("massive-tsm: %s", i) - - #Manage the upload time: + logging.info("massive-tsm: %s", i) + + # Manage the upload time: time_end = datetime.datetime.utcnow() - diff = time_end-time_start + diff = time_end - time_start dd = divmod(diff.total_seconds(), 60) logging.info("Upload time: %s min %s", str(dd[0]), str(dd[1])) - + if run_once: - break + break else: - logging.info('Sleeping.') - time.sleep(60) + logging.info('Sleeping.') + time.sleep(60) + def cax_tape_log_file(): """Analyses the tsm storage""" @@ -1133,63 +1161,64 @@ def cax_tape_log_file(): # Establish mongo connection collection = config.mongo_collection() - sort_key = (('number', -1), - ('detector', -1), - ('_id', -1)) + # sort_key = (('number', -1), + # ('detector', -1), + # ('_id', -1)) if args.monitor == "logfile": - """load the logfile watcher class""" - a = tsm_mover.TSMLogFileCheck(args.log_path) + """load the logfile watcher class""" + tsm_mover.TSMLogFileCheck(args.log_path) elif args.monitor == "database": - """load the data base watcher class""" - total_uploaded_amount = 0 - total_uploaded_datasets = 0 + """load the data base watcher class""" + total_uploaded_amount = 0 + total_uploaded_datasets = 0 - while True: # yeah yeah + while True: # yeah yeah - query = {} + query = {} - docs = list(collection.find(query)) + docs = list(collection.find(query)) - for doc in docs: + for doc in docs: - if 'data' not in doc: - continue + if 'data' not in doc: + continue - nb_tsm_hosts = 0 - for idoc in doc['data']: - if idoc['host'] == "tsm-server" and idoc['location'] != "n/a" and idoc['location'].find("tsm/") >= 0: - nb_tsm_hosts+=1 - logging.info("Dataset: %s at host %s", idoc['location'], idoc['host'] ) - fsize = tsm_mover.TSMDatabaseCheck().get_info(idoc['location']) - total_uploaded_datasets += 1 - logging.info( "Total file size of the individual dataset: %s mb", fsize) - total_uploaded_amount += fsize + nb_tsm_hosts = 0 + for idoc in doc['data']: + if idoc['host'] == "tsm-server" and idoc['location'] != "n/a" and idoc['location'].find("tsm/") >= 0: + nb_tsm_hosts += 1 + logging.info("Dataset: %s at host %s", idoc['location'], idoc['host']) + fsize = tsm_mover.TSMDatabaseCheck().get_info(idoc['location']) + total_uploaded_datasets += 1 + logging.info("Total file size of the individual dataset: %s mb", fsize) + total_uploaded_amount += fsize - if nb_tsm_hosts == 0: - continue + if nb_tsm_hosts == 0: + continue - if run_once: - break + if run_once: + break - logging.info("Total amount of uploaded data: %s mb", total_uploaded_amount) - logging.info("Total amount of uploaded data sets: %s", total_uploaded_datasets) + logging.info("Total amount of uploaded data: %s mb", total_uploaded_amount) + logging.info("Total amount of uploaded data sets: %s", total_uploaded_datasets) elif args.monitor == "checkstatus": - if args.status == None: - logging.info("ATTENTION: Specify status by --status [transferred/transferring/error]") - return 0 - - query = {} - docs = list(collection.find(query)) - number_name = None - if args.name is not None: - number_name = args.name - else: - number_name = args.run + if args.status is None: + logging.info("ATTENTION: Specify status by --status [transferred/transferring/error]") + return 0 + + query = {} + docs = list(collection.find(query)) + number_name = None + if args.name is not None: + number_name = args.name + else: + number_name = args.run + + tsm_mover.TSMStatusCheck(docs, args.status).go(number_name) - tsm_mover.TSMStatusCheck(docs, args.status).go(number_name) def ruciax_status(): parser = argparse.ArgumentParser(description="Allow to check the run database for rucio entries") @@ -1220,13 +1249,14 @@ def ruciax_status(): number_name = None if args.name is not None: - number_name = args.name + number_name = args.name else: - number_name = args.run + number_name = args.run if args.mode == "DoubleEntries": - print("Check if there are more than one entries in the runDB") - filesystem.RuciaxTest(args.mode, args.location).go(number_name) + print("Check if there are more than one entries in the runDB") + filesystem.RuciaxTest(args.mode, args.location).go(number_name) + def remove_from_rucio(): parser = argparse.ArgumentParser(description="Remove data and notify" @@ -1241,7 +1271,7 @@ def remove_from_rucio(): help="Select a single run by name") parser.add_argument('--status', type=str, required=False, help="Select the status in the runDB") - + args = parser.parse_args() database_log = not args.disable_database_update @@ -1249,24 +1279,25 @@ def remove_from_rucio(): # Set information to update the run database config.set_database_log(database_log) config.mongo_password() - + number_name = None if args.name is not None: - number_name = args.name + number_name = args.name else: - number_name = args.run + number_name = args.run filesystem.RemoveRucioEntry(args.location, args.status).go(number_name) + def ruciax_purge(): - #Ask the database for the actual status of the file or folder: - + # Ask the database for the actual status of the file or folder: + parser = argparse.ArgumentParser(description="Check the database status") - + parser.add_argument('--run', type=int, required=False, help="Select a single run by number") parser.add_argument('--name', type=str, required=False, - help="Select a single run by name") + help="Select a single run by name") parser.add_argument('--purge', type=bool, required=False, dest='purge', default=False, help="Activate purge modus [True]") @@ -1283,7 +1314,7 @@ def ruciax_purge(): level="INFO", format=cax_version + '%(asctime)s [%(levelname)s] ' '%(message)s') - + logging.info('Purge raw data sets - Handle with care!') # define a Handler which writes INFO messages or higher to the sys.stderr @@ -1297,38 +1328,38 @@ def ruciax_purge(): console.setFormatter(formatter) # add the handler to the root logger logging.getLogger('').addHandler(console) - # Set information to update the run database config.set_database_log(database_log) config.mongo_password() - + number_name = None if args.name is not None: - number_name = args.name + number_name = args.name else: - number_name = args.run - + number_name = args.run + rucio_mover.RucioPurge(args.purge).go(number_name) + def ruciax_download(): - #Ask the database for the actual status of the file or folder: - + # Ask the database for the actual status of the file or folder: + parser = argparse.ArgumentParser(description="Check the database status") - + parser.add_argument('--run', type=int, required=False, help="Select a single run by number") parser.add_argument('--name', type=str, required=False, - help="Select a single run by name") + help="Select a single run by name") parser.add_argument('--location', type=str, required=False, dest='location', - help="Select a single run by rucio location") + help="Select a single run by rucio location") parser.add_argument('--type', type=str, required=True, dest='data_type', - help="Select what kind of data you want to download [raw/processed]") + help="Select what kind of data you want to download [raw/processed]") parser.add_argument('--rse', type=str, required=False, dest='data_rse', - help="Select a specific rucio storage endpoint for download") + help="Select a specific rucio storage endpoint for download") parser.add_argument('--dir', type=str, required=True, dest='data_dir', help="Select a download directory") @@ -1355,7 +1386,7 @@ def ruciax_download(): level="INFO", format=cax_version + '%(asctime)s [%(levelname)s] ' '%(message)s') - + logging.info('Ruciax - The data set downloader') # define a Handler which writes INFO messages or higher to the sys.stderr @@ -1369,12 +1400,11 @@ def ruciax_download(): console.setFormatter(formatter) # add the handler to the root logger logging.getLogger('').addHandler(console) - # Set information to update the run database config.set_database_log(database_log) config.mongo_password() - + if args.config_file: if not os.path.isfile(args.config_file): logging.error("Config file %s not found", args.config_file) @@ -1382,55 +1412,55 @@ def ruciax_download(): logging.info("Using custom config file: %s", args.config_file) config.set_json(args.config_file) - - #Check if args.name and args.run are defined by input: - if args.name == None and args.run == None and args.list_file == None: - logging.info("No run number, run name or a list is defined") - exit() - - if args.list_file != None and args.name == None and args.run == None: - #Download files according to a list of runs (names, runs) from - #an external file - - #check if file exits before go on: - if os.path.exists( args.list_file) == False: - logging.info("The requested file %s does not exists -> exit", args.list_file) + + # Check if args.name and args.run are defined by input: + if args.name is None and args.run is None and args.list_file is None: + logging.info("No run number, run name or a list is defined") exit() - - list_file_abs = os.path.abspath(args.list_file) - - #extract the run/name information: - obj = open( list_file_abs, 'r') - lines = obj.read().replace(",", "\n").split("\n") - lines = list(filter(None, lines)) - - #Cycle over the run numbers or names: - for i_line in lines: - i_line = i_line.replace(" ", "") #remove spaces - #Run the download command + + if args.list_file is not None and args.name is None and args.run is None: + # Download files according to a list of runs (names, runs) from + # an external file + + # check if file exits before go on: + if not os.path.exists(args.list_file): + logging.info("The requested file %s does not exists -> exit", args.list_file) + exit() + + list_file_abs = os.path.abspath(args.list_file) + + # extract the run/name information: + obj = open(list_file_abs, 'r') + lines = obj.read().replace(",", "\n").split("\n") + lines = list(filter(None, lines)) + + # Cycle over the run numbers or names: + for i_line in lines: + i_line = i_line.replace(" ", "") # remove spaces + # Run the download command + rucio_mover.RucioDownload(args.data_rse, args.data_dir, args.data_type, args.restore, args.location, args.overwrite).go(i_line) + + elif args.list_file is None and (args.name is None or args.run is None): + # Download a single run name or number: + # Read if run number or run name is given + number_name = None + if args.name is not None: + number_name = args.name + else: + number_name = args.run + rucio_mover.RucioDownload(args.data_rse, args.data_dir, args.data_type, args.restore, args.location, args.overwrite).go(number_name) - - elif args.list_file == None and ( args.name == None or args.run == None ): - #Download a single run name or number: - #Read if run number or run name is given - number_name = None - if args.name is not None: - number_name = args.name - else: - number_name = args.run - - rucio_mover.RucioDownload(args.data_rse, args.data_dir, args.data_type, args.restore, args.location, args.overwrite).go(number_name) - + def ruciax_locator(): - #Ask the database for the actual status of the file or folder: - + # Ask the database for the actual status of the file or folder: + parser = argparse.ArgumentParser(description="Check the database status") - + parser.add_argument('--run', type=int, required=False, help="Select a single run by number") parser.add_argument('--name', type=str, required=False, - help="Select a single run by name") + help="Select a single run by name") parser.add_argument('--rse', type=str, required=False, dest='rse', action='append', help="Select an rucio storage element") @@ -1446,9 +1476,8 @@ def ruciax_locator(): parser.add_argument('--method', type=str, required=True, dest='method', help="Select method: [SingleRun] (--run) | [Status] (--status) | [CheckRSEMultiple] (--rse) | [CheckRSESingle] (--rse) | [MultiCopies] (--copies) | [ListSingleRules] (--run/--name)") - - args = parser.parse_args() + args = parser.parse_args() # Setup logging cax_version = 'cax_v%s - ' % __version__ @@ -1456,7 +1485,7 @@ def ruciax_locator(): level="INFO", format=cax_version + '%(asctime)s [%(levelname)s] ' '%(message)s') - + # define a Handler which writes INFO messages or higher to the sys.stderr console = logging.StreamHandler() console.setLevel("INFO") @@ -1468,26 +1497,26 @@ def ruciax_locator(): console.setFormatter(formatter) # add the handler to the root logger logging.getLogger('').addHandler(console) - # Set information to update the run database config.mongo_password() - + if args.config_file: - if not os.path.isfile(args.config_file): - logging.error("Config file %s not found", args.config_file) - else: - logging.info("Using custom config file: %s", - args.config_file) - config.set_json(args.config_file) - + if not os.path.isfile(args.config_file): + logging.error("Config file %s not found", args.config_file) + else: + logging.info("Using custom config file: %s", + args.config_file) + config.set_json(args.config_file) + number_name = None if args.name is not None: - number_name = args.name + number_name = args.name else: - number_name = args.run - + number_name = args.run + rucio_mover.RucioLocator(args.rse, args.copies, args.method, args.status).go(number_name) + if __name__ == '__main__': main() diff --git a/cax/qsub.py b/cax/qsub.py index c26b59d..7323ebb 100644 --- a/cax/qsub.py +++ b/cax/qsub.py @@ -50,17 +50,15 @@ def submit_job(script, extra=''): extra=extra)) try: result = subprocess.check_output(sbatch, - stderr=subprocess.STDOUT, - shell=True, - timeout=120) + stderr=subprocess.STDOUT, + shell=True, + timeout=120) logging.info(result) except subprocess.TimeoutExpired as e: logging.error("Process timeout") except Exception as e: logging.exception(e) - - delete_script(fileobj) @@ -94,10 +92,10 @@ def get_queue(host=config.get_hostname(), partition=''): if host == "midway-login1": args = {'partition': 'sandyb', - 'user' : 'mklinton'} + 'user': 'mklinton'} elif host == 'tegner-login-1': args = {'partition': 'main', - 'user' : 'bobau'} + 'user': 'bobau'} else: return [] @@ -119,8 +117,24 @@ def get_queue(host=config.get_hostname(), partition=''): logging.exception(e) return [] - queue_list = queue.rstrip().decode('ascii').split() if len(queue_list) > 1: return queue_list[1:] return [] + + +def command_submission(command): + + # Submit the command + sc = create_script(command) + execute = subprocess.Popen(['sh', sc.name], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, shell=False) + stdout_value, stderr_value = execute.communicate() + stdout_value = stdout_value.decode("utf-8") + stdout_value = stdout_value.split("\n") + # delete script: + delete_script(sc) + + return stdout_value diff --git a/cax/tasks/corrections.py b/cax/tasks/corrections.py index a23a372..81171ec 100644 --- a/cax/tasks/corrections.py +++ b/cax/tasks/corrections.py @@ -12,6 +12,7 @@ PAX_CONFIG = configuration.load_configuration('XENON1T') PAX_CONFIG_MV = configuration.load_configuration('XENON1T_MV') + class CorrectionBase(Task): """Base class for corrections. @@ -57,6 +58,10 @@ def each_run(self): self.correction_doc = cdoc = self.correction_collection.find_one(sort=(('calculation_time', -1), )) self.version = cdoc.get('version', str(cdoc['calculation_time'])) + # Get the correction sympy function, if one is set + if 'function' in cdoc: + self.function = parse_expr(cdoc['function']) + # Check if this correction's version correction has already been applied. If so, skip this run. classname = self.__class__.__name__ this_run_version = self.run_doc.get('processor', {}).get('correction_versions', {}).get(classname, 'not_set') @@ -148,7 +153,7 @@ def evaluate(self): def get_gains(self, timestamp): """Timestamp is a UNIX timestamp in UTC """ - V = sympy.symbols('V') + # V = sympy.symbols('V') pmt = sympy.symbols('pmt', integer=True) t = sympy.symbols('t') @@ -159,8 +164,7 @@ def get_gains(self, timestamp): for i in range(0, len(PAX_CONFIG['DEFAULT']['pmts'])): gain = self.function.evalf(subs={pmt: i, t: self.run_doc['start'].replace(tzinfo=pytz.utc).timestamp(), - 't0': 0 - }) + 't0': 0}) gains.append(float(gain) * self.correction_units) return gains @@ -170,7 +174,7 @@ class SetNeuralNetwork(CorrectionBase): '''Set the proper neural network file according to run number''' key = "processor.NeuralNet|PosRecNeuralNet.neural_net_file" collection_name = 'neural_network' - + def evaluate(self): number = self.run_doc['number'] for rdef in self.correction_doc['correction']: @@ -178,35 +182,38 @@ def evaluate(self): return rdef['value'] return None + class SetFieldDistortion(CorrectionBase): - '''Set the proper field distortion map according to run number''' - key = 'processor.WaveformSimulator.rz_position_distortion_map' - collection_name = 'field_distortion' + '''Set the proper field distortion map according to run number''' + key = 'processor.WaveformSimulator.rz_position_distortion_map' + collection_name = 'field_distortion' + + def evaluate(self): + number = self.run_doc['number'] + for rdef in self.correction_doc['correction']: + if number >= rdef['min'] and number < rdef['max']: + return rdef['value'] + return None - def evaluate(self): - number = self.run_doc['number'] - for rdef in self.correction_doc['correction']: - if number >= rdef['min'] and number < rdef['max']: - return rdef['value'] - return None class SetLightCollectionEfficiency(CorrectionBase): - '''Set the proper LCE map according to run number''' - key = 'processor.WaveformSimulator.s1_light_yield_map' - collection_name = 'light_collection_efficiency' + '''Set the proper LCE map according to run number''' + key = 'processor.WaveformSimulator.s1_light_yield_map' + collection_name = 'light_collection_efficiency' + + def evaluate(self): + number = self.run_doc['number'] + for rdef in self.correction_doc['correction']: + if number >= rdef['min'] and number < rdef['max']: + return rdef['value'] + return None - def evaluate(self): - number = self.run_doc['number'] - for rdef in self.correction_doc['correction']: - if number >= rdef['min'] and number < rdef['max']: - return rdef['value'] - return None class SetS2xyMap(CorrectionBase): """Set the proper S2 x, y map according to run number""" key = 'processor.WaveformSimulator.s2_light_yield_map' collection_name = 's2_xy_map' - + def evaluate(self): number = self.run_doc['number'] for rdef in self.correction_doc['correction']: