diff --git a/gcc_exec/gcc_node.py b/gcc_exec/gcc_node.py
index 5cddcb4..c81e8f2 100644
--- a/gcc_exec/gcc_node.py
+++ b/gcc_exec/gcc_node.py
@@ -200,7 +200,7 @@ def set_config_commands(self) -> None:
"exit",
]
- def configure(self) -> None:
+ def configure_virtual_machine(self) -> None:
"""Execute configuration commands on a virtual machine."""
keyfile = StringIO(self.__node_virtual_machine["pem"])
mykey = paramiko.RSAKey.from_private_key(keyfile)
diff --git a/gcc_exec/gcc_workflow.py b/gcc_exec/gcc_workflow.py
index 09a2805..c38025b 100644
--- a/gcc_exec/gcc_workflow.py
+++ b/gcc_exec/gcc_workflow.py
@@ -11,10 +11,11 @@
import xmltodict
from botocore.exceptions import ClientError
-from gcc_drbx import GccDrbx
-from gcc_ec2 import GccEc2
-from gcc_node import GccNode
-from gcc_user import GccUser
+
+from gcc_exec.gcc_drbx import GccDrbx
+from gcc_exec.gcc_ec2 import GccEc2
+from gcc_exec.gcc_node import GccNode
+from gcc_exec.gcc_user import GccUser
class GccWorkflow:
@@ -46,9 +47,7 @@ def __init__(self, gcc_user_obj: GccUser, workflow_name: str) -> None:
"machines_initialized": None,
}
- def plan(
- self, available_machines: list, xml_specification: str = None
- ) -> list:
+ def plan(self, available_machines: list, xml_specification: str = None) -> list:
"""This method creates an execution plan based on a workflow specification."""
if xml_specification is None:
xml_specification = xmltodict.parse(
@@ -56,6 +55,8 @@ def plan(
f"/{self.__workflow_dict['name']}/spec.xml"
)
)
+ elif isinstance(xml_specification, str):
+ xml_specification = xmltodict.parse(xml_specification)
self.__workflow_dict["type"] = int(xml_specification["workflow"]["@type"])
@@ -71,14 +72,15 @@ def plan(
node_virtual_machine = None
if gcc_node_vm is not None:
- vm_ip = gcc_node_vm["#text"]
-
- try:
+ if isinstance(gcc_node_vm, str):
+ vm_pem = None
+ vm_ip = gcc_node_vm
+ elif isinstance(gcc_node_vm, OrderedDict):
vm_pem = self.__gcc_drbx_obj.get_file_contents(
f"/{self.__workflow_dict['name']}/pem/{gcc_node_vm['@pem']}"
).strip("\n")
- except KeyError:
- vm_pem = None
+
+ vm_ip = gcc_node_vm["#text"]
node_virtual_machine = {"ip": vm_ip, "pem": vm_pem, "instance_id": None}
@@ -352,7 +354,9 @@ def configure(self) -> None:
max_workers=len(self.__workflow_dict["nodes"])
) as executor:
for node in self.__workflow_dict["nodes"]:
- executor.submit(self.__workflow_dict["nodes"][node].configure)
+ executor.submit(
+ self.__workflow_dict["nodes"][node].configure_virtual_machine
+ )
executor.shutdown()
def initialize(self) -> None:
@@ -447,6 +451,14 @@ def get_tmp_dir(self) -> str:
"""Return tmp directory string."""
return self.__tmp_dir
+ def set_gcc_security_group(self, gcc_security_group: dict) -> None:
+ """Set gcc_security_group private variable."""
+ self.__gcc_security_group = gcc_security_group
+
+ def set_gcc_key_pair(self, gcc_key_pair: dict) -> None:
+ """Set gcc_key_pair private variable."""
+ self.__gcc_key_pair = gcc_key_pair
+
def generate_random_string() -> str:
"""Generate a random 7 character string."""
diff --git a/gcc_exec/main.py b/gcc_exec/main.py
index 4866f34..76f8514 100644
--- a/gcc_exec/main.py
+++ b/gcc_exec/main.py
@@ -11,7 +11,7 @@ def main(
aws_access_key_id: str,
aws_secret_access_key: str,
workflow_name: str,
-):
+) -> None:
"""This method contains code to execute a wofkflow."""
gcc_user_obj = GccUser(
oauth2_refresh_token, aws_access_key_id, aws_secret_access_key
diff --git a/gcc_exec/tests/data/spec/spec_1.xml b/gcc_exec/tests/data/spec/spec_1.xml
new file mode 100644
index 0000000..62a7de0
--- /dev/null
+++ b/gcc_exec/tests/data/spec/spec_1.xml
@@ -0,0 +1,14 @@
+
+
+
+
+ x
+
+
+ x
+
+
+ x
+ x
+
+
\ No newline at end of file
diff --git a/gcc_exec/tests/data/spec/spec_2.xml b/gcc_exec/tests/data/spec/spec_2.xml
new file mode 100644
index 0000000..a50e09a
--- /dev/null
+++ b/gcc_exec/tests/data/spec/spec_2.xml
@@ -0,0 +1,14 @@
+
+
+
+
+ x
+
+
+ x
+
+
+ x
+ x
+
+
\ No newline at end of file
diff --git a/gcc_exec/tests/data/spec/spec_3.xml b/gcc_exec/tests/data/spec/spec_3.xml
new file mode 100644
index 0000000..f913147
--- /dev/null
+++ b/gcc_exec/tests/data/spec/spec_3.xml
@@ -0,0 +1,19 @@
+
+
+
+ 0.0.0.0
+
+
+ 0.0.0.0
+ x
+
+
+ 0.0.0.0
+ x
+
+
+ 0.0.0.0
+ x
+ x
+
+
\ No newline at end of file
diff --git a/gcc_exec/tests/data/spec/spec_4.xml b/gcc_exec/tests/data/spec/spec_4.xml
new file mode 100644
index 0000000..601bce4
--- /dev/null
+++ b/gcc_exec/tests/data/spec/spec_4.xml
@@ -0,0 +1,19 @@
+
+
+
+ 0.0.0.0
+
+
+ 0.0.0.0
+ x
+
+
+ 0.0.0.0
+ x
+
+
+ 0.0.0.0
+ x
+ x
+
+
\ No newline at end of file
diff --git a/gcc_exec/tests/test_gcc_workflow.py b/gcc_exec/tests/test_gcc_workflow.py
new file mode 100644
index 0000000..3a2911e
--- /dev/null
+++ b/gcc_exec/tests/test_gcc_workflow.py
@@ -0,0 +1,359 @@
+"""This file contains the TestGccWorkflow class."""
+# pylint: disable=W1514,R0913
+import os
+from os.path import dirname, join
+from unittest import mock
+
+import pytest
+from dotenv import load_dotenv
+
+from gcc_exec.gcc_user import GccUser
+from gcc_exec.gcc_workflow import GccWorkflow
+
+
+class TestGccWorkflow:
+ """This class contains methods to test the GccWorkflow class."""
+
+ env_path = join(dirname(__file__), ".env")
+
+ if os.path.isfile(env_path):
+ load_dotenv()
+
+ __gcc_user_obj = GccUser(
+ oauth2_refresh_token=os.environ.get("OAUTH2_REFRESH_TOKEN"),
+ aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
+ aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
+ )
+
+ @pytest.mark.parametrize(
+ "xml_specification_filename,\
+ workflow_name,workflow_type,\
+ workflow_plan_human_readable,\
+ node_virtual_machines",
+ [
+ (
+ "spec_1.xml",
+ "workflow_1",
+ 0,
+ {0: ["n1"], 1: ["n2", "n3"], 2: ["n4"]},
+ [None, None, None, None],
+ ),
+ (
+ "spec_2.xml",
+ "workflow_2",
+ 1,
+ {2: ["n4"], 1: ["n2", "n3"], 0: ["n1"]},
+ [None, None, None, None],
+ ),
+ (
+ "spec_3.xml",
+ "workflow_3",
+ 0,
+ {0: ["n1"], 1: ["n2", "n3"], 2: ["n4"]},
+ [dict, dict, dict, dict],
+ ),
+ (
+ "spec_4.xml",
+ "workflow_4",
+ 1,
+ {2: ["n4"], 1: ["n2", "n3"], 0: ["n1"]},
+ [dict, dict, dict, dict],
+ ),
+ ],
+ )
+ def test_plan(
+ self,
+ xml_specification_filename: str,
+ workflow_name: str,
+ workflow_type: int,
+ workflow_plan_human_readable: dict,
+ node_virtual_machines: list,
+ ):
+ """This method ensures workflow plans are generated properly."""
+ gcc_workflow_obj = GccWorkflow(
+ gcc_user_obj=self.__gcc_user_obj, workflow_name=workflow_name
+ )
+
+ with open(
+ join(dirname(__file__), f"data/spec/{xml_specification_filename}")
+ ) as xml_specification_file:
+ xml_specification = xml_specification_file.read()
+
+ gcc_workflow_obj.plan(
+ available_machines=[], xml_specification=xml_specification
+ )
+
+ workflow_dict = gcc_workflow_obj.get_workflow_dict()
+
+ print(workflow_dict)
+
+ assert workflow_dict["name"] == workflow_name
+ assert workflow_dict["type"] == workflow_type
+ assert workflow_dict["plan_human_readable"] == workflow_plan_human_readable
+ for node in workflow_dict["nodes"]:
+ node_virtual_machine = node_virtual_machines.pop()
+
+ if node_virtual_machine is None:
+ assert (
+ workflow_dict["nodes"][node].get_node_virtual_machine()
+ == node_virtual_machine
+ )
+ else:
+ assert isinstance(
+ workflow_dict["nodes"][node].get_node_virtual_machine(),
+ node_virtual_machine,
+ )
+
+ @mock.patch(
+ "gcc_exec.gcc_node.GccNode.initialize",
+ return_value=None,
+ )
+ @mock.patch(
+ "gcc_exec.gcc_ec2.GccEc2.create_key_pair",
+ return_value=None,
+ )
+ @mock.patch(
+ "gcc_exec.gcc_ec2.GccEc2.create_security_group",
+ return_value=None,
+ )
+ @mock.patch(
+ "gcc_exec.gcc_drbx.GccDrbx.create_folder",
+ return_value=None,
+ )
+ @mock.patch("os.makedirs", return_value=None)
+ @pytest.mark.parametrize(
+ "xml_specification_filename,workflow_name,security_group,key_pair",
+ [
+ (
+ "spec_1.xml",
+ "workflow_1",
+ None,
+ None,
+ ),
+ (
+ "spec_2.xml",
+ "workflow_2",
+ {},
+ {},
+ ),
+ ],
+ )
+ def test_initialize(
+ self,
+ mock_makedirs: mock.MagicMock,
+ mock_create_folder: mock.MagicMock,
+ mock_create_security_group: mock.MagicMock,
+ mock_create_key_pair: mock.MagicMock,
+ mock_initialize: mock.MagicMock,
+ xml_specification_filename: str,
+ workflow_name: str,
+ security_group: dict,
+ key_pair: dict,
+ ):
+ """This method ensures all nodes have a virtual machine initialized."""
+ gcc_workflow_obj = GccWorkflow(
+ gcc_user_obj=self.__gcc_user_obj, workflow_name=workflow_name
+ )
+
+ with open(
+ join(dirname(__file__), f"data/spec/{xml_specification_filename}")
+ ) as xml_specification_file:
+ xml_specification = xml_specification_file.read()
+
+ gcc_workflow_obj.plan(
+ available_machines=[], xml_specification=xml_specification
+ )
+
+ gcc_workflow_obj.set_gcc_security_group(security_group)
+ gcc_workflow_obj.set_gcc_key_pair(key_pair)
+
+ gcc_workflow_obj.initialize()
+
+ if security_group is None and key_pair is None:
+ assert mock_create_key_pair.called
+ assert mock_create_security_group.called
+
+ assert mock_initialize.called
+ assert mock_create_folder.called
+ assert mock_makedirs.called
+
+ @mock.patch(
+ "gcc_exec.gcc_node.GccNode.set_config_commands",
+ return_value=None,
+ )
+ @mock.patch(
+ "gcc_exec.gcc_node.GccNode.configure_virtual_machine", return_value=None
+ )
+ @pytest.mark.parametrize(
+ "xml_specification_filename,workflow_name",
+ [
+ (
+ "spec_3.xml",
+ "workflow_3",
+ ),
+ (
+ "spec_4.xml",
+ "workflow_4",
+ ),
+ ],
+ )
+ def test_configure(
+ self,
+ mock_configure_virtual_machine: mock.MagicMock,
+ mock_set_config_commands: mock.MagicMock,
+ xml_specification_filename: str,
+ workflow_name: str,
+ ):
+ """This method ensures all virtual machines are configured."""
+ gcc_workflow_obj = GccWorkflow(
+ gcc_user_obj=self.__gcc_user_obj, workflow_name=workflow_name
+ )
+
+ with open(
+ join(dirname(__file__), f"data/spec/{xml_specification_filename}")
+ ) as xml_specification_file:
+ xml_specification = xml_specification_file.read()
+
+ gcc_workflow_obj.plan(
+ available_machines=[], xml_specification=xml_specification
+ )
+
+ gcc_workflow_obj.configure()
+
+ assert mock_set_config_commands.called
+ assert mock_configure_virtual_machine.called
+
+ @mock.patch(
+ "gcc_exec.gcc_node.GccNode.execute",
+ return_value=None,
+ )
+ @mock.patch("time.sleep", return_value=None)
+ @pytest.mark.parametrize(
+ "xml_specification_filename,workflow_name",
+ [
+ (
+ "spec_1.xml",
+ "workflow_1",
+ ),
+ (
+ "spec_2.xml",
+ "workflow_2",
+ ),
+ (
+ "spec_3.xml",
+ "workflow_3",
+ ),
+ (
+ "spec_4.xml",
+ "workflow_4",
+ ),
+ ],
+ )
+ def test_execute(
+ self,
+ mock_sleep: mock.MagicMock,
+ mock_execute: mock.MagicMock,
+ xml_specification_filename: str,
+ workflow_name: str,
+ ):
+ """This method ensures all nodes are executed."""
+ gcc_workflow_obj = GccWorkflow(
+ gcc_user_obj=self.__gcc_user_obj, workflow_name=workflow_name
+ )
+
+ with open(
+ join(dirname(__file__), f"data/spec/{xml_specification_filename}")
+ ) as xml_specification_file:
+ xml_specification = xml_specification_file.read()
+
+ gcc_workflow_obj.plan(
+ available_machines=[], xml_specification=xml_specification
+ )
+
+ workflow_dict = gcc_workflow_obj.get_workflow_dict()
+
+ gcc_workflow_obj.execute()
+
+ if workflow_dict["type"] == 1:
+ assert mock_sleep.called
+
+ assert mock_execute.called
+
+ @mock.patch(
+ "gcc_exec.gcc_node.GccNode.terminate",
+ return_value=None,
+ )
+ @mock.patch(
+ "gcc_exec.gcc_ec2.GccEc2.delete_key_pair",
+ return_value=None,
+ )
+ @mock.patch(
+ "gcc_exec.gcc_ec2.GccEc2.delete_security_group",
+ return_value=None,
+ )
+ @mock.patch("shutil.rmtree", return_value=None)
+ @pytest.mark.parametrize(
+ "xml_specification_filename,workflow_name,security_group,key_pair",
+ [
+ (
+ "spec_1.xml",
+ "workflow_1",
+ None,
+ None,
+ ),
+ (
+ "spec_2.xml",
+ "workflow_2",
+ {"GroupId": "group_2"},
+ {"KeyName": "key_2"},
+ ),
+ (
+ "spec_3.xml",
+ "workflow_3",
+ None,
+ None,
+ ),
+ (
+ "spec_4.xml",
+ "workflow_4",
+ {"GroupId": "group_4"},
+ {"KeyName": "key_4"},
+ ),
+ ],
+ )
+ def test_complete(
+ self,
+ mock_rmtree: mock.MagicMock,
+ mock_delete_security_group: mock.MagicMock,
+ mock_delete_key_pair: mock.MagicMock,
+ mock_teminate: mock.MagicMock,
+ xml_specification_filename: str,
+ workflow_name: str,
+ security_group: dict,
+ key_pair: dict,
+ ):
+ """This method ensures that a workflow completes correctly."""
+ gcc_workflow_obj = GccWorkflow(
+ gcc_user_obj=self.__gcc_user_obj, workflow_name=workflow_name
+ )
+
+ with open(
+ join(dirname(__file__), f"data/spec/{xml_specification_filename}")
+ ) as xml_specification_file:
+ xml_specification = xml_specification_file.read()
+
+ gcc_workflow_obj.plan(
+ available_machines=[], xml_specification=xml_specification
+ )
+
+ gcc_workflow_obj.set_gcc_security_group(security_group)
+ gcc_workflow_obj.set_gcc_key_pair(key_pair)
+
+ gcc_workflow_obj.complete()
+
+ if security_group is not None and key_pair is not None:
+ assert mock_delete_key_pair.called
+ assert mock_delete_security_group.called
+
+ assert mock_teminate.called
+ assert mock_rmtree.called
diff --git a/poetry.lock b/poetry.lock
index 9df4146..7cd5fe0 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -729,6 +729,14 @@ category = "main"
optional = false
python-versions = ">=2"
+[[package]]
+name = "unittest"
+version = "0.0"
+description = ""
+category = "main"
+optional = false
+python-versions = "*"
+
[[package]]
name = "urllib3"
version = "1.26.9"
@@ -761,7 +769,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
-content-hash = "58c9033e7250a338634c7a6fbad7b95b2573441a044601b06b4141da5f43da92"
+content-hash = "b5828162bd35c98dd737a8948d9858934a310ed01802f0714e6500861b725957"
[metadata.files]
asgiref = [
@@ -1232,6 +1240,9 @@ tzdata = [
{file = "tzdata-2022.1-py2.py3-none-any.whl", hash = "sha256:238e70234214138ed7b4e8a0fab0e5e13872edab3be586ab8198c407620e2ab9"},
{file = "tzdata-2022.1.tar.gz", hash = "sha256:8b536a8ec63dc0751342b3984193a3118f8fca2afe25752bb9b7fffd398552d3"},
]
+unittest = [
+ {file = "unittest2-0.0.0.tar.gz", hash = "sha256:1b492484bea7f68abda5fcc61412e77b6a105e5842355d630f8b75f01add3555"},
+]
urllib3 = [
{file = "urllib3-1.26.9-py2.py3-none-any.whl", hash = "sha256:44ece4d53fb1706f667c9bd1c648f5469a2ec925fcf3a776667042d645472c14"},
{file = "urllib3-1.26.9.tar.gz", hash = "sha256:aabaf16477806a5e1dd19aa41f8c2b7950dd3c746362d7e3223dbe6de6ac448e"},
diff --git a/pyproject.toml b/pyproject.toml
index 03119d7..dab9401 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -19,6 +19,7 @@ psutil = "^5.9.0"
paramiko = "^2.10.3"
requests = "^2.27.1"
rpyc = "^5.1.0"
+unittest = "^0.0"
[tool.poetry.dev-dependencies]
black = "^22.3.0"
diff --git a/socket_service/SocketService.py b/socket_service/SocketService.py
index 32ab7d2..71c3ece 100644
--- a/socket_service/SocketService.py
+++ b/socket_service/SocketService.py
@@ -40,7 +40,6 @@ def send_files(filedictlist, host, port):
"""Handles the sending of files to one recipient."""
import os
import socket
-
from pathlib import Path
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -65,7 +64,6 @@ def send_files_to_many(argsdictlist):
import os
import socket
import threading
-
from pathlib import Path
def _send_files(filedictlist, host, port):
@@ -104,7 +102,6 @@ def receive_files(outdir, host, port):
"""Handles the receiving of files from one sender."""
import os
import socket
-
from pathlib import Path
def get_bytes(conn, num, buffer):
@@ -164,10 +161,9 @@ def get_data(conn, buffer):
def receive_files_from_many(argsdictlist):
"""Handles the receiving of files from many recipients."""
- import socket
import os
+ import socket
import threading
-
from pathlib import Path
def _receive_files(outdir, host, port):
@@ -240,10 +236,11 @@ def get_data(conn, buffer):
def upload_to_dropbox(argsdict):
"""Handles the upload of files to Dropbox."""
- import dropbox
import os
import threading
+ import dropbox
+
def _upload_file(local_file_path, drbx_file_path, drbx):
chunk_size = 4 * 1024 * 1024
file_size = os.path.getsize(local_file_path)