Skip to content
This repository has been archived by the owner on May 14, 2024. It is now read-only.

Feature/add driver core limit option #44 #140

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
8 changes: 7 additions & 1 deletion SystemTests/roles/robot/files/k8s_methods.robot
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ Submit SparkPi Job
${response}= Post Request With Json Body /piezo/submitjob ${submitbody}
[return] ${response}

Submit SparkPi Job With Optional Parameters
[Arguments] ${job_name}
${submitbody}= Create Dictionary name=${job_name} language=Python python_version=2 path_to_main_app_file=s3a://kubernetes/inputs/pi.py driver_cores=0.2 driver_core_limit=1.4 driver_memory=1024m executors=2 executor_cores=1 executor_core_limit=4.5 executor_memory=1024m label=systemTest
${response}= Post Request With Json Body /piezo/submitjob ${submitbody}
[return] ${response}

Submit SparkPi Python3 Job
[Arguments] ${job_name}
${submitbody}= Create Dictionary name=${job_name} language=Python python_version=3 path_to_main_app_file=s3a://kubernetes/inputs/pi.py label=systemTest
Expand Down Expand Up @@ -113,7 +119,7 @@ Tidy jobs


Wait For Spark Job To Finish
[Arguments] ${job_name} ${step_size}
[Arguments] ${job_name}
:For ${i} IN RANGE 0 ${JOB_FINISH_CHECK_STEPS}
\ Sleep ${JOB_FINISH_CHECK_INTERVAL}
\ ${response}= Get Status Of Spark Job ${job_name}
Expand Down
10 changes: 10 additions & 0 deletions SystemTests/roles/robot/files/piezo.robot
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ Can Run Python3 Jobs
${finished}= Wait For Spark Job To Finish ${job_name}
Should Be True ${finished}

Can Run Spark Job Specifying All Optional Arguments
${response}= Submit SparkPi Job With Optional Parameters spark-pi-params
Confirm Ok Response ${response}
${job_name}= Get Response Job Name ${response}
Should Match Regexp ${job_name} spark-params-[a-z0-9]{5}
${message}= Get Response Data Message ${response}
Should Be Equal As Strings ${message} Job driver created successfully
${finished}= Wait For Spark Job To Finish ${job_name}
Should Be True ${finished}

Submit Input Args Job With Arguments Returns Ok Response
${response}= Submit InputArgs Job With Arguments input-args-test
Confirm Ok Response ${response}
Expand Down
14 changes: 14 additions & 0 deletions piezo_web_app/PiezoWebApp/example_validation_rules.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@
"minimum": 0.1,
"maximum": 1
},
{
"input_name": "driver_core_limit",
"classification": "Optional",
"default": "1000m",
"minimum": 1,
"maximum": 3
},
{
"input_name": "driver_memory",
"classification": "Optional",
Expand All @@ -123,6 +130,13 @@
"minimum": 1,
"maximum": 4
},
{
"input_name": "executor_core_limit",
"classification": "Optional",
"default": "4000m",
"minimum": 4,
"maximum": 8
},
{
"input_name": "executor_memory",
"classification": "Optional",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ def validate(key, value, validation_rule):
return _validate_string_from_list(key, value, validation_rule)
if key in ["executors", "executor_cores"]:
return _validate_integer(key, value, validation_rule)
if key in ["driver_cores", "driver_core_limit"]:
if key in ["driver_cores"]:
return _validate_multiple_of_a_tenth(key, value, validation_rule)
if key in ["driver_memory", "executor_memory"]:
return _validate_byte_quantity(key, value, validation_rule)
if key in ["driver_core_limit", "executor_core_limit"]:
return _validate_core_limit(key, value, validation_rule)
if key in ["arguments"]:
return ValidationResult(True, None, value)
raise ValueError(f"Unexpected argument {key}")
Expand Down Expand Up @@ -143,3 +145,10 @@ def _validate_byte_quantity(key, value, validation_rule):
f'"{key}" input must be in range [{validation_rule.minimum}m, {validation_rule.maximum}m]',
None
)


