diff --git a/README.md b/README.md index b8ac266..7184188 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,7 @@ operating system)._ _Undergoing active development. Expect APIs to change._ - [X] json-rpc server for user queries + - [X] feed - [X] message - [X] peers - [X] ping @@ -84,6 +85,7 @@ The server currently supports HTTP. | Method | Parameters | Response | Description | | --- | --- | --- | --- | +| `feed` | `{ "pub_key": "<@...=.ed25519>" }` | `[{ "key": "<%...=.sha256>", "value": , "timestamp": , "rts": null }]` | Return an array of message KVTs (key, value, timestamp) from the local database | | `message` | `{ "msg_ref": }` | `{ "key": "<%...=.sha256>", "value": , "timestamp": , "rts": null }` | Return a single message KVT (key, value, timestamp) from the local database | | `peers` | | `[{ "pub_key": "<@...=.ed25519>", "seq_num": }` | Return the public key and latest sequence number for all peers in the local database | | `ping` | | `pong!` | Responds if the JSON-RPC server is running | diff --git a/src/actors/jsonrpc_server.rs b/src/actors/jsonrpc_server.rs index 0a7739c..36982b6 100644 --- a/src/actors/jsonrpc_server.rs +++ b/src/actors/jsonrpc_server.rs @@ -20,6 +20,12 @@ struct MsgRef { msg_ref: String, } +/// The public key (ID) of a peer. +#[derive(Debug, Deserialize)] +struct PubKey { + pub_key: String, +} + /// Register the JSON-RPC server endpoint, define the JSON-RPC methods /// and spawn the server. /// @@ -36,22 +42,20 @@ pub async fn actor(server_id: OwnedIdentity, port: u16) -> Result<()> { let mut io = IoHandler::default(); - // Simple `ping` endpoint. - io.add_sync_method("ping", |_| Ok(Value::String("pong!".to_owned()))); - - let local_pk = server_id.id.clone(); - - // Return the public key of the local SSB server. - io.add_sync_method("whoami", move |_| Ok(Value::String(local_pk.to_owned()))); - - // Return the public key and latest sequence number for all feeds in the - // local database. - io.add_sync_method("peers", |_| { + // Retrieve a feed by public key. + // Returns an array of messages as a KVTs. + io.add_sync_method("feed", move |params: Params| { task::block_on(async { + // Parse the parameter containing the public key. + let pub_key: PubKey = params.parse()?; + + // Open the primary KV database for reading. let db = KV_STORAGE.read().await; - let peers = db.get_peers().await?; - let response = json!(peers); + // Retrieve the message value for the requested message. + let feed = db.get_feed(&pub_key.pub_key)?; + + let response = json!(feed); Ok(response) }) @@ -84,6 +88,26 @@ pub async fn actor(server_id: OwnedIdentity, port: u16) -> Result<()> { }) }); + // Return the public key and latest sequence number for all feeds in the + // local database. + io.add_sync_method("peers", |_| { + task::block_on(async { + let db = KV_STORAGE.read().await; + let peers = db.get_peers().await?; + + let response = json!(peers); + + Ok(response) + }) + }); + + // Simple `ping` endpoint. + io.add_sync_method("ping", |_| Ok(Value::String("pong!".to_owned()))); + + // Clone the local public key (ID) so it can later be captured by the + // `whoami` closure. + let local_pk = server_id.id.clone(); + // Publish a typed message (raw). // Returns the key (hash) and sequence number of the published message. io.add_sync_method("publish", move |params: Params| { @@ -121,6 +145,9 @@ pub async fn actor(server_id: OwnedIdentity, port: u16) -> Result<()> { }) }); + // Return the public key of the local SSB server. + io.add_sync_method("whoami", move |_| Ok(Value::String(local_pk.to_owned()))); + let server_addr = format!("0.0.0.0:{}", port); let server = ServerBuilder::new(io) .cors(DomainsValidation::AllowOnly(vec![ diff --git a/src/storage/kv.rs b/src/storage/kv.rs index d0df608..ed6ead4 100644 --- a/src/storage/kv.rs +++ b/src/storage/kv.rs @@ -1,5 +1,5 @@ use futures::SinkExt; -use kuska_ssb::feed::{Feed as MessageKVT, Message as MessageValue}; +use kuska_ssb::feed::{Feed as MessageKvt, Message as MessageValue}; use log::warn; use serde::{Deserialize, Serialize}; @@ -149,12 +149,12 @@ impl KvStorage { Ok(seq) } - /// Get the message KVT (Key Value Timestamp) for the given message ID - /// (key). - pub fn get_msg_kvt(&self, user_id: &str, msg_seq: u64) -> Result> { + /// Get the message KVT (Key Value Timestamp) for the given author and + /// message sequence number. + pub fn get_msg_kvt(&self, user_id: &str, msg_seq: u64) -> Result> { let db = self.db.as_ref().unwrap(); if let Some(raw) = db.get(Self::key_msg_kvt(user_id, msg_seq))? { - Ok(Some(MessageKVT::from_slice(&raw)?)) + Ok(Some(MessageKvt::from_slice(&raw)?)) } else { Ok(None) } @@ -246,7 +246,7 @@ impl KvStorage { })?; db.insert(Self::key_msg_val(&msg_val.id().to_string()), msg_ref)?; - let msg_kvt = MessageKVT::new(msg_val.clone()); + let msg_kvt = MessageKvt::new(msg_val.clone()); db.insert( Self::key_msg_kvt(&author, seq_num), msg_kvt.to_string().as_bytes(), @@ -278,6 +278,25 @@ impl KvStorage { Ok(seq_num) } + + /// Get all messages comprising the feed authored by the given public key. + pub fn get_feed(&self, user_id: &str) -> Result> { + let mut feed = Vec::new(); + + // Lookup the latest sequence number for the given peer. + if let Some(latest_seq) = self.get_latest_seq(user_id)? { + // Iterate through the messages in the feed. + for msg_seq in 1..latest_seq { + // Get the message KVT for the given author and message + // sequence number and add it to the feed vector. + // TODO: consider handling the `None` case instead of + // unwrapping. + feed.push(self.get_msg_kvt(user_id, msg_seq)?.unwrap()) + } + } + + Ok(feed) + } } #[cfg(test)]