Skip to content

Commit

Permalink
fix: set Interrupted status for streaming predicates that fail (#574)
Browse files Browse the repository at this point in the history
Fixes #523
  • Loading branch information
MicaiahReid authored May 7, 2024
1 parent b678c8d commit 900d6df
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 63 deletions.
4 changes: 2 additions & 2 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
Ok(action) => {
number_of_times_triggered += 1;
let res = match action {
StacksChainhookOccurrence::Http(request) => {
StacksChainhookOccurrence::Http(request, _) => {
send_request(request, 3, 1, &ctx).await
}
StacksChainhookOccurrence::File(path, bytes) => file_append(path, bytes, &ctx),
Expand Down Expand Up @@ -488,7 +488,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
Ok(action) => {
occurrences_found += 1;
let res = match action {
StacksChainhookOccurrence::Http(request) => {
StacksChainhookOccurrence::Http(request, _) => {
send_request(request, 10, 3, &ctx).await
}
StacksChainhookOccurrence::File(path, bytes) => file_append(path, bytes, &ctx),
Expand Down
30 changes: 29 additions & 1 deletion components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use chainhook_sdk::chainhooks::types::{ChainhookConfig, ChainhookFullSpecificati
use chainhook_sdk::chainhooks::types::ChainhookSpecification;
use chainhook_sdk::observer::{
start_event_observer, HookExpirationData, ObserverCommand, ObserverEvent,
PredicateEvaluationReport, StacksObserverStartupContext,
PredicateEvaluationReport, PredicateInterruptedData, StacksObserverStartupContext,
};
use chainhook_sdk::types::{Chain, StacksBlockData, StacksChainEvent};
use chainhook_sdk::utils::Context;
Expand Down Expand Up @@ -632,6 +632,24 @@ impl Service {
}
}
}
ObserverEvent::PredicateInterrupted(PredicateInterruptedData {
predicate_key,
error,
}) => {
if let PredicatesApi::On(ref config) = self.config.http_api {
let Ok(mut predicates_db_conn) =
open_readwrite_predicates_db_conn_verbose(&config, &ctx)
else {
continue;
};
set_predicate_interrupted_status(
error,
&predicate_key,
&mut predicates_db_conn,
&ctx,
);
}
}
ObserverEvent::Terminate => {
info!(
self.ctx.expect_logger(),
Expand Down Expand Up @@ -757,6 +775,16 @@ fn update_status_from_report(
}
}

fn set_predicate_interrupted_status(
error: String,
predicate_key: &str,
predicates_db_conn: &mut Connection,
ctx: &Context,
) {
let status = PredicateStatus::Interrupted(error);
update_predicate_status(predicate_key, status, predicates_db_conn, ctx);
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StreamingDataType {
Occurrence {
Expand Down
24 changes: 10 additions & 14 deletions components/chainhook-cli/src/service/runloops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ use crate::{
bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate,
stacks::scan_stacks_chainstate_via_rocksdb_using_predicate,
},
service::{
open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus,
},
service::{open_readwrite_predicates_db_conn_or_panic, set_predicate_interrupted_status},
storage::open_readonly_stacks_db_conn,
};

Expand Down Expand Up @@ -73,14 +71,13 @@ pub fn start_stacks_scan_runloop(

// Update predicate status in redis
if let PredicatesApi::On(ref api_config) = moved_config.http_api {
let status = PredicateStatus::Interrupted(format!(
"Unable to evaluate predicate on Stacks chainstate: {e}"
));
let error =
format!("Unable to evaluate predicate on Stacks chainstate: {e}");
let mut predicates_db_conn =
open_readwrite_predicates_db_conn_or_panic(api_config, &moved_ctx);
update_predicate_status(
set_predicate_interrupted_status(
error,
&predicate_spec.key(),
status,
&mut predicates_db_conn,
&moved_ctx,
);
Expand Down Expand Up @@ -147,17 +144,16 @@ pub fn start_bitcoin_scan_runloop(

// Update predicate status in redis
if let PredicatesApi::On(ref api_config) = moved_config.http_api {
let status = PredicateStatus::Interrupted(format!(
"Unable to evaluate predicate on Bitcoin chainstate: {e}"
));
let error =
format!("Unable to evaluate predicate on Bitcoin chainstate: {e}");
let mut predicates_db_conn =
open_readwrite_predicates_db_conn_or_panic(api_config, &moved_ctx);
update_predicate_status(
set_predicate_interrupted_status(
error,
&predicate_spec.key(),
status,
&mut predicates_db_conn,
&moved_ctx,
);
)
}
return;
}
Expand Down
86 changes: 50 additions & 36 deletions components/chainhook-sdk/src/chainhooks/stacks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::io::Cursor;

use reqwest::RequestBuilder;

#[derive(Clone)]
pub struct StacksTriggerChainhook<'a> {
pub chainhook: &'a StacksChainhookSpecification,
pub apply: Vec<(Vec<&'a StacksTransactionData>, &'a dyn AbstractStacksBlock)>,
Expand Down Expand Up @@ -48,8 +49,48 @@ pub struct StacksChainhookOccurrencePayload {
pub rollback: Vec<StacksRollbackTransactionPayload>,
pub chainhook: StacksChainhookPayload,
}

impl StacksChainhookOccurrencePayload {
pub fn from_trigger<'a>(
trigger: StacksTriggerChainhook<'a>,
) -> StacksChainhookOccurrencePayload {
StacksChainhookOccurrencePayload {
apply: trigger
.apply
.into_iter()
.map(|(transactions, block)| {
let transactions = transactions
.into_iter()
.map(|t| t.clone())
.collect::<Vec<_>>();
StacksApplyTransactionPayload {
block_identifier: block.get_identifier().clone(),
transactions,
}
})
.collect::<Vec<_>>(),
rollback: trigger
.rollback
.into_iter()
.map(|(transactions, block)| {
let transactions = transactions
.into_iter()
.map(|t| t.clone())
.collect::<Vec<_>>();
StacksRollbackTransactionPayload {
block_identifier: block.get_identifier().clone(),
transactions,
}
})
.collect::<Vec<_>>(),
chainhook: StacksChainhookPayload {
uuid: trigger.chainhook.uuid.clone(),
},
}
}
}
pub enum StacksChainhookOccurrence {
Http(RequestBuilder),
Http(RequestBuilder, StacksChainhookOccurrencePayload),
File(String, Vec<u8>),
Data(StacksChainhookOccurrencePayload),
}
Expand Down Expand Up @@ -869,14 +910,19 @@ pub fn handle_stacks_hook_action<'a>(
.map_err(|e| format!("unable to build http client: {}", e.to_string()))?;
let host = format!("{}", http.url);
let method = Method::POST;
let body = serde_json::to_vec(&serialize_stacks_payload_to_json(trigger, proofs, ctx))
.map_err(|e| format!("unable to serialize payload {}", e.to_string()))?;
let body = serde_json::to_vec(&serialize_stacks_payload_to_json(
trigger.clone(),
proofs,
ctx,
))
.map_err(|e| format!("unable to serialize payload {}", e.to_string()))?;
Ok(StacksChainhookOccurrence::Http(
client
.request(method, &host)
.header("Content-Type", "application/json")
.header("Authorization", http.authorization_header.clone())
.body(body),
StacksChainhookOccurrencePayload::from_trigger(trigger),
))
}
HookAction::FileAppend(disk) => {
Expand All @@ -888,39 +934,7 @@ pub fn handle_stacks_hook_action<'a>(
))
}
HookAction::Noop => Ok(StacksChainhookOccurrence::Data(
StacksChainhookOccurrencePayload {
apply: trigger
.apply
.into_iter()
.map(|(transactions, block)| {
let transactions = transactions
.into_iter()
.map(|t| t.clone())
.collect::<Vec<_>>();
StacksApplyTransactionPayload {
block_identifier: block.get_identifier().clone(),
transactions,
}
})
.collect::<Vec<_>>(),
rollback: trigger
.rollback
.into_iter()
.map(|(transactions, block)| {
let transactions = transactions
.into_iter()
.map(|t| t.clone())
.collect::<Vec<_>>();
StacksRollbackTransactionPayload {
block_identifier: block.get_identifier().clone(),
transactions,
}
})
.collect::<Vec<_>>(),
chainhook: StacksChainhookPayload {
uuid: trigger.chainhook.uuid.clone(),
},
},
StacksChainhookOccurrencePayload::from_trigger(trigger),
)),
}
}
60 changes: 50 additions & 10 deletions components/chainhook-sdk/src/observer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ impl PredicateEvaluationReport {
}
}

#[derive(Clone, Debug)]
pub struct PredicateInterruptedData {
pub predicate_key: String,
pub error: String,
}

#[derive(Clone, Debug)]
pub enum ObserverEvent {
Error(String),
Expand All @@ -309,6 +315,7 @@ pub enum ObserverEvent {
BitcoinPredicateTriggered(BitcoinChainhookOccurrencePayload),
StacksPredicateTriggered(StacksChainhookOccurrencePayload),
PredicatesTriggered(usize),
PredicateInterrupted(PredicateInterruptedData),
Terminate,
StacksChainMempoolEvent(StacksChainMempoolEvent),
}
Expand Down Expand Up @@ -1151,8 +1158,10 @@ pub async fn start_observer_commands_handler(
let predicate_uuid = &chainhook_to_trigger.chainhook.uuid;
match handle_bitcoin_hook_action(chainhook_to_trigger, &proofs) {
Err(e) => {
// todo: we may want to set predicates that reach this branch as interrupted,
// but for now we will error to see if this problem occurs.
ctx.try_log(|logger| {
slog::warn!(
slog::error!(
logger,
"unable to handle action for predicate {}: {}",
predicate_uuid,
Expand Down Expand Up @@ -1197,10 +1206,22 @@ pub async fn start_observer_commands_handler(
}

for (request, data) in requests.into_iter() {
// todo: need to handle failure case - we should be setting interrupted status: https://github.com/hirosystems/chainhook/issues/523
if send_request(request, 3, 1, &ctx).await.is_ok() {
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::BitcoinPredicateTriggered(data));
match send_request(request, 3, 1, &ctx).await {
Ok(_) => {
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::BitcoinPredicateTriggered(data));
}
}
Err(e) => {
chainhook_store
.predicates
.deregister_bitcoin_hook(data.chainhook.uuid.clone());
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::PredicateInterrupted(PredicateInterruptedData {
predicate_key: ChainhookSpecification::bitcoin_key(&data.chainhook.uuid),
error: format!("Unable to evaluate predicate on Bitcoin chainstate: {}", e)
}));
}
}
}
}
Expand Down Expand Up @@ -1326,16 +1347,18 @@ pub async fn start_observer_commands_handler(
match handle_stacks_hook_action(chainhook_to_trigger, &proofs, &ctx) {
Err(e) => {
ctx.try_log(|logger| {
slog::warn!(
// todo: we may want to set predicates that reach this branch as interrupted,
// but for now we will error to see if this problem occurs.
slog::error!(
logger,
"unable to handle action for predicate {}: {}",
predicate_uuid,
e
)
});
}
Ok(StacksChainhookOccurrence::Http(request)) => {
requests.push(request);
Ok(StacksChainhookOccurrence::Http(request, data)) => {
requests.push((request, data));
}
Ok(StacksChainhookOccurrence::File(_path, _bytes)) => {
ctx.try_log(|logger| {
Expand Down Expand Up @@ -1363,7 +1386,7 @@ pub async fn start_observer_commands_handler(
}
}

for request in requests.into_iter() {
for (request, data) in requests.into_iter() {
// todo(lgalabru): collect responses for reporting
ctx.try_log(|logger| {
slog::debug!(
Expand All @@ -1372,7 +1395,24 @@ pub async fn start_observer_commands_handler(
request
)
});
let _ = send_request(request, 3, 1, &ctx).await;
match send_request(request, 3, 1, &ctx).await {
Ok(_) => {
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::StacksPredicateTriggered(data));
}
}
Err(e) => {
chainhook_store
.predicates
.deregister_stacks_hook(data.chainhook.uuid.clone());
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::PredicateInterrupted(PredicateInterruptedData {
predicate_key: ChainhookSpecification::stacks_key(&data.chainhook.uuid),
error: format!("Unable to evaluate predicate on Bitcoin chainstate: {}", e)
}));
}
}
};
}

prometheus_monitoring.stx_metrics_block_evaluated(new_tip);
Expand Down

0 comments on commit 900d6df

Please sign in to comment.