def _validate_core_limit(key, value, validation_rule):
result_of_tenth = _validate_multiple_of_a_tenth(key, value, validation_rule)
if result_of_tenth.is_valid is True:
return ValidationResult(True, None, str(int(result_of_tenth.validated_value * 1000)) + "m")
robert-clegg-tessella marked this conversation as resolved.
Show resolved Hide resolved
return result_of_tenth
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ def __init__(self, configuration, validation_rules):
self._spec_restart_submission_failure_retry_interval = \
self._validation_rules.get_default_value_for_key("on_submission_failure_retry_interval")
self._spec_driver_cores = self._validation_rules.get_default_value_for_key("driver_cores")
self._spec_driver_core_limit = self._validation_rules.get_default_value_for_key("driver_core_limit")
self._spec_driver_memory = self._validation_rules.get_default_value_for_key("driver_memory")
self._spec_driver_label_version = self._validation_rules.get_default_value_for_key("spark_version")
self._spec_driver_service_account = self._validation_rules.get_default_value_for_key("service_account")
self._spec_executor_instances = self._validation_rules.get_default_value_for_key("executors")
self._spec_executor_cores = self._validation_rules.get_default_value_for_key("executor_cores")
self._spec_executor_core_limit = self._validation_rules.get_default_value_for_key("executor_core_limit")
self._spec_executor_memory = self._validation_rules.get_default_value_for_key("executor_memory")
self._spec_executor_label_version = self._validation_rules.get_default_value_for_key("spark_version")
self._monitoring_java_agent = self._validation_rules.get_default_value_for_key("java_agent")
Expand Down Expand Up @@ -81,6 +83,7 @@ def _default_spark_application_manifest(self):
],
"driver": {
"cores": self._spec_driver_cores,
"coreLimit": self._spec_driver_core_limit,
"memory": self._spec_driver_memory,
"labels": {
"version": self._spec_driver_label_version},
Expand All @@ -95,6 +98,7 @@ def _default_spark_application_manifest(self):
},
"executor": {
"cores": self._spec_executor_cores,
"coreLimit": self._spec_executor_core_limit,
"instances": self._spec_executor_instances,
"memory": self._spec_executor_memory,
"labels": {
Expand Down Expand Up @@ -125,9 +129,11 @@ def _variable_to_manifest_path(var):
"main_class": ["spec", "mainClass"],
"path_to_main_app_file": ["spec", "mainApplicationFile"],
"driver_cores": ["spec", "driver", "cores"],
"driver_core_limit": ["spec", "driver", "coreLimit"],
"driver_memory": ["spec", "driver", "memory"],
"executors": ["spec", "executor", "instances"],
"executor_cores": ["spec", "executor", "cores"],
"executor_core_limit": ["spec", "executor", "coreLimit"],
"executor_memory": ["spec", "executor", "memory"]
}
return var_to_path_dict[var]
3 changes: 3 additions & 0 deletions piezo_web_app/PiezoWebApp/tests/handlers/submit_job_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ def test_post_accepts_optional_inputs(self):
'language': 'test-language',
'path_to_main_app_file': '/path/to/main/app.file',
'driver_cores': '1',
'driver_core_limit': '1000m',
'driver_memory': '1024m',
'executors': '5',
'executor_core_limit': '4000m',
'arguments': ["arg1", "arg2", 10],
'lable': 'my_label'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@
"minimum": 0.1,
"maximum": 1
},
{
"input_name": "driver_core_limit",
"classification": "Optional",
"default": "1000m",
"minimum": 1,
"maximum": 3
},
{
"input_name": "driver_memory",
"classification": "Optional",
Expand All @@ -123,6 +130,13 @@
"minimum": 1,
"maximum": 4
},
{
"input_name": "executor_core_limit",
"classification": "Optional",
"default": "4000m",
"minimum": 4,
"maximum": 8
},
{
"input_name": "executor_memory",
"classification": "Optional",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def test_correct_python_job_is_submitted_correctly(self):
],
'driver': {
'cores': 0.1,
'coreLimit': '1000m',
'memory': '512m',
'labels': {'version': '2.4.0'},
'serviceAccount': 'spark',
Expand All @@ -103,6 +104,7 @@ def test_correct_python_job_is_submitted_correctly(self):
},
'executor': {
'cores': 1,
'coreLimit': '4000m',
'instances': 1,
'memory': '512m',
'labels': {'version': '2.4.0'},
Expand Down Expand Up @@ -199,6 +201,7 @@ def test_correct_scala_job_is_submitted_correctly(self):
],
'driver': {
'cores': 0.1,
'coreLimit': '1000m',
'memory': '512m',
'labels': {'version': '2.4.0'},
'serviceAccount': 'spark',
Expand All @@ -212,6 +215,7 @@ def test_correct_scala_job_is_submitted_correctly(self):
},
'executor': {
'cores': 1,
'coreLimit': '4000m',
'instances': 1,
'memory': '512m',
'labels': {'version': '2.4.0'},
Expand Down Expand Up @@ -279,9 +283,11 @@ def test_all_optional_inputs_defined_to_maximum_succeeds(self):
'path_to_main_app_file': '/path_to/file',
'python_version': '2',
'driver_cores': '1',
'driver_core_limit': '3',
'driver_memory': '2048m',
'executors': '10',
'executor_cores': '4',
'executor_core_limit': '8',
'executor_memory': '4096m',
'label': 'my-label'
}
Expand Down Expand Up @@ -336,6 +342,7 @@ def test_all_optional_inputs_defined_to_maximum_succeeds(self):
],
'driver': {
'cores': 1.0,
'coreLimit': '3000m',
'memory': '2048m',
'labels': {'version': '2.4.0'},
'serviceAccount': 'spark',
Expand All @@ -349,6 +356,7 @@ def test_all_optional_inputs_defined_to_maximum_succeeds(self):
},
'executor': {
'cores': 4,
'coreLimit': '8000m',
'instances': 10,
'memory': '4096m',
'labels': {'version': '2.4.0'},
Expand Down Expand Up @@ -396,9 +404,11 @@ def test_all_optional_inputs_defined_to_above_maximum_returns_400_with_explanati
'path_to_main_app_file': '/path_to/file',
'python_version': '2',
'driver_cores': '1.1',
'driver_core_limit': '3.1',
'driver_memory': '2049m',
'executors': '11',
'executor_cores': '5',
'executor_core_limit': '8.1',
'executor_memory': '4097m'
}
# Act
Expand All @@ -411,9 +421,11 @@ def test_all_optional_inputs_defined_to_above_maximum_returns_400_with_explanati
msg = json.loads(error.value.response.body, encoding='utf-8')['data']
assert msg == "The following errors were found:\n" \
'"driver_cores" input must be in range [0.1, 1]\n' \
'"driver_core_limit" input must be in range [1, 3]\n' \
'"driver_memory" input must be in range [512m, 2048m]\n' \
'"executors" input must be in range [1, 10]\n' \
'"executor_cores" input must be in range [1, 4]\n' \
'"executor_core_limit" input must be in range [4, 8]\n' \
'"executor_memory" input must be in range [512m, 4096m]\n'

