Skip to content

Commit

Permalink
Add a security watchdog service to periodically run queries and repor…
Browse files Browse the repository at this point in the history
…t data (#16926)

## Description 

This PR adds a crate whic downloads a github repo to read a config file
with queries (repo will have limited access and enables peer review
before any modification), parses the config file for queries and their
schedules, and reports data back to prometheus by running those queries.

## Test Plan 

Not tested yet.
  • Loading branch information
sadhansood authored Apr 9, 2024
1 parent d1cbdaf commit dd84ebd
Show file tree
Hide file tree
Showing 11 changed files with 1,048 additions and 51 deletions.
235 changes: 185 additions & 50 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ members = [
"crates/sui-rosetta",
"crates/sui-rpc-loadgen",
"crates/sui-sdk",
"crates/sui-security-watchdog",
"crates/sui-simulator",
"crates/sui-single-node-benchmark",
"crates/sui-snapshot",
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-analytics-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ sui-package-resolver.workspace = true
simulacrum.workspace = true
arrow = { version = "50.0.0"}
gcp-bigquery-client = "0.18.0"
snowflake-api = { version = "0.6.0" }
snowflake-api = { version = "0.7.0" }
tap = { version = "1.0.1", features = [] }

[dev-dependencies]
Expand Down
26 changes: 26 additions & 0 deletions crates/sui-security-watchdog/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "sui-security-watchdog"
version.workspace = true
authors = ["Mysten Labs <[email protected]>"]
license = "Apache-2.0"
publish = false
edition = "2021"

[dependencies]
arrow-array.workspace = true
mysten-metrics.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
tokio-cron-scheduler = { version = "0.10.0", features = ["signal"] }
anyhow.workspace = true
chrono.workspace = true
snowflake-api = { version = "0.7.0"}
clap.workspace = true
prometheus.workspace = true
telemetry-subscribers.workspace = true
async-trait.workspace = true
uuid.workspace = true
lexical-util = "0.8.5"
reqwest = { workspace = true, features = ["json"] }
49 changes: 49 additions & 0 deletions crates/sui-security-watchdog/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Security Watchdog Service

## Overview
The Analytics Watchdog Service is designed to monitor and analyze data changes over time. It achieves this by periodically downloading a specified GitHub repository, parsing configuration files for SQL queries, and executing these queries on a set schedule. Results are then used to update Prometheus metrics, providing real-time insights into data trends.

## Running the Service
Execute the compiled binary to start the service:
```shell
cargo run --release -p sui-security-watchdog
```
## Usage
The service will automatically start downloading the configured GitHub repository, parsing the configuration file, and scheduling SQL queries as specified. Metrics will be updated in Prometheus according to the results of these queries.
The config file allows setting up time based schedule for expected results. For example, when periodically checking total sui in the network we want it to be an exact value i.e 10B whereas when periodically checking balance of an account
which has time based token unlocks, we want it to compare against a lower bound (balance of the account should never drop below a certain number before a given date), etc.

```json lines
[
{
"name": "sui_10B",
"cron_schedule": "0 0 * * *", // Every day at midnight (UTC)
"sql_query": "SELECT total_sui FROM total_sui_mainnet ORDER BY epoch DESC LIMIT 1",
"metric_name": "total_sui_10B",
"timed_exact_limits": {
// total sui should always be exact 10B since
// the dawn of time
"1970-01-01T00:00:00Z": 10000000000.0
},
"timed_lower_limits": {},
"timed_exact_limits": {}
},
{
"name": "user_x_balance",
"cron_schedule": "0 15 * * *", // Every day at 3:15 PM (UTC)
"sql_query": "SELECT balance FROM user_balances WHERE user_id = 'x' LIMIT 1",
"metric_name": "user_x_balance",
"timed_exact_limits": {},
"timed_upper_limits": {},
"timed_lower_limits": {
// user balance should not drop below these numbers on those dates
// i.e. balance should not drop below 50 SUI before 1/1/2024,
// balance should not drop below 100SUI before 2/1/2024
// and it should not drop below 150SUI before 3/1/2024
"2024-01-01T15:00:00Z": 50.0,
"2024-02-01T15:00:00Z": 100.0,
"2024-03-01T15:00:00Z": 150.0,
}
}
]
```
44 changes: 44 additions & 0 deletions crates/sui-security-watchdog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use clap::Parser;
use std::path::PathBuf;

mod metrics;
mod pagerduty;
mod query_runner;
pub mod scheduler;

#[derive(Parser, Clone, Debug)]
#[clap(
name = "Sui Security Watchdog",
about = "Watchdog service to monitor chain data.",
rename_all = "kebab-case"
)]
pub struct SecurityWatchdogConfig {
#[clap(long)]
pub pd_api_key: String,
#[clap(long)]
pub pd_wallet_monitoring_service_id: String,
#[clap(long)]
pub config: PathBuf,
#[clap(long, default_value = None, global = true)]
pub sf_account_identifier: Option<String>,
#[clap(long, default_value = None, global = true)]
pub sf_warehouse: Option<String>,
#[clap(long, default_value = None, global = true)]
pub sf_database: Option<String>,
#[clap(long, default_value = None, global = true)]
pub sf_schema: Option<String>,
#[clap(long, default_value = None, global = true)]
pub sf_username: Option<String>,
#[clap(long, default_value = None, global = true)]
pub sf_role: Option<String>,
#[clap(long, default_value = None, global = true)]
pub sf_password: Option<String>,
/// The url of the metrics client to connect to.
#[clap(long, default_value = "127.0.0.1", global = true)]
pub client_metric_host: String,
/// The port of the metrics client to connect to.
#[clap(long, default_value = "8081", global = true)]
pub client_metric_port: u16,
}
33 changes: 33 additions & 0 deletions crates/sui-security-watchdog/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use prometheus::Registry;

