Skip to content

Commit

Permalink
wip: plumbing FakeNexus handle_instance_put_{success,failure}
Browse files Browse the repository at this point in the history
  • Loading branch information
lif committed Feb 15, 2024
1 parent d18090d commit 7bfa9b4
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 56 deletions.
10 changes: 10 additions & 0 deletions common/src/api/internal/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ pub struct SledInstanceState {
pub vmm_state: VmmRuntimeState,
}

/// Tells sled-agent whether an instance whose status it's reporting is still
/// relevant, or if it's timed out during creation and been marked as failed
/// (such that sled-agent can destroy the tardy instance)
#[derive(Serialize, JsonSchema)]
#[serde(rename_all = "snake_case", tag = "last_result", content = "details")]
pub enum HandleInstancePutResultResult {
Ok,
TimedOut { generation: Generation },
}

// Oximeter producer/collector objects.

/// The kind of metric producer this is.
Expand Down
11 changes: 5 additions & 6 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use nexus_db_queries::db::identity::Resource;
use nexus_db_queries::db::lookup;
use nexus_db_queries::db::lookup::LookupPath;
use nexus_types::external_api::views;
use nexus_types::internal_api::views::HandleInstancePutResultResult;
use omicron_common::address::PROPOLIS_PORT;
use omicron_common::api::external::http_pagination::PaginatedBy;
use omicron_common::api::external::ByteCount;
Expand Down Expand Up @@ -1071,7 +1070,7 @@ impl super::Nexus {
opctx: &OpContext,
instance_id: &Uuid,
result: Result<nexus::SledInstanceState, Error>,
) -> Result<HandleInstancePutResultResult, Error> {
) -> Result<nexus::HandleInstancePutResultResult, Error> {
let (.., authz_instance) = LookupPath::new(&opctx, &self.db_datastore)
.instance_id(*instance_id)
.lookup_for(authz::Action::Modify)
Expand All @@ -1097,14 +1096,14 @@ impl super::Nexus {
"instance_id" => %instance_id,
"generation" => %generation
);
Ok(HandleInstancePutResultResult::TimedOut {
Ok(nexus::HandleInstancePutResultResult::TimedOut {
generation,
})
}
_ => Ok(HandleInstancePutResultResult::Ok),
_ => Ok(nexus::HandleInstancePutResultResult::Ok),
}
} else {
Ok(HandleInstancePutResultResult::Ok)
Ok(nexus::HandleInstancePutResultResult::Ok)
}
}
Err(error) => {
Expand All @@ -1119,7 +1118,7 @@ impl super::Nexus {
error,
)
.await?;
Ok(HandleInstancePutResultResult::Ok)
Ok(nexus::HandleInstancePutResultResult::Ok)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion nexus/src/internal_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ use nexus_types::internal_api::params::SwitchPutRequest;
use nexus_types::internal_api::params::SwitchPutResponse;
use nexus_types::internal_api::views::to_list;
use nexus_types::internal_api::views::BackgroundTask;
use nexus_types::internal_api::views::HandleInstancePutResultResult;
use nexus_types::internal_api::views::Saga;
use omicron_common::api::external::http_pagination::data_page_params_for;
use omicron_common::api::external::http_pagination::PaginatedById;
use omicron_common::api::external::http_pagination::ScanById;
use omicron_common::api::external::http_pagination::ScanParams;
use omicron_common::api::external::Error;
use omicron_common::api::internal::nexus::DiskRuntimeState;
use omicron_common::api::internal::nexus::HandleInstancePutResultResult;
use omicron_common::api::internal::nexus::ProducerEndpoint;
use omicron_common::api::internal::nexus::SledInstanceState;
use omicron_common::update::ArtifactId;
Expand Down
11 changes: 0 additions & 11 deletions nexus/types/src/internal_api/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use chrono::DateTime;
use chrono::Utc;
use futures::future::ready;
use futures::stream::StreamExt;
use omicron_common::api::external::Generation;
use omicron_common::api::external::ObjectStream;
use schemars::JsonSchema;
use serde::Serialize;
Expand Down Expand Up @@ -297,13 +296,3 @@ pub struct LastResultCompleted {
/// arbitrary datum emitted by the background task
pub details: serde_json::Value,
}

/// Tells sled-agent whether an instance whose status it's reporting is still
/// relevant, or if it's timed out during creation and been marked as failed
/// (such that sled-agent can destroy the tardy instance)
#[derive(Serialize, JsonSchema)]
#[serde(rename_all = "snake_case", tag = "last_result", content = "details")]
pub enum HandleInstancePutResultResult {
Ok,
TimedOut { generation: Generation },
}
56 changes: 55 additions & 1 deletion sled-agent/src/fakes/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use hyper::Body;
use internal_dns::ServiceName;
use omicron_common::api::external::Error;
use omicron_common::api::internal::nexus::{
SledInstanceState, UpdateArtifactId,
HandleInstancePutResultResult, SledInstanceState, UpdateArtifactId,
};
use schemars::JsonSchema;
use serde::Deserialize;
Expand All @@ -40,6 +40,20 @@ pub trait FakeNexusServer: Send + Sync {
) -> Result<(), Error> {
Err(Error::internal_error("Not implemented"))
}
fn cpapi_handle_instance_put_success(
&self,
_instance_id: Uuid,
_instance_state: SledInstanceState,
) -> Result<HandleInstancePutResultResult, Error> {
Err(Error::internal_error("Not implemented"))
}
fn cpapi_handle_instance_put_failure(
&self,
_instance_id: Uuid,
_error: String,
) -> Result<HandleInstancePutResultResult, Error> {
Err(Error::internal_error("Not implemented"))
}
}

