Skip to content

Commit

Permalink
convert all params to be maps / structs
Browse files Browse the repository at this point in the history
  • Loading branch information
mycognosist committed Feb 12, 2024
1 parent 6e4df49 commit 4f9b421
Showing 1 changed file with 72 additions and 44 deletions.
116 changes: 72 additions & 44 deletions solar/src/actors/jsonrpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,38 @@ use jsonrpsee::server::{logger::Params, RpcModule, ServerBuilder};
use jsonrpsee::types::error::ErrorObject as JsonRpcError;
use kuska_ssb::{api::dto::content::TypedMessage, feed::Message, keystore::OwnedIdentity};
use log::{info, warn};
use serde::Deserialize;
use serde_json::{json, Value};

use crate::{broker::*, error::Error, node::KV_STORE, Result};

/// The name of a channel.
#[derive(Debug, Deserialize)]
struct Channel {
channel: String,
}

/// The public keys (ID) of two peers.
#[derive(Debug, Deserialize)]
struct IsFollowing {
peer_a: String,
peer_b: String,
}

/// Message reference containing the key (sha256 hash) of a message.
/// Used to parse the key from the parameters supplied to the `message`
/// endpoint.
#[derive(Debug, Deserialize)]
struct MsgRef {
msg_ref: String,
}

/// The public key (ID) of a peer.
#[derive(Debug, Deserialize)]
struct PubKey {
pub_key: String,
}