@gen_test
Expand Down Expand Up @@ -448,9 +460,11 @@ def test_optional_inputs_in_wrong_format_returns_400_with_explanation(self):
'path_to_main_app_file': '/path_to/file',
'python_version': '2.3',
'driver_cores': '500m',
'driver_core_limit': '2000m',
'driver_memory': '1024',
'executors': 'Maximum',
'executor_cores': '3.5',
'executor_core_limit': '4040m',
'executor_memory': '2048'
}
# Act
Expand All @@ -464,10 +478,12 @@ def test_optional_inputs_in_wrong_format_returns_400_with_explanation(self):
assert msg == "The following errors were found:\n" \
'"python_version" input must be one of: "2", "3"\n' \
'"driver_cores" input must be a multiple of 0.1\n' \
'"driver_core_limit" input must be a multiple of 0.1\n' \
'"driver_memory" input must be a string integer value ending in "m" ' \
'(e.g. "512m" for 512 megabytes)\n' \
'"executors" input must be an integer\n' \
'"executor_cores" input must be an integer\n' \
'"executor_core_limit" input must be a multiple of 0.1\n' \
'"executor_memory" input must be a string integer value ending in "m" ' \
'(e.g. "512m" for 512 megabytes)\n'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,48 @@ def test_validate_label_rejects_empty_strings_and_non_strings(label):
validation_result = argument_validator.validate("label", label, validation_rule)
# Assert
assert validation_result.is_valid is False