/// Describes the server context type.
Expand Down Expand Up @@ -69,6 +83,44 @@ struct InstancePathParam {
instance_id: Uuid,
}

#[endpoint {
method = PUT,
path = "/instances/{instance_id}/creation-success",
}]
async fn cpapi_handle_instance_put_success(
request_context: RequestContext<ServerContext>,
path_params: Path<InstancePathParam>,
instance_state: TypedBody<SledInstanceState>,
) -> Result<HttpResponseOk<HandleInstancePutResultResult>, HttpError> {
request_context
.context()
.cpapi_handle_instance_put_success(
path_params.into_inner().instance_id,
instance_state.into_inner(),
)
.map(HttpResponseOk)
.map_err(Into::into)
}

#[endpoint {
method = PUT,
path = "/instances/{instance_id}/creation-failure",
}]
async fn cpapi_handle_instance_put_failure(
request_context: RequestContext<ServerContext>,
path_params: Path<InstancePathParam>,
error: TypedBody<String>,
) -> Result<HttpResponseOk<HandleInstancePutResultResult>, HttpError> {
request_context
.context()
.cpapi_handle_instance_put_failure(
path_params.into_inner().instance_id,
error.into_inner(),
)
.map(HttpResponseOk)
.map_err(Into::into)
}

#[endpoint {
method = PUT,
path = "/instances/{instance_id}",
Expand All @@ -90,6 +142,8 @@ fn api() -> ApiDescription<ServerContext> {
let mut api = ApiDescription::new();
api.register(cpapi_artifact_download).unwrap();
api.register(cpapi_instances_put).unwrap();
api.register(cpapi_handle_instance_put_success).unwrap();
api.register(cpapi_handle_instance_put_failure).unwrap();
api
}