/// Register the JSON-RPC server endpoint, define the JSON-RPC methods
/// and spawn the server.
///
Expand Down Expand Up @@ -39,13 +67,13 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
rpc_module.register_method("blocks", move |params: Params, _| {
task::block_on(async {
// Parse the parameter containing the public key.
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

// Open the primary KV database for reading.
let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let blocks = indexes.get_blocks(&pub_key[0])?;
let blocks = indexes.get_blocks(&pub_key.pub_key)?;
let response = json!(blocks);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -57,12 +85,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of public keys.
rpc_module.register_method("blockers", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let blockers = indexes.get_blockers(&pub_key[0])?;
let blockers = indexes.get_blockers(&pub_key.pub_key)?;
let response = json!(blockers);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -74,12 +102,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of descriptions.
rpc_module.register_method("descriptions", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let descriptions = indexes.get_descriptions(&pub_key[0])?;
let descriptions = indexes.get_descriptions(&pub_key.pub_key)?;
let response = json!(descriptions);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -91,12 +119,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of descriptions.
rpc_module.register_method("self_descriptions", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let descriptions = indexes.get_self_assigned_descriptions(&pub_key[0])?;
let descriptions = indexes.get_self_assigned_descriptions(&pub_key.pub_key)?;
let response = json!(descriptions);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -108,12 +136,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns a string.
rpc_module.register_method("latest_description", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let description = indexes.get_latest_description(&pub_key[0])?;
let description = indexes.get_latest_description(&pub_key.pub_key)?;
let response = json!(description);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -126,12 +154,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns a string.
rpc_module.register_method("latest_self_description", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let description = indexes.get_latest_self_assigned_description(&pub_key[0])?;
let description = indexes.get_latest_self_assigned_description(&pub_key.pub_key)?;
let response = json!(description);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -143,12 +171,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of public keys.
rpc_module.register_method("follows", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let follows = indexes.get_follows(&pub_key[0])?;
let follows = indexes.get_follows(&pub_key.pub_key)?;
let response = json!(follows);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -160,12 +188,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of public keys.
rpc_module.register_method("followers", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let followers = indexes.get_followers(&pub_key[0])?;
let followers = indexes.get_followers(&pub_key.pub_key)?;
let response = json!(followers);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -177,12 +205,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns a boolean.
rpc_module.register_method("is_following", move |params: Params, _| {
task::block_on(async {
let peers = params.parse::<Vec<String>>()?;
let peers: IsFollowing = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let is_following = indexes.is_following(&peers[0], &peers[1])?;
let is_following = indexes.is_following(&peers.peer_a, &peers.peer_b)?;
let response = json!(is_following);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -195,12 +223,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of public keys.
rpc_module.register_method("friends", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let friends = indexes.get_friends(&pub_key[0])?;
let friends = indexes.get_friends(&pub_key.pub_key)?;
let response = json!(friends);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -212,12 +240,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of strings.
rpc_module.register_method("images", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let images = indexes.get_images(&pub_key[0])?;
let images = indexes.get_images(&pub_key.pub_key)?;
let response = json!(images);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -229,12 +257,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of strings.
rpc_module.register_method("self_images", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let images = indexes.get_self_assigned_images(&pub_key[0])?;
let images = indexes.get_self_assigned_images(&pub_key.pub_key)?;
let response = json!(images);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -246,12 +274,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns a string.
rpc_module.register_method("latest_image", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let image = indexes.get_latest_image(&pub_key[0])?;
let image = indexes.get_latest_image(&pub_key.pub_key)?;
let response = json!(image);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -264,12 +292,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of strings.
rpc_module.register_method("latest_self_image", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let image = indexes.get_latest_self_assigned_image(&pub_key[0])?;
let image = indexes.get_latest_self_assigned_image(&pub_key.pub_key)?;
let response = json!(image);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -281,12 +309,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of strings.
rpc_module.register_method("names", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let names = indexes.get_names(&pub_key[0])?;
let names = indexes.get_names(&pub_key.pub_key)?;
let response = json!(names);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -298,12 +326,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of strings.
rpc_module.register_method("self_names", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let names = indexes.get_self_assigned_names(&pub_key[0])?;
let names = indexes.get_self_assigned_names(&pub_key.pub_key)?;
let response = json!(names);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -315,12 +343,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns a string.
rpc_module.register_method("latest_name", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let names = indexes.get_latest_name(&pub_key[0])?;
let names = indexes.get_latest_name(&pub_key.pub_key)?;
let response = json!(names);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -332,12 +360,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns a string.
rpc_module.register_method("latest_self_name", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let names = indexes.get_latest_self_assigned_name(&pub_key[0])?;
let names = indexes.get_latest_self_assigned_name(&pub_key.pub_key)?;
let response = json!(names);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -349,12 +377,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of public keys.
rpc_module.register_method("subscribers", move |params: Params, _| {
task::block_on(async {
let channel = params.parse::<Vec<String>>()?;
let channel: Channel = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let subscribers = indexes.get_channel_subscribers(&channel[0])?;
let subscribers = indexes.get_channel_subscribers(&channel.channel)?;
let response = json!(subscribers);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -366,12 +394,12 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
// Returns an array of channel names.
rpc_module.register_method("subscriptions", move |params: Params, _| {
task::block_on(async {
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

let db = KV_STORE.read().await;

let indexes = &db.indexes.as_ref().ok_or(Error::Indexes)?;
let subscriptions = indexes.get_channel_subscriptions(&pub_key[0])?;
let subscriptions = indexes.get_channel_subscriptions(&pub_key.pub_key)?;
let response = json!(subscriptions);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -383,13 +411,13 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
rpc_module.register_method("feed", move |params: Params, _| {
task::block_on(async {
// Parse the parameter containing the public key.
let pub_key = params.parse::<Vec<String>>()?;
let pub_key: PubKey = params.parse()?;

// Open the primary KV database for reading.
let db = KV_STORE.read().await;

// Retrieve the message value for the requested message.
let feed = db.get_feed(&pub_key[0])?;
let feed = db.get_feed(&pub_key.pub_key)?;
let response = json!(feed);

Ok::<Value, JsonRpcError>(response)
Expand All @@ -401,13 +429,13 @@ pub async fn actor(server_id: OwnedIdentity, server_addr: SocketAddr) -> Result<
rpc_module.register_method("message", move |params: Params, _| {
task::block_on(async {
// Parse the parameter containing the message reference (key).
let msg_ref = params.parse::<Vec<String>>()?;
let msg_ref: MsgRef = params.parse()?;

// Open the primary KV database for reading.
let db = KV_STORE.read().await;

// Retrieve the message value for the requested message.
let msg_val = db.get_msg_val(&msg_ref[0])?;
let msg_val = db.get_msg_val(&msg_ref.msg_ref)?;

// Retrieve the message KVT for the requested message using the
// author and sequence fields from the message value.
Expand Down

0 comments on commit 4f9b421

Please sign in to comment.