diff --git a/config.yml b/config.yml index d27170c..15d36ba 100644 --- a/config.yml +++ b/config.yml @@ -11,7 +11,7 @@ extensions: cache: default_ttl_seconds: 60 default_size: 500 - merge_subscriptions: + merge_subscription: keep_alive_seconds: 60 server: port: 9944 @@ -28,7 +28,7 @@ middlewares: - inject_params - upstream subscrptions: - - merge_subscriptions + - merge_subscription - upstream rpcs: diff --git a/src/extensions/merge_subscriptions.rs b/src/extensions/merge_subscription.rs similarity index 61% rename from src/extensions/merge_subscriptions.rs rename to src/extensions/merge_subscription.rs index ac1f576..ad7b7dd 100644 --- a/src/extensions/merge_subscriptions.rs +++ b/src/extensions/merge_subscription.rs @@ -3,19 +3,19 @@ use serde::Deserialize; use crate::{extension::Extension, utils::TypeRegistryRef}; -pub struct MergeSubscriptions { - config: MergeSubscriptionsConfig, +pub struct MergeSubscription { + pub config: MergeSubscriptionConfig, } #[derive(Deserialize, Debug, Clone)] -pub struct MergeSubscriptionsConfig { +pub struct MergeSubscriptionConfig { #[serde(default)] pub keep_alive_seconds: Option, } #[async_trait] -impl Extension for MergeSubscriptions { - type Config = MergeSubscriptionsConfig; +impl Extension for MergeSubscription { + type Config = MergeSubscriptionConfig; async fn from_config( config: &Self::Config, @@ -25,8 +25,8 @@ impl Extension for MergeSubscriptions { } } -impl MergeSubscriptions { - pub fn new(config: MergeSubscriptionsConfig) -> Self { +impl MergeSubscription { + pub fn new(config: MergeSubscriptionConfig) -> Self { Self { config } } } diff --git a/src/extensions/mod.rs b/src/extensions/mod.rs index d107ab4..d89d7ac 100644 --- a/src/extensions/mod.rs +++ b/src/extensions/mod.rs @@ -13,7 +13,7 @@ pub mod api; pub mod cache; pub mod client; pub mod event_bus; -pub mod merge_subscriptions; +pub mod merge_subscription; pub mod server; pub mod telemetry; @@ -87,7 +87,7 @@ define_all_extensions! { telemetry: telemetry::Telemetry, cache: cache::Cache, client: client::Client, - merge_subscriptions: merge_subscriptions::MergeSubscriptions, + merge_subscription: merge_subscription::MergeSubscription, substrate_api: api::SubstrateApi, eth_api: api::EthApi, server: server::Server, diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index 53efdaf..ae5c861 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -3,12 +3,15 @@ use futures::{future::BoxFuture, FutureExt}; use std::sync::Arc; use crate::utils::TypeRegistryRef; -pub use crate::{config::RpcMethod, utils::TypeRegistry}; +pub use crate::{ + config::{RpcMethod, RpcSubscription}, + utils::TypeRegistry, +}; #[async_trait] -pub trait MiddlewareBuilder { +pub trait MiddlewareBuilder { async fn build( - method: &RpcMethod, + method: &Method, extensions: &TypeRegistryRef, ) -> Option>>; } diff --git a/src/middlewares/methods/block_tag.rs b/src/middlewares/methods/block_tag.rs index 14df2c2..e28a80b 100644 --- a/src/middlewares/methods/block_tag.rs +++ b/src/middlewares/methods/block_tag.rs @@ -18,7 +18,7 @@ pub struct BlockTagMiddleware { } #[async_trait] -impl MiddlewareBuilder for BlockTagMiddleware { +impl MiddlewareBuilder for BlockTagMiddleware { async fn build( method: &RpcMethod, extensions: &TypeRegistryRef, diff --git a/src/middlewares/methods/cache.rs b/src/middlewares/methods/cache.rs index 29e37a0..df73c80 100644 --- a/src/middlewares/methods/cache.rs +++ b/src/middlewares/methods/cache.rs @@ -25,7 +25,7 @@ impl CacheMiddleware { } #[async_trait] -impl MiddlewareBuilder for CacheMiddleware { +impl MiddlewareBuilder for CacheMiddleware { async fn build( method: &RpcMethod, extensions: &TypeRegistryRef, diff --git a/src/middlewares/methods/inject_params.rs b/src/middlewares/methods/inject_params.rs index 94d47cf..b7f98ce 100644 --- a/src/middlewares/methods/inject_params.rs +++ b/src/middlewares/methods/inject_params.rs @@ -39,7 +39,7 @@ fn inject_type(params: &[MethodParam]) -> Option { } #[async_trait] -impl MiddlewareBuilder for InjectParamsMiddleware { +impl MiddlewareBuilder for InjectParamsMiddleware { async fn build( method: &RpcMethod, extensions: &TypeRegistryRef, diff --git a/src/middlewares/methods/response.rs b/src/middlewares/methods/response.rs index 31bcff4..140c57a 100644 --- a/src/middlewares/methods/response.rs +++ b/src/middlewares/methods/response.rs @@ -18,7 +18,7 @@ impl ResponseMiddleware { } #[async_trait] -impl MiddlewareBuilder for ResponseMiddleware { +impl MiddlewareBuilder for ResponseMiddleware { async fn build( method: &RpcMethod, _extensions: &TypeRegistryRef, diff --git a/src/middlewares/methods/upstream.rs b/src/middlewares/methods/upstream.rs index b55f20f..4d4d6ab 100644 --- a/src/middlewares/methods/upstream.rs +++ b/src/middlewares/methods/upstream.rs @@ -21,7 +21,7 @@ impl UpstreamMiddleware { } #[async_trait] -impl MiddlewareBuilder for UpstreamMiddleware { +impl MiddlewareBuilder for UpstreamMiddleware { async fn build( _method: &RpcMethod, extensions: &TypeRegistryRef, diff --git a/src/middlewares/mod.rs b/src/middlewares/mod.rs index 9f6d7e1..ea41fbe 100644 --- a/src/middlewares/mod.rs +++ b/src/middlewares/mod.rs @@ -1,13 +1,17 @@ -use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned}; +use jsonrpsee::{ + core::{JsonValue, StringError}, + types::ErrorObjectOwned, + PendingSubscriptionSink, +}; use crate::{ - config::RpcMethod, + config::{RpcMethod, RpcSubscription}, middleware::{Middleware, MiddlewareBuilder}, utils::TypeRegistryRef, }; pub mod methods; -// pub mod subscriptions; +pub mod subscriptions; #[derive(Debug)] pub struct CallRequest { @@ -42,3 +46,26 @@ pub async fn create_method_middleware( _ => None, } } + +#[derive(Debug)] +pub struct SubscriptionRequest { + pub subscribe: String, + pub params: Vec, + pub unsubscribe: String, + pub sink: PendingSubscriptionSink, +} + +pub type SubscriptionResult = Result<(), StringError>; + +pub async fn create_subscription_middleware( + name: &str, + method: &RpcSubscription, + extensions: &TypeRegistryRef, +) -> Option>> { + use subscriptions::*; + + match name { + "upstream" => upstream::UpstreamMiddleware::build(method, extensions).await, + _ => None, + } +} diff --git a/src/middlewares/subscriptions/merge_subscription.rs b/src/middlewares/subscriptions/merge_subscription.rs index f9411e8..27dc8d0 100644 --- a/src/middlewares/subscriptions/merge_subscription.rs +++ b/src/middlewares/subscriptions/merge_subscription.rs @@ -1,3 +1,9 @@ +use std::{ + collections::{BTreeSet, HashMap}, + sync::Arc, + time::Duration, +}; + use async_trait::async_trait; use blake2::Blake2b512; use jsonrpsee::{ @@ -5,18 +11,14 @@ use jsonrpsee::{ SubscriptionMessage, }; use serde::{Deserialize, Serialize}; -use std::{ - collections::{BTreeSet, HashMap}, - sync::Arc, - time::Duration, -}; use tokio::sync::{broadcast, RwLock}; use crate::{ - cache::CacheKey, - client::Client, config::MergeStrategy, - middleware::{subscription::SubscriptionRequest, Middleware, NextFn}, + extensions::{client::Client, merge_subscription::MergeSubscription}, + middleware::{Middleware, MiddlewareBuilder, NextFn, RpcSubscription}, + middlewares::{SubscriptionRequest, SubscriptionResult}, + utils::{CacheKey, TypeRegistry, TypeRegistryRef}, }; #[derive(Serialize, Deserialize, Debug)] @@ -188,11 +190,39 @@ impl MergeSubscriptionMiddleware { } } +#[async_trait] +impl MiddlewareBuilder + for MergeSubscriptionMiddleware +{ + async fn build( + method: &RpcSubscription, + extensions: &TypeRegistryRef, + ) -> Option>> { + let Some(merge_strategy) = method.merge_strategy else { + return None; + }; + + let ext = extensions.read().await; + let client = ext.get::().expect("Client extension not found"); + + let merge_subscription = ext + .get::() + .expect("MergeSubscription extension not found"); + + Some(Box::new(MergeSubscriptionMiddleware::new( + client, + merge_strategy, + merge_subscription.config.keep_alive_seconds, + ))) + } +} + #[async_trait] impl Middleware> for MergeSubscriptionMiddleware { async fn call( &self, request: SubscriptionRequest, + _context: TypeRegistry, _next: NextFn>, ) -> Result<(), StringError> { let key = CacheKey::new(&request.subscribe, &request.params); diff --git a/src/middlewares/subscriptions/mod.rs b/src/middlewares/subscriptions/mod.rs index dfd290f..4399965 100644 --- a/src/middlewares/subscriptions/mod.rs +++ b/src/middlewares/subscriptions/mod.rs @@ -1,2 +1,2 @@ pub mod merge_subscription; -pub mod subscription; +pub mod upstream; diff --git a/src/middlewares/subscriptions/subscription.rs b/src/middlewares/subscriptions/upstream.rs similarity index 57% rename from src/middlewares/subscriptions/subscription.rs rename to src/middlewares/subscriptions/upstream.rs index 8dc83e3..7705254 100644 --- a/src/middlewares/subscriptions/subscription.rs +++ b/src/middlewares/subscriptions/upstream.rs @@ -1,22 +1,15 @@ -use async_trait::async_trait; -use jsonrpsee::{ - core::{JsonValue, StringError}, - PendingSubscriptionSink, SubscriptionMessage, -}; use std::sync::Arc; +use async_trait::async_trait; +use jsonrpsee::SubscriptionMessage; + use crate::{ - client::Client, - middleware::{Middleware, NextFn}, + extensions::client::Client, + middleware::{Middleware, MiddlewareBuilder, NextFn, RpcSubscription}, + middlewares::{SubscriptionRequest, SubscriptionResult}, + utils::{TypeRegistry, TypeRegistryRef}, }; -pub struct SubscriptionRequest { - pub subscribe: String, - pub params: Vec, - pub unsubscribe: String, - pub sink: PendingSubscriptionSink, -} - pub struct UpstreamMiddleware { client: Arc, } @@ -28,12 +21,30 @@ impl UpstreamMiddleware { } #[async_trait] -impl Middleware> for UpstreamMiddleware { +impl MiddlewareBuilder + for UpstreamMiddleware +{ + async fn build( + _method: &RpcSubscription, + extensions: &TypeRegistryRef, + ) -> Option>> { + let client = extensions + .read() + .await + .get::() + .expect("Client extension not found"); + Some(Box::new(UpstreamMiddleware::new(client))) + } +} + +#[async_trait] +impl Middleware for UpstreamMiddleware { async fn call( &self, request: SubscriptionRequest, - _next: NextFn>, - ) -> Result<(), StringError> { + _context: TypeRegistry, + _next: NextFn, + ) -> SubscriptionResult { let sink = request.sink; let mut sub = self