Skip to content

Commit

Permalink
Merge pull request #8 from mycognosist/add_publish_jsonrpc
Browse files Browse the repository at this point in the history
Add publish jsonrpc method
  • Loading branch information
mycognosist authored Dec 5, 2022
2 parents 39b995a + f846d9a commit e4963ee
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 18 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ _Undergoing active development._
- [X] json-rpc server for user queries
- [X] ping
- [X] whoami
- [X] publish
- [ ] ...
- [ ] improved connection handler
- [ ] ebt replication
Expand All @@ -81,6 +82,7 @@ The server currently supports HTTP.
| --- | --- | --- | --- |
| `ping` | | `pong!` | Responds if the JSON-RPC server is running |
| `whoami` | | `<@...=.ed25519>` | Returns the public key of the local node |
| `publish` | `<content>` | `{ "msg_ref": "<%...=.sha256>", "seq": <int> }` | Publishes a message and returns the reference (message hash) and sequence number |

`curl` can be used to invoke the available methods from the commandline:

Expand All @@ -90,6 +92,18 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc": "2.0", "method"
{"jsonrpc":"2.0","result":"pong!","id":1}
```

```
curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc": "2.0", "method": "publish", "params": {"type": "about", "about": "@o8lWpyLeSqV/BJV9pbxFhKpwm6Lw5k+sqexYK+zT9Tc=.ed25519", "name": "solar_glyph", "description": "glyph's experimental solar (rust) node"}, "id":1 }' 127.0.0.1:3030
{"jsonrpc":"2.0","result":{"msg_ref":"%ZwYwLxMHgU8eC43HOziJvYURjZzAzwFk3v5RYS/NbQY=.sha256","seq": 3,"id":1}
```

_Note: You might find it easier to save your JSON to file and pass that to `curl` instead._

```
curl -X POST -H "Content-Type: application/json" --data @publish.json 127.0.0.1:3030
```

## License

AGPL-3.0
50 changes: 46 additions & 4 deletions src/actors/jsonrpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use async_std::task;
use futures::FutureExt;
use jsonrpc_http_server::jsonrpc_core::*;
use jsonrpc_http_server::{AccessControlAllowOrigin, DomainsValidation, ServerBuilder};
use serde_json::json;

use kuska_ssb::keystore::OwnedIdentity;
use kuska_ssb::{api::dto::content::TypedMessage, feed::Message, keystore::OwnedIdentity};

use crate::{broker::*, Result};
use crate::{broker::*, error::Error, Result, KV_STORAGE};

/// Register the JSON-RPC server endpoint, define the JSON-RPC methods
/// and spawn the server.
Expand All @@ -28,9 +29,50 @@ pub async fn actor(server_id: OwnedIdentity, port: u16) -> Result<()> {
// Simple `ping` endpoint.
io.add_sync_method("ping", |_| Ok(Value::String("pong!".to_owned())));

let local_pk = server_id.id.clone();

// Return the public key of the local SSB server.
io.add_sync_method("whoami", move |_| {
Ok(Value::String(server_id.id.to_owned()))
io.add_sync_method("whoami", move |_| Ok(Value::String(local_pk.to_owned())));

// Publish a typed message (raw).
// Returns the key (hash) and sequence number of the published message.
io.add_sync_method("publish", move |params: Params| {
task::block_on(async {
// Parse the parameter containing the post content.
let post_content: TypedMessage = params.parse()?;

// Open the primary KV database for writing.
let feed_storage = KV_STORAGE.write().await;

// Lookup the last message published on the local feed.
// Return `None` if no messages have yet been published on the feed.
let last_msg = feed_storage
.get_last_message(&server_id.id)
// Map the error to a variant of our crate-specific error type.
// The `?` operator then performs the `From` conversion to
// the `jsonrpc_core::Error` type if an error occurs.
.map_err(Error::KV)?;

// Instantiate and cryptographically-sign a new message using `post`.
let msg = Message::sign(last_msg.as_ref(), &server_id, json!(post_content))
.map_err(Error::KuskaFeed)?;

// Append the signed message to the feed.
let seq = feed_storage
.append_feed(msg.clone())
.await
.map_err(Error::KV)?;

info!(
"published message {} with sequence number {}",
msg.id().to_string(),
seq
);

let response = json![{ "msg_ref": msg.id().to_string(), "seq": seq }];

Ok(response)
})
});

let server_addr = format!("0.0.0.0:{}", port);
Expand Down
27 changes: 27 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{error::Error as ErrorTrait, fmt, io, net};

use jsonrpc_http_server::jsonrpc_core;
use kuska_ssb::{api, crypto, discovery, feed, handshake, rpc};
use toml::{de, ser};

Expand Down Expand Up @@ -125,6 +126,32 @@ impl From<ser::Error> for Error {
}
}

// Conversions for errors which occur in the context of a JSON-RPC method call.
// Crate-local error variants are converted to JSON-RPC errors which are
// then return to the caller.
impl From<Error> for jsonrpc_core::Error {
fn from(err: Error) -> Self {
match &err {
Error::KV(err_msg) => jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: err_msg.to_string(),
data: None,
},
Error::KuskaFeed(err_msg) => jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32001),
message: err_msg.to_string(),
data: None,
},
Error::SerdeJson(err_msg) => jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32002),
message: err_msg.to_string(),
data: None,
},
_ => todo!(),
}
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
Expand Down
38 changes: 24 additions & 14 deletions src/storage/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,6 @@ impl KvStorage {
Ok(())
}

pub fn get_last_feed_no(&self, user_id: &str) -> Result<Option<u64>> {
let db = self.db.as_ref().unwrap();
let key = Self::key_lastfeed(user_id);
let count = if let Some(value) = db.get(&key)? {
let mut u64_buffer = [0u8; 8];
u64_buffer.copy_from_slice(&value);
Some(u64::from_be_bytes(u64_buffer))
} else {
None
};

Ok(count)
}

fn key_feed(user_id: &str, feed_seq: u64) -> Vec<u8> {
let mut key = Vec::new();
key.push(PREFIX_FEED);
Expand Down Expand Up @@ -148,6 +134,20 @@ impl KvStorage {
Ok(list)
}

pub fn get_last_feed_no(&self, user_id: &str) -> Result<Option<u64>> {
let db = self.db.as_ref().unwrap();
let key = Self::key_lastfeed(user_id);
let count = if let Some(value) = db.get(&key)? {
let mut u64_buffer = [0u8; 8];
u64_buffer.copy_from_slice(&value);
Some(u64::from_be_bytes(u64_buffer))
} else {
None
};

Ok(count)
}

pub fn get_feed(&self, user_id: &str, feed_seq: u64) -> Result<Option<Feed>> {
let db = self.db.as_ref().unwrap();
if let Some(raw) = db.get(Self::key_feed(user_id, feed_seq))? {
Expand All @@ -172,6 +172,16 @@ impl KvStorage {
}
}

pub fn get_last_message(&self, user_id: &str) -> Result<Option<Message>> {
let last_msg = if let Some(last_id) = self.get_last_feed_no(user_id)? {
Some(self.get_feed(user_id, last_id)?.unwrap().into_message()?)
} else {
None
};

Ok(last_msg)
}

pub async fn append_feed(&self, msg: Message) -> Result<u64> {
let seq_no = self.get_last_feed_no(msg.author())?.map_or(0, |no| no) + 1;

Expand Down

0 comments on commit e4963ee

Please sign in to comment.