use anyhow::Result;
use clap::*;
use sui_security_watchdog::scheduler::SchedulerService;
use sui_security_watchdog::SecurityWatchdogConfig;

#[tokio::main]
async fn main() -> Result<()> {
let _guard = telemetry_subscribers::TelemetryConfig::new()
.with_env()
.init();

let config = SecurityWatchdogConfig::parse();
let registry_service = mysten_metrics::start_prometheus_server(
format!(
"{}:{}",
config.client_metric_host, config.client_metric_port
)
.parse()
.unwrap(),
);
let registry: Registry = registry_service.default_registry();
mysten_metrics::init_metrics(&registry);
let service = SchedulerService::new(&config, &registry).await?;
service.schedule().await?;
service.start().await?;
tokio::signal::ctrl_c().await?;
Ok(())
}
83 changes: 83 additions & 0 deletions crates/sui-security-watchdog/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Context;
use prometheus::{register_int_gauge_with_registry, IntGauge, Registry};
use std::collections::HashMap;
use tokio::sync::Mutex;

/// Defines a structure to hold and manage metrics for a watchdog service.
/// This structure is thread-safe, allowing concurrent access and modification of metrics.
#[derive(Debug)]
pub struct WatchdogMetrics {
// The Prometheus registry to which metrics are registered.
registry: Registry,
// A HashMap to store IntGauge metrics, keyed by their names.
// Wrapped in a Mutex to ensure thread-safe access.
metrics: Mutex<HashMap<String, IntGauge>>,
}

