diff --git a/dsub/_dsub_version.py b/dsub/_dsub_version.py index 4b2d29c..ed67ad8 100644 --- a/dsub/_dsub_version.py +++ b/dsub/_dsub_version.py @@ -26,4 +26,4 @@ 0.1.3.dev0 -> 0.1.3 -> 0.1.4.dev0 -> ... """ -DSUB_VERSION = '0.3.0' +DSUB_VERSION = '0.3.1' diff --git a/dsub/commands/dstat.py b/dsub/commands/dstat.py index 52db906..c4bfa85 100755 --- a/dsub/commands/dstat.py +++ b/dsub/commands/dstat.py @@ -143,7 +143,9 @@ def prepare_output(self, row): ('labels', 'Labels', self.format_pairs), ('envs', 'Environment Variables', self.format_pairs), ('inputs', 'Inputs', self.format_pairs), + ('input-recursives', 'Input Recursives', self.format_pairs), ('outputs', 'Outputs', self.format_pairs), + ('output-recursives', 'Output Recursives', self.format_pairs), ('mounts', 'Mounts', self.format_pairs), ('user-project', 'User Project'), ('dsub-version', 'Version'), @@ -301,7 +303,9 @@ def _prepare_row(task, full, summary): row_spec('labels', True, {}), row_spec('envs', True, {}), row_spec('inputs', True, {}), + row_spec('input-recursives', False, {}), row_spec('outputs', True, {}), + row_spec('output-recursives', False, {}), row_spec('mounts', True, {}), row_spec('provider', True, None), row_spec('provider-attributes', True, {}), diff --git a/dsub/commands/dsub.py b/dsub/commands/dsub.py index fcee4e8..74ef45f 100644 --- a/dsub/commands/dsub.py +++ b/dsub/commands/dsub.py @@ -25,6 +25,7 @@ import re import sys import time +import uuid from dateutil.tz import tzlocal from ..lib import dsub_errors @@ -211,6 +212,12 @@ def _parse_arguments(prog, argv): parser.add_argument( '--version', '-v', default=False, help='Print the dsub version and exit.') + parser.add_argument( + '--unique-job-id', + default=False, + action='store_true', + help="""Experimental: create a unique 32 character UUID for the dsub + job-id using https://docs.python.org/3/library/uuid.html.""") parser.add_argument( '--name', help="""Name for pipeline. Defaults to the script name or @@ -551,7 +558,7 @@ def _get_job_resources(args): def _get_job_metadata(provider, user_id, job_name, script, task_ids, - user_project): + user_project, unique_job_id): """Allow provider to extract job-specific metadata from command-line args. Args: @@ -561,6 +568,7 @@ def _get_job_metadata(provider, user_id, job_name, script, task_ids, script: the script to run task_ids: a set of the task-ids for all tasks in the job user_project: name of the project to be billed for the request + unique_job_id: generate a unique job id Returns: A dictionary of job-specific metadata (such as job id, name, etc.) @@ -569,6 +577,8 @@ def _get_job_metadata(provider, user_id, job_name, script, task_ids, user_id = user_id or dsub_util.get_os_user() job_metadata = provider.prepare_job_metadata(script.name, job_name, user_id, create_time) + if unique_job_id: + job_metadata['job-id'] = uuid.uuid4().hex job_metadata['create-time'] = create_time job_metadata['script'] = script @@ -1014,7 +1024,8 @@ def run_main(args): after=args.after, skip=args.skip, project=args.project, - disable_warning=True) + disable_warning=True, + unique_job_id=args.unique_job_id) def run(provider, @@ -1033,7 +1044,8 @@ def run(provider, after=None, skip=False, project=None, - disable_warning=False): + disable_warning=False, + unique_job_id=False): """Actual dsub body, post-stdout-redirection.""" if not dry_run: provider_base.emit_provider_message(provider) @@ -1078,7 +1090,7 @@ def run(provider, # job_metadata such as the job-id, create-time, user-id, etc. # task_resources such as the logging_path (which may include job-id, task-id) job_metadata = _get_job_metadata(provider, user, name, script, task_ids, - user_project) + user_project, unique_job_id) _resolve_task_resources(job_metadata, job_resources, task_descriptors) # Job and task properties are now all resolved. Begin execution! diff --git a/dsub/lib/job_model.py b/dsub/lib/job_model.py index 0214b2c..61184c8 100644 --- a/dsub/lib/job_model.py +++ b/dsub/lib/job_model.py @@ -483,7 +483,10 @@ def __new__(cls, def ensure_job_params_are_complete(job_params): """For the job, ensure that each param entry is not None.""" - for param in 'labels', 'envs', 'inputs', 'outputs', 'mounts': + for param in [ + 'labels', 'envs', 'inputs', 'outputs', 'mounts', 'input-recursives', + 'output-recursives' + ]: if not job_params.get(param): job_params[param] = set() @@ -491,7 +494,10 @@ def ensure_job_params_are_complete(job_params): def ensure_task_params_are_complete(task_descriptors): """For each task, ensure that each task param entry is not None.""" for task_desc in task_descriptors: - for param in 'labels', 'envs', 'inputs', 'outputs': + for param in [ + 'labels', 'envs', 'inputs', 'outputs', 'input-recursives', + 'output-recursives' + ]: if not task_desc.task_params.get(param): task_desc.task_params[param] = set() @@ -833,7 +839,11 @@ def _from_yaml_v0(cls, job): @classmethod def from_yaml(cls, yaml_string): """Populate and return a JobDescriptor from a YAML string.""" - job = yaml.load(yaml_string) + try: + job = yaml.full_load(yaml_string) + except AttributeError: + # For installations that cannot update their PyYAML version + job = yaml.load(yaml_string) # If the YAML does not contain a top-level dsub version, then assume that # the string is coming from the local provider, reading an old version of @@ -860,11 +870,13 @@ def from_yaml(cls, yaml_string): job_params['labels'] = cls._label_params_from_dict(job.get('labels', {})) job_params['envs'] = cls._env_params_from_dict(job.get('envs', {})) job_params['inputs'] = cls._input_file_params_from_dict( - job.get('inputs', {}), False) | cls._input_file_params_from_dict( - job.get('input-recursives', {}), True) + job.get('inputs', {}), False) + job_params['input-recursives'] = cls._input_file_params_from_dict( + job.get('input-recursives', {}), True) job_params['outputs'] = cls._output_file_params_from_dict( - job.get('outputs', {}), False) | cls._output_file_params_from_dict( - job.get('output-recursives', {}), True) + job.get('outputs', {}), False) + job_params['output-recursives'] = cls._output_file_params_from_dict( + job.get('output-recursives', {}), True) job_params['mounts'] = cls._mount_params_from_dict(job.get('mounts', {})) task_descriptors = [] @@ -885,11 +897,13 @@ def from_yaml(cls, yaml_string): task.get('labels', {})) task_params['envs'] = cls._env_params_from_dict(task.get('envs', {})) task_params['inputs'] = cls._input_file_params_from_dict( - task.get('inputs', {}), False) | cls._input_file_params_from_dict( - task.get('input-recursives', {}), True) + task.get('inputs', {}), False) + task_params['input-recursives'] = cls._input_file_params_from_dict( + task.get('input-recursives', {}), True) task_params['outputs'] = cls._output_file_params_from_dict( - task.get('outputs', {}), False) | cls._output_file_params_from_dict( - task.get('output-recursives', {}), True) + task.get('outputs', {}), False) + task_params['output-recursives'] = cls._output_file_params_from_dict( + task.get('output-recursives', {}), True) task_resources = Resources(logging_path=task.get('logging-path')) diff --git a/dsub/providers/google.py b/dsub/providers/google.py index 184f1c9..5777fdd 100644 --- a/dsub/providers/google.py +++ b/dsub/providers/google.py @@ -1019,7 +1019,10 @@ def get_field(self, field, default=None): event['endTime']) value.append(event_value) - elif field in ['user-project', 'script-name', 'script']: + elif field in [ + 'user-project', 'script-name', 'script', 'input-recursives', + 'output-recursives' + ]: # Supported in local and google-v2 providers. value = None diff --git a/dsub/providers/google_v2.py b/dsub/providers/google_v2.py index d3b66cd..c9288cc 100644 --- a/dsub/providers/google_v2.py +++ b/dsub/providers/google_v2.py @@ -65,6 +65,21 @@ # Name of the data disk _DATA_DISK_NAME = 'datadisk' +# Define a bash function for "echo" that includes timestamps +_LOG_MSG_FN = textwrap.dedent("""\ + function get_datestamp() { + date "+%Y-%m-%d %H:%M:%S" + } + + function log_info() { + echo "$(get_datestamp) INFO: $@" + } + + function log_error() { + echo "$(get_datestamp) ERROR: $@" + } +""") + # Define a bash function for "gsutil cp" to be used by the logging, # localization, and delocalization actions. _GSUTIL_CP_FN = textwrap.dedent("""\ @@ -86,13 +101,13 @@ local i for ((i = 0; i < 3; i++)); do - echo "gsutil ${headers} ${user_project_flag} -mq cp \"${src}\" \"${dst}\"" + log_info "gsutil ${headers} ${user_project_flag} -mq cp \"${src}\" \"${dst}\"" if gsutil ${headers} ${user_project_flag} -mq cp "${src}" "${dst}"; then return fi done - 2>&1 echo "ERROR: gsutil ${headers} ${user_project_flag} -mq cp \"${src}\" \"${dst}\"" + 2>&1 log_error "gsutil ${headers} ${user_project_flag} -mq cp \"${src}\" \"${dst}\"" exit 1 } @@ -132,13 +147,13 @@ local i for ((i = 0; i < 3; i++)); do - echo "gsutil ${user_project_flag} -mq rsync -r \"${src}\" \"${dst}\"" + log_info "gsutil ${user_project_flag} -mq rsync -r \"${src}\" \"${dst}\"" if gsutil ${user_project_flag} -mq rsync -r "${src}" "${dst}"; then return fi done - 2>&1 echo "ERROR: gsutil ${user_project_flag} -mq rsync -r \"${src}\" \"${dst}\"" + 2>&1 log_error "gsutil ${user_project_flag} -mq rsync -r \"${src}\" \"${dst}\"" exit 1 } """) @@ -184,6 +199,7 @@ set -o errexit set -o nounset + {log_msg_fn} {log_cp_fn} while [[ ! -d /google/logs/action/{final_logging_action} ]]; do @@ -204,7 +220,7 @@ INPUT_SRC="INPUT_SRC_${i}" INPUT_DST="INPUT_DST_${i}" - echo "Localizing ${!INPUT_VAR}" + log_info "Localizing ${!INPUT_VAR}" if [[ "${!INPUT_RECURSIVE}" -eq "1" ]]; then gsutil_rsync "${!INPUT_SRC}" "${!INPUT_DST}" "${USER_PROJECT}" else @@ -224,7 +240,7 @@ OUTPUT_SRC="OUTPUT_SRC_${i}" OUTPUT_DST="OUTPUT_DST_${i}" - echo "Delocalizing ${!OUTPUT_VAR}" + log_info "Delocalizing ${!OUTPUT_VAR}" if [[ "${!OUTPUT_RECURSIVE}" -eq "1" ]]; then gsutil_rsync "${!OUTPUT_SRC}" "${!OUTPUT_DST}" "${USER_PROJECT}" else @@ -234,6 +250,7 @@ """) _LOCALIZATION_CMD = textwrap.dedent("""\ + {log_msg_fn} {recursive_cp_fn} {cp_fn} @@ -272,7 +289,7 @@ for ((i=0; i < DIR_COUNT; i++)); do DIR_VAR="DIR_${i}" - echo "mkdir -m 777 -p \"${!DIR_VAR}\"" + log_info "mkdir -m 777 -p \"${!DIR_VAR}\"" mkdir -m 777 -p "${!DIR_VAR}" done """) @@ -283,6 +300,7 @@ set -o errexit set -o nounset + {log_msg_fn} {mk_runtime_dirs} echo "${{{script_var}}}" \ @@ -678,6 +696,7 @@ def _build_pipeline_request(self, task_view): log_cp_cmd=_LOG_CP_CMD.format( user_action=user_action, logging_action='logging_action')) continuous_logging_cmd = _CONTINUOUS_LOGGING_CMD.format( + log_msg_fn=_LOG_MSG_FN, log_cp_fn=_GSUTIL_CP_FN, log_cp_cmd=_LOG_CP_CMD.format( user_action=user_action, @@ -691,6 +710,7 @@ def _build_pipeline_request(self, task_view): # and de-localization actions script_path = os.path.join(providers_util.SCRIPT_DIR, script.name) prepare_command = _PREPARE_CMD.format( + log_msg_fn=_LOG_MSG_FN, mk_runtime_dirs=_MK_RUNTIME_DIRS_CMD, script_var=_SCRIPT_VARNAME, python_decode_script=_PYTHON_DECODE_SCRIPT, @@ -746,6 +766,7 @@ def _build_pipeline_request(self, task_view): commands=[ '-c', _LOCALIZATION_CMD.format( + log_msg_fn=_LOG_MSG_FN, recursive_cp_fn=_GSUTIL_RSYNC_FN, cp_fn=_GSUTIL_CP_FN, cp_loop=_LOCALIZATION_LOOP) @@ -772,6 +793,7 @@ def _build_pipeline_request(self, task_view): commands=[ '-c', _LOCALIZATION_CMD.format( + log_msg_fn=_LOG_MSG_FN, recursive_cp_fn=_GSUTIL_RSYNC_FN, cp_fn=_GSUTIL_CP_FN, cp_loop=_DELOCALIZATION_LOOP) @@ -1298,22 +1320,15 @@ def get_field(self, field, default=None): self._job_descriptor.job_params, self._job_descriptor.task_descriptors[0].task_params, field) value = {item.name: item.value for item in items} - elif field == 'inputs': - if self._job_descriptor: - value = {} - for field in ['inputs', 'input-recursives']: - items = providers_util.get_job_and_task_param( - self._job_descriptor.job_params, - self._job_descriptor.task_descriptors[0].task_params, field) - value.update({item.name: item.value for item in items}) - elif field == 'outputs': + elif field in [ + 'inputs', 'outputs', 'input-recursives', 'output-recursives' + ]: if self._job_descriptor: value = {} - for field in ['outputs', 'output-recursives']: - items = providers_util.get_job_and_task_param( - self._job_descriptor.job_params, - self._job_descriptor.task_descriptors[0].task_params, field) - value.update({item.name: item.value for item in items}) + items = providers_util.get_job_and_task_param( + self._job_descriptor.job_params, + self._job_descriptor.task_descriptors[0].task_params, field) + value.update({item.name: item.value for item in items}) elif field == 'mounts': if self._job_descriptor: items = providers_util.get_job_and_task_param( diff --git a/dsub/providers/local.py b/dsub/providers/local.py index c06033f..3e12e52 100644 --- a/dsub/providers/local.py +++ b/dsub/providers/local.py @@ -464,7 +464,7 @@ def delete_jobs(self, with open(os.path.join(task_dir, 'end-time.txt'), 'wt') as f: f.write(today) msg = 'Operation canceled at %s\n' % today - with open(os.path.join(task_dir, 'runner-log.txt'), 'a') as f: + with open(os.path.join(task_dir, 'runner-log.txt'), 'at') as f: f.write(msg) return (canceled, cancel_errors) @@ -634,7 +634,7 @@ def _get_status_from_task_dir(self, task_dir): def _get_log_detail_from_task_dir(self, task_dir): try: with open(os.path.join(task_dir, 'runner-log.txt'), 'r') as f: - return [line.decode('utf-8') for line in f.read().splitlines()] + return [line for line in f.read().splitlines()] except (IOError, OSError): return None @@ -984,18 +984,13 @@ def get_field(self, field, default=None): items = providers_util.get_job_and_task_param(job_params, task_params, field) value = {item.name: item.value for item in items} - elif field == 'inputs': - value = {} - for field in ['inputs', 'input-recursives']: - items = providers_util.get_job_and_task_param(job_params, task_params, - field) - value.update({item.name: item.value for item in items}) - elif field == 'outputs': + elif field in [ + 'inputs', 'outputs', 'input-recursives', 'output-recursives' + ]: value = {} - for field in ['outputs', 'output-recursives']: - items = providers_util.get_job_and_task_param(job_params, task_params, - field) - value.update({item.name: item.value for item in items}) + items = providers_util.get_job_and_task_param(job_params, task_params, + field) + value.update({item.name: item.value for item in items}) elif field == 'mounts': items = providers_util.get_job_and_task_param(job_params, task_params, field) diff --git a/test/integration/e2e_io_recursive.sh b/test/integration/e2e_io_recursive.sh index 9b74e47..17754fe 100755 --- a/test/integration/e2e_io_recursive.sh +++ b/test/integration/e2e_io_recursive.sh @@ -70,7 +70,7 @@ if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 ]]; then echo "Launching pipeline..." - run_dsub \ + JOB_ID="$(run_dsub \ --image "google/cloud-sdk:latest" \ --script "${SCRIPT_DIR}/script_io_recursive.sh" \ --env FILE_CONTENTS="${FILE_CONTENTS}" \ @@ -78,8 +78,7 @@ if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 ]]; then --input-recursive INPUT_PATH_DEEP="${INPUTS}/deep/" \ --output OUTPUT_PATH_SHALLOW="${OUTPUTS}/shallow/*" \ --output-recursive OUTPUT_PATH_DEEP="${OUTPUTS}/deep/" \ - --wait - + --wait)" fi echo @@ -163,4 +162,17 @@ fi echo echo "GCS output file list matches expected" +# Verify dstat +if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 && "${DSUB_PROVIDER}" != "google" ]]; then + if ! DSTAT_OUTPUT="$(run_dstat --status '*' --full --jobs "${JOB_ID}")"; then + echo "dstat exited with a non-zero exit code!" + echo "Output:" + echo "${DSTAT_OUTPUT}" + exit 1 + fi + util::dstat_yaml_assert_field_equal "${DSTAT_OUTPUT}" "[0].inputs" "{'INPUT_PATH_SHALLOW': '${INPUTS}/shallow/*'}" + util::dstat_yaml_assert_field_equal "${DSTAT_OUTPUT}" "[0].outputs" "{'OUTPUT_PATH_SHALLOW': '${OUTPUTS}/shallow/*'}" + util::dstat_yaml_assert_field_equal "${DSTAT_OUTPUT}" "[0].input-recursives" "{'INPUT_PATH_DEEP': '${INPUTS}/deep/'}" + util::dstat_yaml_assert_field_equal "${DSTAT_OUTPUT}" "[0].output-recursives" "{'OUTPUT_PATH_DEEP': '${OUTPUTS}/deep/'}" +fi echo "SUCCESS" diff --git a/test/integration/e2e_logging_paths_basic_tasks.sh b/test/integration/e2e_logging_paths_basic_tasks.sh index 4198b33..ae52b7f 100755 --- a/test/integration/e2e_logging_paths_basic_tasks.sh +++ b/test/integration/e2e_logging_paths_basic_tasks.sh @@ -30,17 +30,14 @@ source "${SCRIPT_DIR}/logging_paths_tasks_setup.sh" readonly LOGGING_BASE="$(dirname "${LOGGING}")" declare LOGGING_OVERRIDE -readonly JOB_NAME="log-tasks" +readonly JOB_NAME=$(logging_paths_tasks_setup::get_job_name) # Set up the tasks file logging_paths_tasks_setup::write_tasks_file # Launch the job LOGGING_OVERRIDE="${LOGGING_BASE}" -JOB_ID=$(run_dsub \ - --name "${JOB_NAME}" \ - --tasks "${TASKS_FILE}" \ - --command 'echo "Test"') +JOB_ID=$(logging_paths_tasks_setup::run_dsub) # Verify output LOGGING_PATH=$(logging_paths_tasks_setup::dstat_get_logging "${JOB_ID}" "1") diff --git a/test/integration/e2e_logging_paths_log_suffix_tasks.sh b/test/integration/e2e_logging_paths_log_suffix_tasks.sh index fb56969..3005ad2 100755 --- a/test/integration/e2e_logging_paths_log_suffix_tasks.sh +++ b/test/integration/e2e_logging_paths_log_suffix_tasks.sh @@ -30,17 +30,14 @@ source "${SCRIPT_DIR}/logging_paths_tasks_setup.sh" readonly LOGGING_BASE="$(dirname "${LOGGING}")" declare LOGGING_OVERRIDE -readonly JOB_NAME="log-tasks" +readonly JOB_NAME=$(logging_paths_tasks_setup::get_job_name) # Set up the tasks file logging_paths_tasks_setup::write_tasks_file # Launch the job LOGGING_OVERRIDE="${LOGGING_BASE}" -JOB_ID=$(run_dsub \ - --name "${JOB_NAME}" \ - --tasks "${TASKS_FILE}" \ - --command 'echo "Test"') +JOB_ID=$(logging_paths_tasks_setup::run_dsub) # Verify output LOGGING_PATH=$(logging_paths_tasks_setup::dstat_get_logging "${JOB_ID}" "1") diff --git a/test/integration/e2e_logging_paths_pattern_tasks.sh b/test/integration/e2e_logging_paths_pattern_tasks.sh index a986618..0150fc0 100755 --- a/test/integration/e2e_logging_paths_pattern_tasks.sh +++ b/test/integration/e2e_logging_paths_pattern_tasks.sh @@ -30,7 +30,7 @@ source "${SCRIPT_DIR}/logging_paths_tasks_setup.sh" readonly LOGGING_BASE="$(dirname "${LOGGING}")" declare LOGGING_OVERRIDE -readonly JOB_NAME="log-tasks" +readonly JOB_NAME=$(logging_paths_tasks_setup::get_job_name) readonly JOB_USER="${USER}" # Set up the tasks file @@ -38,11 +38,8 @@ logging_paths_tasks_setup::write_tasks_file # Launch the job LOGGING_OVERRIDE="${LOGGING_BASE}/{user-id}/{job-name}.{task-id}.test.log" -JOB_ID=$(run_dsub \ - --name "${JOB_NAME}" \ - --user "${JOB_USER}" \ - --tasks "${TASKS_FILE}" \ - --command 'echo "Test"') +JOB_ID=$(logging_paths_tasks_setup::run_dsub \ + --user "${JOB_USER}") # Verify output LOGGING_PATH=$(logging_paths_tasks_setup::dstat_get_logging "${JOB_ID}" "1") diff --git a/test/integration/e2e_logging_paths_retry_failure_tasks.sh b/test/integration/e2e_logging_paths_retry_failure_tasks.sh index cf82681..e6902ca 100755 --- a/test/integration/e2e_logging_paths_retry_failure_tasks.sh +++ b/test/integration/e2e_logging_paths_retry_failure_tasks.sh @@ -30,19 +30,17 @@ source "${SCRIPT_DIR}/logging_paths_tasks_setup.sh" readonly LOGGING_BASE="$(dirname "${LOGGING}")" declare LOGGING_OVERRIDE -readonly JOB_NAME="log-tasks" +readonly JOB_NAME=$(logging_paths_tasks_setup::get_job_name) # Set up the tasks file logging_paths_tasks_setup::write_tasks_file # Launch the job LOGGING_OVERRIDE="${LOGGING_BASE}" -JOB_ID=$(run_dsub \ - --name "${JOB_NAME}" \ - --tasks "${TASKS_FILE}" \ +JOB_ID=$(logging_paths_tasks_setup::run_dsub \ --retries 1 \ --wait \ - --command 'false' || true) + --command 'false') || true # Verify output for task_attempt in 1 2; do diff --git a/test/integration/e2e_logging_paths_retry_tasks.sh b/test/integration/e2e_logging_paths_retry_tasks.sh index 587ec37..0823f8c 100755 --- a/test/integration/e2e_logging_paths_retry_tasks.sh +++ b/test/integration/e2e_logging_paths_retry_tasks.sh @@ -30,19 +30,16 @@ source "${SCRIPT_DIR}/logging_paths_tasks_setup.sh" readonly LOGGING_BASE="$(dirname "${LOGGING}")" declare LOGGING_OVERRIDE -readonly JOB_NAME="log-tasks" +readonly JOB_NAME=$(logging_paths_tasks_setup::get_job_name) # Set up the tasks file logging_paths_tasks_setup::write_tasks_file # Launch the job LOGGING_OVERRIDE="${LOGGING_BASE}" -JOB_ID=$(run_dsub \ - --name "${JOB_NAME}" \ - --tasks "${TASKS_FILE}" \ +JOB_ID=$(logging_paths_tasks_setup::run_dsub \ --retries 1 \ - --wait \ - --command 'echo "Test"') + --wait) # Verify output LOGGING_PATH=$(logging_paths_tasks_setup::dstat_get_logging "${JOB_ID}" "1") diff --git a/test/integration/get_data_value.py b/test/integration/get_data_value.py index af95f02..8ea4f02 100644 --- a/test/integration/get_data_value.py +++ b/test/integration/get_data_value.py @@ -83,7 +83,11 @@ def main(): if doc_type == 'json': data = json.loads(doc_string) elif doc_type == 'yaml': - data = yaml.load(doc_string) + try: + data = yaml.full_load(doc_string) + except AttributeError: + # For installations that cannot update their PyYAML version + data = yaml.load(doc_string) else: raise ValueError('Unsupported doc type: %s' % doc_type) diff --git a/test/integration/io_setup.sh b/test/integration/io_setup.sh index e218dba..18571bd 100644 --- a/test/integration/io_setup.sh +++ b/test/integration/io_setup.sh @@ -27,7 +27,8 @@ readonly REQUESTER_PAYS_INPUT_BAM_FULL_PATH="gs://${DSUB_BUCKET_REQUESTER_PAYS}/ readonly REQUESTER_PAYS_POPULATION_FILE_FULL_PATH="gs://${DSUB_BUCKET_REQUESTER_PAYS}/${POPULATION_FILE}" # This is the image we use to test the PD mount feature. -readonly TEST_IMAGE_NAME="dsub-e2e-test-image" +# Inject the TEST_TOKEN into the name so that multiple test can run concurrently. +readonly TEST_IMAGE_NAME="dsub-e2e-test-image-$(echo ${TEST_TOKEN} | tr '_' '-')" readonly TEST_IMAGE_GCS_LOCATION="gs://dsub-test-e2e-bucket/dsub-test-image.tar.gz" readonly TEST_IMAGE_URL="https://www.googleapis.com/compute/v1/projects/${PROJECT_ID}/global/images/${TEST_IMAGE_NAME}" @@ -67,6 +68,7 @@ readonly -f io_setup::image_setup function io_setup::run_dsub_requester_pays() { run_dsub \ + --unique-job-id \ ${IMAGE:+--image "${IMAGE}"} \ --user-project "$PROJECT_ID" \ --script "${SCRIPT_DIR}/script_io_test.sh" \ @@ -84,6 +86,7 @@ function io_setup::run_dsub_with_mount() { local mount_point="${1}" run_dsub \ + --unique-job-id \ ${IMAGE:+--image "${IMAGE}"} \ --script "${SCRIPT_DIR}/script_io_test.sh" \ --env TASK_ID="task" \ @@ -99,6 +102,7 @@ readonly -f io_setup::run_dsub_with_mount function io_setup::run_dsub() { run_dsub \ + --unique-job-id \ ${IMAGE:+--image "${IMAGE}"} \ --script "${SCRIPT_DIR}/script_io_test.sh" \ --env TASK_ID="task" \ diff --git a/test/integration/logging_paths_tasks_setup.sh b/test/integration/logging_paths_tasks_setup.sh index 27cdaaa..112f975 100644 --- a/test/integration/logging_paths_tasks_setup.sh +++ b/test/integration/logging_paths_tasks_setup.sh @@ -18,6 +18,15 @@ readonly LOGGING_PATHS_TASKS_FILE_TMPL="${TEST_DIR}/logging_paths_tasks.tsv.tmpl" +# Several of the logging paths tests specifically test the log file name +# generated, which includes the job-id which is based on the job-name, +# thus we cannot use the new --unique-job-id flag when launching these +# test jobs. +# +# This leads to flaky tests as sometimes jobs are launched concurrently +# and generate the same job identifier. +readonly LOGGING_PATHS_UNIQUE_ID="$(uuidgen)" + function logging_paths_tasks_setup::write_tasks_file() { cat "${LOGGING_PATHS_TASKS_FILE_TMPL}" \ | util::expand_tsv_fields \ @@ -25,6 +34,29 @@ function logging_paths_tasks_setup::write_tasks_file() { } readonly -f logging_paths_tasks_setup::write_tasks_file +function logging_paths_tasks_setup::get_job_name() { + # Generate a job name from the test replacing "logging_paths" with "lp" + # + # dsub turns the job name into a google label and turns the underscores + # into labels, so let's start with our job names in that form. + # + # Truncate the test name at 10 characters, since that is what dsub will do + # when it generates the job-id and these logging_paths_* tests are + # specifically checking that the output log file name is generated correctly. + echo "lp_${TEST_NAME#logging_paths_}" | tr '_' '-' | cut -c1-10 +} +readonly -f logging_paths_tasks_setup::get_job_name + +function logging_paths_tasks_setup::run_dsub() { + run_dsub \ + --name "${JOB_NAME}" \ + --tasks "${TASKS_FILE}" \ + --command 'echo "Test"' \ + --label unique_id="${LOGGING_PATHS_UNIQUE_ID}" \ + "${@}" +} +readonly -f logging_paths_tasks_setup::run_dsub + function logging_paths_tasks_setup::dstat_get_logging() { local job_id="${1}" local task_id="${2}" @@ -32,6 +64,7 @@ function logging_paths_tasks_setup::dstat_get_logging() { local dstat_out=$(\ run_dstat \ --jobs "${job_id}" \ + --label unique_id="${LOGGING_PATHS_UNIQUE_ID}" \ --status "*" \ --full \ --format json) diff --git a/test/integration/test_setup_e2e.py b/test/integration/test_setup_e2e.py index d794f66..ab91925 100644 --- a/test/integration/test_setup_e2e.py +++ b/test/integration/test_setup_e2e.py @@ -82,7 +82,8 @@ def _environ(): print("Checking configured gcloud project") PROJECT_ID = subprocess.check_output( 'gcloud config list core/project --format="value(core.project)"', - shell=True).strip() + shell=True, + universal_newlines=True).strip() if not PROJECT_ID: print("Your project ID could not be determined.") diff --git a/test/run_tests.sh b/test/run_tests.sh index a9a0d10..cbbb192 100755 --- a/test/run_tests.sh +++ b/test/run_tests.sh @@ -209,15 +209,7 @@ function get_test_providers() { return fi - case "${test_file}" in - e2e_requester_pays_buckets.sh | \ - e2e_image.sh) - echo -n "local google-v2" - return - ;; - esac - - echo -n "local google google-v2" + echo -n "local google-v2" } readonly -f get_test_providers @@ -309,6 +301,11 @@ for TEST_TYPE in "${TESTS[@]}"; do # Run tests in test/unit if [[ "${TEST_TYPE}" == "pythonunit_*.py" ]]; then + if [[ ${NO_PY_MODULE_TESTS:-0} -eq 1 ]]; then + echo "Test test/unit/*: SKIPPED" + continue + fi + start_test "test/unit" # for unit tests, also include the Python unit tests @@ -332,6 +329,13 @@ for TEST_TYPE in "${TESTS[@]}"; do fi for TEST in "${TEST_LIST[@]}"; do + if [[ ${NO_PY_MODULE_TESTS:-0} -eq 1 ]]; then + if [[ ${TEST} == *.py ]]; then + echo "Test ${TEST}: SKIPPED" + continue + fi + fi + PROVIDER_LIST="$(get_test_providers "${TEST}")" # If the user has supplied a DSUB_PROVIDER, then override the PROVIDER_LIST, diff --git a/test/setup_virtualenv b/test/setup_virtualenv index 8c9e32d..e5911d6 100644 --- a/test/setup_virtualenv +++ b/test/setup_virtualenv @@ -25,7 +25,7 @@ else fi readonly PY_VERSION -if [[ "${PY_VERSION}" -eq "python2" ]]; then +if [[ "${PY_VERSION}" == "python2" ]]; then LIBS_FOLDER="dsub_libs" else LIBS_FOLDER="dsub_libs$VERSION_ARG" diff --git a/test/unit/job_model_test.py b/test/unit/job_model_test.py index 18f8573..ebd3fb0 100644 --- a/test/unit/job_model_test.py +++ b/test/unit/job_model_test.py @@ -179,19 +179,23 @@ def testScriptCreation(self): 'labels': set(), 'inputs': set(), 'outputs': set(), + 'input-recursives': set(), + 'output-recursives': set(), 'mounts': set(), }, task_descriptors=[ job_model.TaskDescriptor( task_metadata={'task-id': None}, task_resources=job_model.Resources( - logging_path= - 'gs://b/dsub/sh/local/env_list/env_list/logging/env_list.log'), + logging_path='gs://b/dsub/sh/local/env_list/env_list/logging/env_list.log' # pylint: disable=line-too-long + ), task_params={ 'envs': set(), 'labels': set(), 'inputs': set(), 'outputs': set(), + 'input-recursives': set(), + 'output-recursives': set(), }) ]) @@ -282,6 +286,8 @@ def testScriptCreation(self): 'gs://b/dsub/sh/local/io_tasks/output/*', recursive=False), }, + 'input-recursives': set(), + 'output-recursives': set(), 'mounts': set(), }, task_descriptors=[ @@ -306,6 +312,8 @@ def testScriptCreation(self): 'gs://b/dsub/sh/local/io_tasks/output/3/*.md5', recursive=False), }, + 'input-recursives': set(), + 'output-recursives': set(), }) ]) @@ -372,38 +380,44 @@ def testScriptCreation(self): 'envs': {job_model.EnvParam('FILE_CONTENTS', 'Test file contents'),}, 'labels': set(), 'inputs': { - job_model.InputFileParam( - 'INPUT_PATH_DEEP', - 'gs://b/dsub/sh/local/io_recursive/input/deep/', - recursive=True), job_model.InputFileParam( 'INPUT_PATH_SHALLOW', 'gs://b/dsub/sh/local/io_recursive/input/shallow/*', recursive=False), }, - 'outputs': { - job_model.OutputFileParam( - 'OUTPUT_PATH_DEEP', - 'gs://b/dsub/sh/local/io_recursive/output/deep/', + 'input-recursives': { + job_model.InputFileParam( + 'INPUT_PATH_DEEP', + 'gs://b/dsub/sh/local/io_recursive/input/deep/', recursive=True), + }, + 'outputs': { job_model.OutputFileParam( 'OUTPUT_PATH_SHALLOW', 'gs://b/dsub/sh/local/io_recursive/output/shallow/*', recursive=False), }, + 'output-recursives': { + job_model.OutputFileParam( + 'OUTPUT_PATH_DEEP', + 'gs://b/dsub/sh/local/io_recursive/output/deep/', + recursive=True), + }, 'mounts': set(), }, task_descriptors=[ job_model.TaskDescriptor( task_metadata={'task-id': None}, task_resources=job_model.Resources( - logging_path= - 'gs://b/dsub/sh/local/io_recursive/logging/io_recursive.log'), + logging_path='gs://b/dsub/sh/local/io_recursive/logging/io_recursive.log' + ), task_params={ 'envs': set(), 'labels': set(), 'inputs': set(), 'outputs': set(), + 'input-recursives': set(), + 'output-recursives': set(), }) ]) @@ -467,6 +481,8 @@ def testScriptCreation(self): 'labels': {job_model.LabelParam('batch', 'hello-world')}, 'inputs': set(), 'outputs': set(), + 'input-recursives': set(), + 'output-recursives': set(), 'mounts': set(), }, task_descriptors=[ @@ -480,6 +496,8 @@ def testScriptCreation(self): 'labels': {job_model.LabelParam('item-number', '2')}, 'inputs': set(), 'outputs': set(), + 'input-recursives': set(), + 'output-recursives': set(), }) ]) @@ -549,10 +567,12 @@ def test_from_yaml_local(self, unused_name, yaml_string, expected_descriptor): src_params[param_type] = set() # local meta.yaml did not mark recursive inputs/outputs as recursive - for param_type in 'inputs', 'outputs': - for param in dst_params[param_type]: - dst_params[param_type].remove(param) + for param_type, recursive_param_type in zip( + ['inputs', 'outputs'], ['input-recursives', 'output-recursives']): + for param in dst_params[recursive_param_type]: dst_params[param_type].add(param._replace(recursive=False)) + dst_params['input-recursives'] = set() + dst_params['output-recursives'] = set() self.assert_job_descriptors_equal(actual, expected)