Skip to content

Commit

Permalink
Merge pull request #152 from DataBiosphere/dev
Browse files Browse the repository at this point in the history
PR for 0.3.1 release
  • Loading branch information
wnojopra authored Apr 16, 2019
2 parents df620ef + a1ad77d commit 443ce31
Show file tree
Hide file tree
Showing 20 changed files with 216 additions and 109 deletions.
2 changes: 1 addition & 1 deletion dsub/_dsub_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
0.1.3.dev0 -> 0.1.3 -> 0.1.4.dev0 -> ...
"""

DSUB_VERSION = '0.3.0'
DSUB_VERSION = '0.3.1'
4 changes: 4 additions & 0 deletions dsub/commands/dstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ def prepare_output(self, row):
('labels', 'Labels', self.format_pairs),
('envs', 'Environment Variables', self.format_pairs),
('inputs', 'Inputs', self.format_pairs),
('input-recursives', 'Input Recursives', self.format_pairs),
('outputs', 'Outputs', self.format_pairs),
('output-recursives', 'Output Recursives', self.format_pairs),
('mounts', 'Mounts', self.format_pairs),
('user-project', 'User Project'),
('dsub-version', 'Version'),
Expand Down Expand Up @@ -301,7 +303,9 @@ def _prepare_row(task, full, summary):
row_spec('labels', True, {}),
row_spec('envs', True, {}),
row_spec('inputs', True, {}),
row_spec('input-recursives', False, {}),
row_spec('outputs', True, {}),
row_spec('output-recursives', False, {}),
row_spec('mounts', True, {}),
row_spec('provider', True, None),
row_spec('provider-attributes', True, {}),
Expand Down
20 changes: 16 additions & 4 deletions dsub/commands/dsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import re
import sys
import time
import uuid
from dateutil.tz import tzlocal

from ..lib import dsub_errors
Expand Down Expand Up @@ -211,6 +212,12 @@ def _parse_arguments(prog, argv):
parser.add_argument(
'--version', '-v', default=False, help='Print the dsub version and exit.')

parser.add_argument(
'--unique-job-id',
default=False,
action='store_true',
help="""Experimental: create a unique 32 character UUID for the dsub
job-id using https://docs.python.org/3/library/uuid.html.""")
parser.add_argument(
'--name',
help="""Name for pipeline. Defaults to the script name or
Expand Down Expand Up @@ -551,7 +558,7 @@ def _get_job_resources(args):


def _get_job_metadata(provider, user_id, job_name, script, task_ids,
user_project):
user_project, unique_job_id):
"""Allow provider to extract job-specific metadata from command-line args.
Args:
Expand All @@ -561,6 +568,7 @@ def _get_job_metadata(provider, user_id, job_name, script, task_ids,
script: the script to run
task_ids: a set of the task-ids for all tasks in the job
user_project: name of the project to be billed for the request
unique_job_id: generate a unique job id
Returns:
A dictionary of job-specific metadata (such as job id, name, etc.)
Expand All @@ -569,6 +577,8 @@ def _get_job_metadata(provider, user_id, job_name, script, task_ids,
user_id = user_id or dsub_util.get_os_user()
job_metadata = provider.prepare_job_metadata(script.name, job_name, user_id,
create_time)
if unique_job_id:
job_metadata['job-id'] = uuid.uuid4().hex

job_metadata['create-time'] = create_time
job_metadata['script'] = script
Expand Down Expand Up @@ -1014,7 +1024,8 @@ def run_main(args):
after=args.after,
skip=args.skip,
project=args.project,
disable_warning=True)
disable_warning=True,
unique_job_id=args.unique_job_id)


def run(provider,
Expand All @@ -1033,7 +1044,8 @@ def run(provider,
after=None,
skip=False,
project=None,
disable_warning=False):
disable_warning=False,
unique_job_id=False):
"""Actual dsub body, post-stdout-redirection."""
if not dry_run:
provider_base.emit_provider_message(provider)
Expand Down Expand Up @@ -1078,7 +1090,7 @@ def run(provider,
# job_metadata such as the job-id, create-time, user-id, etc.
# task_resources such as the logging_path (which may include job-id, task-id)
job_metadata = _get_job_metadata(provider, user, name, script, task_ids,
user_project)
user_project, unique_job_id)
_resolve_task_resources(job_metadata, job_resources, task_descriptors)

# Job and task properties are now all resolved. Begin execution!
Expand Down
36 changes: 25 additions & 11 deletions dsub/lib/job_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,15 +483,21 @@ def __new__(cls,

def ensure_job_params_are_complete(job_params):
"""For the job, ensure that each param entry is not None."""
for param in 'labels', 'envs', 'inputs', 'outputs', 'mounts':
for param in [
'labels', 'envs', 'inputs', 'outputs', 'mounts', 'input-recursives',
'output-recursives'
]:
if not job_params.get(param):
job_params[param] = set()


def ensure_task_params_are_complete(task_descriptors):
"""For each task, ensure that each task param entry is not None."""
for task_desc in task_descriptors:
for param in 'labels', 'envs', 'inputs', 'outputs':
for param in [
'labels', 'envs', 'inputs', 'outputs', 'input-recursives',
'output-recursives'
]:
if not task_desc.task_params.get(param):
task_desc.task_params[param] = set()

Expand Down Expand Up @@ -833,7 +839,11 @@ def _from_yaml_v0(cls, job):
@classmethod
def from_yaml(cls, yaml_string):
"""Populate and return a JobDescriptor from a YAML string."""
job = yaml.load(yaml_string)
try:
job = yaml.full_load(yaml_string)
except AttributeError:
# For installations that cannot update their PyYAML version
job = yaml.load(yaml_string)

# If the YAML does not contain a top-level dsub version, then assume that
# the string is coming from the local provider, reading an old version of
Expand All @@ -860,11 +870,13 @@ def from_yaml(cls, yaml_string):
job_params['labels'] = cls._label_params_from_dict(job.get('labels', {}))
job_params['envs'] = cls._env_params_from_dict(job.get('envs', {}))
job_params['inputs'] = cls._input_file_params_from_dict(
job.get('inputs', {}), False) | cls._input_file_params_from_dict(
job.get('input-recursives', {}), True)
job.get('inputs', {}), False)
job_params['input-recursives'] = cls._input_file_params_from_dict(
job.get('input-recursives', {}), True)
job_params['outputs'] = cls._output_file_params_from_dict(
job.get('outputs', {}), False) | cls._output_file_params_from_dict(
job.get('output-recursives', {}), True)
job.get('outputs', {}), False)
job_params['output-recursives'] = cls._output_file_params_from_dict(
job.get('output-recursives', {}), True)
job_params['mounts'] = cls._mount_params_from_dict(job.get('mounts', {}))