impl WatchdogMetrics {
/// Constructs a new WatchdogMetrics instance with the given Prometheus registry
pub fn new(registry: &Registry) -> Self {
Self {
registry: registry.clone(),
metrics: Mutex::new(HashMap::new()),
}
}

/// Retrieves or creates an metric for the specified metric name.
/// The metric name is suffixed with "_exact" to denote its type.
pub async fn get(&self, metric_name: &str) -> anyhow::Result<IntGauge> {
let mut metrics = self.metrics.lock().await;
// If the metric doesn't exist, register it and insert into the map.
metrics.entry(metric_name.to_string()).or_insert(
register_int_gauge_with_registry!(metric_name, metric_name, &self.registry).unwrap(),
);
metrics
.get(metric_name)
.context("Failed to get expected metric")
.cloned()
}

/// Retrieves or creates an "exact" metric for the specified metric name.
/// The metric name is suffixed with "_exact" to denote its type.
pub async fn get_exact(&self, metric_name: &str) -> anyhow::Result<IntGauge> {
let mut metrics = self.metrics.lock().await;
let metric = format!("{}_exact", metric_name);
// If the metric doesn't exist, register it and insert into the map.
metrics.entry(metric.clone()).or_insert(
register_int_gauge_with_registry!(&metric, &metric, &self.registry).unwrap(),
);
metrics
.get(&metric)
.context("Failed to get expected metric")
.cloned()
}

/// Similar to get_exact, but for "lower" bound metrics.
pub async fn get_lower(&self, metric_name: &str) -> anyhow::Result<IntGauge> {
let mut metrics = self.metrics.lock().await;
let metric = format!("{}_lower", metric_name);
metrics.entry(metric.clone()).or_insert(
register_int_gauge_with_registry!(&metric, &metric, &self.registry).unwrap(),
);
metrics
.get(&metric)
.context("Failed to get expected metric")
.cloned()
}

/// Similar to get_exact, but for "upper" bound metrics.
pub async fn get_upper(&self, metric_name: &str) -> anyhow::Result<IntGauge> {
let mut metrics = self.metrics.lock().await;
let metric = format!("{}_upper", metric_name);
metrics.entry(metric.clone()).or_insert(
register_int_gauge_with_registry!(&metric, &metric, &self.registry).unwrap(),
);
metrics
.get(&metric)
.context("Failed to get expected metric")
.cloned()
}
}
124 changes: 124 additions & 0 deletions crates/sui-security-watchdog/src/pagerduty.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::info;

#[derive(Serialize, Deserialize, Debug)]
pub struct Service {
pub id: String,
#[serde(rename = "type")]
pub r#type: String,
}

impl Default for Service {
fn default() -> Self {
Service {
id: "".to_string(),
r#type: "service_reference".to_string(),
}
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Body {
#[serde(rename = "type")]
pub r#type: String,
pub details: String,
}

impl Default for Body {
fn default() -> Self {
Body {
r#type: "incident_body".to_string(),
details: "".to_string(),
}
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Incident {
pub incident_key: String,
#[serde(rename = "type")]
pub r#type: String,
pub title: String,
pub service: Service,
pub body: Body,
}

impl Default for Incident {
fn default() -> Self {
Incident {
incident_key: "".to_string(),
r#type: "incident".to_string(),
title: "".to_string(),
service: Service::default(),
body: Body::default(),
}
}
}

#[derive(Serialize, Deserialize)]
pub struct CreateIncident {
pub incident: Incident,
}

#[derive(Clone)]
pub struct Pagerduty {
pub client: Arc<reqwest::Client>,
pub api_key: String,
}

impl Pagerduty {
pub fn new(api_key: String) -> Self {
Pagerduty {
client: Arc::new(reqwest::Client::new()),
api_key,
}
}

pub async fn create_incident(
&self,
from: &str,
incident: CreateIncident,
) -> anyhow::Result<()> {
let token = format!("Token token={}", self.api_key);

let response = self
.client
.post("https://api.pagerduty.com/incidents")
.header("Authorization", token)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.header("From", from)
.json(&incident)
.send()
.await?;
// Check if the status code is in the range of 200-299
if response.status().is_success() {
info!(
"Created incident with key: {:?}",
incident.incident.incident_key
);
Ok(())
} else {
let status = response.status();
let text = response.text().await?;
if status.is_client_error()
&& text.contains(
"Open incident with matching dedup key already exists on this service",
)
{
info!(
"Incident already exists with key: {}",
incident.incident.incident_key
);
Ok(())
} else {
Err(anyhow!("Failed to create incident: {}", text))
}
}
}
}
Loading

0 comments on commit dd84ebd

Please sign in to comment.