Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standalone view server #3417

Open
wants to merge 2 commits into
base: v5-fog-debug
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fog/view/server/src/fog_view_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2018-2022 The MobileCoin Foundation

//! Shard (store) fog view service

use crate::{server::DbPollSharedState, sharding_strategy::ShardingStrategy, SVC_COUNTERS};
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use mc_attest_api::attest;
Expand Down
223 changes: 223 additions & 0 deletions fog/view/server/src/fog_view_service_standalone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright (c) 2018-2022 The MobileCoin Foundation

//! Standalone fog view service

use crate::{server::DbPollSharedState, SVC_COUNTERS};
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use mc_attest_api::attest;
use mc_common::logger::{log, Logger};
use mc_fog_api::view_grpc::FogViewApi;
use mc_fog_recovery_db_iface::RecoveryDb;
use mc_fog_types::view::QueryRequestAAD;
use mc_fog_view_enclave::{Error as ViewEnclaveError, ViewEnclaveProxy};
use mc_fog_view_enclave_api::UntrustedQueryResponse;
use mc_util_grpc::{
check_request_chain_id, rpc_internal_error, rpc_invalid_arg_error, rpc_logger,
rpc_permissions_error, send_result, Authenticator,
};
use mc_util_telemetry::{tracer, BoxedTracer, Tracer};
use std::sync::{Arc, Mutex};

#[derive(Clone)]
pub struct StandaloneFogViewService<E, DB>
where
E: ViewEnclaveProxy,
DB: RecoveryDb + Send + Sync,
{
/// Enclave providing access to the Recovery DB
enclave: E,

/// Recovery DB.
db: Arc<DB>,

/// Shared state from db polling thread.
db_poll_shared_state: Arc<Mutex<DbPollSharedState>>,

/// Chain id.
chain_id: String,

/// GRPC request authenticator.
authenticator: Arc<dyn Authenticator + Send + Sync>,

/// Slog logger object
logger: Logger,
}

impl<E, DB> StandaloneFogViewService<E, DB>
where
E: ViewEnclaveProxy,
DB: RecoveryDb + Send + Sync,
{
/// Creates a new fog-view-service node (but does not create sockets and
/// start it etc.)
pub fn new(
enclave: E,
db: Arc<DB>,
db_poll_shared_state: Arc<Mutex<DbPollSharedState>>,
chain_id: String,
authenticator: Arc<dyn Authenticator + Send + Sync>,
logger: Logger,
) -> Self {
Self {
enclave,
db,
db_poll_shared_state,
chain_id,
authenticator,
logger,
}
}

pub fn create_untrusted_query_response(
&mut self,
aad: &[u8],
tracer: &BoxedTracer,
) -> Result<UntrustedQueryResponse, RpcStatus> {
// Attempt and deserialize the untrusted portion of this request.
let query_request_aad: QueryRequestAAD = mc_util_serial::decode(aad).map_err(|err| {
RpcStatus::with_message(
RpcStatusCode::INVALID_ARGUMENT,
format!("AAD deserialization error: {err}"),
)
})?;

let (user_events, next_start_from_user_event_id) =
tracer.in_span("search_user_events", |_cx| {
self.db
.search_user_events(query_request_aad.start_from_user_event_id)
.map_err(|e| rpc_internal_error("search_user_events", e, &self.logger))
})?;

let (
highest_processed_block_count,
highest_processed_block_signature_timestamp,
last_known_block_count,
last_known_block_cumulative_txo_count,
) = tracer.in_span("get_shared_state", |_cx_| {
let shared_state = self.db_poll_shared_state.lock().expect("mutex poisoned");
(
shared_state.highest_processed_block_count,
shared_state.highest_processed_block_signature_timestamp,
shared_state.last_known_block_count,
shared_state.last_known_block_cumulative_txo_count,
)
});

let untrusted_query_response = UntrustedQueryResponse {
user_events,
next_start_from_user_event_id,
highest_processed_block_count,
highest_processed_block_signature_timestamp,
last_known_block_count,
last_known_block_cumulative_txo_count,
};

Ok(untrusted_query_response)
}

/// Unwrap and forward to enclave
pub fn query_impl(&mut self, request: attest::Message) -> Result<attest::Message, RpcStatus> {
let tracer = tracer!();

tracer.in_span("query_impl", |_cx| {
let untrusted_query_response =
self.create_untrusted_query_response(request.get_aad(), &tracer)?;
let data = tracer.in_span("enclave_query", |_cx| {
self.enclave
.query(request.into(), untrusted_query_response)
.map_err(|e| self.enclave_err_to_rpc_status("enclave request", e))
})?;

let mut resp = attest::Message::new();
resp.set_data(data);
Ok(resp)
})
}

// Helper function that is common
fn enclave_err_to_rpc_status(&self, context: &str, src: ViewEnclaveError) -> RpcStatus {
// Treat prost-decode error as an invalid arg,
// treat attest error as permission denied,
// everything else is an internal error
match src {
ViewEnclaveError::ProstDecode => {
rpc_invalid_arg_error(context, "Prost decode failed", &self.logger)
}
ViewEnclaveError::AttestEnclave(err) => {
rpc_permissions_error(context, err, &self.logger)
}
other => rpc_internal_error(context, format!("{}", &other), &self.logger),
}
}
}

