Skip to content

Commit

Permalink
Merge pull request #53 from HumanCellAtlas/ds_cache_wdls_538
Browse files Browse the repository at this point in the history
Cache wdls (538)
  • Loading branch information
David Shiga authored Feb 8, 2018
2 parents 763d113 + ca9857c commit da3801c
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 115 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
language: python
python:
- '2.7'
sudo: enabled
- '3.6'
sudo: required
services: docker
script: cd lira/test && bash test.sh
notifications:
Expand Down
13 changes: 6 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@ FROM ubuntu:16.04

RUN apt-get update && apt-get upgrade -y && \
apt-get -y install --no-install-recommends \
python-pip \
python-setuptools \
python3-pip \
vim \
nmap \
git

RUN pip install --upgrade pip
RUN pip3 install --upgrade pip setuptools

RUN pip install wheel
RUN pip3 install wheel

RUN mkdir /secondary-analysis
WORKDIR /secondary-analysis
RUN mkdir /lira
WORKDIR /lira

COPY requirements.txt .

RUN pip install -r requirements.txt
RUN pip3 install -r requirements.txt

COPY . .

Expand Down
16 changes: 9 additions & 7 deletions kubernetes/listener-config.json.ctmpl
Original file line number Diff line number Diff line change
@@ -1,40 +1,42 @@
{{with $environment := env "ENV"}}
{{with $cromwellSecrets := vault (printf "secret/dsde/mint/%s/common/htpasswd" $environment)}}
{{with $listenerSecret := vault (printf "secret/dsde/mint/%s/listener/listener_secret" $environment)}}
{
"MAX_CONTENT_LENGTH": 10000,
"cromwell_password": "{{$cromwellSecrets.Data.cromwell_password}}",
"cromwell_url": "https://cromwell.mint-{{ env "ENV" }}.broadinstitute.org/api/workflows/v1",
"cromwell_user": "{{$cromwellSecrets.Data.cromwell_user}}",
"env": "{{ env "ENV" }}",
"notification_token": "{{env "NOTIFICATION_TOKEN" }}",
"notification_token": "{{$listenerSecret.Data.notification_token}}",
"submit_wdl": "{{ env "PIPELINE_TOOLS_PREFIX" }}/adapter_pipelines/submit.wdl",
"wdls": [
{
"analysis_wdls": [
"{{ env "TENX_PREFIX" }}/pipelines/10x/count/count.wdl"
],
"options_link": "{{ env "PIPELINE_TOOLS_PREFIX" }}/adapter_pipelines/10x/options.json",
"subscription_id": {{ env "TENX_SUBSCRIPTION_ID" }},
"wdl_default_inputs_link": "{{ env "PIPELINE_TOOLS_PREFIX" }}/adapter_pipelines/10x/adapter_example_static.json",
"subscription_id": "{{ env "TENX_SUBSCRIPTION_ID" }}",
"wdl_static_inputs_link": "{{ env "PIPELINE_TOOLS_PREFIX" }}/adapter_pipelines/10x/adapter_example_static.json",
"wdl_link": "{{ env "PIPELINE_TOOLS_PREFIX" }}/adapter_pipelines/10x/adapter.wdl",
"workflow_name": "Adapter10xCount"
},
{
"analysis_wdls": [
"{{ env "SS2_PREFIX" }}/pipelines/smartseq2_single_sample/ss2_single_sample.wdl",
"{{ env "SS2_PREFIX" }}/library/subworkflows/smartseq2_single_sample/pipelines/hisat2_QC_pipeline.wdl",
"{{ env "SS2_PREFIX" }}/library/subworkflows/smartseq2_single_sample/pipelines/hisat2_rsem_pipeline.wdl",
"{{ env "SS2_PREFIX" }}/library/subworkflows/hisat2_QC_pipeline.wdl",
"{{ env "SS2_PREFIX" }}/library/subworkflows/hisat2_rsem_pipeline.wdl",
"{{ env "SS2_PREFIX" }}/library/tasks/hisat2.wdl",
"{{ env "SS2_PREFIX" }}/library/tasks/picard.wdl",
"{{ env "SS2_PREFIX" }}/library/tasks/rsem.wdl"
],
"options_link": "{{ env "PIPELINE_TOOLS_PREFIX" }}/adapter_pipelines/ss2_single_sample/options.json",
"subscription_id": {{ env "SS2_SUBSCRIPTION_ID" }},
"wdl_default_inputs_link": "{{ env "PIPELINE_TOOLS_PREFIX" }}/adapter_pipelines/ss2_single_sample/adapter_example_static.json",
"subscription_id": "{{ env "SS2_SUBSCRIPTION_ID" }}",
"wdl_static_inputs_link": "{{ env "PIPELINE_TOOLS_PREFIX" }}/adapter_pipelines/ss2_single_sample/adapter_example_static.json",
"wdl_link": "{{ env "PIPELINE_TOOLS_PREFIX" }}/adapter_pipelines/ss2_single_sample/adapter.wdl",
"workflow_name": "AdapterSmartSeq2SingleCell"
}
]
}
{{end}}
{{end}}
{{end}}
132 changes: 96 additions & 36 deletions lira/api/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
from cromwell_tools import cromwell_tools
from lira import lira_utils

