diff --git a/config.py b/config.py index 6840a7f..abfac4b 100644 --- a/config.py +++ b/config.py @@ -35,6 +35,7 @@ SQS_QUEUE_NAME = APP_NAME + 'Queue' SQS_MESSAGE_VISIBILITY = 1*60 # Timeout (secs) for messages in flight (average time to be processed) SQS_DEAD_LETTER_QUEUE = 'user_DeadMessages' +JOB_RETRIES = 3 # Number of times to retry a job before sending it to DEAD_LETTER_QUEUE # MONITORING AUTO_MONITOR = 'True' @@ -49,6 +50,9 @@ MIN_FILE_SIZE_BYTES = 1 #What is the minimal number of bytes an object should be to "count"? NECESSARY_STRING = '' #Is there any string that should be in the file name to "count"? +# CELLPROFILER SETTINGS +ALWAYS_CONTINUE = 'False' # Whether or not to run CellProfiler with the --always-continue flag, which will keep CellProfiler from crashing if it errors + # PLUGINS USE_PLUGINS = 'False' # True to use any plugin from CellProfiler-plugins repo UPDATE_PLUGINS = 'False' # True to download updates from CellProfiler-plugins repo diff --git a/documentation/DCP-documentation/SQS_QUEUE_information.md b/documentation/DCP-documentation/SQS_QUEUE_information.md index 4cc0d5c..eba869c 100644 --- a/documentation/DCP-documentation/SQS_QUEUE_information.md +++ b/documentation/DCP-documentation/SQS_QUEUE_information.md @@ -68,6 +68,13 @@ To confirm that multiple Dockers are never processing the same job, you can keep Once you have run a pipeline once, you can check the execution time (either by noticing how long after you started your jobs that your first jobs begin to finish, or by checking the logs of individual jobs and noting the start and end time), you will then have an accurate idea of roughly how long that pipeline needs to execute, and can set your message visibility accordingly. You can even do this on the fly while jobs are currently processing; the updated visibility time won’t affect the jobs already out for processing (i.e. if the time was set to 3 hours and you change it to 1 hour, the jobs already processing will remain hidden for 3 hours or until finished), but any job that begins processing AFTER the change will use the new visibility timeout setting. +## JOB_RETRIES + +**JOB_RETRIES** is the number of times that a job will be retried before it is sent to the Dead Letter Queue. +The count goes up every time a message is "In Flight" and after the SQS_MESSAGE_VISIBILITY times out, if the count is too high the message will not be made "Available" but will instead go to your SQS_DEAD_LETTER_QUEUE. +We recommend setting this larger than 1 because stochastic job failures are possible (e.g. the EC2 machine running the job become unavailable mid-run). +Allowing large numbers of retries tends to waste compute as most failure modes are not stochastic. + ## Example SQS Queue [[images/Sample_SQS_Queue.png|alt="Sample_SQS_Queue"]] diff --git a/documentation/DCP-documentation/advanced_configuration.md b/documentation/DCP-documentation/advanced_configuration.md index 3b6ec58..9824f13 100644 --- a/documentation/DCP-documentation/advanced_configuration.md +++ b/documentation/DCP-documentation/advanced_configuration.md @@ -11,8 +11,8 @@ Alternate locations can be designated in the run script. * **Log configuration and location of exported logs:** Distributed-CellProfiler creates log groups with a default retention of 60 days (to avoid hitting the AWS limit of 250) and after finishing the run exports them into your bucket with a prefix of 'exportedlogs/LOG_GROUP_NAME/'. These may be modified in the run script. * **Advanced EC2 configuration:** Any additional configuration of your EC2 spot fleet (such as installing additional packages or running scripts on startup) can be done by modifying the userData parameter in the run script. -* **SQS queue detailed configuration:** Distributed-CellProfiler creates a queue where messages will be tried 10 times before being consigned to a DeadLetterQueue, and unprocessed messages will expire after 14 days (the AWS maximum). -These values can be modified in run.py . +* **SQS queue detailed configuration:** Distributed-CellProfiler creates a queue where unprocessed messages will expire after 14 days (the AWS maximum). +This value can be modified in run.py . *** diff --git a/documentation/DCP-documentation/config_examples.md b/documentation/DCP-documentation/config_examples.md index 671a1e7..5d3b5bc 100644 --- a/documentation/DCP-documentation/config_examples.md +++ b/documentation/DCP-documentation/config_examples.md @@ -53,6 +53,7 @@ Our internal configurations for each pipeline are as follows: | SQS_QUEUE_NAME | APP_NAME + 'Queue' | APP_NAME + 'Queue' | APP_NAME + 'Queue' | APP_NAME + 'Queue' | APP_NAME + 'Queue' | We never change this. | | SQS_MESSAGE_VISIBILITY | 3*60 | 240*60 | 15*60 | 10*60 | 120*60 | About how long you expect a job to take * 1.5 in seconds | | SQS_DEAD_LETTER_QUEUE | 'YOURNAME_DEADMESSAGES' | 'YOURNAME_DEADMESSAGES' | 'YOURNAME_DEADMESSAGES' | 'YOURNAME_DEADMESSAGES' |'YOURNAME_DEADMESSAGES' | | +| JOB_RETRIES | 3 | 3 | 3 | 3 | 3 | | | AUTO_MONITOR | 'True' | 'True' | 'True' | 'True' | 'True' | Can be turned off if manually running Monitor. | | CREATE_DASHBOARD | 'True' | 'True' | 'True' | 'True' | 'True' | | | CLEAN_DASHBOARD | 'True' | 'True' | 'True' | 'True' | 'True' | | @@ -60,6 +61,7 @@ Our internal configurations for each pipeline are as follows: | EXPECTED_NUMBER_FILES | 1 (an image) | number channels + 1 (an .npy for each channel and isdone) | 3 (Experiment.csv, Image.csv, and isdone) | 1 (an image) | 5 (Experiment, Image, Cells, Nuclei, and Cytoplasm .csvs) | Better to underestimate than overestimate. | | MIN_FILE_SIZE_BYTES | 1 | 1 | 1 | 1 | 1 | Count files of any size. | | NECESSARY_STRING | '' | '' | '' | '' | '' | Not necessary for standard workflows. | +| ALWAYS_CONTINUE | 'False' | 'False' | 'False' | 'False' | 'False' | Use with caution. | | USE_PLUGINS | 'False' | 'False' | 'False' | 'False' | 'False' | Not necessary for standard workflows. | | UPDATE_PLUGINS | 'False' | 'False' | 'False' | 'False' | 'False' | Not necessary for standard workflows. | | PLUGINS_COMMIT | '' | '' | '' | '' | '' | Not necessary for standard workflows. | diff --git a/documentation/DCP-documentation/step_1_configuration.md b/documentation/DCP-documentation/step_1_configuration.md index 272cbe7..62aadcb 100644 --- a/documentation/DCP-documentation/step_1_configuration.md +++ b/documentation/DCP-documentation/step_1_configuration.md @@ -73,6 +73,7 @@ We recommend setting this to slightly longer than the average amount of time it * **SQS_DEAD_LETTER_QUEUE:** The name of the queue to send jobs to if they fail to process correctly multiple times; this keeps a single bad job (such as one where a single file has been corrupted) from keeping your cluster active indefinitely. This queue will be automatically made if it doesn't exist already. See [Step 0: Prep](step_0_prep.med) for more information. +* **JOB_RETRIES:** This is the number of times that a job will be retried before it is sent to the Dead Letter Queue. *** @@ -109,6 +110,15 @@ Useful when trying to detect jobs that may have exported smaller corrupted files *** +### CELLPROFILER SETTINGS +* **ALWAYS CONTINUE:** Whether or not to run CellProfiler with the --always-continue flag, which will keep CellProfiler from crashing if it errors. +Use with caution. +This can be particularly helpful in jobs where a large number of files are loaded in a single run (such as during illumination correction) so that a corrupted or missing file doesn't prevent the whole job completing. +However, this can make it harder to notice jobs that are not completely succesffully so should be used with caution. +We suggest using this setting in conjunction with a small number of JOB_RETRIES. + +*** + ### PLUGINS * **USE_PLUGINS:** Whether or not you will be using external plugins from the CellProfiler-plugins repository. * **UPDATE_PLUGINS:** Whether or not to update the plugins repository before use. diff --git a/run.py b/run.py index ad60587..0acc22e 100644 --- a/run.py +++ b/run.py @@ -18,6 +18,8 @@ CREATE_DASHBOARD = 'False' CLEAN_DASHBOARD = 'False' AUTO_MONITOR = 'False' +ALWAYS_CONTINUE = 'False' +JOB_RETRIES = 10 from config import * @@ -125,6 +127,7 @@ def generate_task_definition(AWS_PROFILE): {"name": "USE_PLUGINS", "value": str(USE_PLUGINS)}, {"name": "NECESSARY_STRING", "value": NECESSARY_STRING}, {"name": "DOWNLOAD_FILES", "value": DOWNLOAD_FILES}, + {"name": "ALWAYS_CONTINUE", "value": ALWAYS_CONTINUE}, ] if SOURCE_BUCKET.lower()!='false': task_definition['containerDefinitions'][0]['environment'] += [ @@ -219,9 +222,7 @@ def get_or_create_queue(sqs): "MaximumMessageSize": "262144", "MessageRetentionPeriod": "1209600", "ReceiveMessageWaitTimeSeconds": "0", - "RedrivePolicy": '{"deadLetterTargetArn":"' - + dead_arn - + '","maxReceiveCount":"10"}', + "RedrivePolicy": f'{{"deadLetterTargetArn":"{dead_arn}","maxReceiveCount":"{str(JOB_RETRIES)}"}}', "VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY), } sqs.create_queue(QueueName=SQS_QUEUE_NAME, Attributes=SQS_DEFINITION) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index b2a1c74..edb93e8 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -50,6 +50,10 @@ DOWNLOAD_FILES = 'False' else: DOWNLOAD_FILES = os.environ['DOWNLOAD_FILES'] +if 'ALWAYS_CONTINUE' not in os.environ: + ALWAYS_CONTINUE = False +else: + ALWAYS_CONTINUE = os.environ['ALWAYS_CONTINUE'] localIn = '/home/ubuntu/local_input' @@ -276,6 +280,8 @@ def runCellProfiler(message): printandlog("Didn't recognize input file",logger) if USE_PLUGINS.lower() == 'true': cmd += f' --plugins-directory={PLUGIN_DIR}' + if ALWAYS_CONTINUE.lower() == 'true': + cmd +=' --always-continue' print(f'Running {cmd}') logger.info(cmd)