// Implement grpc trait
impl<E: ViewEnclaveProxy, DB: RecoveryDb + Send + Sync> FogViewApi
for StandaloneFogViewService<E, DB>
{
fn auth(
&mut self,
ctx: RpcContext,
mut request: attest::AuthMessage,
sink: UnarySink<attest::AuthMessage>,
) {
let _timer = SVC_COUNTERS.req(&ctx);
mc_common::logger::scoped_global_logger(&rpc_logger(&ctx, &self.logger), |logger| {
if let Err(err) = check_request_chain_id(&self.chain_id, &ctx) {
return send_result(ctx, sink, Err(err), logger);
}

if let Err(err) = self.authenticator.authenticate_rpc(&ctx) {
return send_result(ctx, sink, err.into(), logger);
}

// TODO: Use the prost message directly, once available
match self.enclave.client_accept(request.take_data().into()) {
Ok((response, _)) => {
let mut result = attest::AuthMessage::new();
result.set_data(response.into());
send_result(ctx, sink, Ok(result), logger);
}
Err(client_error) => {
// This is debug because there's no requirement on the remote party to trigger
// it.
log::debug!(
logger,
"ViewEnclaveApi::client_accept failed: {}",
client_error
);
send_result(
ctx,
sink,
Err(rpc_permissions_error(
"client_auth",
format!("Permission denied: {client_error}"),
logger,
)),
logger,
);
}
}
});
}

fn query(
&mut self,
ctx: RpcContext,
request: attest::Message,
sink: UnarySink<attest::Message>,
) {
let _timer = SVC_COUNTERS.req(&ctx);
mc_common::logger::scoped_global_logger(&rpc_logger(&ctx, &self.logger), |logger| {
if let Err(err) = check_request_chain_id(&self.chain_id, &ctx) {
return send_result(ctx, sink, Err(err), logger);
}

if let Err(err) = self.authenticator.authenticate_rpc(&ctx) {
return send_result(ctx, sink, err.into(), logger);
}

send_result(ctx, sink, self.query_impl(request), logger)
})
}
}
1 change: 1 addition & 0 deletions fog/view/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod error;
pub mod fog_view_router_server;
pub mod fog_view_router_service;
pub mod fog_view_service;
pub mod fog_view_service_standalone;
pub mod server;
pub mod sharding_strategy;

Expand Down
24 changes: 19 additions & 5 deletions fog/view/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

use crate::{
block_tracker::BlockTracker, config::MobileAcctViewConfig, counters, db_fetcher::DbFetcher,
fog_view_service::FogViewService, sharding_strategy::ShardingStrategy,
fog_view_service::FogViewService, fog_view_service_standalone::StandaloneFogViewService,
sharding_strategy::ShardingStrategy,
};
use futures::executor::block_on;
use mc_attest_net::RaClient;
Expand Down Expand Up @@ -113,24 +114,37 @@ where
let uri = FogViewStoreUri::try_from_responder_id(responder_id, use_tls)
.expect("Could not create uri from responder id");

let fog_view_service = view_grpc::create_fog_view_store_api(FogViewService::new(
let recovery_db = Arc::new(recovery_db);

let store_fog_view_service = view_grpc::create_fog_view_store_api(FogViewService::new(
enclave.clone(),
Arc::new(recovery_db),
recovery_db.clone(),
db_poll_thread.get_shared_state(),
uri,
client_authenticator,
client_authenticator.clone(),
sharding_strategy,
logger.clone(),
));

let standalone_fog_view_service =
view_grpc::create_fog_view_api(StandaloneFogViewService::new(
enclave.clone(),
recovery_db,
db_poll_thread.get_shared_state(),
config.chain_id.clone(),
client_authenticator,
logger.clone(),
));

// Package service into grpc server
log::info!(
logger,
"Starting View Store server on {}",
config.client_listen_uri,
);
let server_builder = grpcio::ServerBuilder::new(env)
.register_service(fog_view_service)
.register_service(store_fog_view_service)
.register_service(standalone_fog_view_service)
.register_service(health_service);

let server = server_builder
Expand Down