Skip to content

Commit

Permalink
Merge pull request #34 from mycognosist/add_feed_jsonrpc_endpoint
Browse files Browse the repository at this point in the history
Add feed jsonrpc method
  • Loading branch information
mycognosist authored Dec 16, 2022
2 parents 4499ab8 + e928d2f commit c336d8f
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 19 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ operating system)._
_Undergoing active development. Expect APIs to change._

- [X] json-rpc server for user queries
- [X] feed
- [X] message
- [X] peers
- [X] ping
Expand All @@ -84,6 +85,7 @@ The server currently supports HTTP.

| Method | Parameters | Response | Description |
| --- | --- | --- | --- |
| `feed` | `{ "pub_key": "<@...=.ed25519>" }` | `[{ "key": "<%...=.sha256>", "value": <value>, "timestamp": <timestamp>, "rts": null }]` | Return an array of message KVTs (key, value, timestamp) from the local database |
| `message` | `{ "msg_ref": <key> }` | `{ "key": "<%...=.sha256>", "value": <value>, "timestamp": <timestamp>, "rts": null }` | Return a single message KVT (key, value, timestamp) from the local database |
| `peers` | | `[{ "pub_key": "<@...=.ed25519>", "seq_num": <int> }` | Return the public key and latest sequence number for all peers in the local database |
| `ping` | | `pong!` | Responds if the JSON-RPC server is running |
Expand Down
53 changes: 40 additions & 13 deletions src/actors/jsonrpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ struct MsgRef {
msg_ref: String,
}

/// The public key (ID) of a peer.
#[derive(Debug, Deserialize)]
struct PubKey {
pub_key: String,
}

/// Register the JSON-RPC server endpoint, define the JSON-RPC methods
/// and spawn the server.
///
Expand All @@ -36,22 +42,20 @@ pub async fn actor(server_id: OwnedIdentity, port: u16) -> Result<()> {

let mut io = IoHandler::default();

// Simple `ping` endpoint.
io.add_sync_method("ping", |_| Ok(Value::String("pong!".to_owned())));

let local_pk = server_id.id.clone();

// Return the public key of the local SSB server.
io.add_sync_method("whoami", move |_| Ok(Value::String(local_pk.to_owned())));

// Return the public key and latest sequence number for all feeds in the
// local database.
io.add_sync_method("peers", |_| {
// Retrieve a feed by public key.
// Returns an array of messages as a KVTs.
io.add_sync_method("feed", move |params: Params| {
task::block_on(async {
// Parse the parameter containing the public key.
let pub_key: PubKey = params.parse()?;

// Open the primary KV database for reading.
let db = KV_STORAGE.read().await;
let peers = db.get_peers().await?;

let response = json!(peers);
// Retrieve the message value for the requested message.
let feed = db.get_feed(&pub_key.pub_key)?;

let response = json!(feed);

Ok(response)
})
Expand Down Expand Up @@ -84,6 +88,26 @@ pub async fn actor(server_id: OwnedIdentity, port: u16) -> Result<()> {
})
});

// Return the public key and latest sequence number for all feeds in the
// local database.
io.add_sync_method("peers", |_| {
task::block_on(async {
let db = KV_STORAGE.read().await;
let peers = db.get_peers().await?;

let response = json!(peers);

Ok(response)
})
});

// Simple `ping` endpoint.
io.add_sync_method("ping", |_| Ok(Value::String("pong!".to_owned())));

// Clone the local public key (ID) so it can later be captured by the
// `whoami` closure.
let local_pk = server_id.id.clone();

// Publish a typed message (raw).
// Returns the key (hash) and sequence number of the published message.
io.add_sync_method("publish", move |params: Params| {
Expand Down Expand Up @@ -121,6 +145,9 @@ pub async fn actor(server_id: OwnedIdentity, port: u16) -> Result<()> {
})
});

// Return the public key of the local SSB server.
io.add_sync_method("whoami", move |_| Ok(Value::String(local_pk.to_owned())));

let server_addr = format!("0.0.0.0:{}", port);
let server = ServerBuilder::new(io)
.cors(DomainsValidation::AllowOnly(vec![
Expand Down
31 changes: 25 additions & 6 deletions src/storage/kv.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::SinkExt;
use kuska_ssb::feed::{Feed as MessageKVT, Message as MessageValue};
use kuska_ssb::feed::{Feed as MessageKvt, Message as MessageValue};
use log::warn;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -149,12 +149,12 @@ impl KvStorage {
Ok(seq)
}

/// Get the message KVT (Key Value Timestamp) for the given message ID
/// (key).
pub fn get_msg_kvt(&self, user_id: &str, msg_seq: u64) -> Result<Option<MessageKVT>> {
/// Get the message KVT (Key Value Timestamp) for the given author and
/// message sequence number.
pub fn get_msg_kvt(&self, user_id: &str, msg_seq: u64) -> Result<Option<MessageKvt>> {
let db = self.db.as_ref().unwrap();
if let Some(raw) = db.get(Self::key_msg_kvt(user_id, msg_seq))? {
Ok(Some(MessageKVT::from_slice(&raw)?))
Ok(Some(MessageKvt::from_slice(&raw)?))
} else {
Ok(None)
}
Expand Down Expand Up @@ -246,7 +246,7 @@ impl KvStorage {
})?;
db.insert(Self::key_msg_val(&msg_val.id().to_string()), msg_ref)?;

let msg_kvt = MessageKVT::new(msg_val.clone());
let msg_kvt = MessageKvt::new(msg_val.clone());
db.insert(
Self::key_msg_kvt(&author, seq_num),
msg_kvt.to_string().as_bytes(),
Expand Down Expand Up @@ -278,6 +278,25 @@ impl KvStorage {

Ok(seq_num)
}

/// Get all messages comprising the feed authored by the given public key.
pub fn get_feed(&self, user_id: &str) -> Result<Vec<MessageKvt>> {
let mut feed = Vec::new();

// Lookup the latest sequence number for the given peer.
if let Some(latest_seq) = self.get_latest_seq(user_id)? {
// Iterate through the messages in the feed.
for msg_seq in 1..latest_seq {
// Get the message KVT for the given author and message
// sequence number and add it to the feed vector.
// TODO: consider handling the `None` case instead of
// unwrapping.
feed.push(self.get_msg_kvt(user_id, msg_seq)?.unwrap())
}
}

Ok(feed)
}
}

#[cfg(test)]
Expand Down

0 comments on commit c336d8f

Please sign in to comment.