task_descriptors = []
Expand All @@ -885,11 +897,13 @@ def from_yaml(cls, yaml_string):
task.get('labels', {}))
task_params['envs'] = cls._env_params_from_dict(task.get('envs', {}))
task_params['inputs'] = cls._input_file_params_from_dict(
task.get('inputs', {}), False) | cls._input_file_params_from_dict(
task.get('input-recursives', {}), True)
task.get('inputs', {}), False)
task_params['input-recursives'] = cls._input_file_params_from_dict(
task.get('input-recursives', {}), True)
task_params['outputs'] = cls._output_file_params_from_dict(
task.get('outputs', {}), False) | cls._output_file_params_from_dict(
task.get('output-recursives', {}), True)
task.get('outputs', {}), False)
task_params['output-recursives'] = cls._output_file_params_from_dict(
task.get('output-recursives', {}), True)

task_resources = Resources(logging_path=task.get('logging-path'))

Expand Down
5 changes: 4 additions & 1 deletion dsub/providers/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,10 @@ def get_field(self, field, default=None):
event['endTime'])

value.append(event_value)
elif field in ['user-project', 'script-name', 'script']:
elif field in [
'user-project', 'script-name', 'script', 'input-recursives',
'output-recursives'
]:
# Supported in local and google-v2 providers.
value = None

Expand Down
57 changes: 36 additions & 21 deletions dsub/providers/google_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@
# Name of the data disk
_DATA_DISK_NAME = 'datadisk'

# Define a bash function for "echo" that includes timestamps
_LOG_MSG_FN = textwrap.dedent("""\
function get_datestamp() {
date "+%Y-%m-%d %H:%M:%S"
}
function log_info() {
echo "$(get_datestamp) INFO: $@"
}
function log_error() {
echo "$(get_datestamp) ERROR: $@"
}
""")

