diff --git a/Cargo.lock b/Cargo.lock index 0bd2b77e98..f76bb66af6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,7 @@ dependencies = [ "actix-codec", "actix-rt", "actix-service", + "actix-tls", "actix-utils", "ahash", "base64 0.22.1", @@ -131,6 +132,25 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "actix-tls" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac453898d866cdbecdbc2334fe1738c747b4eba14a677261f2b768ba05329389" +dependencies = [ + "actix-rt", + "actix-service", + "actix-utils", + "futures-core", + "impl-more", + "pin-project-lite", + "tokio", + "tokio-rustls 0.23.4", + "tokio-util", + "tracing", + "webpki-roots 0.22.6", +] + [[package]] name = "actix-utils" version = "3.0.1" @@ -154,6 +174,7 @@ dependencies = [ "actix-rt", "actix-server", "actix-service", + "actix-tls", "actix-utils", "actix-web-codegen", "ahash", @@ -4852,6 +4873,7 @@ dependencies = [ "pin-project-lite", "smallvec", "tokio", + "want", ] [[package]] @@ -4870,6 +4892,24 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.4.1", + "hyper-util", + "rustls 0.23.18", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", + "webpki-roots 0.26.6", +] + [[package]] name = "hyper-util" version = "0.1.9" @@ -4877,13 +4917,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" dependencies = [ "bytes", + "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.1", "hyper 1.4.1", "pin-project-lite", + "socket2 0.5.7", "tokio", "tower-service", + "tracing", ] [[package]] @@ -9328,6 +9371,48 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "reqwest" +version = "0.12.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-rustls 0.27.3", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "quinn 0.11.5", + "rustls 0.23.18", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots 0.26.6", + "windows-registry", +] + [[package]] name = "resolv-conf" version = "0.7.0" @@ -9565,6 +9650,7 @@ version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" dependencies = [ + "log", "ring 0.16.20", "sct", "webpki", @@ -10426,7 +10512,7 @@ dependencies = [ "futures", "futures-timer", "hyper 0.14.30", - "hyper-rustls", + "hyper-rustls 0.24.2", "log", "num_cpus", "once_cell", @@ -12658,6 +12744,7 @@ dependencies = [ name = "subspace-gateway" version = "0.1.0" dependencies = [ + "actix-web", "anyhow", "async-lock 3.4.0", "async-trait", @@ -12667,6 +12754,9 @@ dependencies = [ "hex", "jsonrpsee", "mimalloc", + "reqwest", + "serde", + "serde_json", "subspace-core-primitives", "subspace-data-retrieval", "subspace-erasure-coding", @@ -13431,6 +13521,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + [[package]] name = "synstructure" version = "0.12.6" @@ -13709,6 +13808,17 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls 0.20.9", + "tokio", + "webpki", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -14681,6 +14791,15 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "webpki-roots" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" +dependencies = [ + "webpki", +] + [[package]] name = "webpki-roots" version = "0.25.4" @@ -14783,6 +14902,36 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.42.0" diff --git a/crates/subspace-gateway/Cargo.toml b/crates/subspace-gateway/Cargo.toml index 48300f153e..3feff8f10f 100644 --- a/crates/subspace-gateway/Cargo.toml +++ b/crates/subspace-gateway/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "subspace-gateway" version = "0.1.0" -authors = ["Teor "] +authors = [ + "Teor ", + "Shamil Gadelshin " +] description = "A Subspace Network data gateway." edition = "2021" license = "MIT OR Apache-2.0" @@ -17,6 +20,7 @@ include = [ targets = ["x86_64-unknown-linux-gnu"] [dependencies] +actix-web = { version = "4", features = ["rustls"], default-features = false } async-lock = "3.4.0" anyhow = "1.0.89" async-trait = "0.1.83" @@ -26,6 +30,9 @@ futures = "0.3.31" hex = "0.4.3" jsonrpsee = { version = "0.24.5", features = ["server", "ws-client"] } mimalloc = "0.1.43" +reqwest = { version = "0.12.9", features = ["json", "rustls-tls"], default-features = false } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" } subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" } diff --git a/crates/subspace-gateway/README.md b/crates/subspace-gateway/README.md index e939f26bf8..c9d47d4cde 100644 --- a/crates/subspace-gateway/README.md +++ b/crates/subspace-gateway/README.md @@ -61,7 +61,7 @@ target/production/subspace-gateway --version Start a gateway connected to a single node development chain: ```bash -target/production/subspace-gateway run \ +target/production/subspace-gateway rpc \ --dev ``` diff --git a/crates/subspace-gateway/src/commands.rs b/crates/subspace-gateway/src/commands.rs index 159afc558f..22b924b832 100644 --- a/crates/subspace-gateway/src/commands.rs +++ b/crates/subspace-gateway/src/commands.rs @@ -1,11 +1,28 @@ //! Gateway subcommands. -pub(crate) mod run; +pub(crate) mod http; +pub(crate) mod network; +pub(crate) mod rpc; -use crate::commands::run::RunOptions; +use crate::commands::http::HttpCommandOptions; +use crate::commands::network::{configure_network, NetworkArgs}; +use crate::commands::rpc::RpcCommandOptions; +use crate::node_client::RpcNodeClient; +use crate::piece_getter::DsnPieceGetter; +use crate::piece_validator::SegmentCommitmentPieceValidator; +use anyhow::anyhow; +use async_lock::Semaphore; use clap::Parser; +use std::num::NonZeroUsize; use std::panic; use std::process::exit; +use std::sync::Arc; +use subspace_core_primitives::pieces::Record; +use subspace_data_retrieval::object_fetcher::ObjectFetcher; +use subspace_erasure_coding::ErasureCoding; +use subspace_kzg::Kzg; +use subspace_networking::utils::piece_provider::PieceProvider; +use subspace_networking::NodeRunner; use tokio::signal; use tracing::level_filters::LevelFilter; use tracing::{debug, warn}; @@ -13,15 +30,41 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{fmt, EnvFilter, Layer}; +/// The default size limit, based on the maximum block size in some domains. +pub const DEFAULT_MAX_SIZE: usize = 5 * 1024 * 1024; +/// Multiplier on top of outgoing connections number for piece downloading purposes +const PIECE_PROVIDER_MULTIPLIER: usize = 10; + /// Commands for working with a gateway. #[derive(Debug, Parser)] #[clap(about, version)] pub enum Command { - /// Run data gateway - Run(RunOptions), + /// Run data gateway with RPC server + Rpc(RpcCommandOptions), + /// Run data gateway with HTTP server + Http(HttpCommandOptions), // TODO: subcommand to run various benchmarks } +/// Options for running a gateway +#[derive(Debug, Parser)] +pub(crate) struct GatewayOptions { + /// Enable development mode. + /// + /// Implies following flags (unless customized): + /// * `--allow-private-ips` + #[arg(long, verbatim_doc_comment)] + dev: bool, + + /// The maximum object size to fetch. + /// Larger objects will return an error. + #[arg(long, default_value_t = DEFAULT_MAX_SIZE)] + max_size: usize, + + #[clap(flatten)] + dsn_options: NetworkArgs, +} + /// Install a panic handler which exits on panics, rather than unwinding. Unwinding can hang the /// tokio runtime waiting for stuck tasks or threads. pub(crate) fn set_exit_on_panic() { @@ -102,3 +145,46 @@ pub(crate) async fn shutdown_signal() { tracing::info!("Received Ctrl+C, shutting down gateway..."); } + +/// Configures and returns object fetcher and DSN node runner. +pub async fn initialize_object_fetcher( + options: GatewayOptions, +) -> anyhow::Result<( + ObjectFetcher>>, + NodeRunner<()>, +)> { + let GatewayOptions { + dev, + max_size, + mut dsn_options, + } = options; + // Development mode handling is limited to this section + { + if dev { + dsn_options.allow_private_ips = true; + } + } + + let kzg = Kzg::new(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; + + let out_connections = dsn_options.out_connections; + // TODO: move this service code into its own function, in a new library part of this crate + let (dsn_node, dsn_node_runner, node_client) = configure_network(dsn_options).await?; + + let piece_provider = PieceProvider::new( + dsn_node.clone(), + SegmentCommitmentPieceValidator::new(dsn_node, node_client, kzg), + Arc::new(Semaphore::new( + out_connections as usize * PIECE_PROVIDER_MULTIPLIER, + )), + ); + let piece_getter = DsnPieceGetter::new(piece_provider); + let object_fetcher = ObjectFetcher::new(piece_getter.into(), erasure_coding, Some(max_size)); + + Ok((object_fetcher, dsn_node_runner)) +} diff --git a/crates/subspace-gateway/src/commands/http.rs b/crates/subspace-gateway/src/commands/http.rs new file mode 100644 index 0000000000..43ef3d2e74 --- /dev/null +++ b/crates/subspace-gateway/src/commands/http.rs @@ -0,0 +1,65 @@ +//! Gateway http command. +//! This command start an HTTP server to serve object requests. + +pub(crate) mod server; + +use crate::commands::http::server::{start_server, ServerParameters}; +use crate::commands::{initialize_object_fetcher, shutdown_signal, GatewayOptions}; +use clap::Parser; +use futures::{select, FutureExt}; +use tracing::info; + +/// Options for HTTP server. +#[derive(Debug, Parser)] +pub(crate) struct HttpCommandOptions { + #[clap(flatten)] + gateway_options: GatewayOptions, + + #[arg(long, default_value = "127.0.0.1:3000")] + indexer_endpoint: String, + + #[arg(long, default_value = "127.0.0.1:8080")] + http_listen_on: String, +} + +/// Runs an HTTP server +pub async fn run(run_options: HttpCommandOptions) -> anyhow::Result<()> { + let signal = shutdown_signal(); + + let HttpCommandOptions { + gateway_options, + indexer_endpoint, + http_listen_on, + } = run_options; + + let (object_fetcher, mut dsn_node_runner) = initialize_object_fetcher(gateway_options).await?; + let dsn_fut = dsn_node_runner.run(); + + let server_params = ServerParameters { + object_fetcher, + indexer_endpoint, + http_endpoint: http_listen_on, + }; + let http_server_fut = start_server(server_params); + + // This defines order in which things are dropped + let dsn_fut = dsn_fut; + let http_server_fut = http_server_fut; + + select! { + // Signal future + () = signal.fuse() => {}, + + // Networking future + () = dsn_fut.fuse() => { + info!("DSN network runner exited."); + }, + + // HTTP service future + _ = http_server_fut.fuse() => { + info!("HTTP server exited."); + }, + } + + anyhow::Ok(()) +} diff --git a/crates/subspace-gateway/src/commands/http/server.rs b/crates/subspace-gateway/src/commands/http/server.rs new file mode 100644 index 0000000000..e9d23c33cd --- /dev/null +++ b/crates/subspace-gateway/src/commands/http/server.rs @@ -0,0 +1,151 @@ +use actix_web::{web, App, HttpResponse, HttpServer, Responder}; +use serde::{Deserialize, Deserializer, Serialize}; +use std::default::Default; +use std::sync::Arc; +use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash}; +use subspace_core_primitives::pieces::PieceIndex; +use subspace_core_primitives::BlockNumber; +use subspace_data_retrieval::object_fetcher::ObjectFetcher; +use subspace_data_retrieval::piece_getter::PieceGetter; +use tracing::{debug, error, trace}; + +pub(crate) struct ServerParameters +where + PG: PieceGetter + Send + Sync + 'static, +{ + pub(crate) object_fetcher: ObjectFetcher, + pub(crate) indexer_endpoint: String, + pub(crate) http_endpoint: String, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +#[serde(rename_all = "camelCase")] +struct ObjectMapping { + hash: Blake3Hash, + piece_index: PieceIndex, + piece_offset: u32, + #[serde(deserialize_with = "string_to_u32")] + block_number: BlockNumber, +} + +fn string_to_u32<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s: String = Deserialize::deserialize(deserializer)?; + s.parse::().map_err(serde::de::Error::custom) +} + +async fn request_object_mappings(endpoint: String, key: String) -> anyhow::Result { + let client = reqwest::Client::new(); + let object_mappings_url = format!("http://{}/objects/{}", endpoint, key,); + + debug!(?key, ?object_mappings_url, "Requesting object mapping..."); + + let response = client + .get(object_mappings_url.clone()) + .send() + .await? + .json::() + .await; + match &response { + Ok(json) => { + trace!(?key, ?json, "Requested object mapping."); + } + Err(err) => { + error!(?key, ?err, ?object_mappings_url, "Request failed"); + } + } + + response.map_err(|err| err.into()) +} + +async fn serve_object( + key: web::Path, + additional_data: web::Data>>, +) -> impl Responder +where + PG: PieceGetter + Send + Sync + 'static, +{ + let server_params = additional_data.into_inner(); + let key = key.into_inner(); + + // Validate object hash + let decode_result = hex::decode(key.clone()); + let object_hash = match decode_result { + Ok(hash) => { + if hash.len() != Blake3Hash::SIZE { + error!(?key, ?hash, "Invalid hash provided."); + return HttpResponse::BadRequest().finish(); + } + + Blake3Hash::try_from(hash.as_slice()).expect("Hash size was confirmed.") + } + Err(err) => { + error!(?key, ?err, "Invalid hash provided."); + return HttpResponse::BadRequest().finish(); + } + }; + + let Ok(object_mapping) = + request_object_mappings(server_params.indexer_endpoint.clone(), key.clone()).await + else { + return HttpResponse::BadRequest().finish(); + }; + + if object_mapping.hash != object_hash { + error!( + ?key, + object_mapping_hash=?object_mapping.hash, + "Requested hash doesn't match object mapping." + ); + return HttpResponse::ServiceUnavailable().finish(); + } + + let object_fetcher_result = server_params + .object_fetcher + .fetch_object(object_mapping.piece_index, object_mapping.piece_offset) + .await; + + let object = match object_fetcher_result { + Ok(object) => { + trace!(?key, size=%object.len(), "Object fetched successfully"); + + let data_hash = blake3_hash(&object); + if data_hash != object_hash { + error!( + ?data_hash, + ?object_hash, + "Retrieved data did not match mapping hash" + ); + return HttpResponse::ServiceUnavailable().finish(); + } + + object + } + Err(err) => { + error!(?key, ?err, "Failed to fetch object."); + return HttpResponse::ServiceUnavailable().finish(); + } + }; + + HttpResponse::Ok() + .content_type("application/octet-stream") + .body(object) +} + +pub async fn start_server(server_params: ServerParameters) -> std::io::Result<()> +where + PG: PieceGetter + Send + Sync + 'static, +{ + let server_params = Arc::new(server_params); + let http_endpoint = server_params.http_endpoint.clone(); + HttpServer::new(move || { + App::new() + .app_data(web::Data::new(server_params.clone())) + .route("/data/{hash}", web::get().to(serve_object::)) + }) + .bind(http_endpoint)? + .run() + .await +} diff --git a/crates/subspace-gateway/src/commands/run/network.rs b/crates/subspace-gateway/src/commands/network.rs similarity index 100% rename from crates/subspace-gateway/src/commands/run/network.rs rename to crates/subspace-gateway/src/commands/network.rs diff --git a/crates/subspace-gateway/src/commands/rpc.rs b/crates/subspace-gateway/src/commands/rpc.rs new file mode 100644 index 0000000000..0edd2c1a3d --- /dev/null +++ b/crates/subspace-gateway/src/commands/rpc.rs @@ -0,0 +1,63 @@ +//! Gateway rpc command. +//! This command start an RPC server to serve object requests. +pub(crate) mod server; + +use crate::commands::rpc::server::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT}; +use crate::commands::{initialize_object_fetcher, shutdown_signal, GatewayOptions}; +use clap::Parser; +use futures::{select, FutureExt}; +use std::pin::pin; +use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig}; +use tracing::info; + +/// Options for RPC server. +#[derive(Debug, Parser)] +pub(crate) struct RpcCommandOptions { + #[clap(flatten)] + gateway_options: GatewayOptions, + + /// Options for RPC + #[clap(flatten)] + rpc_options: RpcOptions, +} + +/// Runs an RPC server +pub async fn run(run_options: RpcCommandOptions) -> anyhow::Result<()> { + let signal = shutdown_signal(); + + let RpcCommandOptions { + gateway_options, + rpc_options, + } = run_options; + let (object_fetcher, mut dsn_node_runner) = initialize_object_fetcher(gateway_options).await?; + let dsn_fut = dsn_node_runner.run(); + + let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher }); + let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?; + let rpc_fut = rpc_handle.stopped(); + + // This defines order in which things are dropped + let dsn_fut = dsn_fut; + let rpc_fut = rpc_fut; + + let dsn_fut = pin!(dsn_fut); + let rpc_fut = pin!(rpc_fut); + + select! { + // Signal future + () = signal.fuse() => {}, + + // Networking future + () = dsn_fut.fuse() => { + info!("DSN network runner exited."); + }, + + // RPC service future + () = rpc_fut.fuse() => { + info!("RPC server exited."); + }, + + } + + anyhow::Ok(()) +} diff --git a/crates/subspace-gateway/src/commands/run/rpc.rs b/crates/subspace-gateway/src/commands/rpc/server.rs similarity index 100% rename from crates/subspace-gateway/src/commands/run/rpc.rs rename to crates/subspace-gateway/src/commands/rpc/server.rs diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs deleted file mode 100644 index 2603605a9e..0000000000 --- a/crates/subspace-gateway/src/commands/run.rs +++ /dev/null @@ -1,138 +0,0 @@ -//! Gateway run command. -//! This is the primary command for the gateway. - -mod network; -mod rpc; - -use crate::commands::run::network::{configure_network, NetworkArgs}; -use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT}; -use crate::commands::shutdown_signal; -use crate::piece_getter::DsnPieceGetter; -use crate::piece_validator::SegmentCommitmentPieceValidator; -use anyhow::anyhow; -use async_lock::Semaphore; -use clap::Parser; -use futures::{select, FutureExt}; -use std::env; -use std::num::NonZeroUsize; -use std::pin::pin; -use std::sync::Arc; -use subspace_core_primitives::pieces::Record; -use subspace_data_retrieval::object_fetcher::ObjectFetcher; -use subspace_erasure_coding::ErasureCoding; -use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig}; -use subspace_kzg::Kzg; -use subspace_networking::utils::piece_provider::PieceProvider; -use tracing::info; - -/// The default size limit, based on the maximum block size in some domains. -pub const DEFAULT_MAX_SIZE: usize = 5 * 1024 * 1024; -/// Multiplier on top of outgoing connections number for piece downloading purposes -const PIECE_PROVIDER_MULTIPLIER: usize = 10; - -/// Options for running a node -#[derive(Debug, Parser)] -pub(crate) struct RunOptions { - #[clap(flatten)] - gateway: GatewayOptions, -} - -/// Options for running a gateway -#[derive(Debug, Parser)] -pub(crate) struct GatewayOptions { - /// Enable development mode. - /// - /// Implies following flags (unless customized): - /// * `--allow-private-ips` - #[arg(long, verbatim_doc_comment)] - dev: bool, - - /// The maximum object size to fetch. - /// Larger objects will return an error. - #[arg(long, default_value_t = DEFAULT_MAX_SIZE)] - max_size: usize, - - #[clap(flatten)] - dsn_options: NetworkArgs, - - /// Options for RPC - #[clap(flatten)] - rpc_options: RpcOptions, -} - -/// Default run command for gateway -pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { - let signal = shutdown_signal(); - - let RunOptions { - gateway: - GatewayOptions { - dev, - max_size, - mut dsn_options, - rpc_options, - }, - } = run_options; - - // Development mode handling is limited to this section - { - if dev { - dsn_options.allow_private_ips = true; - } - } - - info!("Subspace Gateway"); - info!("✌️ version {}", env!("CARGO_PKG_VERSION")); - info!("❤️ by {}", env!("CARGO_PKG_AUTHORS")); - - let kzg = Kzg::new(); - let erasure_coding = ErasureCoding::new( - NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) - .expect("Not zero; qed"), - ) - .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; - - let out_connections = dsn_options.out_connections; - // TODO: move this service code into its own function, in a new library part of this crate - let (dsn_node, mut dsn_node_runner, node_client) = configure_network(dsn_options).await?; - let dsn_fut = dsn_node_runner.run(); - - let piece_provider = PieceProvider::new( - dsn_node.clone(), - SegmentCommitmentPieceValidator::new(dsn_node, node_client, kzg), - Arc::new(Semaphore::new( - out_connections as usize * PIECE_PROVIDER_MULTIPLIER, - )), - ); - let piece_getter = DsnPieceGetter::new(piece_provider); - let object_fetcher = ObjectFetcher::new(piece_getter.into(), erasure_coding, Some(max_size)); - - let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher }); - let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?; - let rpc_fut = rpc_handle.stopped(); - - // This defines order in which things are dropped - let dsn_fut = dsn_fut; - let rpc_fut = rpc_fut; - - let dsn_fut = pin!(dsn_fut); - let rpc_fut = pin!(rpc_fut); - - select! { - // Signal future - () = signal.fuse() => {}, - - // Networking future - () = dsn_fut.fuse() => { - info!("DSN network runner exited."); - }, - - // RPC service future - () = rpc_fut.fuse() => { - info!("RPC server exited."); - }, - - } - - anyhow::Ok(()) -} diff --git a/crates/subspace-gateway/src/main.rs b/crates/subspace-gateway/src/main.rs index d215a014cb..0f4c63c20f 100644 --- a/crates/subspace-gateway/src/main.rs +++ b/crates/subspace-gateway/src/main.rs @@ -7,6 +7,7 @@ mod piece_validator; use crate::commands::{init_logger, raise_fd_limit, set_exit_on_panic, Command}; use clap::Parser; +use tracing::info; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; @@ -17,11 +18,18 @@ async fn main() -> anyhow::Result<()> { init_logger(); raise_fd_limit(); + info!("Subspace Gateway"); + info!("✌️ version {}", env!("CARGO_PKG_VERSION")); + info!("❤️ by {}", env!("CARGO_PKG_AUTHORS")); + let command = Command::parse(); match command { - Command::Run(run_options) => { - commands::run::run(run_options).await?; + Command::Rpc(run_options) => { + commands::rpc::run(run_options).await?; + } + Command::Http(run_options) => { + commands::http::run(run_options).await?; } } Ok(())