logger = logging.getLogger("Lira | {module_path}".format(module_path=__name__))


def post(body):
# Check authentication
logger = logging.getLogger("Lira | {module_path}".format(module_path=__name__))
green_config = current_app.config
"""Process notification and launch workflow in Cromwell"""

if not lira_utils.is_authenticated(connexion.request.args, green_config.notification_token):
# Check authentication
lira_config = current_app.config
if not lira_utils.is_authenticated(connexion.request.args, lira_config.notification_token):
time.sleep(1)
return lira_utils.response_with_server_header(dict(error='Unauthorized'), 401)

Expand All @@ -23,43 +25,101 @@ def post(body):
uuid, version, subscription_id = lira_utils.extract_uuid_version_subscription_id(body)

# Find wdl config where the subscription_id matches the notification's subscription_id
id_matches = [wdl for wdl in green_config.wdls if wdl.subscription_id == subscription_id]
id_matches = [wdl for wdl in lira_config.wdls if wdl.subscription_id == subscription_id]
if len(id_matches) == 0:
return lira_utils.response_with_server_header(
dict(error='Not Found: No wdl config found with subscription id {}'
''.format(subscription_id)), 404)
wdl = id_matches[0]
logger.debug("Matched WDL: {wdl}".format(wdl=wdl))
logger.info("Launching {workflow_name} workflow in Cromwell".format(workflow_name=wdl.workflow_name))
wdl_config = id_matches[0]
logger.debug("Matched WDL config: {wdl}".format(wdl=wdl_config))
logger.info("Preparing to launch {workflow_name} workflow in Cromwell".format(workflow_name=wdl_config.workflow_name))

# Prepare inputs
inputs = lira_utils.compose_inputs(wdl.workflow_name, uuid, version, green_config.env)
inputs = lira_utils.compose_inputs(wdl_config.workflow_name, uuid, version, lira_config.env)
cromwell_inputs_file = json.dumps(inputs)

# Read files into memory
wdl_file = cromwell_tools.download(wdl.wdl_link)
wdl_default_inputs_file = cromwell_tools.download(wdl.wdl_default_inputs_link)
options_file = cromwell_tools.download(wdl.options_link)

# Create zip of analysis and submit wdls
url_to_contents = cromwell_tools.download_to_map(wdl.analysis_wdls + [green_config.submit_wdl])
wdl_deps_file = cromwell_tools.make_zip_in_memory(url_to_contents)

cromwell_response = cromwell_tools.start_workflow(
wdl_file=wdl_file,
zip_file=wdl_deps_file,
inputs_file=cromwell_inputs_file,
inputs_file2=wdl_default_inputs_file,
options_file=options_file,
url=green_config.cromwell_url,
user=green_config.cromwell_user,
password=green_config.cromwell_password
)

# Respond
if cromwell_response.status_code > 201:
logger.error("HTTP error content: {content}".format(content=cromwell_response.text))
return lira_utils.response_with_server_header(dict(result=cromwell_response.text), 500)
logger.info("Cromwell response: {response}".format(response=cromwell_response.json()))

return lira_utils.response_with_server_header(cromwell_response.json())
cromwell_submission = current_app.prepare_submission(wdl_config, lira_config.submit_wdl)
logger.info(current_app.prepare_submission.cache_info())


