From dd84ebd015b15cffbae1537833bbdb55b2840e8f Mon Sep 17 00:00:00 2001 From: Sadhan Sood <106645797+sadhansood@users.noreply.github.com> Date: Tue, 9 Apr 2024 01:00:52 -0700 Subject: [PATCH] Add a security watchdog service to periodically run queries and report data (#16926) ## Description This PR adds a crate whic downloads a github repo to read a config file with queries (repo will have limited access and enables peer review before any modification), parses the config file for queries and their schedules, and reports data back to prometheus by running those queries. ## Test Plan Not tested yet. --- Cargo.lock | 235 ++++++++++--- Cargo.toml | 1 + crates/sui-analytics-indexer/Cargo.toml | 2 +- crates/sui-security-watchdog/Cargo.toml | 26 ++ crates/sui-security-watchdog/README.md | 49 +++ crates/sui-security-watchdog/src/lib.rs | 44 +++ crates/sui-security-watchdog/src/main.rs | 33 ++ crates/sui-security-watchdog/src/metrics.rs | 83 +++++ crates/sui-security-watchdog/src/pagerduty.rs | 124 +++++++ .../sui-security-watchdog/src/query_runner.rs | 192 +++++++++++ crates/sui-security-watchdog/src/scheduler.rs | 310 ++++++++++++++++++ 11 files changed, 1048 insertions(+), 51 deletions(-) create mode 100644 crates/sui-security-watchdog/Cargo.toml create mode 100644 crates/sui-security-watchdog/README.md create mode 100644 crates/sui-security-watchdog/src/lib.rs create mode 100644 crates/sui-security-watchdog/src/main.rs create mode 100644 crates/sui-security-watchdog/src/metrics.rs create mode 100644 crates/sui-security-watchdog/src/pagerduty.rs create mode 100644 crates/sui-security-watchdog/src/query_runner.rs create mode 100644 crates/sui-security-watchdog/src/scheduler.rs diff --git a/Cargo.lock b/Cargo.lock index 8be5cf1d53eba..d3aa07e2e7eb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -267,9 +267,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.5.0" +version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" dependencies = [ "anstyle", "anstyle-parse", @@ -305,12 +305,12 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "2.1.0" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" dependencies = [ "anstyle", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -1606,7 +1606,7 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2 1.0.78", "quote 1.0.35", "syn 2.0.48", @@ -2385,35 +2385,34 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.1" +version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c8d502cbaec4595d2e7d5f61e318f05417bd2b66fdc3809498f0d3fdf0bea27" +checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" dependencies = [ "clap_builder", "clap_derive", - "once_cell", ] [[package]] name = "clap_builder" -version = "4.4.1" +version = "4.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5891c7bc0edb3e1c2204fc5e94009affabeb1821c9e5fdc3959536c5c0bb984d" +checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", - "terminal_size 0.2.6", + "strsim 0.11.0", + "terminal_size", ] [[package]] name = "clap_derive" -version = "4.4.0" +version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9fd1a5729c4548118d7d70ff234a44868d00489a4b6597b0b020918a0e91a1a" +checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2 1.0.78", "quote 1.0.35", "syn 2.0.48", @@ -2421,9 +2420,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.5.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" +checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" [[package]] name = "clipboard-win" @@ -2874,6 +2873,17 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "cron" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -3171,7 +3181,7 @@ dependencies = [ "ident_case", "proc-macro2 1.0.78", "quote 1.0.35", - "strsim", + "strsim 0.10.0", "syn 1.0.107", ] @@ -3185,7 +3195,7 @@ dependencies = [ "ident_case", "proc-macro2 1.0.78", "quote 1.0.35", - "strsim", + "strsim 0.10.0", "syn 1.0.107", ] @@ -3199,7 +3209,7 @@ dependencies = [ "ident_case", "proc-macro2 1.0.78", "quote 1.0.35", - "strsim", + "strsim 0.10.0", "syn 2.0.48", ] @@ -3482,7 +3492,7 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b10c03b954333d05bfd5be1d8a74eae2c9ca77b86e0f1c3a1ea29c49da1d6c2" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2 1.0.78", "quote 1.0.35", "syn 1.0.107", @@ -4394,7 +4404,7 @@ dependencies = [ "rand 0.8.5", "readonly", "rfc6979 0.4.0", - "rsa", + "rsa 0.8.2", "schemars", "secp256k1", "serde", @@ -5210,6 +5220,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.2.6" @@ -5933,7 +5949,7 @@ name = "jsonrpsee-proc-macros" version = "0.16.2" source = "git+https://github.com/wlmyng/jsonrpsee.git?rev=b1b300784795f6a64d0fcdf8f03081a9bc38bde8#b1b300784795f6a64d0fcdf8f03081a9bc38bde8" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro-crate", "proc-macro2 1.0.78", "quote 1.0.35", @@ -5985,6 +6001,20 @@ dependencies = [ "jsonrpsee-types", ] +[[package]] +name = "jsonwebtoken" +version = "8.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" +dependencies = [ + "base64 0.21.2", + "pem", + "ring 0.16.20", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "k256" version = "0.11.6" @@ -6231,9 +6261,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" dependencies = [ "serde", ] @@ -6395,7 +6425,7 @@ dependencies = [ "supports-color", "supports-hyperlinks", "supports-unicode", - "terminal_size 0.3.0", + "terminal_size", "textwrap", "thiserror", "unicode-width", @@ -8132,6 +8162,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-derive" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" +dependencies = [ + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 1.0.107", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -8552,7 +8593,7 @@ version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec4c6225c69b4ca778c0aea097321a64c421cf4577b331c61b229267edabb6f8" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro-error", "proc-macro2 1.0.78", "quote 1.0.35", @@ -9027,6 +9068,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "pkcs1" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" +dependencies = [ + "der 0.7.5", + "pkcs8 0.10.2", + "spki 0.7.1", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -9433,7 +9485,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", - "heck", + "heck 0.4.1", "itertools 0.11.0", "log", "multimap", @@ -10149,7 +10201,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "pkcs1", + "pkcs1 0.4.1", "pkcs8 0.9.0", "rand_core 0.6.4", "sha2 0.10.6", @@ -10158,6 +10210,27 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rsa" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72f1471dbb4be5de45050e8ef7040625298ccb9efe941419ac2697088715925f" +dependencies = [ + "byteorder", + "const-oid", + "digest 0.10.7", + "num-bigint-dig", + "num-integer", + "num-iter", + "num-traits", + "pkcs1 0.7.5", + "pkcs8 0.10.2", + "rand_core 0.6.4", + "signature 2.0.0", + "subtle", + "zeroize", +] + [[package]] name = "rstest" version = "0.16.0" @@ -11219,6 +11292,18 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32fea41aca09ee824cc9724996433064c89f7777e60762749a4170a14abbfa21" +[[package]] +name = "simple_asn1" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085" +dependencies = [ + "num-bigint 0.4.4", + "num-traits", + "thiserror", + "time", +] + [[package]] name = "simplelog" version = "0.9.0" @@ -11311,9 +11396,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.10.0" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "smawk" @@ -11337,7 +11422,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "475b3bbe5245c26f2d8a6f62d67c1f30eb9fffeccee721c45d162c3ebbdf81b2" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2 1.0.78", "quote 1.0.35", "syn 1.0.107", @@ -11351,9 +11436,9 @@ checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" [[package]] name = "snowflake-api" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66ca01bf134469135170a7271c0a31bbfb7da903104857e3dfa671093300683a" +checksum = "f6e20db2ea77690628e34db7a6f63f539557195afbbc3cd349a8cbe293e1fffc" dependencies = [ "arrow", "async-trait", @@ -11368,11 +11453,27 @@ dependencies = [ "reqwest-retry", "serde", "serde_json", + "snowflake-jwt", "thiserror", "url", "uuid 1.2.2", ] +[[package]] +name = "snowflake-jwt" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13a6dfdd7c433e0f4bb96d777c88d900c5abe3dc4d2f26d2340fd6c7caadcc6c" +dependencies = [ + "base64 0.21.2", + "jsonwebtoken", + "rsa 0.9.1", + "serde", + "sha2 0.10.6", + "thiserror", + "time", +] + [[package]] name = "socket2" version = "0.4.9" @@ -11498,6 +11599,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" + [[package]] name = "strum" version = "0.24.1" @@ -11522,7 +11629,7 @@ version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2 1.0.78", "quote 1.0.35", "rustversion", @@ -11535,7 +11642,7 @@ version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2 1.0.78", "quote 1.0.35", "rustversion", @@ -13454,6 +13561,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "sui-security-watchdog" +version = "1.23.0" +dependencies = [ + "anyhow", + "arrow-array", + "async-trait", + "chrono", + "clap", + "lexical-util", + "mysten-metrics", + "prometheus", + "reqwest", + "serde", + "serde_json", + "snowflake-api", + "telemetry-subscribers", + "tokio", + "tokio-cron-scheduler", + "tracing", + "uuid 1.2.2", +] + [[package]] name = "sui-simulator" version = "0.7.0" @@ -14227,9 +14357,9 @@ dependencies = [ [[package]] name = "sync_wrapper" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "synstructure" @@ -14275,7 +14405,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99f688a08b54f4f02f0a3c382aefdb7884d3d69609f785bd253dc033243e3fe4" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro-error", "proc-macro2 1.0.78", "quote 1.0.35", @@ -14402,16 +14532,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "terminal_size" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e6bf6f19e9f8ed8d4048dc22981458ebcf406d67e94cd422e5ecd73d63b3237" -dependencies = [ - "rustix 0.37.7", - "windows-sys 0.48.0", -] - [[package]] name = "terminal_size" version = "0.3.0" @@ -14680,6 +14800,21 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-cron-scheduler" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1215c91d6f74868e18a65bda99853e012cfb2a0e42f3438382e318277da76a0" +dependencies = [ + "chrono", + "cron", + "num-derive", + "num-traits", + "tokio", + "tracing", + "uuid 1.2.2", +] + [[package]] name = "tokio-io-timeout" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index d86039e3bd309..8f8feae30e74e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,6 +138,7 @@ members = [ "crates/sui-rosetta", "crates/sui-rpc-loadgen", "crates/sui-sdk", + "crates/sui-security-watchdog", "crates/sui-simulator", "crates/sui-single-node-benchmark", "crates/sui-snapshot", diff --git a/crates/sui-analytics-indexer/Cargo.toml b/crates/sui-analytics-indexer/Cargo.toml index bb3e4e813ef90..0db3b59797ce9 100644 --- a/crates/sui-analytics-indexer/Cargo.toml +++ b/crates/sui-analytics-indexer/Cargo.toml @@ -53,7 +53,7 @@ sui-package-resolver.workspace = true simulacrum.workspace = true arrow = { version = "50.0.0"} gcp-bigquery-client = "0.18.0" -snowflake-api = { version = "0.6.0" } +snowflake-api = { version = "0.7.0" } tap = { version = "1.0.1", features = [] } [dev-dependencies] diff --git a/crates/sui-security-watchdog/Cargo.toml b/crates/sui-security-watchdog/Cargo.toml new file mode 100644 index 0000000000000..051e6c48d2663 --- /dev/null +++ b/crates/sui-security-watchdog/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "sui-security-watchdog" +version.workspace = true +authors = ["Mysten Labs "] +license = "Apache-2.0" +publish = false +edition = "2021" + +[dependencies] +arrow-array.workspace = true +mysten-metrics.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio.workspace = true +tracing.workspace = true +tokio-cron-scheduler = { version = "0.10.0", features = ["signal"] } +anyhow.workspace = true +chrono.workspace = true +snowflake-api = { version = "0.7.0"} +clap.workspace = true +prometheus.workspace = true +telemetry-subscribers.workspace = true +async-trait.workspace = true +uuid.workspace = true +lexical-util = "0.8.5" +reqwest = { workspace = true, features = ["json"] } \ No newline at end of file diff --git a/crates/sui-security-watchdog/README.md b/crates/sui-security-watchdog/README.md new file mode 100644 index 0000000000000..9ea621d8659f0 --- /dev/null +++ b/crates/sui-security-watchdog/README.md @@ -0,0 +1,49 @@ +# Security Watchdog Service + +## Overview +The Analytics Watchdog Service is designed to monitor and analyze data changes over time. It achieves this by periodically downloading a specified GitHub repository, parsing configuration files for SQL queries, and executing these queries on a set schedule. Results are then used to update Prometheus metrics, providing real-time insights into data trends. + +## Running the Service +Execute the compiled binary to start the service: +```shell +cargo run --release -p sui-security-watchdog +``` +## Usage +The service will automatically start downloading the configured GitHub repository, parsing the configuration file, and scheduling SQL queries as specified. Metrics will be updated in Prometheus according to the results of these queries. +The config file allows setting up time based schedule for expected results. For example, when periodically checking total sui in the network we want it to be an exact value i.e 10B whereas when periodically checking balance of an account +which has time based token unlocks, we want it to compare against a lower bound (balance of the account should never drop below a certain number before a given date), etc. + +```json lines +[ + { + "name": "sui_10B", + "cron_schedule": "0 0 * * *", // Every day at midnight (UTC) + "sql_query": "SELECT total_sui FROM total_sui_mainnet ORDER BY epoch DESC LIMIT 1", + "metric_name": "total_sui_10B", + "timed_exact_limits": { + // total sui should always be exact 10B since + // the dawn of time + "1970-01-01T00:00:00Z": 10000000000.0 + }, + "timed_lower_limits": {}, + "timed_exact_limits": {} + }, + { + "name": "user_x_balance", + "cron_schedule": "0 15 * * *", // Every day at 3:15 PM (UTC) + "sql_query": "SELECT balance FROM user_balances WHERE user_id = 'x' LIMIT 1", + "metric_name": "user_x_balance", + "timed_exact_limits": {}, + "timed_upper_limits": {}, + "timed_lower_limits": { + // user balance should not drop below these numbers on those dates + // i.e. balance should not drop below 50 SUI before 1/1/2024, + // balance should not drop below 100SUI before 2/1/2024 + // and it should not drop below 150SUI before 3/1/2024 + "2024-01-01T15:00:00Z": 50.0, + "2024-02-01T15:00:00Z": 100.0, + "2024-03-01T15:00:00Z": 150.0, + } + } +] +``` \ No newline at end of file diff --git a/crates/sui-security-watchdog/src/lib.rs b/crates/sui-security-watchdog/src/lib.rs new file mode 100644 index 0000000000000..5061b1ce9ecdc --- /dev/null +++ b/crates/sui-security-watchdog/src/lib.rs @@ -0,0 +1,44 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 +use clap::Parser; +use std::path::PathBuf; + +mod metrics; +mod pagerduty; +mod query_runner; +pub mod scheduler; + +#[derive(Parser, Clone, Debug)] +#[clap( + name = "Sui Security Watchdog", + about = "Watchdog service to monitor chain data.", + rename_all = "kebab-case" +)] +pub struct SecurityWatchdogConfig { + #[clap(long)] + pub pd_api_key: String, + #[clap(long)] + pub pd_wallet_monitoring_service_id: String, + #[clap(long)] + pub config: PathBuf, + #[clap(long, default_value = None, global = true)] + pub sf_account_identifier: Option, + #[clap(long, default_value = None, global = true)] + pub sf_warehouse: Option, + #[clap(long, default_value = None, global = true)] + pub sf_database: Option, + #[clap(long, default_value = None, global = true)] + pub sf_schema: Option, + #[clap(long, default_value = None, global = true)] + pub sf_username: Option, + #[clap(long, default_value = None, global = true)] + pub sf_role: Option, + #[clap(long, default_value = None, global = true)] + pub sf_password: Option, + /// The url of the metrics client to connect to. + #[clap(long, default_value = "127.0.0.1", global = true)] + pub client_metric_host: String, + /// The port of the metrics client to connect to. + #[clap(long, default_value = "8081", global = true)] + pub client_metric_port: u16, +} diff --git a/crates/sui-security-watchdog/src/main.rs b/crates/sui-security-watchdog/src/main.rs new file mode 100644 index 0000000000000..365ac47662a3f --- /dev/null +++ b/crates/sui-security-watchdog/src/main.rs @@ -0,0 +1,33 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use prometheus::Registry; + +use anyhow::Result; +use clap::*; +use sui_security_watchdog::scheduler::SchedulerService; +use sui_security_watchdog::SecurityWatchdogConfig; + +#[tokio::main] +async fn main() -> Result<()> { + let _guard = telemetry_subscribers::TelemetryConfig::new() + .with_env() + .init(); + + let config = SecurityWatchdogConfig::parse(); + let registry_service = mysten_metrics::start_prometheus_server( + format!( + "{}:{}", + config.client_metric_host, config.client_metric_port + ) + .parse() + .unwrap(), + ); + let registry: Registry = registry_service.default_registry(); + mysten_metrics::init_metrics(®istry); + let service = SchedulerService::new(&config, ®istry).await?; + service.schedule().await?; + service.start().await?; + tokio::signal::ctrl_c().await?; + Ok(()) +} diff --git a/crates/sui-security-watchdog/src/metrics.rs b/crates/sui-security-watchdog/src/metrics.rs new file mode 100644 index 0000000000000..4cd0c309d7c9c --- /dev/null +++ b/crates/sui-security-watchdog/src/metrics.rs @@ -0,0 +1,83 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Context; +use prometheus::{register_int_gauge_with_registry, IntGauge, Registry}; +use std::collections::HashMap; +use tokio::sync::Mutex; + +/// Defines a structure to hold and manage metrics for a watchdog service. +/// This structure is thread-safe, allowing concurrent access and modification of metrics. +#[derive(Debug)] +pub struct WatchdogMetrics { + // The Prometheus registry to which metrics are registered. + registry: Registry, + // A HashMap to store IntGauge metrics, keyed by their names. + // Wrapped in a Mutex to ensure thread-safe access. + metrics: Mutex>, +} + +impl WatchdogMetrics { + /// Constructs a new WatchdogMetrics instance with the given Prometheus registry + pub fn new(registry: &Registry) -> Self { + Self { + registry: registry.clone(), + metrics: Mutex::new(HashMap::new()), + } + } + + /// Retrieves or creates an metric for the specified metric name. + /// The metric name is suffixed with "_exact" to denote its type. + pub async fn get(&self, metric_name: &str) -> anyhow::Result { + let mut metrics = self.metrics.lock().await; + // If the metric doesn't exist, register it and insert into the map. + metrics.entry(metric_name.to_string()).or_insert( + register_int_gauge_with_registry!(metric_name, metric_name, &self.registry).unwrap(), + ); + metrics + .get(metric_name) + .context("Failed to get expected metric") + .cloned() + } + + /// Retrieves or creates an "exact" metric for the specified metric name. + /// The metric name is suffixed with "_exact" to denote its type. + pub async fn get_exact(&self, metric_name: &str) -> anyhow::Result { + let mut metrics = self.metrics.lock().await; + let metric = format!("{}_exact", metric_name); + // If the metric doesn't exist, register it and insert into the map. + metrics.entry(metric.clone()).or_insert( + register_int_gauge_with_registry!(&metric, &metric, &self.registry).unwrap(), + ); + metrics + .get(&metric) + .context("Failed to get expected metric") + .cloned() + } + + /// Similar to get_exact, but for "lower" bound metrics. + pub async fn get_lower(&self, metric_name: &str) -> anyhow::Result { + let mut metrics = self.metrics.lock().await; + let metric = format!("{}_lower", metric_name); + metrics.entry(metric.clone()).or_insert( + register_int_gauge_with_registry!(&metric, &metric, &self.registry).unwrap(), + ); + metrics + .get(&metric) + .context("Failed to get expected metric") + .cloned() + } + + /// Similar to get_exact, but for "upper" bound metrics. + pub async fn get_upper(&self, metric_name: &str) -> anyhow::Result { + let mut metrics = self.metrics.lock().await; + let metric = format!("{}_upper", metric_name); + metrics.entry(metric.clone()).or_insert( + register_int_gauge_with_registry!(&metric, &metric, &self.registry).unwrap(), + ); + metrics + .get(&metric) + .context("Failed to get expected metric") + .cloned() + } +} diff --git a/crates/sui-security-watchdog/src/pagerduty.rs b/crates/sui-security-watchdog/src/pagerduty.rs new file mode 100644 index 0000000000000..8066031da1ef3 --- /dev/null +++ b/crates/sui-security-watchdog/src/pagerduty.rs @@ -0,0 +1,124 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::anyhow; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tracing::info; + +#[derive(Serialize, Deserialize, Debug)] +pub struct Service { + pub id: String, + #[serde(rename = "type")] + pub r#type: String, +} + +impl Default for Service { + fn default() -> Self { + Service { + id: "".to_string(), + r#type: "service_reference".to_string(), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Body { + #[serde(rename = "type")] + pub r#type: String, + pub details: String, +} + +impl Default for Body { + fn default() -> Self { + Body { + r#type: "incident_body".to_string(), + details: "".to_string(), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Incident { + pub incident_key: String, + #[serde(rename = "type")] + pub r#type: String, + pub title: String, + pub service: Service, + pub body: Body, +} + +impl Default for Incident { + fn default() -> Self { + Incident { + incident_key: "".to_string(), + r#type: "incident".to_string(), + title: "".to_string(), + service: Service::default(), + body: Body::default(), + } + } +} + +#[derive(Serialize, Deserialize)] +pub struct CreateIncident { + pub incident: Incident, +} + +#[derive(Clone)] +pub struct Pagerduty { + pub client: Arc, + pub api_key: String, +} + +impl Pagerduty { + pub fn new(api_key: String) -> Self { + Pagerduty { + client: Arc::new(reqwest::Client::new()), + api_key, + } + } + + pub async fn create_incident( + &self, + from: &str, + incident: CreateIncident, + ) -> anyhow::Result<()> { + let token = format!("Token token={}", self.api_key); + + let response = self + .client + .post("https://api.pagerduty.com/incidents") + .header("Authorization", token) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .header("From", from) + .json(&incident) + .send() + .await?; + // Check if the status code is in the range of 200-299 + if response.status().is_success() { + info!( + "Created incident with key: {:?}", + incident.incident.incident_key + ); + Ok(()) + } else { + let status = response.status(); + let text = response.text().await?; + if status.is_client_error() + && text.contains( + "Open incident with matching dedup key already exists on this service", + ) + { + info!( + "Incident already exists with key: {}", + incident.incident.incident_key + ); + Ok(()) + } else { + Err(anyhow!("Failed to create incident: {}", text)) + } + } + } +} diff --git a/crates/sui-security-watchdog/src/query_runner.rs b/crates/sui-security-watchdog/src/query_runner.rs new file mode 100644 index 0000000000000..8f0d71eea6ed8 --- /dev/null +++ b/crates/sui-security-watchdog/src/query_runner.rs @@ -0,0 +1,192 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::SecurityWatchdogConfig; +use anyhow::anyhow; +use arrow_array::cast::AsArray; +use arrow_array::types::{ + Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, + UInt32Type, UInt64Type, UInt8Type, +}; +use arrow_array::{Array, Float32Array, RecordBatch}; +use lexical_util::num::AsPrimitive; +use snowflake_api::{QueryResult, SnowflakeApi}; +use std::any::Any; +use std::collections::HashMap; +use tracing::info; + +pub type Row = HashMap>; + +#[async_trait::async_trait] +pub trait QueryRunner: Send + Sync + 'static { + /// Asynchronously runs the given SQL query and returns the result as a floating-point number. + /// Only the first row and first column in returned, so it is important that users of this trait + /// use it for a query which returns only a single floating point result + async fn run_single_entry(&self, query: &str) -> anyhow::Result; + + /// Asynchronously runs the given SQL query and returns the result as a vector of rows. + async fn run(&self, query: &str) -> anyhow::Result>; +} + +macro_rules! insert_primitive_values { + ($rows:expr, $column:expr, $name:expr, $type:ty) => { + if let Some(value) = $column.as_primitive_opt::<$type>() { + for i in 0..value.len() { + let entry = $rows.get_mut(i); + if let Some(entry) = entry { + entry.insert($name.clone(), Box::new(value.value(i))); + } else { + $rows.push(HashMap::new()); + $rows + .last_mut() + .unwrap() + .insert($name.clone(), Box::new(value.value(i))); + } + } + continue; + } + }; +} + +macro_rules! insert_string_values { + ($rows:expr, $column:expr, $name:expr, $type:ty) => { + if let Some(value) = $column.as_string_opt::<$type>() { + for i in 0..value.len() { + let entry = $rows.get_mut(i); + if let Some(entry) = entry { + entry.insert($name.clone(), Box::new(value.value(i).to_string())); + } else { + $rows.push(HashMap::new()); + $rows + .last_mut() + .unwrap() + .insert($name.clone(), Box::new(value.value(i).to_string())); + } + } + continue; + } + }; +} + +pub struct SnowflakeQueryRunner { + api: SnowflakeApi, +} + +impl SnowflakeQueryRunner { + /// Creates a new `SnowflakeQueryRunner` with the specified connection parameters. + /// + /// # Arguments + /// * `account_identifier` - Snowflake account identifier. + /// * `warehouse` - The Snowflake warehouse to use. + /// * `database` - The database to query against. + /// * `schema` - The schema within the database. + /// * `user` - Username for authentication. + /// * `role` - User role for executing queries. + /// * `passwd` - Password for authentication. + pub fn new( + account_identifier: &str, + warehouse: &str, + database: &str, + schema: &str, + user: &str, + role: &str, + passwd: &str, + ) -> anyhow::Result { + let api = SnowflakeApi::with_password_auth( + account_identifier, + Some(warehouse), + Some(database), + Some(schema), + user, + Some(role), + passwd, + ) + .expect("Failed to build sf api client"); + Ok(SnowflakeQueryRunner { api }) + } + + pub fn from_config(config: &SecurityWatchdogConfig) -> anyhow::Result { + Self::new( + config + .sf_account_identifier + .as_ref() + .cloned() + .unwrap() + .as_str(), + config.sf_warehouse.as_ref().cloned().unwrap().as_str(), + config.sf_database.as_ref().cloned().unwrap().as_str(), + config.sf_schema.as_ref().cloned().unwrap().as_str(), + config.sf_username.as_ref().cloned().unwrap().as_str(), + config.sf_role.as_ref().cloned().unwrap().as_str(), + config.sf_password.as_ref().cloned().unwrap().as_str(), + ) + } + + /// Parses the result of a Snowflake query from a `Vec` into a single `f64` value. + fn parse(&self, res: Vec) -> anyhow::Result { + let value = res + .first() + .ok_or_else(|| anyhow!("No results found in RecordBatch"))? + .columns() + .first() + .ok_or_else(|| anyhow!("No columns found in record"))? + .as_any() + .downcast_ref::() + .ok_or_else(|| anyhow!("Column is not Float32Array"))? + .value(0) + .as_f64(); + Ok(value) + } + + fn parse_record_batch(&self, batch: RecordBatch) -> anyhow::Result> { + let mut rows: Vec = Vec::new(); + for (index, column) in batch.columns().iter().enumerate() { + let name = batch.schema().fields()[index].name().clone(); + insert_primitive_values!(rows, column, name, Int8Type); + insert_primitive_values!(rows, column, name, Int16Type); + insert_primitive_values!(rows, column, name, Int32Type); + insert_primitive_values!(rows, column, name, Int64Type); + insert_primitive_values!(rows, column, name, UInt8Type); + insert_primitive_values!(rows, column, name, UInt16Type); + insert_primitive_values!(rows, column, name, UInt32Type); + insert_primitive_values!(rows, column, name, UInt64Type); + insert_primitive_values!(rows, column, name, Float16Type); + insert_primitive_values!(rows, column, name, Float32Type); + insert_primitive_values!(rows, column, name, Float64Type); + insert_string_values!(rows, column, name, i32); + insert_string_values!(rows, column, name, i64); + info!("Skipping column: {}", name); + } + Ok(rows) + } + + fn parse_record_batches(&self, batches: Vec) -> anyhow::Result> { + let mut rows: Vec = Vec::new(); + for batch in batches { + let mut batch_rows = self.parse_record_batch(batch)?; + rows.append(&mut batch_rows); + } + Ok(rows) + } +} + +#[async_trait::async_trait] +impl QueryRunner for SnowflakeQueryRunner { + async fn run_single_entry(&self, query: &str) -> anyhow::Result { + let res = self.api.exec(query).await?; + match res { + QueryResult::Arrow(records) => self.parse(records), + // Handle other result types (Json, Empty) with a unified error message + _ => Err(anyhow!("Unexpected query result type")), + } + } + + async fn run(&self, query: &str) -> anyhow::Result> { + let res = self.api.exec(query).await?; + match res { + QueryResult::Arrow(records) => self.parse_record_batches(records), + // Handle other result types (Json, Empty) with a unified error message + _ => Err(anyhow!("Unexpected query result type")), + } + } +} diff --git a/crates/sui-security-watchdog/src/scheduler.rs b/crates/sui-security-watchdog/src/scheduler.rs new file mode 100644 index 0000000000000..4ac6653f8c489 --- /dev/null +++ b/crates/sui-security-watchdog/src/scheduler.rs @@ -0,0 +1,310 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::metrics::WatchdogMetrics; +use crate::pagerduty::{Body, CreateIncident, Incident, Pagerduty, Service}; +use crate::query_runner::{QueryRunner, SnowflakeQueryRunner}; +use crate::SecurityWatchdogConfig; +use anyhow::anyhow; +use chrono::{DateTime, Utc}; +use prometheus::{IntGauge, Registry}; +use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::collections::BTreeMap; +use std::fs::File; +use std::io::Read; +use std::sync::Arc; +use tokio_cron_scheduler::{Job, JobScheduler}; +use tracing::{error, info}; +use uuid::Uuid; + +// MonitoringEntry is an enum that represents the types of monitoring entries that can be scheduled. +#[derive(Serialize, Deserialize)] +#[serde(tag = "type")] +enum MonitoringEntry { + MetricPublishingEntry(MetricPublishingEntry), + WalletMonitoringEntry(WalletMonitoringEntry), +} + +// MetricPublishingEntry is a struct that represents the configuration for a job which runs a sql +// query on a cron schedule and publishes metrics if the output is outside expected thresholds. Alerts +// could be set on the metric dashboard in grafana if needed +#[derive(Clone, Serialize, Deserialize)] +pub struct MetricPublishingEntry { + name: String, + cron_schedule: String, + sql_query: String, + metric_name: String, + timed_upper_limits: BTreeMap, f64>, + timed_lower_limits: BTreeMap, f64>, + timed_exact_limits: BTreeMap, f64>, +} + +// WalletMonitoringEntry is a struct that represents the configuration of a job which monitors wallet balances. +// It creates pagerduty incidents based on the given SQL query and cron schedule. +#[derive(Clone, Serialize, Deserialize)] +pub struct WalletMonitoringEntry { + name: String, + cron_schedule: String, + sql_query: String, +} + +pub struct SchedulerService { + scheduler: JobScheduler, + query_runner: Arc, + metrics: Arc, + entries: Vec, + pagerduty: Pagerduty, + pd_wallet_monitoring_service_id: String, +} + +impl SchedulerService { + pub async fn new(config: &SecurityWatchdogConfig, registry: &Registry) -> anyhow::Result { + let mut scheduler = JobScheduler::new().await?; + scheduler.shutdown_on_ctrl_c(); + scheduler.set_shutdown_handler(Box::new(|| { + Box::pin(async move { + info!("Scheduler shut down complete"); + }) + })); + Ok(Self { + scheduler, + query_runner: Arc::new(SnowflakeQueryRunner::from_config(config)?), + metrics: Arc::new(WatchdogMetrics::new(registry)), + entries: Self::from_config(config)?, + pagerduty: Pagerduty::new(config.pd_api_key.clone()), + pd_wallet_monitoring_service_id: config.pd_wallet_monitoring_service_id.clone(), + }) + } + + pub async fn schedule(&self) -> anyhow::Result<()> { + for monitoring_entry in &self.entries { + match monitoring_entry { + MonitoringEntry::MetricPublishingEntry(entry) => { + Self::schedule_metric_publish_job( + entry.clone(), + self.scheduler.clone(), + self.query_runner.clone(), + self.metrics.clone(), + ) + .await?; + } + MonitoringEntry::WalletMonitoringEntry(entry) => { + self.schedule_wallet_monitoring_job( + entry.clone(), + self.scheduler.clone(), + self.query_runner.clone(), + self.pd_wallet_monitoring_service_id.clone(), + self.metrics.clone(), + self.pagerduty.clone(), + ) + .await?; + } + } + } + Ok(()) + } + + pub async fn start(&self) -> anyhow::Result<()> { + self.scheduler.start().await?; + Ok(()) + } + + fn from_config(config: &SecurityWatchdogConfig) -> anyhow::Result> { + let mut file = File::open(&config.config)?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + let entries: Vec = serde_json::from_str(&contents)?; + Ok(entries) + } + + async fn schedule_wallet_monitoring_job( + &self, + entry: WalletMonitoringEntry, + scheduler: JobScheduler, + query_runner: Arc, + pd_service_id: String, + metrics: Arc, + pagerduty: Pagerduty, + ) -> anyhow::Result { + let name = entry.name.clone(); + let cron_schedule = entry.cron_schedule.clone(); + + let job = Job::new_async(cron_schedule.as_str(), move |_uuid, _lock| { + let entry = entry.clone(); + let query_runner = query_runner.clone(); + let pd_service_id = pd_service_id.to_string(); + let pd = pagerduty.clone(); + let metrics = metrics.clone(); + Box::pin(async move { + info!("Running wallet monitoring job: {}", entry.name); + if let Err(err) = + Self::run_wallet_monitoring_job(&pd, &pd_service_id, &query_runner, &entry) + .await + { + error!("Failed to run wallet monitoring job with err: {}", err); + metrics + .get("wallet_monitoring_error") + .await + .iter() + .for_each(|metric| metric.inc()); + } + }) + })?; + let job_id = scheduler.add(job).await?; + info!("Scheduled job: {}", name); + Ok(job_id) + } + + async fn run_wallet_monitoring_job( + pagerduty: &Pagerduty, + service_id: &str, + query_runner: &Arc, + entry: &WalletMonitoringEntry, + ) -> anyhow::Result<()> { + let WalletMonitoringEntry { sql_query, .. } = entry; + let rows = query_runner.run(sql_query).await?; + for row in rows { + let wallet_id = row + .get("WALLET_ID") + .ok_or_else(|| anyhow!("Missing wallet_id"))? + .downcast_ref::() + .ok_or(anyhow!("Failed to downcast wallet_id"))? + .clone(); + let current_balance = Self::extract_u64( + row.get("CURRENT_BALANCE") + .ok_or_else(|| anyhow!("Missing current_balance"))?, + ) + .ok_or(anyhow!("Failed to downcast current_balance"))?; + let lower_bound = Self::extract_u64( + row.get("LOWER_BOUND") + .ok_or_else(|| anyhow!("Missing lower_bound"))?, + ) + .ok_or(anyhow!("Failed to downcast lower_bound"))?; + Self::create_wallet_monitoring_incident( + pagerduty, + &wallet_id, + current_balance, + lower_bound, + service_id, + ) + .await?; + } + Ok(()) + } + + async fn create_wallet_monitoring_incident( + pagerduty: &Pagerduty, + wallet_id: &str, + current_balance: u64, + lower_bound: u64, + service_id: &str, + ) -> anyhow::Result<()> { + let service = Service { + id: service_id.to_string(), + ..Default::default() + }; + let incident_body = Body { + details: format!( + "Current balance: {}, Lower bound: {}", + current_balance, lower_bound + ), + ..Default::default() + }; + let incident = Incident { + title: format!("Wallet: {} is out of compliance", wallet_id), + service, + incident_key: wallet_id.to_string(), + body: incident_body, + ..Default::default() + }; + let create_incident = CreateIncident { incident }; + pagerduty + .create_incident("sadhan@mystenlabs.com", create_incident) + .await?; + Ok(()) + } + + async fn schedule_metric_publish_job( + entry: MetricPublishingEntry, + scheduler: JobScheduler, + query_runner: Arc, + metrics: Arc, + ) -> anyhow::Result { + let name = entry.name.clone(); + let cron_schedule = entry.cron_schedule.clone(); + let job = Job::new_async(cron_schedule.as_str(), move |_uuid, _lock| { + let entry = entry.clone(); + let query_runner = query_runner.clone(); + let metrics = metrics.clone(); + Box::pin(async move { + info!("Running metric publish job: {}", &entry.name); + if let Err(err) = + Self::run_metric_publish_job(&query_runner, &metrics, &entry).await + { + error!("Failed to run metric publish job with err: {}", err); + metrics + .get("metric_publishing_error") + .await + .iter() + .for_each(|metric| metric.inc()); + } + }) + })?; + let job_id = scheduler.add(job).await?; + info!("Scheduled job: {}", name); + Ok(job_id) + } + + async fn run_metric_publish_job( + query_runner: &Arc, + metrics: &Arc, + entry: &MetricPublishingEntry, + ) -> anyhow::Result<()> { + let MetricPublishingEntry { + sql_query, + timed_exact_limits, + timed_upper_limits, + timed_lower_limits, + metric_name, + .. + } = entry; + let res = query_runner.run_single_entry(sql_query).await?; + let update_metrics = |limits: &BTreeMap, f64>, metric: IntGauge| { + if let Some(value) = Self::get_current_limit(limits) { + metric.set((res - value) as i64); + } else { + metric.set(0); + } + }; + + update_metrics(timed_exact_limits, metrics.get_exact(metric_name).await?); + update_metrics(timed_upper_limits, metrics.get_upper(metric_name).await?); + update_metrics(timed_lower_limits, metrics.get_lower(metric_name).await?); + Ok(()) + } + + fn get_current_limit(limits: &BTreeMap, f64>) -> Option { + limits.range(..Utc::now()).next_back().map(|(_, val)| *val) + } + + fn extract_u64(value: &Box) -> Option { + if let Some(value) = value.downcast_ref::() { + Some(*value) + } else if let Some(value) = value.downcast_ref::() { + Some(*value as u64) + } else if let Some(value) = value.downcast_ref::() { + Some(*value as u64) + } else if let Some(value) = value.downcast_ref::() { + Some(*value as u64) + } else if let Some(value) = value.downcast_ref::() { + Some(*value as u64) + } else if let Some(value) = value.downcast_ref::() { + Some(*value as u64) + } else if let Some(value) = value.downcast_ref::() { + Some(*value as u64) + } else { + value.downcast_ref::().map(|value| *value as u64) + } + } +}