Skip to content

Commit

Permalink
Add optional structured logs of RPC related events
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusz-reichert authored and shesek committed Feb 27, 2024
1 parent 01bd5f9 commit 49a7180
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 6 deletions.
36 changes: 36 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct Config {
pub utxos_limit: usize,
pub electrum_txs_limit: usize,
pub electrum_banner: String,
pub electrum_rpc_logging: Option<RpcLogging>,

#[cfg(feature = "liquid")]
pub parent_network: BNetwork,
Expand All @@ -65,6 +66,10 @@ fn str_to_socketaddr(address: &str, what: &str) -> SocketAddr {
impl Config {
pub fn from_args() -> Config {
let network_help = format!("Select network type ({})", Network::names().join(", "));
let rpc_logging_help = format!(
"Select RPC logging option ({})",
RpcLogging::options().join(", ")
);

let args = App::new("Electrum Rust Server")
.version(crate_version!())
Expand Down Expand Up @@ -181,6 +186,11 @@ impl Config {
.long("electrum-banner")
.help("Welcome banner for the Electrum server, shown in the console to clients.")
.takes_value(true)
).arg(
Arg::with_name("electrum_rpc_logging")
.long("electrum-rpc-logging")
.help(&rpc_logging_help)
.takes_value(true),
);

#[cfg(unix)]
Expand Down Expand Up @@ -381,6 +391,9 @@ impl Config {
electrum_rpc_addr,
electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize),
electrum_banner,
electrum_rpc_logging: m
.value_of("electrum_rpc_logging")
.map(|option| RpcLogging::from(option)),
http_addr,
http_socket_file,
monitoring_addr,
Expand Down Expand Up @@ -420,6 +433,29 @@ impl Config {
}
}

#[derive(Debug, Clone)]
pub enum RpcLogging {
Full,
NoParams,
}

impl RpcLogging {
pub fn options() -> Vec<String> {
return vec!["full".to_string(), "no-params".to_string()];
}
}

impl From<&str> for RpcLogging {
fn from(option: &str) -> Self {
match option {
"full" => RpcLogging::Full,
"no-params" => RpcLogging::NoParams,

_ => panic!("unsupported RPC logging option: {:?}", option),
}
}
}

pub fn get_network_subdir(network: Network) -> Option<&'static str> {
match network {
#[cfg(not(feature = "liquid"))]
Expand Down
72 changes: 66 additions & 6 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;

use bitcoin::hashes::sha256d::Hash as Sha256dHash;
use crypto::digest::Digest;
Expand All @@ -18,7 +19,7 @@ use bitcoin::consensus::encode::serialize_hex;
use elements::encode::serialize_hex;

use crate::chain::Txid;
use crate::config::Config;
use crate::config::{Config, RpcLogging};
use crate::electrum::{get_electrum_height, ProtocolVersion};
use crate::errors::*;
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
Expand Down Expand Up @@ -90,6 +91,14 @@ fn get_status_hash(txs: Vec<(Txid, Option<BlockId>)>, query: &Query) -> Option<F
}
}

macro_rules! conditionally_log_rpc_event {
($self:ident, $event:expr) => {
if $self.rpc_logging.is_some() {
$self.log_rpc_event($event);
}
};
}

struct Connection {
query: Arc<Query>,
last_header_entry: Option<HeaderEntry>,
Expand All @@ -101,6 +110,7 @@ struct Connection {
txs_limit: usize,
#[cfg(feature = "electrum-discovery")]
discovery: Option<Arc<DiscoveryManager>>,
rpc_logging: Option<RpcLogging>,
}

impl Connection {
Expand All @@ -111,6 +121,7 @@ impl Connection {
stats: Arc<Stats>,
txs_limit: usize,
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
rpc_logging: Option<RpcLogging>,
) -> Connection {
Connection {
query,
Expand All @@ -123,6 +134,7 @@ impl Connection {
txs_limit,
#[cfg(feature = "electrum-discovery")]
discovery,
rpc_logging,
}
}

Expand Down Expand Up @@ -490,6 +502,17 @@ impl Connection {
Ok(result)
}

fn log_rpc_event(&self, mut log: Value) {
log.as_object_mut().unwrap().insert(
"source".into(),
json!({
"ip": self.addr.ip().to_string(),
"port": self.addr.port(),
}),
);
println!("ELECTRUM-RPC-LOGGER: {}", log);
}

fn send_values(&mut self, values: &[Value]) -> Result<()> {
for value in values {
let line = value.to_string() + "\n";
Expand All @@ -508,7 +531,7 @@ impl Connection {
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
let reply = match (
match (
cmd.get("method"),
cmd.get("params").unwrap_or_else(|| &empty_params),
cmd.get("id"),
Expand All @@ -517,10 +540,41 @@ impl Connection {
Some(&Value::String(ref method)),
&Value::Array(ref params),
Some(ref id),
) => self.handle_command(method, params, id)?,
_ => bail!("invalid command: {}", cmd),
};
self.send_values(&[reply])?
) => {
conditionally_log_rpc_event!(
self,
json!({
"event": "rpc request",
"id": id,
"method": method,
"params": if let Some(RpcLogging::Full) = self.rpc_logging {
json!(params)
} else {
Value::Null
}
})
);

let start_time = Instant::now();
let reply = self.handle_command(method, params, id)?;

conditionally_log_rpc_event!(
self,
json!({
"event": "rpc response",
"method": method,
"payload_size": reply.to_string().as_bytes().len(),
"duration_µs": start_time.elapsed().as_micros(),
"id": id,
})
);

self.send_values(&[reply])?
}
_ => {
bail!("invalid command: {}", cmd)
}
}
}
Message::PeriodicUpdate => {
let values = self
Expand Down Expand Up @@ -563,6 +617,8 @@ impl Connection {

pub fn run(mut self) {
self.stats.clients.inc();
conditionally_log_rpc_event!(self, json!({ "event": "connection established" }));

let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();
let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
Expand All @@ -579,6 +635,8 @@ impl Connection {
.sub(self.status_hashes.len() as i64);

debug!("[{}] shutting down connection", self.addr);
conditionally_log_rpc_event!(self, json!({ "event": "connection closed" }));

let _ = self.stream.shutdown(Shutdown::Both);
if let Err(err) = child.join().expect("receiver panicked") {
error!("[{}] receiver failed: {}", self.addr, err);
Expand Down Expand Up @@ -741,6 +799,7 @@ impl RPC {
let garbage_sender = garbage_sender.clone();
#[cfg(feature = "electrum-discovery")]
let discovery = discovery.clone();
let rpc_logging = config.electrum_rpc_logging.clone();

let spawned = spawn_thread("peer", move || {
info!("[{}] connected peer", addr);
Expand All @@ -752,6 +811,7 @@ impl RPC {
txs_limit,
#[cfg(feature = "electrum-discovery")]
discovery,
rpc_logging,
);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
Expand Down
1 change: 1 addition & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl TestRunner {
utxos_limit: 100,
electrum_txs_limit: 100,
electrum_banner: "".into(),
electrum_rpc_logging: None,

#[cfg(feature = "liquid")]
asset_db_path: None, // XXX
Expand Down

0 comments on commit 49a7180

Please sign in to comment.