Expand Down
135 changes: 98 additions & 37 deletions sled-agent/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1275,51 +1275,112 @@ mod tests {
use omicron_common::api::external::{
ByteCount, Generation, Hostname, InstanceCpuCount, InstanceState,
};
use omicron_common::api::internal::nexus::HandleInstancePutResultResult;
use omicron_common::api::internal::nexus::InstanceProperties;
use sled_storage::disk::{RawDisk, SyntheticDisk};
use sled_storage::manager::FakeStorageManager;
use std::net::Ipv6Addr;
use std::str::FromStr;
use tokio::sync::watch::Receiver;
use tokio::sync::watch::{Receiver, Sender};
use tokio::time::timeout;

const TIMEOUT_DURATION: tokio::time::Duration =
tokio::time::Duration::from_secs(3);

#[derive(Default, Clone)]
enum ReceivedInstanceState {
#[default]
None,
InstancePut(SledInstanceState),
HandleSuccess(SledInstanceState),
HandleFailure(String),
}

struct NexusServer {
observed_runtime_state:
tokio::sync::watch::Sender<Option<SledInstanceState>>,
tokio::sync::watch::Sender<ReceivedInstanceState>,
nexus_disposition:
tokio::sync::watch::Receiver<Option<HandleInstancePutResultResult>>,
}
impl FakeNexusServer for NexusServer {
fn cpapi_instances_put(
&self,
_instance_id: Uuid,
new_runtime_state: SledInstanceState,
) -> Result<(), omicron_common::api::external::Error> {
self.observed_runtime_state.send(Some(new_runtime_state))
.map_err(|_| omicron_common::api::external::Error::internal_error("couldn't send updated SledInstanceState to test driver"))
self.observed_runtime_state
.send(ReceivedInstanceState::InstancePut(new_runtime_state))
.map_err(|_| {
omicron_common::api::external::Error::internal_error(
"couldn't send SledInstanceState to test driver",
)
})
}

fn cpapi_handle_instance_put_success(
&self,
_instance_id: Uuid,
state: SledInstanceState,
) -> Result<
HandleInstancePutResultResult,
omicron_common::api::external::Error,
> {
self.observed_runtime_state
.send(ReceivedInstanceState::HandleSuccess(state))
.map_err(|_| {
omicron_common::api::external::Error::internal_error(
"couldn't send SledInstanceState to test driver",
)
})?;
todo!()
}

fn cpapi_handle_instance_put_failure(
&self,
_instance_id: Uuid,
error: String,
) -> Result<
HandleInstancePutResultResult,
omicron_common::api::external::Error,
> {
self.observed_runtime_state
.send(ReceivedInstanceState::HandleFailure(error))
.map_err(|_| {
omicron_common::api::external::Error::internal_error(
"couldn't send error to test driver",
)
})?;
todo!()
}
}

fn fake_nexus_server(
logctx: &LogContext,
) -> (
NexusClient,
HttpServer<ServerContext>,
Receiver<Option<SledInstanceState>>,
) {
let (state_tx, state_rx) = tokio::sync::watch::channel(None);

let nexus_server = crate::fakes::nexus::start_test_server(
logctx.log.new(o!("component" => "FakeNexusServer")),
Box::new(NexusServer { observed_runtime_state: state_tx }),
);
let nexus_client = NexusClient::new(
&format!("http://{}", nexus_server.local_addr()),
logctx.log.new(o!("component" => "NexusClient")),
);
struct FakeNexusParts {
nexus_client: NexusClient,
nexus_server: HttpServer<ServerContext>,
state_rx: Receiver<ReceivedInstanceState>,
disp_tx: Sender<Option<HandleInstancePutResultResult>>,
}

(nexus_client, nexus_server, state_rx)
impl FakeNexusParts {
fn new(logctx: &LogContext) -> Self {
let (state_tx, state_rx) =
tokio::sync::watch::channel(ReceivedInstanceState::None);
let (disp_tx, disp_rx) = tokio::sync::watch::channel(None);

let nexus_server = crate::fakes::nexus::start_test_server(
logctx.log.new(o!("component" => "FakeNexusServer")),
Box::new(NexusServer {
observed_runtime_state: state_tx,
nexus_disposition: disp_rx,
}),
);
let nexus_client = NexusClient::new(
&format!("http://{}", nexus_server.local_addr()),
logctx.log.new(o!("component" => "NexusClient")),
);

Self { nexus_client, nexus_server, state_rx, disp_tx }
}
}

fn mock_vnic_contexts(
Expand Down Expand Up @@ -1582,8 +1643,8 @@ mod tests {
let _mock_vnic_contexts = mock_vnic_contexts();
let _mock_zone_contexts = mock_zone_contexts();

let (nexus_client, nexus_server, mut state_rx) =
fake_nexus_server(&logctx);
let FakeNexusParts { nexus_client, nexus_server, mut state_rx, .. } =
FakeNexusParts::new(&logctx);

let (_dns_server, resolver, _dns_config_dir) =
timeout(TIMEOUT_DURATION, dns_server(&logctx, &nexus_server))
Expand Down Expand Up @@ -1617,14 +1678,11 @@ mod tests {

timeout(
TIMEOUT_DURATION,
state_rx.wait_for(|maybe_state| {
maybe_state
.as_ref()
.map(|sled_inst_state| {
sled_inst_state.vmm_state.state
== InstanceState::Running
})
.unwrap_or(false)
state_rx.wait_for(|maybe_state| match maybe_state {
ReceivedInstanceState::InstancePut(sled_inst_state) => {
sled_inst_state.vmm_state.state == InstanceState::Running
}
_ => false,
}),
)
.await
Expand All @@ -1645,7 +1703,8 @@ mod tests {
let _mock_vnic_contexts = mock_vnic_contexts();
let _mock_zone_contexts = mock_zone_contexts();

let (nexus_client, nexus_server, state_rx) = fake_nexus_server(&logctx);
let FakeNexusParts { nexus_client, nexus_server, state_rx, .. } =
FakeNexusParts::new(&logctx);

let (_dns_server, resolver, _dns_config_dir) =
timeout(TIMEOUT_DURATION, dns_server(&logctx, &nexus_server))
Expand Down Expand Up @@ -1674,7 +1733,7 @@ mod tests {
.await
.expect_err("*should've* timed out waiting for Instance::put_state, but didn't?");

if let Some(SledInstanceState {
if let ReceivedInstanceState::InstancePut(SledInstanceState {
vmm_state: VmmRuntimeState { state: InstanceState::Running, .. },
..
}) = state_rx.borrow().to_owned()
Expand Down Expand Up @@ -1707,7 +1766,8 @@ mod tests {
let halt_rm_ctx = MockZones::halt_and_remove_logged_context();
halt_rm_ctx.expect().times(..).returning(|_, _| Ok(()));

let (nexus_client, nexus_server, state_rx) = fake_nexus_server(&logctx);
let FakeNexusParts { nexus_client, nexus_server, state_rx, .. } =
FakeNexusParts::new(&logctx);

let (_dns_server, resolver, _dns_config_dir) =
timeout(TIMEOUT_DURATION, dns_server(&logctx, &nexus_server))
Expand Down Expand Up @@ -1736,7 +1796,7 @@ mod tests {
.await
.expect_err("*should've* timed out waiting for Instance::put_state, but didn't?");

if let Some(SledInstanceState {
if let ReceivedInstanceState::InstancePut(SledInstanceState {
vmm_state: VmmRuntimeState { state: InstanceState::Running, .. },
..
}) = state_rx.borrow().to_owned()
Expand All @@ -1755,7 +1815,8 @@ mod tests {

let storage_handle = fake_storage_manager_with_u2().await;

let (nexus_client, nexus_server, state_rx) = fake_nexus_server(&logctx);
let FakeNexusParts { nexus_client, nexus_server, state_rx, disp_tx } =
FakeNexusParts::new(&logctx);

let (_dns_server, resolver, _dns_config_dir) =
timeout(TIMEOUT_DURATION, dns_server(&logctx, &nexus_server))
Expand Down

0 comments on commit 7bfa9b4

Please sign in to comment.