# Define a bash function for "gsutil cp" to be used by the logging,
# localization, and delocalization actions.
_GSUTIL_CP_FN = textwrap.dedent("""\
Expand All @@ -86,13 +101,13 @@
local i
for ((i = 0; i < 3; i++)); do
echo "gsutil ${headers} ${user_project_flag} -mq cp \"${src}\" \"${dst}\""
log_info "gsutil ${headers} ${user_project_flag} -mq cp \"${src}\" \"${dst}\""
if gsutil ${headers} ${user_project_flag} -mq cp "${src}" "${dst}"; then
return
fi
done
2>&1 echo "ERROR: gsutil ${headers} ${user_project_flag} -mq cp \"${src}\" \"${dst}\""
2>&1 log_error "gsutil ${headers} ${user_project_flag} -mq cp \"${src}\" \"${dst}\""
exit 1
}
Expand Down Expand Up @@ -132,13 +147,13 @@
local i
for ((i = 0; i < 3; i++)); do
echo "gsutil ${user_project_flag} -mq rsync -r \"${src}\" \"${dst}\""
log_info "gsutil ${user_project_flag} -mq rsync -r \"${src}\" \"${dst}\""
if gsutil ${user_project_flag} -mq rsync -r "${src}" "${dst}"; then
return
fi
done
2>&1 echo "ERROR: gsutil ${user_project_flag} -mq rsync -r \"${src}\" \"${dst}\""
2>&1 log_error "gsutil ${user_project_flag} -mq rsync -r \"${src}\" \"${dst}\""
exit 1
}
""")
Expand Down Expand Up @@ -184,6 +199,7 @@
set -o errexit
set -o nounset
{log_msg_fn}
{log_cp_fn}
while [[ ! -d /google/logs/action/{final_logging_action} ]]; do
Expand All @@ -204,7 +220,7 @@
INPUT_SRC="INPUT_SRC_${i}"
INPUT_DST="INPUT_DST_${i}"
echo "Localizing ${!INPUT_VAR}"
log_info "Localizing ${!INPUT_VAR}"
if [[ "${!INPUT_RECURSIVE}" -eq "1" ]]; then
gsutil_rsync "${!INPUT_SRC}" "${!INPUT_DST}" "${USER_PROJECT}"
else
Expand All @@ -224,7 +240,7 @@
OUTPUT_SRC="OUTPUT_SRC_${i}"
OUTPUT_DST="OUTPUT_DST_${i}"
echo "Delocalizing ${!OUTPUT_VAR}"
log_info "Delocalizing ${!OUTPUT_VAR}"
if [[ "${!OUTPUT_RECURSIVE}" -eq "1" ]]; then
gsutil_rsync "${!OUTPUT_SRC}" "${!OUTPUT_DST}" "${USER_PROJECT}"
else
Expand All @@ -234,6 +250,7 @@
""")

_LOCALIZATION_CMD = textwrap.dedent("""\
{log_msg_fn}
{recursive_cp_fn}
{cp_fn}
Expand Down Expand Up @@ -272,7 +289,7 @@
for ((i=0; i < DIR_COUNT; i++)); do
DIR_VAR="DIR_${i}"
echo "mkdir -m 777 -p \"${!DIR_VAR}\""
log_info "mkdir -m 777 -p \"${!DIR_VAR}\""
mkdir -m 777 -p "${!DIR_VAR}"
done
""")
Expand All @@ -283,6 +300,7 @@
set -o errexit
set -o nounset
{log_msg_fn}
{mk_runtime_dirs}
echo "${{{script_var}}}" \
Expand Down Expand Up @@ -678,6 +696,7 @@ def _build_pipeline_request(self, task_view):
log_cp_cmd=_LOG_CP_CMD.format(
user_action=user_action, logging_action='logging_action'))
continuous_logging_cmd = _CONTINUOUS_LOGGING_CMD.format(
log_msg_fn=_LOG_MSG_FN,
log_cp_fn=_GSUTIL_CP_FN,
log_cp_cmd=_LOG_CP_CMD.format(
user_action=user_action,
Expand All @@ -691,6 +710,7 @@ def _build_pipeline_request(self, task_view):
# and de-localization actions
script_path = os.path.join(providers_util.SCRIPT_DIR, script.name)
prepare_command = _PREPARE_CMD.format(
log_msg_fn=_LOG_MSG_FN,
mk_runtime_dirs=_MK_RUNTIME_DIRS_CMD,
script_var=_SCRIPT_VARNAME,
python_decode_script=_PYTHON_DECODE_SCRIPT,
Expand Down Expand Up @@ -746,6 +766,7 @@ def _build_pipeline_request(self, task_view):
commands=[
'-c',
_LOCALIZATION_CMD.format(
log_msg_fn=_LOG_MSG_FN,
recursive_cp_fn=_GSUTIL_RSYNC_FN,
cp_fn=_GSUTIL_CP_FN,
cp_loop=_LOCALIZATION_LOOP)
Expand All @@ -772,6 +793,7 @@ def _build_pipeline_request(self, task_view):
commands=[
'-c',
_LOCALIZATION_CMD.format(
log_msg_fn=_LOG_MSG_FN,
recursive_cp_fn=_GSUTIL_RSYNC_FN,
cp_fn=_GSUTIL_CP_FN,
cp_loop=_DELOCALIZATION_LOOP)
Expand Down Expand Up @@ -1298,22 +1320,15 @@ def get_field(self, field, default=None):
self._job_descriptor.job_params,
self._job_descriptor.task_descriptors[0].task_params, field)
value = {item.name: item.value for item in items}
elif field == 'inputs':
if self._job_descriptor:
value = {}
for field in ['inputs', 'input-recursives']:
items = providers_util.get_job_and_task_param(
self._job_descriptor.job_params,
self._job_descriptor.task_descriptors[0].task_params, field)
value.update({item.name: item.value for item in items})
elif field == 'outputs':
elif field in [
'inputs', 'outputs', 'input-recursives', 'output-recursives'
]:
if self._job_descriptor:
value = {}
for field in ['outputs', 'output-recursives']:
items = providers_util.get_job_and_task_param(
self._job_descriptor.job_params,
self._job_descriptor.task_descriptors[0].task_params, field)
value.update({item.name: item.value for item in items})
items = providers_util.get_job_and_task_param(
self._job_descriptor.job_params,
self._job_descriptor.task_descriptors[0].task_params, field)
value.update({item.name: item.value for item in items})
elif field == 'mounts':
if self._job_descriptor:
items = providers_util.get_job_and_task_param(
Expand Down
21 changes: 8 additions & 13 deletions dsub/providers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def delete_jobs(self,
with open(os.path.join(task_dir, 'end-time.txt'), 'wt') as f:
f.write(today)
msg = 'Operation canceled at %s\n' % today
with open(os.path.join(task_dir, 'runner-log.txt'), 'a') as f:
with open(os.path.join(task_dir, 'runner-log.txt'), 'at') as f:
f.write(msg)

return (canceled, cancel_errors)
Expand Down Expand Up @@ -634,7 +634,7 @@ def _get_status_from_task_dir(self, task_dir):
def _get_log_detail_from_task_dir(self, task_dir):
try:
with open(os.path.join(task_dir, 'runner-log.txt'), 'r') as f:
return [line.decode('utf-8') for line in f.read().splitlines()]
return [line for line in f.read().splitlines()]
except (IOError, OSError):
return None

Expand Down Expand Up @@ -984,18 +984,13 @@ def get_field(self, field, default=None):
items = providers_util.get_job_and_task_param(job_params, task_params,
field)
value = {item.name: item.value for item in items}
elif field == 'inputs':
value = {}
for field in ['inputs', 'input-recursives']:
items = providers_util.get_job_and_task_param(job_params, task_params,
field)
value.update({item.name: item.value for item in items})
elif field == 'outputs':
elif field in [
'inputs', 'outputs', 'input-recursives', 'output-recursives'
]:
value = {}
for field in ['outputs', 'output-recursives']:
items = providers_util.get_job_and_task_param(job_params, task_params,
field)
value.update({item.name: item.value for item in items})
items = providers_util.get_job_and_task_param(job_params, task_params,
field)
value.update({item.name: item.value for item in items})
elif field == 'mounts':
items = providers_util.get_job_and_task_param(job_params, task_params,
field)
Expand Down
Loading

0 comments on commit 443ce31

Please sign in to comment.