Skip to content

Commit

Permalink
subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
xlc committed Aug 1, 2023
1 parent c0e675a commit 2035ff6
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 48 deletions.
4 changes: 2 additions & 2 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ extensions:
cache:
default_ttl_seconds: 60
default_size: 500
merge_subscriptions:
merge_subscription:
keep_alive_seconds: 60
server:
port: 9944
Expand All @@ -28,7 +28,7 @@ middlewares:
- inject_params
- upstream
subscrptions:
- merge_subscriptions
- merge_subscription
- upstream

rpcs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ use serde::Deserialize;

use crate::{extension::Extension, utils::TypeRegistryRef};

pub struct MergeSubscriptions {
config: MergeSubscriptionsConfig,
pub struct MergeSubscription {
pub config: MergeSubscriptionConfig,
}

#[derive(Deserialize, Debug, Clone)]
pub struct MergeSubscriptionsConfig {
pub struct MergeSubscriptionConfig {
#[serde(default)]
pub keep_alive_seconds: Option<u64>,
}

#[async_trait]
impl Extension for MergeSubscriptions {
type Config = MergeSubscriptionsConfig;
impl Extension for MergeSubscription {
type Config = MergeSubscriptionConfig;

async fn from_config(
config: &Self::Config,
Expand All @@ -25,8 +25,8 @@ impl Extension for MergeSubscriptions {
}
}

impl MergeSubscriptions {
pub fn new(config: MergeSubscriptionsConfig) -> Self {
impl MergeSubscription {
pub fn new(config: MergeSubscriptionConfig) -> Self {
Self { config }
}
}
4 changes: 2 additions & 2 deletions src/extensions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub mod api;
pub mod cache;
pub mod client;
pub mod event_bus;
pub mod merge_subscriptions;
pub mod merge_subscription;
pub mod server;
pub mod telemetry;

Expand Down Expand Up @@ -87,7 +87,7 @@ define_all_extensions! {
telemetry: telemetry::Telemetry,
cache: cache::Cache,
client: client::Client,
merge_subscriptions: merge_subscriptions::MergeSubscriptions,
merge_subscription: merge_subscription::MergeSubscription,
substrate_api: api::SubstrateApi,
eth_api: api::EthApi,
server: server::Server,
Expand Down
9 changes: 6 additions & 3 deletions src/middleware/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ use futures::{future::BoxFuture, FutureExt};
use std::sync::Arc;

use crate::utils::TypeRegistryRef;
pub use crate::{config::RpcMethod, utils::TypeRegistry};
pub use crate::{
config::{RpcMethod, RpcSubscription},
utils::TypeRegistry,
};

#[async_trait]
pub trait MiddlewareBuilder<Request, Result> {
pub trait MiddlewareBuilder<Method, Request, Result> {
async fn build(
method: &RpcMethod,
method: &Method,
extensions: &TypeRegistryRef,
) -> Option<Box<dyn Middleware<Request, Result>>>;
}
Expand Down
2 changes: 1 addition & 1 deletion src/middlewares/methods/block_tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct BlockTagMiddleware {
}

#[async_trait]
impl MiddlewareBuilder<CallRequest, CallResult> for BlockTagMiddleware {
impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for BlockTagMiddleware {
async fn build(
method: &RpcMethod,
extensions: &TypeRegistryRef,
Expand Down
2 changes: 1 addition & 1 deletion src/middlewares/methods/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl CacheMiddleware {
}

#[async_trait]
impl MiddlewareBuilder<CallRequest, CallResult> for CacheMiddleware {
impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for CacheMiddleware {
async fn build(
method: &RpcMethod,
extensions: &TypeRegistryRef,
Expand Down
2 changes: 1 addition & 1 deletion src/middlewares/methods/inject_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn inject_type(params: &[MethodParam]) -> Option<InjectType> {
}

#[async_trait]
impl MiddlewareBuilder<CallRequest, CallResult> for InjectParamsMiddleware {
impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for InjectParamsMiddleware {
async fn build(
method: &RpcMethod,
extensions: &TypeRegistryRef,
Expand Down
2 changes: 1 addition & 1 deletion src/middlewares/methods/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl ResponseMiddleware {
}

#[async_trait]
impl MiddlewareBuilder<CallRequest, CallResult> for ResponseMiddleware {
impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for ResponseMiddleware {
async fn build(
method: &RpcMethod,
_extensions: &TypeRegistryRef,
Expand Down
2 changes: 1 addition & 1 deletion src/middlewares/methods/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl UpstreamMiddleware {
}

#[async_trait]
impl MiddlewareBuilder<CallRequest, CallResult> for UpstreamMiddleware {
impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for UpstreamMiddleware {
async fn build(
_method: &RpcMethod,
extensions: &TypeRegistryRef,
Expand Down
33 changes: 30 additions & 3 deletions src/middlewares/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned};
use jsonrpsee::{
core::{JsonValue, StringError},
types::ErrorObjectOwned,
PendingSubscriptionSink,
};

use crate::{
config::RpcMethod,
config::{RpcMethod, RpcSubscription},
middleware::{Middleware, MiddlewareBuilder},
utils::TypeRegistryRef,
};

pub mod methods;
// pub mod subscriptions;
pub mod subscriptions;

#[derive(Debug)]
pub struct CallRequest {
Expand Down Expand Up @@ -42,3 +46,26 @@ pub async fn create_method_middleware(
_ => None,
}
}

#[derive(Debug)]
pub struct SubscriptionRequest {
pub subscribe: String,
pub params: Vec<JsonValue>,
pub unsubscribe: String,
pub sink: PendingSubscriptionSink,
}

pub type SubscriptionResult = Result<(), StringError>;

pub async fn create_subscription_middleware(
name: &str,
method: &RpcSubscription,
extensions: &TypeRegistryRef,
) -> Option<Box<dyn Middleware<SubscriptionRequest, SubscriptionResult>>> {
use subscriptions::*;

match name {
"upstream" => upstream::UpstreamMiddleware::build(method, extensions).await,
_ => None,
}
}
46 changes: 38 additions & 8 deletions src/middlewares/subscriptions/merge_subscription.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
use std::{
collections::{BTreeSet, HashMap},
sync::Arc,
time::Duration,
};

use async_trait::async_trait;
use blake2::Blake2b512;
use jsonrpsee::{
core::{JsonValue, StringError},
SubscriptionMessage,
};
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeSet, HashMap},
sync::Arc,
time::Duration,
};
use tokio::sync::{broadcast, RwLock};

use crate::{
cache::CacheKey,
client::Client,
config::MergeStrategy,
middleware::{subscription::SubscriptionRequest, Middleware, NextFn},
extensions::{client::Client, merge_subscription::MergeSubscription},
middleware::{Middleware, MiddlewareBuilder, NextFn, RpcSubscription},
middlewares::{SubscriptionRequest, SubscriptionResult},
utils::{CacheKey, TypeRegistry, TypeRegistryRef},
};

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -188,11 +190,39 @@ impl MergeSubscriptionMiddleware {
}
}

#[async_trait]
impl MiddlewareBuilder<RpcSubscription, SubscriptionRequest, SubscriptionResult>
for MergeSubscriptionMiddleware
{
async fn build(
method: &RpcSubscription,
extensions: &TypeRegistryRef,
) -> Option<Box<dyn Middleware<SubscriptionRequest, SubscriptionResult>>> {
let Some(merge_strategy) = method.merge_strategy else {
return None;
};

let ext = extensions.read().await;
let client = ext.get::<Client>().expect("Client extension not found");

let merge_subscription = ext
.get::<MergeSubscription>()
.expect("MergeSubscription extension not found");

Some(Box::new(MergeSubscriptionMiddleware::new(
client,
merge_strategy,
merge_subscription.config.keep_alive_seconds,
)))
}
}

#[async_trait]
impl Middleware<SubscriptionRequest, Result<(), StringError>> for MergeSubscriptionMiddleware {
async fn call(
&self,
request: SubscriptionRequest,
_context: TypeRegistry,
_next: NextFn<SubscriptionRequest, Result<(), StringError>>,
) -> Result<(), StringError> {
let key = CacheKey::new(&request.subscribe, &request.params);
Expand Down
2 changes: 1 addition & 1 deletion src/middlewares/subscriptions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod merge_subscription;
pub mod subscription;
pub mod upstream;
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
use async_trait::async_trait;
use jsonrpsee::{
core::{JsonValue, StringError},
PendingSubscriptionSink, SubscriptionMessage,
};
use std::sync::Arc;

use async_trait::async_trait;
use jsonrpsee::SubscriptionMessage;

use crate::{
client::Client,
middleware::{Middleware, NextFn},
extensions::client::Client,
middleware::{Middleware, MiddlewareBuilder, NextFn, RpcSubscription},
middlewares::{SubscriptionRequest, SubscriptionResult},
utils::{TypeRegistry, TypeRegistryRef},
};

pub struct SubscriptionRequest {
pub subscribe: String,
pub params: Vec<JsonValue>,
pub unsubscribe: String,
pub sink: PendingSubscriptionSink,
}

pub struct UpstreamMiddleware {
client: Arc<Client>,
}
Expand All @@ -28,12 +21,30 @@ impl UpstreamMiddleware {
}

#[async_trait]
impl Middleware<SubscriptionRequest, Result<(), StringError>> for UpstreamMiddleware {
impl MiddlewareBuilder<RpcSubscription, SubscriptionRequest, SubscriptionResult>
for UpstreamMiddleware
{
async fn build(
_method: &RpcSubscription,
extensions: &TypeRegistryRef,
) -> Option<Box<dyn Middleware<SubscriptionRequest, SubscriptionResult>>> {
let client = extensions
.read()
.await
.get::<Client>()
.expect("Client extension not found");
Some(Box::new(UpstreamMiddleware::new(client)))
}
}

#[async_trait]
impl Middleware<SubscriptionRequest, SubscriptionResult> for UpstreamMiddleware {
async fn call(
&self,
request: SubscriptionRequest,
_next: NextFn<SubscriptionRequest, Result<(), StringError>>,
) -> Result<(), StringError> {
_context: TypeRegistry,
_next: NextFn<SubscriptionRequest, SubscriptionResult>,
) -> SubscriptionResult {
let sink = request.sink;

let mut sub = self
Expand Down

0 comments on commit 2035ff6

Please sign in to comment.