Skip to content

Commit

Permalink
improve subscription (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci authored Oct 31, 2023
1 parent f42d4ac commit ddd665a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 41 deletions.
45 changes: 21 additions & 24 deletions src/middlewares/subscriptions/merge_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn merge_storage_changes(current_value: JsonValue, new_value: JsonValue) -> Resu
let mut current = serde_json::from_value::<StorageChanges>(current_value)?;
let StorageChanges { block, changes } = serde_json::from_value::<StorageChanges>(new_value)?;

let changed_keys = changes.clone().into_iter().map(|(key, _)| key).collect::<BTreeSet<_>>();
let changed_keys = changes.iter().map(|(key, _)| key).collect::<BTreeSet<_>>();

// replace block hash
current.block = block;
Expand Down Expand Up @@ -230,34 +230,31 @@ impl Middleware<SubscriptionRequest, Result<(), StringError>> for MergeSubscript
.await?;

// broadcast new values
tokio::spawn(async move {
// create receiver inside task to avoid msg been broadcast before stream.recv() is hit
let mut stream = subscribe();

loop {
tokio::select! {
resp = stream.recv() => {
match resp {
Ok(new_value) => {
if let Err(e) = sink.send(new_value).await {
tracing::trace!("subscription sink closed {e:?}");
break;
}
}
Err(e) => {
// this should never happen
tracing::error!("subscription stream error {e:?}");
unreachable!("subscription stream error {e:?}");
let mut stream = subscribe();

loop {
tokio::select! {
resp = stream.recv() => {
match resp {
Ok(new_value) => {
if let Err(e) = sink.send(new_value).await {
tracing::trace!("subscription sink closed {e:?}");
break;
}
}
}
_ = sink.closed() => {
tracing::trace!("subscription sink closed");
break;
Err(e) => {
// this should never happen
tracing::error!("subscription stream error {e:?}");
unreachable!("subscription stream error {e:?}");
}
}
}
_ = sink.closed() => {
tracing::trace!("subscription sink closed");
break;
}
}
});
}

Ok(())
}
Expand Down
50 changes: 33 additions & 17 deletions src/middlewares/subscriptions/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,42 @@ impl Middleware<SubscriptionRequest, SubscriptionResult> for UpstreamMiddleware

let sink = sink.accept().await?;

while let Some(resp) = sub.next().await {
let resp = match resp {
Ok(resp) => resp,
Err(e) => {
tracing::error!("Subscription error: {}", e);
continue;
loop {
tokio::select! {
msg = sub.next() => {
match msg {
Some(resp) => {
let resp = match resp {
Ok(resp) => resp,
Err(e) => {
tracing::error!("Subscription error: {}", e);
continue;
}
};
let resp = match SubscriptionMessage::from_json(&resp) {
Ok(resp) => resp,
Err(e) => {
tracing::error!("Failed to serialize subscription response: {}", e);
continue;
}
};
if let Err(e) = sink.send(resp).await {
tracing::error!("Failed to send subscription response: {}", e);
break;
}
}
None => break,
}
}
};
let resp = match SubscriptionMessage::from_json(&resp) {
Ok(resp) => resp,
Err(e) => {
tracing::error!("Failed to serialize subscription response: {}", e);
continue;
}
};
if let Err(e) = sink.send(resp).await {
tracing::info!("Failed to send subscription response: {}", e);
break;
_ = sink.closed() => {
if let Err(err) = sub.unsubscribe().await {
tracing::error!("Failed to unsubscribe: {}", err);
}
break
},
}
}

Ok(())
}
}

0 comments on commit ddd665a

Please sign in to comment.