From e97303a2ff532de23155398b6bff3586640f335e Mon Sep 17 00:00:00 2001 From: Morten Dahl Date: Thu, 2 Jun 2022 15:59:45 +0200 Subject: [PATCH] Add `GrpcRuntime` to PyMoose (#1083) --- modules/Cargo.toml | 1 - modules/src/choreography/filesystem.rs | 1 + modules/src/choreography/grpc.rs | 1 + modules/src/networking/tcpstream.rs | 1 + modules/src/storage/csv.rs | 1 + modules/src/storage/local_file_storage.rs | 1 + modules/src/storage/numpy.rs | 1 + moose/src/lib.rs | 1 + pymoose/Cargo.toml | 1 + pymoose/examples/grpc/README.md | 11 +++ pymoose/examples/grpc/grpc.py | 45 ++++++++++ .../examples/linear-regression/linreg_test.py | 3 +- .../logistic_regression_test.py | 3 +- pymoose/examples/replicated/aes_test.py | 3 +- pymoose/examples/replicated/division_test.py | 3 +- pymoose/examples/replicated/identity_test.py | 3 +- pymoose/pymoose/__init__.py | 4 + .../pymoose/predictors/tree_ensemble_test.py | 3 +- pymoose/pymoose/testing.py | 21 +++++ pymoose/rust_integration_tests/add_n_test.py | 3 +- pymoose/rust_integration_tests/argmax_test.py | 3 +- .../boolean_ops_test.py | 3 +- pymoose/rust_integration_tests/concat_test.py | 3 +- .../dtype_conversions_test.py | 3 +- pymoose/rust_integration_tests/exp_test.py | 5 +- pymoose/rust_integration_tests/log_test.py | 3 +- .../rust_integration_tests/maximum_test.py | 5 +- .../mirrored_ops_test.py | 3 +- pymoose/rust_integration_tests/ones_test.py | 3 +- .../rust_integration_tests/reduce_max_test.py | 3 +- pymoose/rust_integration_tests/relu_test.py | 3 +- pymoose/rust_integration_tests/rerurn_test.py | 5 +- .../rust_integration_tests/reshape_test.py | 3 +- pymoose/rust_integration_tests/shape_test.py | 3 +- .../rust_integration_tests/sigmoid_test.py | 5 +- .../rust_integration_tests/slicing_test.py | 5 +- .../rust_integration_tests/softmax_test.py | 5 +- pymoose/rust_integration_tests/sqrt_test.py | 3 +- .../rust_integration_tests/squeeze_test.py | 5 +- .../rust_integration_tests/transpose_test.py | 5 +- pymoose/rust_integration_tests/uint64_test.py | 3 +- pymoose/rust_integration_tests/zeros_test.py | 3 +- pymoose/src/bindings.rs | 87 +++++++++++++++++-- reindeer/Cargo.toml | 1 - reindeer/src/comet.rs | 1 + reindeer/src/cometctl.rs | 1 + reindeer/src/dasher.rs | 1 + reindeer/src/rudolph.rs | 1 + reindeer/src/vixen.rs | 1 + 49 files changed, 211 insertions(+), 75 deletions(-) create mode 100644 pymoose/examples/grpc/README.md create mode 100644 pymoose/examples/grpc/grpc.py diff --git a/modules/Cargo.toml b/modules/Cargo.toml index 8ac3f6b9d..6609df8c2 100644 --- a/modules/Cargo.toml +++ b/modules/Cargo.toml @@ -21,7 +21,6 @@ notify = "4.0" prost = "0.9" serde = { version = "~1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } toml = "0.5" tonic = { version = "~0.6", features = ["tls"] } tracing = { version="0.1", features = ["log"] } diff --git a/modules/src/choreography/filesystem.rs b/modules/src/choreography/filesystem.rs index f832da941..76bbf1091 100644 --- a/modules/src/choreography/filesystem.rs +++ b/modules/src/choreography/filesystem.rs @@ -2,6 +2,7 @@ use super::parse_session_config_file_with_computation; use crate::choreography::{NetworkingStrategy, StorageStrategy}; use crate::execution::ExecutionContext; use moose::prelude::*; +use moose::tokio; use notify::{DebouncedEvent, Watcher}; use std::path::Path; diff --git a/modules/src/choreography/grpc.rs b/modules/src/choreography/grpc.rs index a7cf28330..5c3f247a5 100644 --- a/modules/src/choreography/grpc.rs +++ b/modules/src/choreography/grpc.rs @@ -15,6 +15,7 @@ use dashmap::mapref::entry::Entry; use dashmap::DashMap; use moose::computation::{SessionId, Value}; use moose::execution::Identity; +use moose::tokio; use std::collections::HashMap; use std::sync::Arc; diff --git a/modules/src/networking/tcpstream.rs b/modules/src/networking/tcpstream.rs index a82f6df96..717634f4e 100644 --- a/modules/src/networking/tcpstream.rs +++ b/modules/src/networking/tcpstream.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use moose::tokio; use moose::{ computation::{RendezvousKey, SessionId, Value}, error::Error, diff --git a/modules/src/storage/csv.rs b/modules/src/storage/csv.rs index 1980831de..2523d27e5 100644 --- a/modules/src/storage/csv.rs +++ b/modules/src/storage/csv.rs @@ -195,6 +195,7 @@ where #[cfg(test)] mod tests { use super::*; + use moose::tokio; use ndarray::array; use std::io::Write; use tempfile::NamedTempFile; diff --git a/modules/src/storage/local_file_storage.rs b/modules/src/storage/local_file_storage.rs index f0626887a..ff87a3ed8 100644 --- a/modules/src/storage/local_file_storage.rs +++ b/modules/src/storage/local_file_storage.rs @@ -81,6 +81,7 @@ pub fn parse_columns(query: &str) -> Result> { #[cfg(test)] mod tests { use super::*; + use moose::tokio; use moose::types::HostFloat64Tensor; use ndarray::array; use std::convert::TryFrom; diff --git a/modules/src/storage/numpy.rs b/modules/src/storage/numpy.rs index 985f6fe11..3ca8f513f 100644 --- a/modules/src/storage/numpy.rs +++ b/modules/src/storage/numpy.rs @@ -301,6 +301,7 @@ fn extract_dtype(npy_filename: &str) -> Result { #[cfg(test)] mod tests { use super::*; + use moose::tokio; use ndarray::array; use std::io::Write; use tempfile::NamedTempFile; diff --git a/moose/src/lib.rs b/moose/src/lib.rs index d51d37b59..07ad49d97 100644 --- a/moose/src/lib.rs +++ b/moose/src/lib.rs @@ -2795,3 +2795,4 @@ pub mod types; #[doc(inline)] pub use error::{Error, Result}; +pub use tokio; diff --git a/pymoose/Cargo.toml b/pymoose/Cargo.toml index dc1c7fa7f..bf56d1f37 100644 --- a/pymoose/Cargo.toml +++ b/pymoose/Cargo.toml @@ -20,6 +20,7 @@ approx = "~0.5" numpy = "~0.16" ndarray = "~0.15" moose = { path = "../moose" } +moose-modules = { path = "../modules" } pyo3 = "~0.16" rmp-serde = "~1.0" serde = { version = "~1.0", features = ["derive"] } diff --git a/pymoose/examples/grpc/README.md b/pymoose/examples/grpc/README.md new file mode 100644 index 000000000..cacdd7994 --- /dev/null +++ b/pymoose/examples/grpc/README.md @@ -0,0 +1,11 @@ +## Running + +Make sure to have three gRPC workers at the right endpoints before running this example. + +This can be done using Comet as follows (run each command in its own terminal): + +```sh +$ cargo run --bin comet -- --identity localhost:50000 --port 50000 +$ cargo run --bin comet -- --identity localhost:50001 --port 50001 +$ cargo run --bin comet -- --identity localhost:50002 --port 50002 +``` diff --git a/pymoose/examples/grpc/grpc.py b/pymoose/examples/grpc/grpc.py new file mode 100644 index 000000000..ff4898d43 --- /dev/null +++ b/pymoose/examples/grpc/grpc.py @@ -0,0 +1,45 @@ +import argparse +import logging + +import numpy as np + +import pymoose as pm +from pymoose.logger import get_logger + + +@pm.computation +def my_computation(): + alice = pm.host_placement("alice") + bob = pm.host_placement("bob") + carole = pm.host_placement("carole") + + with alice: + x = pm.constant(np.array([1.0, 2.0], dtype=np.float64)) + + with bob: + y = pm.constant(np.array([3.0, 4.0], dtype=np.float64)) + + with carole: + z = pm.add(x, y) + + return z + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Example") + parser.add_argument("--verbose", action="store_true") + args = parser.parse_args() + + if args.verbose: + get_logger().setLevel(level=logging.DEBUG) + + runtime = pm.GrpcMooseRuntime( + { + "alice": "localhost:50000", + "bob": "localhost:50001", + "carole": "localhost:50002", + } + ) + + results = runtime.evaluate_computation(my_computation) + print(results) diff --git a/pymoose/examples/linear-regression/linreg_test.py b/pymoose/examples/linear-regression/linreg_test.py index 90ee22f0b..1db0fb277 100644 --- a/pymoose/examples/linear-regression/linreg_test.py +++ b/pymoose/examples/linear-regression/linreg_test.py @@ -9,7 +9,6 @@ import pymoose as pm from pymoose.computation import utils from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime FIXED = pm.fixed(8, 27) # Rust compiler currently supports only limited set of alternative precisions: @@ -132,7 +131,7 @@ def _linear_regression_eval(self, metric_name): "y-owner": {"y_data": y_data}, "model-owner": {}, } - runtime = LocalMooseRuntime(storage_mapping=executors_storage) + runtime = pm.LocalMooseRuntime(storage_mapping=executors_storage) traced = pm.trace(linear_comp) _ = runtime.evaluate_computation( computation=traced, diff --git a/pymoose/examples/logistic-regression/logistic_regression_test.py b/pymoose/examples/logistic-regression/logistic_regression_test.py index fedf2d3f0..e92052a1c 100644 --- a/pymoose/examples/logistic-regression/logistic_regression_test.py +++ b/pymoose/examples/logistic-regression/logistic_regression_test.py @@ -8,7 +8,6 @@ import pymoose as pm from pymoose.computation import utils from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class ReplicatedExample(parameterized.TestCase): @@ -61,7 +60,7 @@ def test_logistic_regression_example_execute(self): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_model_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/examples/replicated/aes_test.py b/pymoose/examples/replicated/aes_test.py index 31ef34adf..93713a498 100644 --- a/pymoose/examples/replicated/aes_test.py +++ b/pymoose/examples/replicated/aes_test.py @@ -9,7 +9,6 @@ import pymoose as pm from pymoose.computation import utils from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class ReplicatedExample(parameterized.TestCase): @@ -62,7 +61,7 @@ def test_aes_example_execute(self, host_decrypt): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_aes_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/examples/replicated/division_test.py b/pymoose/examples/replicated/division_test.py index 92be8cc89..7153a1e13 100644 --- a/pymoose/examples/replicated/division_test.py +++ b/pymoose/examples/replicated/division_test.py @@ -6,7 +6,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime FIXED = pm.fixed(14, 23) @@ -46,7 +45,7 @@ def my_comp(): "carole": {}, "dave": {}, } - runtime = LocalMooseRuntime(storage_mapping=executors_storage) + runtime = pm.LocalMooseRuntime(storage_mapping=executors_storage) logical_comp = pm.trace(my_comp) runtime.evaluate_computation( diff --git a/pymoose/examples/replicated/identity_test.py b/pymoose/examples/replicated/identity_test.py index 6b7087945..a10bd01be 100644 --- a/pymoose/examples/replicated/identity_test.py +++ b/pymoose/examples/replicated/identity_test.py @@ -8,7 +8,6 @@ import pymoose as pm from pymoose.computation import utils from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class TensorIdentityExample(parameterized.TestCase): @@ -108,7 +107,7 @@ def test_identity_example_execute(self, f, t, e): "bob-1": {}, "carole-1": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) result_dict = runtime.evaluate_computation( computation=traced_identity_comp, role_assignment={ diff --git a/pymoose/pymoose/__init__.py b/pymoose/pymoose/__init__.py index 4cb6e7990..c836c9e35 100644 --- a/pymoose/pymoose/__init__.py +++ b/pymoose/pymoose/__init__.py @@ -64,6 +64,8 @@ from pymoose.edsl.tracer import trace_and_compile from pymoose.rust import elk_compiler from pymoose.rust import moose_runtime as _moose_runtime +from pymoose.testing import GrpcMooseRuntime +from pymoose.testing import LocalMooseRuntime MooseComputation = _moose_runtime.MooseComputation @@ -93,6 +95,7 @@ FloatType, host_placement, greater, + GrpcMooseRuntime, identity, index_axis, int32, @@ -101,6 +104,7 @@ IntType, less, load, + LocalMooseRuntime, log, log2, logical_or, diff --git a/pymoose/pymoose/predictors/tree_ensemble_test.py b/pymoose/pymoose/predictors/tree_ensemble_test.py index 09357ab44..85e5a2423 100644 --- a/pymoose/pymoose/predictors/tree_ensemble_test.py +++ b/pymoose/pymoose/predictors/tree_ensemble_test.py @@ -14,7 +14,6 @@ from pymoose.logger import get_logger from pymoose.predictors import predictor_utils from pymoose.predictors import tree_ensemble -from pymoose.testing import LocalMooseRuntime _XGB_REGRESSOR_MODELS = [("xgboost_regressor", [14.121551, 14.121551, 113.279236])] _SK_REGRESSOR_MODELS = [ @@ -165,7 +164,7 @@ def test_tree_ensemble_logic(self, test_case, predictor_cls): traced_model_comp = pm.trace(predictor_logic) storage = {plc.name: {} for plc in predictor.host_placements} - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) role_assignment = {plc.name: plc.name for plc in predictor.host_placements} result_dict = runtime.evaluate_computation( computation=traced_model_comp, diff --git a/pymoose/pymoose/testing.py b/pymoose/pymoose/testing.py index f4eb49e9a..9e1f9bf0c 100644 --- a/pymoose/pymoose/testing.py +++ b/pymoose/pymoose/testing.py @@ -1,4 +1,6 @@ from pymoose.computation import utils +from pymoose.edsl.base import AbstractComputation +from pymoose.edsl.tracer import trace from pymoose.rust import moose_runtime @@ -41,3 +43,22 @@ def read_value_from_storage(self, identity, key): def write_value_to_storage(self, identity, key, value): return super().write_value_to_storage(identity, key, value) + + +class GrpcMooseRuntime(moose_runtime.GrpcRuntime): + def __new__(cls, role_assignment): + return moose_runtime.GrpcRuntime.__new__(GrpcMooseRuntime, role_assignment) + + def evaluate_computation( + self, + computation, + arguments=None, + ): + if isinstance(computation, AbstractComputation): + computation = trace(computation) + + if arguments is None: + arguments = {} + + comp_bin = utils.serialize_computation(computation) + return super().evaluate_computation(comp_bin, arguments) diff --git a/pymoose/rust_integration_tests/add_n_test.py b/pymoose/rust_integration_tests/add_n_test.py index dea7f6612..ec2bed039 100644 --- a/pymoose/rust_integration_tests/add_n_test.py +++ b/pymoose/rust_integration_tests/add_n_test.py @@ -9,7 +9,6 @@ from pymoose import elk_compiler from pymoose.computation import utils from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime player0 = pm.host_placement("player0") player1 = pm.host_placement("player1") @@ -53,7 +52,7 @@ def my_comp(): "player1": {}, "player2": {}, } - runtime = LocalMooseRuntime(storage_mapping=executors_storage) + runtime = pm.LocalMooseRuntime(storage_mapping=executors_storage) concrete_comp = pm.trace(my_comp) comp_bin = utils.serialize_computation(concrete_comp) diff --git a/pymoose/rust_integration_tests/argmax_test.py b/pymoose/rust_integration_tests/argmax_test.py index 7b47bdc5f..012bcd483 100644 --- a/pymoose/rust_integration_tests/argmax_test.py +++ b/pymoose/rust_integration_tests/argmax_test.py @@ -8,7 +8,6 @@ import pymoose as pm from pymoose.computation import types as ty from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class ArgmaxExample(parameterized.TestCase): @@ -77,7 +76,7 @@ def test_example_execute(self, x, axis, axis_idx_max): "bob": {"x_arg": x_arg}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_less_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/boolean_ops_test.py b/pymoose/rust_integration_tests/boolean_ops_test.py index d523a96a6..e802bc5ac 100644 --- a/pymoose/rust_integration_tests/boolean_ops_test.py +++ b/pymoose/rust_integration_tests/boolean_ops_test.py @@ -8,7 +8,6 @@ import pymoose as pm from pymoose.computation import types as ty from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class BooleanLogicExample(parameterized.TestCase): @@ -79,7 +78,7 @@ def test_bool_example_execute(self, x, y): "bob": {"x_arg": x, "y_arg": y}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_less_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/concat_test.py b/pymoose/rust_integration_tests/concat_test.py index 4d1d93801..af24e794b 100644 --- a/pymoose/rust_integration_tests/concat_test.py +++ b/pymoose/rust_integration_tests/concat_test.py @@ -8,7 +8,6 @@ import pymoose as pm from pymoose.computation import utils from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime player0 = pm.host_placement("player0") player1 = pm.host_placement("player1") @@ -52,7 +51,7 @@ def my_comp(): "player1": {}, "player2": {}, } - runtime = LocalMooseRuntime(storage_mapping=executors_storage) + runtime = pm.LocalMooseRuntime(storage_mapping=executors_storage) concrete_comp = pm.trace(my_comp) comp_bin = utils.serialize_computation(concrete_comp) diff --git a/pymoose/rust_integration_tests/dtype_conversions_test.py b/pymoose/rust_integration_tests/dtype_conversions_test.py index cd37ee9df..f0a864550 100644 --- a/pymoose/rust_integration_tests/dtype_conversions_test.py +++ b/pymoose/rust_integration_tests/dtype_conversions_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class DTypeConversionTest(parameterized.TestCase): @@ -94,7 +93,7 @@ def test_host_dtype_conversions(self, x_array, from_dtype, to_dtype): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/exp_test.py b/pymoose/rust_integration_tests/exp_test.py index 8289f2e7f..ba5bf065b 100644 --- a/pymoose/rust_integration_tests/exp_test.py +++ b/pymoose/rust_integration_tests/exp_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class ReplicatedExample(parameterized.TestCase): @@ -66,7 +65,7 @@ def test_exp_example_execute(self, x): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_exp_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, @@ -90,7 +89,7 @@ def test_float_exp_execute(self, x, moose_dtype): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_exp_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/log_test.py b/pymoose/rust_integration_tests/log_test.py index af06245ee..d31c64eb5 100644 --- a/pymoose/rust_integration_tests/log_test.py +++ b/pymoose/rust_integration_tests/log_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class ReplicatedExample(parameterized.TestCase): @@ -53,7 +52,7 @@ def test_log_example_execute(self, x, log_op, np_log): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_exp_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/maximum_test.py b/pymoose/rust_integration_tests/maximum_test.py index e692e8a3e..49b3d93c6 100644 --- a/pymoose/rust_integration_tests/maximum_test.py +++ b/pymoose/rust_integration_tests/maximum_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class MaximumExample(parameterized.TestCase): @@ -128,7 +127,7 @@ def test_maximum_fixed(self, x, y, z, run_rep): "carole": {"z_arg": z_arg}, } - runtime_rep = LocalMooseRuntime(storage_mapping=storage_rep) + runtime_rep = pm.LocalMooseRuntime(storage_mapping=storage_rep) _ = runtime_rep.evaluate_computation( computation=traced_maximum_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, @@ -166,7 +165,7 @@ def test_float_maximum_execute(self, x, y, z, edsl_dtype): "carole": {"z_arg": z_arg}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_maximum_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/mirrored_ops_test.py b/pymoose/rust_integration_tests/mirrored_ops_test.py index 3b161ff4e..7a8689687 100644 --- a/pymoose/rust_integration_tests/mirrored_ops_test.py +++ b/pymoose/rust_integration_tests/mirrored_ops_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class MirroredOpsExample(parameterized.TestCase): @@ -37,7 +36,7 @@ def test_example_execute(self): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) result_dict = runtime.evaluate_computation( computation=traced_less_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/ones_test.py b/pymoose/rust_integration_tests/ones_test.py index 1af92703e..2ae088a2b 100644 --- a/pymoose/rust_integration_tests/ones_test.py +++ b/pymoose/rust_integration_tests/ones_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class HostExample(parameterized.TestCase): @@ -44,7 +43,7 @@ def test_ones_example_execute(self, x, ones_op, np_ones): storage = { "bob": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_ones_comp, role_assignment={"bob": "bob"}, diff --git a/pymoose/rust_integration_tests/reduce_max_test.py b/pymoose/rust_integration_tests/reduce_max_test.py index 6221c5ea3..5e6ee30a6 100644 --- a/pymoose/rust_integration_tests/reduce_max_test.py +++ b/pymoose/rust_integration_tests/reduce_max_test.py @@ -8,7 +8,6 @@ import pymoose as pm from pymoose.computation import types as ty from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class ReducemaxLogicExample(parameterized.TestCase): @@ -54,7 +53,7 @@ def test_example_execute(self, x): "bob": {"x_arg": x_arg}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_less_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/relu_test.py b/pymoose/rust_integration_tests/relu_test.py index 9808f1e92..e607631c2 100644 --- a/pymoose/rust_integration_tests/relu_test.py +++ b/pymoose/rust_integration_tests/relu_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class ReplicatedExample(parameterized.TestCase): @@ -50,7 +49,7 @@ def test_relu_example_execute(self, x): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_relu_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/rerurn_test.py b/pymoose/rust_integration_tests/rerurn_test.py index 27e859bbf..2820c83ec 100644 --- a/pymoose/rust_integration_tests/rerurn_test.py +++ b/pymoose/rust_integration_tests/rerurn_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class RerunExample(parameterized.TestCase): @@ -41,7 +40,7 @@ def my_comp(): def test_example_execute(self): comp = self._setup_comp() traced_less_comp = pm.trace(comp) - runtime = LocalMooseRuntime(identities=["alice", "bob", "carole"]) + runtime = pm.LocalMooseRuntime(identities=["alice", "bob", "carole"]) result_dict = runtime.evaluate_computation( computation=traced_less_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, @@ -69,7 +68,7 @@ def test_example_execute(self): ) # But if you want to have different identities, you would need a new instance - runtime = LocalMooseRuntime(identities=["newalice", "newbob", "newcarole"]) + runtime = pm.LocalMooseRuntime(identities=["newalice", "newbob", "newcarole"]) result_dict = runtime.evaluate_computation( computation=traced_less_comp, role_assignment={ diff --git a/pymoose/rust_integration_tests/reshape_test.py b/pymoose/rust_integration_tests/reshape_test.py index 4aeb7b72d..8b2feea21 100644 --- a/pymoose/rust_integration_tests/reshape_test.py +++ b/pymoose/rust_integration_tests/reshape_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime alice = pm.host_placement(name="alice") bob = pm.host_placement(name="bob") @@ -66,7 +65,7 @@ def test_example_execute(self, reshape_placement): "bob": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) runtime.evaluate_computation( computation=traced_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/shape_test.py b/pymoose/rust_integration_tests/shape_test.py index af2b45655..bce8087b4 100644 --- a/pymoose/rust_integration_tests/shape_test.py +++ b/pymoose/rust_integration_tests/shape_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime alice = pm.host_placement(name="alice") bob = pm.host_placement(name="bob") @@ -50,7 +49,7 @@ def test_example_execute(self, dtype, shape_placement): "bob": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) results = runtime.evaluate_computation( computation=traced_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/sigmoid_test.py b/pymoose/rust_integration_tests/sigmoid_test.py index 81b7cd58e..e3f8759b0 100644 --- a/pymoose/rust_integration_tests/sigmoid_test.py +++ b/pymoose/rust_integration_tests/sigmoid_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class ReplicatedExample(parameterized.TestCase): @@ -66,7 +65,7 @@ def test_sigmoid_example_execute(self, x): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_sigmoid_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, @@ -94,7 +93,7 @@ def test_float_sigmoid_execute(self, x, dtype): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_sigmoid_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/slicing_test.py b/pymoose/rust_integration_tests/slicing_test.py index 76bddb375..f2e2ea2d4 100644 --- a/pymoose/rust_integration_tests/slicing_test.py +++ b/pymoose/rust_integration_tests/slicing_test.py @@ -8,7 +8,6 @@ import pymoose as pm from pymoose.computation import types as ty from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime def compile_and_run(traced_slice_comp, x_arg): @@ -18,7 +17,7 @@ def compile_and_run(traced_slice_comp, x_arg): "bob": {"x_arg": x_arg}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_slice_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, @@ -165,7 +164,7 @@ def my_comp(x: pm.Argument(alice, pm.float64)): traced_slice_comp = pm.trace(my_comp) x_arg = np.ones([4, 3, 5], dtype=np.float64) - runtime = LocalMooseRuntime(storage_mapping={"alice": {}}) + runtime = pm.LocalMooseRuntime(storage_mapping={"alice": {}}) _ = runtime.evaluate_computation( computation=traced_slice_comp, role_assignment={"alice": "alice"}, diff --git a/pymoose/rust_integration_tests/softmax_test.py b/pymoose/rust_integration_tests/softmax_test.py index cd7429859..2d1b76880 100644 --- a/pymoose/rust_integration_tests/softmax_test.py +++ b/pymoose/rust_integration_tests/softmax_test.py @@ -8,7 +8,6 @@ import pymoose as pm from pymoose.computation import types as ty from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class SoftmaxExample(parameterized.TestCase): @@ -89,7 +88,7 @@ def test_example_execute(self, x, axis, axis_idx_max): "bob": {"x_arg": x_arg}, } - runtime_rep = LocalMooseRuntime(storage_mapping=storage_rep) + runtime_rep = pm.LocalMooseRuntime(storage_mapping=storage_rep) _ = runtime_rep.evaluate_computation( computation=traced_softmax_rep_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, @@ -107,7 +106,7 @@ def test_example_execute(self, x, axis, axis_idx_max): "bob": {"x_arg": x_arg}, } - runtime_host = LocalMooseRuntime(storage_mapping=storage_host) + runtime_host = pm.LocalMooseRuntime(storage_mapping=storage_host) _ = runtime_host.evaluate_computation( computation=traced_softmax_host_comp, role_assignment={"bob": "bob"}, diff --git a/pymoose/rust_integration_tests/sqrt_test.py b/pymoose/rust_integration_tests/sqrt_test.py index 167bd4726..322915e8a 100644 --- a/pymoose/rust_integration_tests/sqrt_test.py +++ b/pymoose/rust_integration_tests/sqrt_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class ReplicatedExample(parameterized.TestCase): @@ -46,7 +45,7 @@ def test_sqrt_example_execute(self, x): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_sqrt_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/squeeze_test.py b/pymoose/rust_integration_tests/squeeze_test.py index 1ab778238..1302d46b8 100644 --- a/pymoose/rust_integration_tests/squeeze_test.py +++ b/pymoose/rust_integration_tests/squeeze_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class SqueezeExample(parameterized.TestCase): @@ -116,7 +115,7 @@ def test_squeeze_fixed(self, x, axis, run_rep): "carole": {}, } - runtime_rep = LocalMooseRuntime(storage_mapping=storage_rep) + runtime_rep = pm.LocalMooseRuntime(storage_mapping=storage_rep) _ = runtime_rep.evaluate_computation( computation=traced_squeeze_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, @@ -155,7 +154,7 @@ def test_float_squeeze_execute(self, x, axis, edsl_dtype): "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_maximum_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/transpose_test.py b/pymoose/rust_integration_tests/transpose_test.py index 1b4793cd2..ddf16f9c7 100644 --- a/pymoose/rust_integration_tests/transpose_test.py +++ b/pymoose/rust_integration_tests/transpose_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class TransposeExample(parameterized.TestCase): @@ -110,7 +109,7 @@ def test_transpose_fixed(self, x, run_rep): "carole": {}, } - runtime_rep = LocalMooseRuntime(storage_mapping=storage_rep) + runtime_rep = pm.LocalMooseRuntime(storage_mapping=storage_rep) _ = runtime_rep.evaluate_computation( computation=traced_squeeze_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, @@ -150,7 +149,7 @@ def test_float_transpose_execute(self, x, edsl_dtype): "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_maximum_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/uint64_test.py b/pymoose/rust_integration_tests/uint64_test.py index 0fcc16e3c..c18edaad1 100644 --- a/pymoose/rust_integration_tests/uint64_test.py +++ b/pymoose/rust_integration_tests/uint64_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class ReplicatedExample(parameterized.TestCase): @@ -40,7 +39,7 @@ def test_int_example_execute(self, x): "bob": {}, "carole": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_exp_comp, role_assignment={"alice": "alice", "bob": "bob", "carole": "carole"}, diff --git a/pymoose/rust_integration_tests/zeros_test.py b/pymoose/rust_integration_tests/zeros_test.py index a7ce50260..1524ee361 100644 --- a/pymoose/rust_integration_tests/zeros_test.py +++ b/pymoose/rust_integration_tests/zeros_test.py @@ -7,7 +7,6 @@ import pymoose as pm from pymoose.logger import get_logger -from pymoose.testing import LocalMooseRuntime class HostExample(parameterized.TestCase): @@ -44,7 +43,7 @@ def test_zeros_example_execute(self, x, zeros_op, np_zeros): storage = { "bob": {}, } - runtime = LocalMooseRuntime(storage_mapping=storage) + runtime = pm.LocalMooseRuntime(storage_mapping=storage) _ = runtime.evaluate_computation( computation=traced_zeros_comp, role_assignment={"bob": "bob"}, diff --git a/pymoose/src/bindings.rs b/pymoose/src/bindings.rs index fe858e20b..b9f31cb58 100644 --- a/pymoose/src/bindings.rs +++ b/pymoose/src/bindings.rs @@ -1,11 +1,12 @@ use crate::computation::PyComputation; use moose::compilation::compile; use moose::compilation::toposort; -use moose::computation::{Computation, Role, Value}; use moose::execution::AsyncTestRuntime; -use moose::execution::Identity; -use moose::host::{FromRaw, HostBitTensor, HostPlacement, HostString, HostTensor}; +use moose::host::HostTensor; +use moose::prelude::*; use moose::textual::{parallel_parse_computation, ToTextual}; +use moose::tokio; +use moose_modules::execution::grpc::GrpcMooseRuntime; use ndarray::LinalgScalar; use numpy::{Element, PyArrayDescr, PyArrayDyn, ToPyArray}; use pyo3::exceptions::PyRuntimeError; @@ -22,7 +23,7 @@ fn create_computation_graph_from_py_bytes(computation: Vec) -> Computation { toposort::toposort(rust_comp).unwrap() } -fn pyobj_to_value(py: Python, obj: PyObject) -> PyResult { +fn pyobj_to_value(py: Python, obj: &PyObject) -> PyResult { let obj_ref = obj.as_ref(py); if obj_ref.is_instance_of::()? { let string_value: String = obj.extract(py)?; @@ -37,7 +38,7 @@ fn pyobj_to_value(py: Python, obj: PyObject) -> PyResult { // NOTE: this passes for any inner dtype, since python's isinstance will // only do a shallow typecheck. inside the pyobj_tensor_to_value we do further // introspection on the array & its dtype to map to the correct kind of Value - let value = pyobj_tensor_to_value(py, &obj).unwrap(); + let value = pyobj_tensor_to_value(py, obj).unwrap(); Ok(value) } else { Err(PyTypeError::new_err( @@ -197,7 +198,7 @@ impl LocalRuntime { value: PyObject, ) -> PyResult<()> { let identity = Identity::from(identity); - let value_to_store = pyobj_to_value(py, value)?; + let value_to_store = pyobj_to_value(py, &value)?; let _result = self .runtime .write_value_to_storage(identity, key, value_to_store) @@ -231,7 +232,7 @@ impl LocalRuntime { ) -> PyResult>> { let arguments = arguments .iter() - .map(|arg| (arg.0.clone(), pyobj_to_value(py, arg.1.clone()).unwrap())) + .map(|(name, value)| (name.clone(), pyobj_to_value(py, value).unwrap())) .collect::>(); let valid_role_assignments = role_assignments @@ -265,6 +266,77 @@ impl LocalRuntime { } } +#[pyclass(subclass)] +pub struct GrpcRuntime { + tokio_runtime: tokio::runtime::Runtime, + grpc_runtime: GrpcMooseRuntime, +} + +#[pymethods] +impl GrpcRuntime { + #[new] + fn new(role_assignment: HashMap) -> Self { + let typed_role_assignment = role_assignment + .into_iter() + .map(|(role, identity)| (Role::from(role), Identity::from(identity))) + .collect::>(); + + let tokio_runtime = tokio::runtime::Runtime::new().expect("failed to create Tokio runtime"); + + let grpc_runtime = { + let _guard = tokio_runtime.enter(); + GrpcMooseRuntime::new(typed_role_assignment, None).unwrap() + }; + + GrpcRuntime { + grpc_runtime, + tokio_runtime, + } + } + + fn evaluate_computation( + &mut self, + py: Python, + computation: Vec, + arguments: HashMap, + ) -> PyResult>> { + let logical_computation = create_computation_graph_from_py_bytes(computation); + + let physical_computation = compile::(logical_computation, None) + .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + + // TODO(Morten) run_computation should accept arguments! + let _typed_arguments = arguments + .iter() + .map(|(name, value)| (name.clone(), pyobj_to_value(py, value).unwrap())) + .collect::>(); + + let session_id = SessionId::random(); + + let typed_outputs = self + .tokio_runtime + .block_on( + self.grpc_runtime + .run_computation(&session_id, &physical_computation), + ) + .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + + let mut outputs_py_val: HashMap = HashMap::new(); + for (output_name, value) in typed_outputs { + match value { + Value::HostUnit(_) => None, + // TODO: not sure what to support, should eventually standardize output types of computations + Value::HostString(s) => Some(PyString::new(py, &s.0).to_object(py)), + Value::Float64(f) => Some(PyFloat::new(py, *f).to_object(py)), + // assume it's a tensor + _ => outputs_py_val.insert(output_name, tensorval_to_pyobj(py, value).unwrap()), + }; + } + + Ok(Some(outputs_py_val)) + } +} + #[pyclass] pub struct MooseComputation { computation: Computation, @@ -357,6 +429,7 @@ fn elk_compiler(_py: Python<'_>, m: &PyModule) -> PyResult<()> { #[pymodule] fn moose_runtime(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; + m.add_class::()?; m.add_class::()?; Ok(()) } diff --git a/reindeer/Cargo.toml b/reindeer/Cargo.toml index 590b63c50..a39112c27 100644 --- a/reindeer/Cargo.toml +++ b/reindeer/Cargo.toml @@ -13,7 +13,6 @@ opentelemetry = { version = "0.17", default-features = false, features = ["trace opentelemetry-jaeger = "0.16" serde_json = "1.0" structopt = "0.3" -tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } tonic = "~0.6" tracing = { version = "0.1", features = ["log"] } tracing-attributes = "0.1" diff --git a/reindeer/src/comet.rs b/reindeer/src/comet.rs index 52c70d33d..2eaad4d60 100644 --- a/reindeer/src/comet.rs +++ b/reindeer/src/comet.rs @@ -1,5 +1,6 @@ use moose::prelude::*; use moose::storage::LocalAsyncStorage; +use moose::tokio; use moose_modules::choreography::grpc::GrpcChoreography; use moose_modules::networking::grpc::GrpcNetworkingManager; use std::sync::Arc; diff --git a/reindeer/src/cometctl.rs b/reindeer/src/cometctl.rs index 808d2c4a3..5dc027d49 100644 --- a/reindeer/src/cometctl.rs +++ b/reindeer/src/cometctl.rs @@ -1,5 +1,6 @@ use clap::{Parser, Subcommand}; use moose::computation::SessionId; +use moose::tokio; use moose_modules::choreography::{ parse_session_config_file_with_computation, parse_session_config_file_without_computation, }; diff --git a/reindeer/src/dasher.rs b/reindeer/src/dasher.rs index b2814f039..b60f02220 100644 --- a/reindeer/src/dasher.rs +++ b/reindeer/src/dasher.rs @@ -1,5 +1,6 @@ use moose::computation::Operator; use moose::prelude::*; +use moose::tokio; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::sync::Arc; diff --git a/reindeer/src/rudolph.rs b/reindeer/src/rudolph.rs index 5546bd370..e75c9fd3f 100644 --- a/reindeer/src/rudolph.rs +++ b/reindeer/src/rudolph.rs @@ -1,5 +1,6 @@ use moose::prelude::*; use moose::storage::LocalAsyncStorage; +use moose::tokio; use moose_modules::choreography::filesystem::FilesystemChoreography; use moose_modules::networking::grpc::GrpcNetworkingManager; use std::sync::Arc; diff --git a/reindeer/src/vixen.rs b/reindeer/src/vixen.rs index 9dcf0529b..47301e1cb 100644 --- a/reindeer/src/vixen.rs +++ b/reindeer/src/vixen.rs @@ -2,6 +2,7 @@ use moose::computation::Role; use moose::execution::Identity; use moose::prelude::*; use moose::storage::LocalAsyncStorage; +use moose::tokio; use moose_modules::networking::tcpstream::TcpStreamNetworking; use std::collections::HashMap; use std::convert::TryFrom;