diff --git a/Cargo.lock b/Cargo.lock index 71363a42..aa5b9ef3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -249,9 +249,9 @@ checksum = "383d29d513d8764dcdc42ea295d979eb99c3c9f00607b3692cf68a431f7dca72" [[package]] name = "bindgen" -version = "0.64.0" +version = "0.65.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4" +checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5" dependencies = [ "bitflags 1.3.2", "cexpr", @@ -259,12 +259,13 @@ dependencies = [ "lazy_static", "lazycell", "peeking_take_while", + "prettyplease", "proc-macro2", "quote", "regex", "rustc-hash", "shlex", - "syn 1.0.109", + "syn 2.0.39", ] [[package]] @@ -674,16 +675,6 @@ dependencies = [ "cc", ] -[[package]] -name = "combine" -version = "4.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" -dependencies = [ - "bytes", - "memchr", -] - [[package]] name = "const_fn" version = "0.4.9" @@ -1833,9 +1824,9 @@ dependencies = [ [[package]] name = "librocksdb-sys" -version = "0.10.0+7.9.2" +version = "0.11.0+8.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fe4d5874f5ff2bc616e55e8c6086d478fcda13faf9495768a4aa1c22042d30b" +checksum = "d3386f101bcb4bd252d8e9d2fb41ec3b0862a15a62b478c355b2982efa469e3e" dependencies = [ "bindgen", "bzip2-sys", @@ -1843,7 +1834,6 @@ dependencies = [ "glob", "libc", "libz-sys", - "lz4-sys", ] [[package]] @@ -1969,16 +1959,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "lz4-sys" -version = "1.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "matchers" version = "0.1.0" @@ -2302,7 +2282,6 @@ dependencies = [ "pprof", "progressing", "rand 0.8.5", - "redis", "reqwest", "rocket", "rocket_okapi", @@ -2310,7 +2289,6 @@ dependencies = [ "rusqlite", "schemars 0.8.12", "serde", - "serde-redis", "serde_derive", "serde_json", "tar", @@ -2548,6 +2526,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" +dependencies = [ + "proc-macro2", + "syn 2.0.39", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2727,21 +2715,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "redis" -version = "0.21.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "152f3863635cbb76b73bc247845781098302c6c9ad2060e1a9a7de56840346b6" -dependencies = [ - "async-trait", - "combine", - "itoa", - "percent-encoding", - "ryu", - "sha1", - "url", -] - [[package]] name = "redox_syscall" version = "0.3.5" @@ -3022,9 +2995,9 @@ dependencies = [ [[package]] name = "rocksdb" -version = "0.20.1" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "015439787fce1e75d55f279078d33ff14b4af5d93d995e8838ee4631301c8a99" +checksum = "bb6f170a4041d50a0ce04b0d2e14916d6ca863ea2e422689a5b694395d299ffe" dependencies = [ "libc", "librocksdb-sys", @@ -3341,16 +3314,6 @@ dependencies = [ "smallvec 0.6.14", ] -[[package]] -name = "serde-redis" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78a6774398484da90606c597852a9662188a3611c912ed6eb29fc81812605c0d" -dependencies = [ - "redis", - "serde", -] - [[package]] name = "serde_derive" version = "1.0.193" diff --git a/README.md b/README.md index d329fd25..3f5b2bbb 100644 --- a/README.md +++ b/README.md @@ -94,12 +94,11 @@ $ ordhook scan blocks --interval 767430:767753 --post-to=http://localhost:3000/a $ ordhook service start --post-to=http://localhost:3000/api/events --config-path=./Ordhook.toml ``` -New `http-post` endpoints can also be added dynamically by spinning up a redis server and adding the following section in the `Ordhook.toml` configuration file: +New `http-post` endpoints can also be added dynamically by adding the following section in the `Ordhook.toml` configuration file: ```toml [http_api] http_port = 20456 -database_uri = "redis://localhost:6379/" ``` Running `ordhook` with the command diff --git a/components/ordhook-cli/src/cli/mod.rs b/components/ordhook-cli/src/cli/mod.rs index 5331517e..8e35516e 100644 --- a/components/ordhook-cli/src/cli/mod.rs +++ b/components/ordhook-cli/src/cli/mod.rs @@ -3,10 +3,9 @@ use crate::config::generator::generate_config; use clap::{Parser, Subcommand}; use hiro_system_kit; use ordhook::chainhook_sdk::bitcoincore_rpc::{Auth, Client, RpcApi}; -use ordhook::chainhook_sdk::chainhooks::types::HttpHook; +use ordhook::chainhook_sdk::chainhooks::types::{BitcoinChainhookSpecification, HttpHook}; use ordhook::chainhook_sdk::chainhooks::types::{ - BitcoinChainhookFullSpecification, BitcoinChainhookNetworkSpecification, BitcoinPredicateType, - ChainhookFullSpecification, HookAction, OrdinalOperations, + BitcoinPredicateType, ChainhookFullSpecification, HookAction, OrdinalOperations, }; use ordhook::chainhook_sdk::indexer::bitcoin::{ build_http_client, download_and_parse_block_with_retry, retrieve_block_hash_with_retry, @@ -34,7 +33,6 @@ use ordhook::download::download_ordinals_dataset_if_required; use ordhook::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate; use ordhook::service::{start_observer_forwarding, Service}; use reqwest::Client as HttpClient; -use std::collections::BTreeMap; use std::io::{BufReader, Read}; use std::path::PathBuf; use std::process; @@ -555,8 +553,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { Some(&block_heights), None, cmd.auth_token, - )? - .into_selected_network_specification(&config.network.bitcoin_network)?; + )?; scan_bitcoin_chainstate_via_rpc_using_predicate( &predicate_spec, &config, @@ -734,7 +731,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { Some(start_block), cmd.auth_token.clone(), )?; - predicates.push(ChainhookFullSpecification::Bitcoin(predicate)); + predicates.push(predicate); } let mut service = Service::new(config, ctx.clone()); @@ -939,8 +936,7 @@ pub fn build_predicate_from_cli( block_heights: Option<&BlockHeights>, start_block: Option, auth_token: Option, -) -> Result { - let mut networks = BTreeMap::new(); +) -> Result { // Retrieve last block height known, and display it let (start_block, end_block, blocks) = match (start_block, block_heights) { (None, Some(BlockHeights::BlockRange(start, end))) => (Some(*start), Some(*end), None), @@ -948,30 +944,27 @@ pub fn build_predicate_from_cli( (Some(start), None) => (Some(start), None, None), _ => unreachable!(), }; - networks.insert( - config.network.bitcoin_network.clone(), - BitcoinChainhookNetworkSpecification { - start_block, - end_block, - blocks, - expire_after_occurrence: None, - include_proof: None, - include_inputs: None, - include_outputs: None, - include_witness: None, - predicate: BitcoinPredicateType::OrdinalsProtocol(OrdinalOperations::InscriptionFeed), - action: HookAction::HttpPost(HttpHook { - url: post_to.to_string(), - authorization_header: format!("Bearer {}", auth_token.unwrap_or("".to_string())), - }), - }, - ); - let predicate = BitcoinChainhookFullSpecification { + let predicate = BitcoinChainhookSpecification { + network: config.network.bitcoin_network.clone(), uuid: post_to.to_string(), owner_uuid: None, name: post_to.to_string(), version: 1, - networks, + start_block, + end_block, + blocks, + expire_after_occurrence: None, + include_proof: false, + include_inputs: false, + include_outputs: false, + include_witness: false, + expired_at: None, + enabled: true, + predicate: BitcoinPredicateType::OrdinalsProtocol(OrdinalOperations::InscriptionFeed), + action: HookAction::HttpPost(HttpHook { + url: post_to.to_string(), + authorization_header: format!("Bearer {}", auth_token.unwrap_or("".to_string())), + }), }; Ok(predicate) diff --git a/components/ordhook-cli/src/config/file.rs b/components/ordhook-cli/src/config/file.rs index 7eb2a2fc..a5604a55 100644 --- a/components/ordhook-cli/src/config/file.rs +++ b/components/ordhook-cli/src/config/file.rs @@ -9,8 +9,6 @@ use ordhook::config::{ use std::fs::File; use std::io::{BufReader, Read}; -const DEFAULT_REDIS_URI: &str = "redis://localhost:6379/"; - pub const DEFAULT_INGESTION_PORT: u16 = 20455; pub const DEFAULT_CONTROL_PORT: u16 = 20456; pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 10; @@ -74,9 +72,6 @@ impl ConfigFile { _ => PredicatesApi::On(PredicatesApiConfig { http_port: http_api.http_port.unwrap_or(DEFAULT_CONTROL_PORT), display_logs: http_api.display_logs.unwrap_or(true), - database_uri: http_api - .database_uri - .unwrap_or(DEFAULT_REDIS_URI.to_string()), }), }, }, diff --git a/components/ordhook-cli/src/config/generator.rs b/components/ordhook-cli/src/config/generator.rs index 158007ed..53b6cce5 100644 --- a/components/ordhook-cli/src/config/generator.rs +++ b/components/ordhook-cli/src/config/generator.rs @@ -12,7 +12,6 @@ working_dir = "ordhook" # # [http_api] # http_port = 20456 -# database_uri = "redis://localhost:6379/" [network] mode = "{network}" diff --git a/components/ordhook-core/Cargo.toml b/components/ordhook-core/Cargo.toml index ce417439..ab3f2b94 100644 --- a/components/ordhook-core/Cargo.toml +++ b/components/ordhook-core/Cargo.toml @@ -8,8 +8,6 @@ num_cpus = "1.16.0" serde = "1" serde_json = "1" serde_derive = "1" -redis = "0.21.5" -serde-redis = "0.12.0" hex = "0.4.3" rand = "0.8.5" chainhook-sdk = { version = "0.11.0", features = ["zeromq"] } @@ -33,14 +31,10 @@ fxhash = "0.2.1" rusqlite = { version = "0.27.0", features = ["bundled"] } anyhow = { version = "1.0.56", features = ["backtrace"] } schemars = { version = "0.8.10", git = "https://github.com/hirosystems/schemars.git", branch = "feat-chainhook-fixes" } -pprof = { version = "0.13.0", features = ["flamegraph"], optional = true } progressing = '3' futures = "0.3.28" - -[dependencies.rocksdb] -version = "0.20.1" -default-features = false -features = ["lz4", "snappy"] +rocksdb = { version = "0.21.0", default-features = false, features = ["snappy"] } +pprof = { version = "0.13.0", features = ["flamegraph"], optional = true } # [profile.release] # debug = true diff --git a/components/ordhook-core/src/config/mod.rs b/components/ordhook-core/src/config/mod.rs index 79a2017f..950e090e 100644 --- a/components/ordhook-core/src/config/mod.rs +++ b/components/ordhook-core/src/config/mod.rs @@ -46,7 +46,6 @@ pub enum PredicatesApi { #[derive(Clone, Debug)] pub struct PredicatesApiConfig { pub http_port: u16, - pub database_uri: String, pub display_logs: bool, } @@ -126,10 +125,6 @@ impl Config { } } - pub fn expected_api_database_uri(&self) -> &str { - &self.expected_api_config().database_uri - } - pub fn expected_api_config(&self) -> &PredicatesApiConfig { match self.http_api { PredicatesApi::On(ref config) => config, diff --git a/components/ordhook-core/src/db/mod.rs b/components/ordhook-core/src/db/mod.rs index 2b10562d..607325e7 100644 --- a/components/ordhook-core/src/db/mod.rs +++ b/components/ordhook-core/src/db/mod.rs @@ -47,12 +47,14 @@ pub fn open_readwrite_ordhook_db_conn( base_dir: &PathBuf, ctx: &Context, ) -> Result { - let conn = create_or_open_readwrite_db(&base_dir, ctx); + let db_path = get_default_ordhook_db_file_path(&base_dir); + let conn = create_or_open_readwrite_db(&db_path, ctx); Ok(conn) } -pub fn initialize_ordhook_db(path: &PathBuf, ctx: &Context) -> Connection { - let conn = create_or_open_readwrite_db(path, ctx); +pub fn initialize_ordhook_db(base_dir: &PathBuf, ctx: &Context) -> Connection { + let db_path = get_default_ordhook_db_file_path(&base_dir); + let conn = create_or_open_readwrite_db(&db_path, ctx); // TODO: introduce initial output if let Err(e) = conn.execute( "CREATE TABLE IF NOT EXISTS inscriptions ( @@ -156,20 +158,19 @@ pub fn initialize_ordhook_db(path: &PathBuf, ctx: &Context) -> Connection { conn } -pub fn create_or_open_readwrite_db(cache_path: &PathBuf, ctx: &Context) -> Connection { - let path = get_default_ordhook_db_file_path(&cache_path); - let open_flags = match std::fs::metadata(&path) { +pub fn create_or_open_readwrite_db(db_path: &PathBuf, ctx: &Context) -> Connection { + let open_flags = match std::fs::metadata(&db_path) { Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { // need to create - if let Some(dirp) = PathBuf::from(&path).parent() { + if let Some(dirp) = PathBuf::from(&db_path).parent() { std::fs::create_dir_all(dirp).unwrap_or_else(|e| { ctx.try_log(|logger| error!(logger, "{}", e.to_string())); }); } OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE } else { - panic!("FATAL: could not stat {}", path.display()); + panic!("FATAL: could not stat {}", db_path.display()); } } Ok(_md) => { @@ -179,7 +180,7 @@ pub fn create_or_open_readwrite_db(cache_path: &PathBuf, ctx: &Context) -> Conne }; let conn = loop { - match Connection::open_with_flags(&path, open_flags) { + match Connection::open_with_flags(&db_path, open_flags) { Ok(conn) => break conn, Err(e) => { ctx.try_log(|logger| error!(logger, "{}", e.to_string())); @@ -190,13 +191,13 @@ pub fn create_or_open_readwrite_db(cache_path: &PathBuf, ctx: &Context) -> Conne connection_with_defaults_pragma(conn) } -fn open_existing_readonly_db(path: &PathBuf, ctx: &Context) -> Connection { - let open_flags = match std::fs::metadata(path) { +pub fn open_existing_readonly_db(db_path: &PathBuf, ctx: &Context) -> Connection { + let open_flags = match std::fs::metadata(db_path) { Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { - panic!("FATAL: could not find {}", path.display()); + panic!("FATAL: could not find {}", db_path.display()); } else { - panic!("FATAL: could not stat {}", path.display()); + panic!("FATAL: could not stat {}", db_path.display()); } } Ok(_md) => { @@ -206,7 +207,7 @@ fn open_existing_readonly_db(path: &PathBuf, ctx: &Context) -> Connection { }; let conn = loop { - match Connection::open_with_flags(path, open_flags) { + match Connection::open_with_flags(db_path, open_flags) { Ok(conn) => break conn, Err(e) => { ctx.try_log(|logger| { @@ -579,7 +580,7 @@ pub fn insert_transfer_in_locations( } } -fn perform_query_exists( +pub fn perform_query_exists( query: &str, args: &[&dyn ToSql], db_conn: &Connection, @@ -589,7 +590,7 @@ fn perform_query_exists( !res.is_empty() } -fn perform_query_one( +pub fn perform_query_one( query: &str, args: &[&dyn ToSql], db_conn: &Connection, @@ -606,7 +607,7 @@ where } } -fn perform_query_set( +pub fn perform_query_set( query: &str, args: &[&dyn ToSql], db_conn: &Connection, diff --git a/components/ordhook-core/src/scan/bitcoin.rs b/components/ordhook-core/src/scan/bitcoin.rs index eadeb1e4..f3ef8ea5 100644 --- a/components/ordhook-core/src/scan/bitcoin.rs +++ b/components/ordhook-core/src/scan/bitcoin.rs @@ -1,4 +1,4 @@ -use crate::config::{Config, PredicatesApi}; +use crate::config::Config; use crate::core::protocol::inscription_parsing::{ get_inscriptions_revealed_in_block, get_inscriptions_transferred_in_block, parse_inscriptions_and_standardize_block, @@ -6,9 +6,8 @@ use crate::core::protocol::inscription_parsing::{ use crate::core::protocol::inscription_sequencing::consolidate_block_with_pre_computed_ordinals_data; use crate::db::{get_any_entry_in_ordinal_activities, open_readonly_ordhook_db_conn}; use crate::download::download_ordinals_dataset_if_required; -use crate::service::predicates::{ - open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus, - ScanningData, +use crate::service::observers::{ + open_readwrite_observers_db_conn_or_panic, update_observer_progress, }; use chainhook_sdk::bitcoincore_rpc::RpcApi; use chainhook_sdk::bitcoincore_rpc::{Auth, Client}; @@ -78,7 +77,8 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( info!( ctx.expect_logger(), - "Starting predicate evaluation on Bitcoin blocks", + "Starting predicate evaluation on {} Bitcoin blocks", + block_heights_to_scan.len() ); let mut actions_triggered = 0; let mut err_count = 0; @@ -88,9 +88,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( None => config.get_event_observer_config(), }; let bitcoin_config = event_observer_config.get_bitcoin_config(); - let number_of_blocks_to_scan = block_heights_to_scan.len() as u64; let mut number_of_blocks_scanned = 0; - let mut number_of_blocks_sent = 0u64; let http_client = build_http_client(); while let Some(current_block_height) = block_heights_to_scan.pop_front() { @@ -162,38 +160,23 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( ) .await { - Ok(actions) => { - if actions > 0 { - number_of_blocks_sent += 1; - } - actions_triggered += actions - } + Ok(actions) => actions_triggered += actions, Err(_) => err_count += 1, } if err_count >= 3 { return Err(format!("Scan aborted (consecutive action errors >= 3)")); } - - if let PredicatesApi::On(ref api_config) = config.http_api { - if number_of_blocks_scanned % 50 == 0 { - let status = PredicateStatus::Scanning(ScanningData { - number_of_blocks_to_scan, - number_of_blocks_scanned, - number_of_blocks_sent, - current_block_height, - }); - let mut predicates_db_conn = - open_readwrite_predicates_db_conn_or_panic(api_config, &ctx); - update_predicate_status( - &predicate_spec.key(), - status, - &mut predicates_db_conn, - &ctx, - ) - } + { + let observers_db_conn = + open_readwrite_observers_db_conn_or_panic(&config.expected_cache_path(), &ctx); + update_observer_progress( + &predicate_spec.uuid, + current_block_height, + &observers_db_conn, + &ctx, + ) } - if block_heights_to_scan.is_empty() && floating_end_block { match bitcoin_rpc.get_blockchain_info() { Ok(result) => { @@ -212,17 +195,6 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( "{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered" ); - if let PredicatesApi::On(ref api_config) = config.http_api { - let status = PredicateStatus::Scanning(ScanningData { - number_of_blocks_to_scan, - number_of_blocks_scanned, - number_of_blocks_sent, - current_block_height: 0, - }); - let mut predicates_db_conn = open_readwrite_predicates_db_conn_or_panic(api_config, &ctx); - update_predicate_status(&predicate_spec.key(), status, &mut predicates_db_conn, &ctx) - } - Ok(()) } diff --git a/components/ordhook-core/src/service/http_api.rs b/components/ordhook-core/src/service/http_api.rs index b5f5d007..3beaf844 100644 --- a/components/ordhook-core/src/service/http_api.rs +++ b/components/ordhook-core/src/service/http_api.rs @@ -1,6 +1,6 @@ use std::{ - collections::HashMap, net::{IpAddr, Ipv4Addr}, + path::PathBuf, sync::{mpsc::Sender, Arc, Mutex}, }; @@ -9,18 +9,18 @@ use chainhook_sdk::{ observer::ObserverCommand, utils::Context, }; -use redis::{Commands, Connection}; use rocket::config::{self, Config, LogLevel}; use rocket::serde::json::{json, Json, Value as JsonValue}; use rocket::State; use std::error::Error; -use crate::config::PredicatesApiConfig; - -use super::{open_readwrite_predicates_db_conn, PredicateStatus}; +use super::observers::{ + find_all_observers, find_observer_with_uuid, open_readonly_observers_db_conn, ObserverReport, +}; pub async fn start_predicate_api_server( - api_config: PredicatesApiConfig, + port: u16, + observers_db_dir_path: PathBuf, observer_commands_tx: Sender, ctx: Context, ) -> Result<(), Box> { @@ -32,7 +32,7 @@ pub async fn start_predicate_api_server( shutdown_config.mercy = 1; let control_config = Config { - port: api_config.http_port, + port, workers: 1, address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), keep_alive: 5, @@ -57,7 +57,7 @@ pub async fn start_predicate_api_server( let ignite = rocket::custom(control_config) .manage(background_job_tx_mutex) - .manage(api_config) + .manage(observers_db_dir_path) .manage(ctx_cloned) .mount("/", routes) .ignite() @@ -80,24 +80,14 @@ fn handle_ping(ctx: &State) -> Json { #[get("/v1/observers", format = "application/json")] fn handle_get_predicates( - api_config: &State, + observers_db_dir_path: &State, ctx: &State, ) -> Json { ctx.try_log(|logger| info!(logger, "Handling HTTP GET /v1/observers")); - match open_readwrite_predicates_db_conn(api_config) { - Ok(mut predicates_db_conn) => { - let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, &ctx) { - Ok(predicates) => predicates, - Err(e) => { - ctx.try_log(|logger| warn!(logger, "unable to retrieve predicates: {e}")); - return Json(json!({ - "status": 500, - "message": "unable to retrieve predicates", - })); - } - }; - - let serialized_predicates = predicates + match open_readonly_observers_db_conn(observers_db_dir_path, ctx) { + Ok(mut db_conn) => { + let observers = find_all_observers(&mut db_conn, &ctx); + let serialized_predicates = observers .iter() .map(|(p, s)| serialized_predicate_with_status(p, s)) .collect::>(); @@ -117,7 +107,7 @@ fn handle_get_predicates( #[post("/v1/observers", format = "application/json", data = "")] fn handle_create_predicate( predicate: Json, - api_config: &State, + observers_db_dir_path: &State, background_job_tx: &State>>>, ctx: &State, ) -> Json { @@ -132,10 +122,11 @@ fn handle_create_predicate( let predicate_uuid = predicate.get_uuid().to_string(); - if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) { + if let Ok(mut predicates_db_conn) = open_readonly_observers_db_conn(observers_db_dir_path, ctx) + { let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid)); - match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) { - Ok(Some(_)) => { + match find_observer_with_uuid(&key, &mut predicates_db_conn, &ctx) { + Some(_) => { return Json(json!({ "status": 409, "error": "Predicate uuid already in use", @@ -162,29 +153,21 @@ fn handle_create_predicate( #[get("/v1/observers/", format = "application/json")] fn handle_get_predicate( predicate_uuid: String, - api_config: &State, + observers_db_dir_path: &State, ctx: &State, ) -> Json { ctx.try_log(|logger| info!(logger, "Handling HTTP GET /v1/observers/{}", predicate_uuid)); - match open_readwrite_predicates_db_conn(api_config) { + match open_readonly_observers_db_conn(observers_db_dir_path, ctx) { Ok(mut predicates_db_conn) => { let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid)); - let entry = match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) { - Ok(Some((ChainhookSpecification::Stacks(spec), status))) => json!({ - "chain": "stacks", - "uuid": spec.uuid, - "network": spec.network, - "predicate": spec.predicate, - "status": status, - "enabled": spec.enabled, - }), - Ok(Some((ChainhookSpecification::Bitcoin(spec), status))) => json!({ + let entry = match find_observer_with_uuid(&key, &mut predicates_db_conn, &ctx) { + Some((ChainhookSpecification::Bitcoin(spec), report)) => json!({ "chain": "bitcoin", "uuid": spec.uuid, "network": spec.network, "predicate": spec.predicate, - "status": status, + "status": report, "enabled": spec.enabled, }), _ => { @@ -232,109 +215,25 @@ fn handle_delete_bitcoin_predicate( })) } -pub fn get_entry_from_predicates_db( - predicate_key: &str, - predicate_db_conn: &mut Connection, - _ctx: &Context, -) -> Result, String> { - let entry: HashMap = predicate_db_conn.hgetall(predicate_key).map_err(|e| { - format!( - "unable to load predicate associated with key {}: {}", - predicate_key, - e.to_string() - ) - })?; - - let encoded_spec = match entry.get("specification") { - None => return Ok(None), - Some(payload) => payload, - }; - - let spec = ChainhookSpecification::deserialize_specification(&encoded_spec)?; - - let encoded_status = match entry.get("status") { - None => unimplemented!(), - Some(payload) => payload, - }; - - let status = serde_json::from_str(&encoded_status).map_err(|e| format!("{}", e.to_string()))?; - - Ok(Some((spec, status))) -} - -pub fn get_entries_from_predicates_db( - predicate_db_conn: &mut Connection, - ctx: &Context, -) -> Result, String> { - let key: String = format!("{}", ChainhookSpecification::bitcoin_key("*")); - let chainhooks_to_load: Vec = predicate_db_conn - .scan_match(key) - .map_err(|e| format!("unable to connect to redis: {}", e.to_string()))? - .into_iter() - .collect(); - - let mut predicates = vec![]; - for predicate_key in chainhooks_to_load.iter() { - let chainhook = match get_entry_from_predicates_db(predicate_key, predicate_db_conn, ctx) { - Ok(Some((spec, status))) => (spec, status), - Ok(None) => { - ctx.try_log(|logger| { - warn!( - logger, - "unable to load predicate associated with key {}", predicate_key, - ) - }); - continue; - } - Err(e) => { - ctx.try_log(|logger| { - error!( - logger, - "unable to load predicate associated with key {}: {}", - predicate_key, - e.to_string() - ) - }); - continue; - } - }; - predicates.push(chainhook); - } - Ok(predicates) -} - -pub fn load_predicates_from_redis( - config: &crate::config::Config, - ctx: &Context, -) -> Result, String> { - let redis_uri: &str = config.expected_api_database_uri(); - let client = redis::Client::open(redis_uri.clone()) - .map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?; - let mut predicate_db_conn = client - .get_connection() - .map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?; - get_entries_from_predicates_db(&mut predicate_db_conn, ctx) -} - fn serialized_predicate_with_status( predicate: &ChainhookSpecification, - status: &PredicateStatus, + report: &ObserverReport, ) -> JsonValue { - match (predicate, status) { - (ChainhookSpecification::Stacks(spec), status) => json!({ + match (predicate, report) { + (ChainhookSpecification::Stacks(spec), report) => json!({ "chain": "stacks", "uuid": spec.uuid, "network": spec.network, "predicate": spec.predicate, - "status": status, + "status": report, "enabled": spec.enabled, }), - (ChainhookSpecification::Bitcoin(spec), status) => json!({ + (ChainhookSpecification::Bitcoin(spec), report) => json!({ "chain": "bitcoin", "uuid": spec.uuid, "network": spec.network, "predicate": spec.predicate, - "status": status, + "status": report, "enabled": spec.enabled, }), } diff --git a/components/ordhook-core/src/service/mod.rs b/components/ordhook-core/src/service/mod.rs index 0c975518..0745f131 100644 --- a/components/ordhook-core/src/service/mod.rs +++ b/components/ordhook-core/src/service/mod.rs @@ -1,5 +1,5 @@ mod http_api; -pub mod predicates; +pub mod observers; mod runloops; use crate::config::{Config, PredicatesApi}; @@ -22,9 +22,10 @@ use crate::db::{ }; use crate::scan::bitcoin::process_block_with_predicates; use crate::service::http_api::start_predicate_api_server; -use crate::service::predicates::{ - create_and_consolidate_chainhook_config_with_predicates, open_readwrite_predicates_db_conn, - update_predicate_spec, update_predicate_status, PredicateStatus, +use crate::service::observers::{ + create_and_consolidate_chainhook_config_with_predicates, insert_entry_in_observers, + open_readwrite_observers_db_conn, remove_entry_from_observers, update_observer_progress, + update_observer_streaming_enabled, ObserverReport, }; use crate::service::runloops::start_bitcoin_scan_runloop; @@ -42,7 +43,6 @@ use crossbeam_channel::unbounded; use crossbeam_channel::{select, Sender}; use dashmap::DashMap; use fxhash::FxHasher; -use redis::Commands; use std::collections::BTreeMap; use std::hash::BuildHasherDefault; @@ -61,29 +61,15 @@ impl Service { pub async fn run( &mut self, - predicates: Vec, + predicates: Vec, predicate_activity_relayer: Option< crossbeam_channel::Sender, >, ) -> Result<(), String> { let mut event_observer_config = self.config.get_event_observer_config(); - let chainhook_config = create_and_consolidate_chainhook_config_with_predicates( - predicates, - predicate_activity_relayer.is_some(), - &self.config, - &self.ctx, - ); - - event_observer_config.chainhook_config = Some(chainhook_config); - - let ordhook_config = self.config.get_ordhook_config(); - - // Sleep - // std::thread::sleep(std::time::Duration::from_secs(1200)); // Catch-up with chain tip - self.catch_up_with_chain_tip(false, &event_observer_config) - .await?; + let chain_tip_height = self.catch_up_with_chain_tip(false).await?; info!( self.ctx.expect_logger(), "Database up to date, service will start streaming blocks" @@ -95,12 +81,32 @@ impl Service { // Create the chainhook runloop tx/rx comms let (observer_command_tx, observer_command_rx) = channel(); let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded(); + let ordhook_config = self.config.get_ordhook_config(); let inner_ctx = if ordhook_config.logs.chainhook_internals { self.ctx.clone() } else { Context::empty() }; + // Observers handling + // 1) update event_observer_config with observers ready to be used + // 2) catch-up outdated observers by dispatching replays + let (chainhook_config, outdated_observers) = + create_and_consolidate_chainhook_config_with_predicates( + predicates, + chain_tip_height, + predicate_activity_relayer.is_some(), + &self.config, + &self.ctx, + )?; + // Dispatch required replays + for outdated_observer_spec in outdated_observers.into_iter() { + let _ = observer_command_tx.send(ObserverCommand::RegisterPredicate( + ChainhookFullSpecification::Bitcoin(outdated_observer_spec), + )); + } + event_observer_config.chainhook_config = Some(chainhook_config); + let _ = start_event_observer( event_observer_config, observer_command_tx.clone(), @@ -113,19 +119,11 @@ impl Service { // If HTTP Predicates API is on, we start: // - Thread pool in charge of performing replays // - API server - if self.config.is_http_api_enabled() { - self.start_main_runloop_with_dynamic_predicates( - &observer_command_tx, - observer_event_rx, - predicate_activity_relayer, - )?; - } else { - self.start_main_runloop( - &observer_command_tx, - observer_event_rx, - predicate_activity_relayer, - )?; - } + self.start_main_runloop_with_dynamic_predicates( + &observer_command_tx, + observer_event_rx, + predicate_activity_relayer, + )?; Ok(()) } @@ -140,12 +138,13 @@ impl Service { String, > { let mut event_observer_config = self.config.get_event_observer_config(); - let chainhook_config = create_and_consolidate_chainhook_config_with_predicates( + let (chainhook_config, _) = create_and_consolidate_chainhook_config_with_predicates( vec![], + 0, true, &self.config, &self.ctx, - ); + )?; event_observer_config.chainhook_config = Some(chainhook_config); @@ -217,10 +216,6 @@ impl Service { crossbeam_channel::Sender, >, ) -> Result<(), String> { - let PredicatesApi::On(ref api_config) = self.config.http_api else { - return Ok(()); - }; - let (bitcoin_scan_op_tx, bitcoin_scan_op_rx) = crossbeam_channel::unbounded(); let ctx = self.ctx.clone(); let config = self.config.clone(); @@ -236,18 +231,26 @@ impl Service { }) .expect("unable to spawn thread"); - info!( - self.ctx.expect_logger(), - "Listening on port {} for chainhook predicate registrations", api_config.http_port - ); - let ctx = self.ctx.clone(); - let api_config = api_config.clone(); - let moved_observer_command_tx = observer_command_tx.clone(); - // Test and initialize a database connection - let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || { - let future = start_predicate_api_server(api_config, moved_observer_command_tx, ctx); - let _ = hiro_system_kit::nestable_block_on(future); - }); + if let PredicatesApi::On(ref api_config) = self.config.http_api { + info!( + self.ctx.expect_logger(), + "Listening on port {} for chainhook predicate registrations", api_config.http_port + ); + let ctx = self.ctx.clone(); + let api_config = api_config.clone(); + let moved_observer_command_tx = observer_command_tx.clone(); + let db_dir_path = self.config.expected_cache_path(); + // Test and initialize a database connection + let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || { + let future = start_predicate_api_server( + api_config.http_port, + db_dir_path, + moved_observer_command_tx, + ctx, + ); + let _ = hiro_system_kit::nestable_block_on(future); + }); + } loop { let event = match observer_event_rx.recv() { @@ -266,32 +269,22 @@ impl Service { // If start block specified, use it. // If no start block specified, depending on the nature the hook, we'd like to retrieve: // - contract-id - if let PredicatesApi::On(ref config) = self.config.http_api { - let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config) - { - Ok(con) => con, - Err(e) => { - error!( - self.ctx.expect_logger(), - "unable to register predicate: {}", - e.to_string() - ); - continue; - } - }; - update_predicate_spec( - &spec.key(), - &spec, - &mut predicates_db_conn, - &self.ctx, - ); - update_predicate_status( - &spec.key(), - PredicateStatus::Disabled, - &mut predicates_db_conn, - &self.ctx, - ); - } + let observers_db_conn = match open_readwrite_observers_db_conn( + &self.config.expected_cache_path(), + &self.ctx, + ) { + Ok(con) => con, + Err(e) => { + error!( + self.ctx.expect_logger(), + "unable to register predicate: {}", + e.to_string() + ); + continue; + } + }; + let report = ObserverReport::default(); + insert_entry_in_observers(&spec, &report, &observers_db_conn, &self.ctx); match spec { ChainhookSpecification::Stacks(_predicate_spec) => {} ChainhookSpecification::Bitcoin(predicate_spec) => { @@ -300,60 +293,68 @@ impl Service { } } ObserverEvent::PredicateEnabled(spec) => { - if let PredicatesApi::On(ref config) = self.config.http_api { - let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config) - { - Ok(con) => con, - Err(e) => { - error!( - self.ctx.expect_logger(), - "unable to enable predicate: {}", - e.to_string() - ); - continue; - } - }; - update_predicate_spec( - &spec.key(), - &spec, - &mut predicates_db_conn, - &self.ctx, - ); - update_predicate_status( - &spec.key(), - PredicateStatus::InitialScanCompleted, - &mut predicates_db_conn, - &self.ctx, - ); - } + let observers_db_conn = match open_readwrite_observers_db_conn( + &self.config.expected_cache_path(), + &self.ctx, + ) { + Ok(con) => con, + Err(e) => { + error!( + self.ctx.expect_logger(), + "unable to enable observer: {}", + e.to_string() + ); + continue; + } + }; + update_observer_streaming_enabled( + &spec.uuid(), + true, + &observers_db_conn, + &self.ctx, + ); } ObserverEvent::PredicateDeregistered(spec) => { - if let PredicatesApi::On(ref config) = self.config.http_api { - let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config) - { + let observers_db_conn = match open_readwrite_observers_db_conn( + &self.config.expected_cache_path(), + &self.ctx, + ) { + Ok(con) => con, + Err(e) => { + error!( + self.ctx.expect_logger(), + "unable to deregister observer: {}", + e.to_string() + ); + continue; + } + }; + remove_entry_from_observers(&spec.uuid(), &observers_db_conn, &self.ctx); + } + ObserverEvent::BitcoinPredicateTriggered(data) => { + if let Some(ref tip) = data.apply.last() { + let observers_db_conn = match open_readwrite_observers_db_conn( + &self.config.expected_cache_path(), + &self.ctx, + ) { Ok(con) => con, Err(e) => { error!( self.ctx.expect_logger(), - "unable to deregister predicate: {}", + "unable to update observer: {}", e.to_string() ); continue; } }; - let predicate_key = spec.key(); - let res: Result<(), redis::RedisError> = - predicates_db_conn.del(predicate_key); - if let Err(e) = res { - error!( - self.ctx.expect_logger(), - "unable to delete predicate: {}", - e.to_string() - ); - } + let last_block_height_update = tip.block.block_identifier.index; + update_observer_progress( + &data.chainhook.uuid, + last_block_height_update, + &observers_db_conn, + &self.ctx, + ) } - } - ObserverEvent::BitcoinPredicateTriggered(data) => { if let Some(ref tx) = predicate_activity_relayer { let _ = tx.send(data); } @@ -371,7 +372,7 @@ impl Service { pub fn set_up_observer_config( &self, - predicates: Vec, + predicates: Vec, enable_internal_trigger: bool, ) -> Result< ( @@ -381,12 +382,13 @@ impl Service { String, > { let mut event_observer_config = self.config.get_event_observer_config(); - let chainhook_config = create_and_consolidate_chainhook_config_with_predicates( + let (chainhook_config, _) = create_and_consolidate_chainhook_config_with_predicates( predicates, + 0, enable_internal_trigger, &self.config, &self.ctx, - ); + )?; event_observer_config.chainhook_config = Some(chainhook_config); let data_rx = if enable_internal_trigger { let (tx, rx) = crossbeam_channel::bounded(256); @@ -440,8 +442,7 @@ impl Service { pub async fn catch_up_with_chain_tip( &mut self, rebuild_from_scratch: bool, - event_observer_config: &EventObserverConfig, - ) -> Result<(), String> { + ) -> Result { if rebuild_from_scratch { let blocks_db = open_ordhook_db_conn_rocks_db_loop( true, @@ -459,15 +460,13 @@ impl Service { &self.ctx, )?; } - let tx_replayer = start_observer_forwarding(event_observer_config, &self.ctx); - self.update_state(Some(tx_replayer)).await?; - Ok(()) + self.update_state(None).await } pub async fn update_state( &self, block_post_processor: Option>, - ) -> Result<(), String> { + ) -> Result { // First, make sure that rocksdb and sqlite are aligned. // If rocksdb.chain_tip.height <= sqlite.chain_tip.height // Perform some block compression until that height. @@ -537,7 +536,7 @@ impl Service { last_block_processed = end_block; } - Ok(()) + Ok(last_block_processed) } pub async fn replay_transfers( diff --git a/components/ordhook-core/src/service/observers.rs b/components/ordhook-core/src/service/observers.rs new file mode 100644 index 00000000..0172cacc --- /dev/null +++ b/components/ordhook-core/src/service/observers.rs @@ -0,0 +1,362 @@ +use std::{ + collections::BTreeMap, + path::PathBuf, + sync::mpsc::{channel, Sender}, +}; + +use chainhook_sdk::{ + chainhooks::types::{ + BitcoinChainhookFullSpecification, BitcoinChainhookNetworkSpecification, + BitcoinChainhookSpecification, ChainhookConfig, ChainhookSpecification, + }, + observer::EventObserverConfig, + types::BitcoinBlockData, + utils::Context, +}; +use rusqlite::{Connection, ToSql}; +use serde_json::json; + +use crate::{ + config::Config, + db::{ + create_or_open_readwrite_db, open_existing_readonly_db, perform_query_one, + perform_query_set, + }, + scan::bitcoin::process_block_with_predicates, +}; + +pub fn update_observer_progress( + uuid: &str, + last_block_height_update: u64, + observers_db_conn: &Connection, + ctx: &Context, +) { + while let Err(e) = observers_db_conn.execute( + "UPDATE observers SET last_block_height_update = ? WHERE uuid = ?", + rusqlite::params![last_block_height_update, uuid], + ) { + ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + std::thread::sleep(std::time::Duration::from_secs(1)); + } +} + +pub fn update_observer_streaming_enabled( + uuid: &str, + streaming_enabled: bool, + observers_db_conn: &Connection, + ctx: &Context, +) { + while let Err(e) = observers_db_conn.execute( + "UPDATE observers SET streaming_enabled = ? WHERE uuid = ?", + rusqlite::params![streaming_enabled, uuid], + ) { + ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + std::thread::sleep(std::time::Duration::from_secs(1)); + } +} + +pub fn insert_entry_in_observers( + spec: &ChainhookSpecification, + report: &ObserverReport, + observers_db_conn: &Connection, + ctx: &Context, +) { + remove_entry_from_observers(&spec.uuid(), observers_db_conn, ctx); + while let Err(e) = observers_db_conn.execute( + "INSERT INTO observers (uuid, spec, streaming_enabled, last_block_height_update) VALUES (?1, ?2, ?3, ?4)", + rusqlite::params![&spec.uuid(), json!(spec).to_string(), report.streaming_enabled, report.last_block_height_update], + ) { + ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + std::thread::sleep(std::time::Duration::from_secs(1)); + } +} + +pub fn get_default_observers_db_file_path(base_dir: &PathBuf) -> PathBuf { + let mut destination_path = base_dir.clone(); + destination_path.push("observers.sqlite"); + destination_path +} + +pub fn open_readonly_observers_db_conn( + base_dir: &PathBuf, + ctx: &Context, +) -> Result { + let db_path = get_default_observers_db_file_path(&base_dir); + let conn = open_existing_readonly_db(&db_path, ctx); + Ok(conn) +} + +pub fn open_readwrite_observers_db_conn( + base_dir: &PathBuf, + ctx: &Context, +) -> Result { + let db_path = get_default_observers_db_file_path(&base_dir); + let conn = create_or_open_readwrite_db(&db_path, ctx); + Ok(conn) +} + +pub fn open_readwrite_observers_db_conn_or_panic(base_dir: &PathBuf, ctx: &Context) -> Connection { + let conn = match open_readwrite_observers_db_conn(base_dir, ctx) { + Ok(con) => con, + Err(message) => { + error!(ctx.expect_logger(), "Storage: {}", message.to_string()); + panic!(); + } + }; + conn +} + +pub fn initialize_observers_db(base_dir: &PathBuf, ctx: &Context) -> Connection { + let db_path = get_default_observers_db_file_path(&base_dir); + let conn = create_or_open_readwrite_db(&db_path, ctx); + // TODO: introduce initial output + if let Err(e) = conn.execute( + "CREATE TABLE IF NOT EXISTS observers ( + uuid TEXT NOT NULL PRIMARY KEY, + spec TEXT NOT NULL, + streaming_enabled INTEGER NOT NULL, + last_block_height_update INTEGER NOT NULL + )", + [], + ) { + ctx.try_log(|logger| { + warn!( + logger, + "Unable to create table inscriptions: {}", + e.to_string() + ) + }); + } + conn +} + +#[derive(Default, Clone, Serialize, Deserialize)] +pub struct ObserverReport { + pub streaming_enabled: bool, + pub last_block_height_update: u64, +} + +pub fn find_observer_with_uuid( + uuid: &str, + db_conn: &Connection, + ctx: &Context, +) -> Option<(ChainhookSpecification, ObserverReport)> { + let args: &[&dyn ToSql] = &[&uuid.to_sql().unwrap()]; + let query = + "SELECT spec, streaming_enabled, last_block_height_update FROM observers WHERE uuid = ?"; + perform_query_one(query, args, db_conn, ctx, |row| { + let encoded_spec: String = row.get(0).unwrap(); + let spec = ChainhookSpecification::deserialize_specification(&encoded_spec).unwrap(); + let report = ObserverReport { + streaming_enabled: row.get(1).unwrap(), + last_block_height_update: row.get(2).unwrap(), + }; + (spec, report) + }) +} + +pub fn find_all_observers( + db_conn: &Connection, + ctx: &Context, +) -> Vec<(ChainhookSpecification, ObserverReport)> { + let args: &[&dyn ToSql] = &[]; + let query = "SELECT spec, streaming_enabled, last_block_height_update FROM observers"; + perform_query_set(query, args, db_conn, ctx, |row| { + let encoded_spec: String = row.get(0).unwrap(); + let spec = ChainhookSpecification::deserialize_specification(&encoded_spec).unwrap(); + let report = ObserverReport { + streaming_enabled: row.get(1).unwrap(), + last_block_height_update: row.get(2).unwrap(), + }; + (spec, report) + }) +} + +pub fn remove_entry_from_observers(uuid: &str, db_conn: &Connection, ctx: &Context) { + while let Err(e) = db_conn.execute( + "DELETE FROM observers WHERE uuid = ?1", + rusqlite::params![&uuid], + ) { + ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string())); + std::thread::sleep(std::time::Duration::from_secs(1)); + } +} + +// Cases to cover: +// - Empty state +// - State present, but not up to date +// - Blocks presents, no inscriptions +// - Blocks presents, inscription presents +// - State up to date + +pub fn start_predicate_processor( + event_observer_config: &EventObserverConfig, + ctx: &Context, +) -> Sender { + let (tx, rx) = channel(); + + let mut moved_event_observer_config = event_observer_config.clone(); + let moved_ctx = ctx.clone(); + + let _ = hiro_system_kit::thread_named("Initial predicate processing") + .spawn(move || { + if let Some(mut chainhook_config) = moved_event_observer_config.chainhook_config.take() + { + let mut bitcoin_predicates_ref: Vec<&BitcoinChainhookSpecification> = vec![]; + for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut() { + bitcoin_predicate.enabled = false; + bitcoin_predicates_ref.push(bitcoin_predicate); + } + while let Ok(block) = rx.recv() { + let future = process_block_with_predicates( + block, + &bitcoin_predicates_ref, + &moved_event_observer_config, + &moved_ctx, + ); + let res = hiro_system_kit::nestable_block_on(future); + if let Err(_) = res { + error!(moved_ctx.expect_logger(), "Initial ingestion failing"); + } + } + } + }) + .expect("unable to spawn thread"); + tx +} + +pub fn create_and_consolidate_chainhook_config_with_predicates( + provided_observers: Vec, + chain_tip_height: u64, + enable_internal_trigger: bool, + config: &Config, + ctx: &Context, +) -> Result<(ChainhookConfig, Vec), String> { + let mut chainhook_config: ChainhookConfig = ChainhookConfig::new(); + + if enable_internal_trigger { + let _ = chainhook_config.register_specification(ChainhookSpecification::Bitcoin( + BitcoinChainhookSpecification { + uuid: format!("ordhook-internal-trigger"), + owner_uuid: None, + name: format!("ordhook-internal-trigger"), + network: config.network.bitcoin_network.clone(), + version: 1, + blocks: None, + start_block: None, + end_block: None, + expired_at: None, + expire_after_occurrence: None, + predicate: chainhook_sdk::chainhooks::types::BitcoinPredicateType::OrdinalsProtocol( + chainhook_sdk::chainhooks::types::OrdinalOperations::InscriptionFeed, + ), + action: chainhook_sdk::chainhooks::types::HookAction::Noop, + include_proof: false, + include_inputs: true, + include_outputs: false, + include_witness: false, + enabled: true, + }, + )); + } + + let observers_db_conn = initialize_observers_db(&config.expected_cache_path(), ctx); + + let mut observers_to_catchup = vec![]; + let mut observers_to_clean_up = vec![]; + let mut observers_ready = vec![]; + + let previously_registered_observers = find_all_observers(&observers_db_conn, ctx); + for (spec, report) in previously_registered_observers.into_iter() { + let ChainhookSpecification::Bitcoin(spec) = spec else { + continue; + }; + // De-register outdated observers: was end_block (if specified) scanned? + if let Some(expiration) = spec.end_block { + if report.last_block_height_update >= expiration { + observers_to_clean_up.push(spec.uuid); + continue; + } + } + // De-register outdated observers: were all blocks (if specified) scanned? + if let Some(ref blocks) = spec.blocks { + let mut scanning_completed = true; + for block in blocks.iter() { + if block.gt(&report.last_block_height_update) { + scanning_completed = false; + break; + } + } + if scanning_completed { + observers_to_clean_up.push(spec.uuid); + continue; + } + } + + if report.last_block_height_update == chain_tip_height { + observers_ready.push(spec); + } else { + observers_to_catchup.push((spec, report)); + } + } + + // Clean-up + for outdated_observer in observers_to_clean_up.iter() { + remove_entry_from_observers(outdated_observer, &observers_db_conn, ctx); + } + + // Registrations + for mut bitcoin_spec in observers_ready.into_iter() { + bitcoin_spec.enabled = true; + let spec = ChainhookSpecification::Bitcoin(bitcoin_spec); + chainhook_config.register_specification(spec)?; + } + + // Among observers provided, only consider the ones that are not known + for observer in provided_observers.into_iter() { + let existing_observer = find_observer_with_uuid(&observer.uuid, &observers_db_conn, ctx); + if existing_observer.is_some() { + continue; + } + let report = ObserverReport::default(); + observers_to_catchup.push((observer, report)); + } + + let mut full_specs = vec![]; + + for (observer, report) in observers_to_catchup.into_iter() { + let mut networks = BTreeMap::new(); + networks.insert( + config.network.bitcoin_network.clone(), + BitcoinChainhookNetworkSpecification { + start_block: Some(report.last_block_height_update + 1), + end_block: observer.end_block, + blocks: observer.blocks, + expire_after_occurrence: observer.expire_after_occurrence, + include_proof: Some(observer.include_proof), + include_inputs: Some(observer.include_inputs), + include_outputs: Some(observer.include_outputs), + include_witness: Some(observer.include_witness), + predicate: observer.predicate, + action: observer.action, + }, + ); + let full_spec = BitcoinChainhookFullSpecification { + uuid: observer.uuid, + owner_uuid: observer.owner_uuid, + name: observer.name, + version: observer.version, + networks, + }; + info!( + ctx.expect_logger(), + "Observer '{}' to be caught-up (last block sent: {}, tip: {})", + full_spec.name, + report.last_block_height_update, + chain_tip_height + ); + full_specs.push(full_spec); + } + + Ok((chainhook_config, full_specs)) +} diff --git a/components/ordhook-core/src/service/predicates.rs b/components/ordhook-core/src/service/predicates.rs deleted file mode 100644 index e8f66aba..00000000 --- a/components/ordhook-core/src/service/predicates.rs +++ /dev/null @@ -1,266 +0,0 @@ -use std::sync::mpsc::{channel, Sender}; - -use chainhook_sdk::{ - chainhooks::types::{ - BitcoinChainhookSpecification, ChainhookConfig, ChainhookFullSpecification, - ChainhookSpecification, - }, - observer::EventObserverConfig, - types::BitcoinBlockData, - utils::Context, -}; -use redis::{Commands, Connection}; -use serde_json::json; - -use crate::{ - config::{Config, PredicatesApiConfig}, - scan::bitcoin::process_block_with_predicates, -}; - -use super::http_api::load_predicates_from_redis; - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum PredicateStatus { - Scanning(ScanningData), - Streaming(StreamingData), - InitialScanCompleted, - Interrupted(String), - Disabled, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ScanningData { - pub number_of_blocks_to_scan: u64, - pub number_of_blocks_scanned: u64, - pub number_of_blocks_sent: u64, - pub current_block_height: u64, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct StreamingData { - pub last_occurence: u64, - pub last_evaluation: u64, -} - -pub fn update_predicate_status( - predicate_key: &str, - status: PredicateStatus, - predicates_db_conn: &mut Connection, - ctx: &Context, -) { - let serialized_status = json!(status).to_string(); - if let Err(e) = - predicates_db_conn.hset::<_, _, _, ()>(&predicate_key, "status", &serialized_status) - { - error!( - ctx.expect_logger(), - "Error updating status: {}", - e.to_string() - ); - } else { - info!( - ctx.expect_logger(), - "Updating predicate {predicate_key} status: {serialized_status}" - ); - } -} - -pub fn update_predicate_spec( - predicate_key: &str, - spec: &ChainhookSpecification, - predicates_db_conn: &mut Connection, - ctx: &Context, -) { - let serialized_spec = json!(spec).to_string(); - if let Err(e) = - predicates_db_conn.hset::<_, _, _, ()>(&predicate_key, "specification", &serialized_spec) - { - error!( - ctx.expect_logger(), - "Error updating status: {}", - e.to_string() - ); - } else { - info!( - ctx.expect_logger(), - "Updating predicate {predicate_key} with spec: {serialized_spec}" - ); - } -} - -pub fn retrieve_predicate_status( - predicate_key: &str, - predicates_db_conn: &mut Connection, -) -> Option { - match predicates_db_conn.hget::<_, _, String>(predicate_key.to_string(), "status") { - Ok(ref payload) => match serde_json::from_str(payload) { - Ok(data) => Some(data), - Err(_) => None, - }, - Err(_) => None, - } -} - -pub fn open_readwrite_predicates_db_conn( - config: &PredicatesApiConfig, -) -> Result { - let redis_uri = &config.database_uri; - let client = redis::Client::open(redis_uri.clone()).unwrap(); - client - .get_connection() - .map_err(|e| format!("unable to connect to db: {}", e.to_string())) -} - -pub fn open_readwrite_predicates_db_conn_or_panic( - config: &PredicatesApiConfig, - ctx: &Context, -) -> Connection { - let redis_con = match open_readwrite_predicates_db_conn(config) { - Ok(con) => con, - Err(message) => { - error!(ctx.expect_logger(), "Redis: {}", message.to_string()); - panic!(); - } - }; - redis_con -} - -// Cases to cover: -// - Empty state -// - State present, but not up to date -// - Blocks presents, no inscriptions -// - Blocks presents, inscription presents -// - State up to date - -pub fn start_predicate_processor( - event_observer_config: &EventObserverConfig, - ctx: &Context, -) -> Sender { - let (tx, rx) = channel(); - - let mut moved_event_observer_config = event_observer_config.clone(); - let moved_ctx = ctx.clone(); - - let _ = hiro_system_kit::thread_named("Initial predicate processing") - .spawn(move || { - if let Some(mut chainhook_config) = moved_event_observer_config.chainhook_config.take() - { - let mut bitcoin_predicates_ref: Vec<&BitcoinChainhookSpecification> = vec![]; - for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut() { - bitcoin_predicate.enabled = false; - bitcoin_predicates_ref.push(bitcoin_predicate); - } - while let Ok(block) = rx.recv() { - let future = process_block_with_predicates( - block, - &bitcoin_predicates_ref, - &moved_event_observer_config, - &moved_ctx, - ); - let res = hiro_system_kit::nestable_block_on(future); - if let Err(_) = res { - error!(moved_ctx.expect_logger(), "Initial ingestion failing"); - } - } - } - }) - .expect("unable to spawn thread"); - tx -} - -pub fn create_and_consolidate_chainhook_config_with_predicates( - predicates: Vec, - enable_internal_trigger: bool, - config: &Config, - ctx: &Context, -) -> ChainhookConfig { - let mut chainhook_config: ChainhookConfig = ChainhookConfig::new(); - - // If no predicates passed at launch, retrieve predicates from Redis - if predicates.is_empty() && config.is_http_api_enabled() { - let registered_predicates = match load_predicates_from_redis(&config, &ctx) { - Ok(predicates) => predicates, - Err(e) => { - error!( - ctx.expect_logger(), - "Failed loading predicate from storage: {}", - e.to_string() - ); - vec![] - } - }; - for (predicate, _status) in registered_predicates.into_iter() { - let predicate_uuid = predicate.uuid().to_string(); - match chainhook_config.register_specification(predicate) { - Ok(_) => { - info!( - ctx.expect_logger(), - "Predicate {} retrieved from storage and loaded", predicate_uuid, - ); - } - Err(e) => { - error!( - ctx.expect_logger(), - "Failed loading predicate from storage: {}", - e.to_string() - ); - } - } - } - } - - // For each predicate found, register in memory. - for predicate in predicates.into_iter() { - match chainhook_config.register_full_specification( - ( - &config.network.bitcoin_network, - &config.network.stacks_network, - ), - predicate, - ) { - Ok(ref mut spec) => { - chainhook_config.enable_specification(spec); - info!( - ctx.expect_logger(), - "Predicate {} retrieved from config and loaded", - spec.uuid(), - ); - } - Err(e) => { - error!( - ctx.expect_logger(), - "Failed loading predicate from config: {}", - e.to_string() - ); - } - } - } - - if enable_internal_trigger { - let _ = chainhook_config.register_specification(ChainhookSpecification::Bitcoin( - BitcoinChainhookSpecification { - uuid: format!("ordhook"), - owner_uuid: None, - name: format!("ordhook"), - network: chainhook_sdk::types::BitcoinNetwork::Mainnet, - version: 1, - blocks: None, - start_block: None, - end_block: None, - expired_at: None, - expire_after_occurrence: None, - predicate: chainhook_sdk::chainhooks::types::BitcoinPredicateType::OrdinalsProtocol( - chainhook_sdk::chainhooks::types::OrdinalOperations::InscriptionFeed, - ), - action: chainhook_sdk::chainhooks::types::HookAction::Noop, - include_proof: false, - include_inputs: true, - include_outputs: false, - include_witness: false, - enabled: true, - }, - )); - } - chainhook_config -} diff --git a/components/ordhook-core/src/service/runloops.rs b/components/ordhook-core/src/service/runloops.rs index df7df880..2bea99a7 100644 --- a/components/ordhook-core/src/service/runloops.rs +++ b/components/ordhook-core/src/service/runloops.rs @@ -8,11 +8,10 @@ use chainhook_sdk::{ use threadpool::ThreadPool; use crate::{ - config::{Config, PredicatesApi}, + config::Config, scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate, service::{ - predicates::open_readwrite_predicates_db_conn_or_panic, update_predicate_status, - PredicateStatus, + observers::open_readwrite_observers_db_conn_or_panic, update_observer_streaming_enabled, }, }; @@ -28,6 +27,7 @@ pub fn start_bitcoin_scan_runloop( let moved_ctx = ctx.clone(); let moved_config = config.clone(); let observer_command_tx = observer_command_tx.clone(); + let db_base_dir = config.expected_cache_path(); bitcoin_scan_pool.execute(move || { let op = scan_bitcoin_chainstate_via_rpc_using_predicate( &predicate_spec, @@ -46,20 +46,15 @@ pub fn start_bitcoin_scan_runloop( ) }); - // Update predicate status in redis - if let PredicatesApi::On(ref api_config) = moved_config.http_api { - let status = PredicateStatus::Interrupted(format!( - "Unable to evaluate predicate on Bitcoin chainstate: {e}" - )); - let mut predicates_db_conn = - open_readwrite_predicates_db_conn_or_panic(api_config, &moved_ctx); - update_predicate_status( - &predicate_spec.key(), - status, - &mut predicates_db_conn, - &moved_ctx, - ); - } + // Update predicate + let mut observers_db_conn = + open_readwrite_observers_db_conn_or_panic(&db_base_dir, &moved_ctx); + update_observer_streaming_enabled( + &predicate_spec.uuid, + false, + &mut observers_db_conn, + &moved_ctx, + ); return; } }; diff --git a/components/ordhook-sdk-js/src/ordinals_indexer.rs b/components/ordhook-sdk-js/src/ordinals_indexer.rs index a0a44118..f6440fde 100644 --- a/components/ordhook-sdk-js/src/ordinals_indexer.rs +++ b/components/ordhook-sdk-js/src/ordinals_indexer.rs @@ -159,7 +159,7 @@ impl OrdinalsIndexingRunloop { match cmd { IndexerCommand::StreamBlocks => { // We start the service as soon as the start() method is being called. - let future = service.catch_up_with_chain_tip(false, &observer_config); + let future = service.catch_up_with_chain_tip(false); let _ = hiro_system_kit::nestable_block_on(future).expect("unable to start indexer"); let future = service.start_event_observer(observer_sidecar); let (command_tx, event_rx) = diff --git a/docs/how-to-guides/how-to-run-ordhook-as-a-service-using-bitcoind.md b/docs/how-to-guides/how-to-run-ordhook-as-a-service-using-bitcoind.md index 65cd1830..e72d77c8 100644 --- a/docs/how-to-guides/how-to-run-ordhook-as-a-service-using-bitcoind.md +++ b/docs/how-to-guides/how-to-run-ordhook-as-a-service-using-bitcoind.md @@ -87,7 +87,6 @@ working_dir = "ordhook" # # [http_api] # http_port = 20456 -# database_uri = "redis://localhost:6379/" [network] mode = "mainnet" @@ -140,12 +139,11 @@ When the Ordhook service starts, it is initiated in the background to augment th ### Add `http-post` endpoints dynamically -To enable dynamically posting endpoints to the server, you can spin up the Redis server by enabling the following lines of code in the `Ordhook.toml` file: +Dynamic posting endpoints can be enabled by adding the following settings in the `Ordhook.toml` file: ```toml [http_api] http_port = 20456 -database_uri = "redis://localhost:6379/" ``` ## Run ordhook service diff --git a/docs/how-to-guides/how-to-stream-ordinal-activities.md b/docs/how-to-guides/how-to-stream-ordinal-activities.md index 6ba98392..fd8dabab 100644 --- a/docs/how-to-guides/how-to-stream-ordinal-activities.md +++ b/docs/how-to-guides/how-to-stream-ordinal-activities.md @@ -29,7 +29,6 @@ working_dir = "ordhook" # # [http_api] # http_port = 20456 -# database_uri = "redis://localhost:6379/" [network] mode = "mainnet"