dry_run = getattr(lira_config, 'dry_run', False)
if dry_run:
logger.warning('Not launching workflow because Lira is in dry_run mode')
response_json = {
'id': 'fake_id',
'status': 'fake_status'
}
status_code = 201
else:
cromwell_response = cromwell_tools.start_workflow(
wdl_file=cromwell_submission.wdl_file,
zip_file=cromwell_submission.wdl_deps_file,
inputs_file=cromwell_inputs_file,
inputs_file2=cromwell_submission.wdl_static_inputs_file,
options_file=cromwell_submission.options_file,
url=lira_config.cromwell_url,
user=lira_config.cromwell_user,
password=lira_config.cromwell_password
)
if cromwell_response.status_code > 201:
logger.error("HTTP error content: {content}".format(content=cromwell_response.text))
response_json = dict(result=cromwell_response.text)
status_code = 500
else:
response_json = cromwell_response.json()
logger.info("Cromwell response: {0} {1}".format(response_json, cromwell_response.status_code))
status_code = 201

return lira_utils.response_with_server_header(response_json, status_code)


def create_prepare_submission_function(cache_wdls):
"""Returns decorated prepare_submission function. Decorator is determined as follows:
Python 2: Always decorate with noop_lru_cache, since functools.lru_cache is not available in 2.
Python 3: Use functools.lru_cache if cache_wdls is true, otherwise use noop_lru_cache."""

# Use noop_lru_cache unless cache_wdls is true and functools.lru_cache is available
lru_cache = lira_utils.noop_lru_cache
if not cache_wdls:
logger.info('Not caching wdls because Lira is configured with cache_wdls false')
else:
try:
from functools import lru_cache
except ImportError:
logger.info('Not caching wdls because functools.lru_cache is not available in Python 2.')

@lru_cache(maxsize=None)
# Setting maxsize to None here means each unique call to prepare_submission (defined by parameters used)
# will be added to the cache without evicting any other items. Additional non-unique calls
# (parameters identical to a previous call) will read from the cache instead of actually
# calling this function. This means that the function will be called no more than once for
# each wdl config, but we can have arbitrarily many wdl configs without running out of space
# in the cache.
def prepare_submission(wdl_config, submit_wdl):
"""Load into memory all static data needed for submitting a workflow to Cromwell"""

# Read files into memory
wdl_file = cromwell_tools.download(wdl_config.wdl_link)
wdl_static_inputs_file = cromwell_tools.download(wdl_config.wdl_static_inputs_link)
options_file = cromwell_tools.download(wdl_config.options_link)

# Create zip of analysis and submit wdls
url_to_contents = cromwell_tools.download_to_map(wdl_config.analysis_wdls + [submit_wdl])
wdl_deps_file = cromwell_tools.make_zip_in_memory(url_to_contents)

return CromwellSubmission(wdl_file, wdl_static_inputs_file, options_file, wdl_deps_file)

return prepare_submission


class CromwellSubmission(object):
"""Holds static data needed for submitting a workflow to Cromwell, including
the top level wdl file, the static inputs json file, the options file,
and the dependencies zip file.
"""

def __init__(self, wdl_file, wdl_static_inputs_file=None, options_file=None, wdl_deps_file=None):
self.wdl_file = wdl_file
self.wdl_static_inputs_file = wdl_static_inputs_file
self.options_file = options_file
self.wdl_deps_file = wdl_deps_file
7 changes: 5 additions & 2 deletions lira/lira.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from connexion.resolver import RestyResolver
import argparse

import config
from . import lira_config
from .api import notifications

parser = argparse.ArgumentParser()
parser.add_argument('--host', default='0.0.0.0')
Expand All @@ -26,8 +27,10 @@
application = app.app

config_path = os.environ['listener_config']
logger.info('Using config file at {0}'.format(config_path))
with open(config_path) as f:
app.app.config = config.ListenerConfig(json.load(f), app.app.config)
app.app.config = lira_config.LiraConfig(json.load(f), app.app.config)
app.app.prepare_submission = notifications.create_prepare_submission_function(app.app.config.cache_wdls)

resolver = RestyResolver('lira.api', collection_endpoint_name='list')
app.add_api('lira.yml', resolver=resolver, validate_responses=True)
Expand Down
13 changes: 5 additions & 8 deletions lira/lira.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ paths:
schema:
$ref: '#/definitions/Notification'
responses:
200:
201:
description: OK
schema:
$ref: '#/definitions/NotificationResponse'
Expand Down Expand Up @@ -77,13 +77,10 @@ definitions:
NotificationResponse:
type: object
properties:
result:
type: object
properties:
status:
type: string
id:
type: string
status:
type: string
id:
type: string
HealthCheckResponse:
type: object
properties:
Expand Down
Loading

0 comments on commit da3801c

Please sign in to comment.