@pytest.mark.parametrize("driver_core_limit", ["1", "2.5", "3.2", "4"])
def test_validate_core_limit_accepts_numerical_values_within_valid_range(driver_core_limit):
# Arrange
validation_rule = ValidationRule({
'classification': 'Optional',
'default': 1,
'minimum': 1,
'maximum': 4
})
# Act
validation_result = argument_validator.validate("driver_core_limit", driver_core_limit, validation_rule)
# Assert
assert validation_result.is_valid is True


def test_validate_core_limit_converts_floats_to_millicpu_values_for_manifest():
# Arrange
validation_rule = ValidationRule({
'classification': 'Optional',
'default': 1,
'minimum': 1,
'maximum': 4
})
# Act
validation_result = argument_validator.validate("driver_core_limit", "1.2", validation_rule)
# Assert
assert validation_result.is_valid is True
assert validation_result.validated_value == "1200m"


@pytest.mark.parametrize("executor_core_limit", ["100", "0", " ", "", "1p", "5000m"])
def test_validate_core_limit_rejects_values_outside_valid_range_or_with_bad_format(executor_core_limit):
robert-clegg-tessella marked this conversation as resolved.
Show resolved Hide resolved
# Arrange
validation_rule = ValidationRule({
'classification': 'Optional',
'default': 1,
'minimum': 1,
'maximum': 4
})
# Act
validation_result = argument_validator.validate("executor_core_limit", executor_core_limit, validation_rule)
# Assert
assert validation_result.is_valid is False
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ def setup(self):
'main_class': None,
'arguments': None,
'driver_cores': 0.1,
'driver_core_limit': "1000m",
'driver_memory': "512m",
'executors': 1,
'executor_cores': 1,
'executor_core_limit': "4000m",
'executor_memory': "512m",
'label': None
}[input_name]
Expand All @@ -50,6 +52,7 @@ def setup(self):
"driver_cores": "0.1",
"driver_memory": "512m",
"executor_cores": "1",
"executor_core_limit": "7000m",
"executors": "1",
"executor_memory": "512m",
"arguments": ["1000", "100"],
Expand Down Expand Up @@ -95,6 +98,7 @@ def test_build_manifest_builds_python_job_manifest_for_python_applications(self)
],
"driver": {
"cores": "0.1",
"coreLimit": "1000m",
"memory": "512m",
"labels": {"version": "2.4.0"},
"serviceAccount": "spark",
Expand All @@ -111,6 +115,7 @@ def test_build_manifest_builds_python_job_manifest_for_python_applications(self)
},
"executor": {
"cores": "1",
"coreLimit": "7000m",
"instances": "1",
"memory": "512m",
"labels": {"version": "2.4.0"},
Expand Down Expand Up @@ -176,6 +181,7 @@ def test_build_manifest_builds_scala_job_manifest_for_scala_applications(self):
],
"driver": {
"cores": "0.1",
"coreLimit": "1000m",
"memory": "512m",
"labels": {"version": "2.4.0"},
"serviceAccount": "spark",
Expand All @@ -192,6 +198,7 @@ def test_build_manifest_builds_scala_job_manifest_for_scala_applications(self):
},
"executor": {
"cores": "1",
"coreLimit": "7000m",
"instances": "1",
"memory": "512m",
"labels": {"version": "2.4.0"},
Expand Down Expand Up @@ -250,6 +257,7 @@ def test_default_manifest_returns_a_filled_in_spark_application_template_with_de
],
"driver": {
"cores": 0.1,
"coreLimit": "1000m",
"memory": "512m",
"labels": {"version": "2.4.0"},
"serviceAccount": "spark",
Expand All @@ -266,6 +274,7 @@ def test_default_manifest_returns_a_filled_in_spark_application_template_with_de
},
"executor": {
"cores": 1,
"coreLimit": "4000m",
"instances": 1,
"memory": "512m",
"labels": {"version": "2.4.0"},
Expand Down