Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Eigenda disperser /get/ and /put/ endpoints #313

Open
wants to merge 9 commits into
base: eigenda-disperser-client
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions core/lib/config/src/configs/da_client/eigen_da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ pub struct MemStoreConfig {
pub custom_quorum_numbers: Option<Vec<u32>>, // todo: This should be removed once eigenda proxy is no longer used
pub account_id: Option<String>, // todo: This should be removed once eigenda proxy is no longer used
pub max_blob_size_bytes: u64,
/// Blob expiration time in seconds
pub blob_expiration: u64,
/// Latency in milliseconds for get operations
pub get_latency: u64,
/// Latency in milliseconds for put operations
pub put_latency: u64,
}

Expand Down
1 change: 1 addition & 0 deletions core/node/eigenda_proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tracing.workspace = true
rlp.workspace = true
rand.workspace = true
sha3.workspace = true
hex.workspace = true
zksync_config.workspace = true
# we can't use the workspace version of prost because
# the tonic dependency requires a hugher version.
Expand Down
3 changes: 2 additions & 1 deletion core/node/eigenda_proxy/src/eigenda_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
errors::EigenDAError,
};

#[derive(Clone)]
pub struct EigenDAClient {
disperser: Arc<Mutex<DisperserClient<Channel>>>,
config: DisperserConfig,
Expand All @@ -37,7 +38,7 @@ impl EigenDAClient {
let disperser = Arc::new(Mutex::new(
DisperserClient::connect(inner)
.await
.map_err(|_| EigenDAError::ConnectionError)?,
.map_err(|err| EigenDAError::ConnectionError(err))?,
));

Ok(Self { disperser, config })
Expand Down
32 changes: 31 additions & 1 deletion core/node/eigenda_proxy/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};

#[derive(Debug, PartialEq)]
pub enum MemStoreError {
BlobToLarge,
Expand All @@ -11,7 +16,32 @@ pub enum MemStoreError {
pub enum EigenDAError {
TlsError,
UriError,
ConnectionError,
ConnectionError(tonic::transport::Error),
PutError,
GetError,
}

pub(crate) enum RequestProcessorError {
EigenDA(EigenDAError),
}

impl IntoResponse for RequestProcessorError {
fn into_response(self) -> Response {
let (status_code, message) = match self {
RequestProcessorError::EigenDA(err) => {
tracing::error!("EigenDA error: {:?}", err);
match err {
EigenDAError::TlsError => (StatusCode::BAD_GATEWAY, "Tls error".to_owned()),
EigenDAError::UriError => (StatusCode::BAD_GATEWAY, "Uri error".to_owned()),
EigenDAError::ConnectionError(err) => (
StatusCode::BAD_GATEWAY,
format!("Connection error: {:?}", err).to_owned(),
),
EigenDAError::PutError => (StatusCode::BAD_GATEWAY, "Put error".to_owned()),
EigenDAError::GetError => (StatusCode::BAD_GATEWAY, "Get error".to_owned()),
}
}
};
(status_code, message).into_response()
}
}
60 changes: 50 additions & 10 deletions core/node/eigenda_proxy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,58 @@
mod common;
mod disperser;

use std::net::SocketAddr;
use std::{net::SocketAddr, str::FromStr};

use anyhow::Context as _;
use axum::{
routing::{get, put},
extract::Path,
routing::{get, post},
Router,
};
use eigenda_client::EigenDAClient;
use memstore::MemStore;
use request_processor::RequestProcessor;
use tokio::sync::watch;
use zksync_config::configs::da_client::eigen_da::EigenDAConfig;

mod blob_info;
mod eigenda_client;
mod errors;
mod memstore;
mod request_processor;

pub async fn run_server(
config: EigenDAConfig,
mut stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let (bind_address, client) = match config {
EigenDAConfig::MemStore(cfg) => {
let bind_address = SocketAddr::from_str(&cfg.api_node_url)?;

let client = MemStore::new(cfg);
(bind_address, client)
}
EigenDAConfig::Disperser(cfg) => {
let bind_address = SocketAddr::from_str(&cfg.api_node_url)?;

let client = EigenDAClient::new(cfg)
.await
.map_err(|e| anyhow::anyhow!("Failed to create EigenDA client: {:?}", e))?;
(bind_address, client)
}
};

pub async fn run_server(mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
// TODO: Replace port for config
let bind_address = SocketAddr::from(([0, 0, 0, 0], 4242));
tracing::debug!("Starting eigenda proxy on {bind_address}");
let app = create_eigenda_proxy_router();

let eigenda_client = match config {
EigenDAConfig::Disperser(disperser_config) => EigenDAClient::new(disperser_config)
.await
.map_err(|e| anyhow::anyhow!("Failed to create EigenDA client: {:?}", e))?,
_ => panic!("memstore unimplemented"),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next would be implementing this for memstore also

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Working on it

};

// TODO: app should receive an impl instead of a struct
let app = create_eigenda_proxy_router(eigenda_client);

let listener = tokio::net::TcpListener::bind(bind_address)
.await
Expand All @@ -38,15 +72,21 @@ pub async fn run_server(mut stop_receiver: watch::Receiver<bool>) -> anyhow::Res
Ok(())
}

fn create_eigenda_proxy_router() -> Router {
fn create_eigenda_proxy_router(eigenda_client: EigenDAClient) -> Router {
let get_blob_id_processor = RequestProcessor::new(eigenda_client);
let pub_blob_id_processor = get_blob_id_processor.clone();
let router = Router::new()
.route(
"/get/",
get(|| async { todo!("Handle eigenda proxy get request") }),
"/get/:l1_batch_number",
get(move |blob_id: Path<String>| async move {
get_blob_id_processor.get_blob_id(blob_id).await
}),
)
.route(
"/put/",
put(|| async { todo!("Handle eigenda proxy put request") }),
post(move |blob_id: Path<String>| async move {
pub_blob_id_processor.put_blob_id(blob_id).await
}),
);
router
}
65 changes: 35 additions & 30 deletions core/node/eigenda_proxy/src/memstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,25 @@ use rand::{rngs::OsRng, Rng, RngCore};
use rlp::decode;
use sha3::{Digest, Keccak256};
use tokio::time::interval;
use zksync_config::configs::da_client::eigen_da::MemStoreConfig;

use crate::{
blob_info::{self, BlobInfo},
errors::MemStoreError,
};

struct MemStoreConfig {
max_blob_size_bytes: u64,
blob_expiration: Duration,
put_latency: Duration,
get_latency: Duration,
}

struct MemStoreData {
store: HashMap<String, Vec<u8>>,
key_starts: HashMap<String, Instant>,
}

struct MemStore {
pub struct MemStore {
config: MemStoreConfig,
data: Arc<RwLock<MemStoreData>>,
}

impl MemStore {
fn new(config: MemStoreConfig) -> Arc<Self> {
pub fn new(config: MemStoreConfig) -> Arc<Self> {
let memstore = Arc::new(Self {
config,
data: Arc::new(RwLock::new(MemStoreData {
Expand All @@ -48,7 +42,7 @@ impl MemStore {
}

async fn put(self: Arc<Self>, value: Vec<u8>) -> Result<Vec<u8>, MemStoreError> {
tokio::time::sleep(self.config.put_latency).await;
tokio::time::sleep(Duration::from_millis(self.config.put_latency)).await;
if value.len() as u64 > self.config.max_blob_size_bytes {
return Err(MemStoreError::BlobToLarge.into());
}
Expand Down Expand Up @@ -122,7 +116,7 @@ impl MemStore {
}

async fn get(self: Arc<Self>, commit: Vec<u8>) -> Result<Vec<u8>, MemStoreError> {
tokio::time::sleep(self.config.get_latency).await;
tokio::time::sleep(Duration::from_millis(self.config.get_latency)).await;
let blob_info: BlobInfo =
decode(&commit).map_err(|_| MemStoreError::IncorrectCommitment)?;
let key = String::from_utf8_lossy(
Expand All @@ -147,7 +141,7 @@ impl MemStore {
let mut data = self.data.write().unwrap();
let mut to_remove = vec![];
for (key, start) in data.key_starts.iter() {
if start.elapsed() > self.config.blob_expiration {
if start.elapsed() > Duration::from_secs(self.config.blob_expiration) {
to_remove.push(key.clone());
}
}
Expand All @@ -158,7 +152,7 @@ impl MemStore {
}

async fn pruning_loop(self: Arc<Self>) {
let mut interval = interval(self.config.blob_expiration);
let mut interval = interval(Duration::from_secs(self.config.blob_expiration));

loop {
interval.tick().await;
Expand All @@ -168,6 +162,7 @@ impl MemStore {
}
}

#[cfg(test)]
mod test {
use std::time::Duration;

Expand All @@ -177,9 +172,12 @@ mod test {
async fn test_memstore() {
let config = MemStoreConfig {
max_blob_size_bytes: 1024,
blob_expiration: Duration::from_secs(60),
put_latency: Duration::from_millis(100),
get_latency: Duration::from_millis(100),
blob_expiration: 60,
put_latency: 100,
get_latency: 100,
api_node_url: String::default(), // unused for this test
custom_quorum_numbers: None, // unused for this test
account_id: None, // unused for this test
};
let store = MemStore::new(config);

Expand All @@ -193,9 +191,12 @@ mod test {
async fn test_memstore_multiple() {
let config = MemStoreConfig {
max_blob_size_bytes: 1024,
blob_expiration: Duration::from_secs(60),
put_latency: Duration::from_millis(100),
get_latency: Duration::from_millis(100),
blob_expiration: 60,
put_latency: 100,
get_latency: 100,
api_node_url: String::default(), // unused for this test
custom_quorum_numbers: None, // unused for this test
account_id: None, // unused for this test
};
let store = MemStore::new(config);

Expand All @@ -211,23 +212,24 @@ mod test {

#[tokio::test]
async fn test_memstore_latency() {
let put_latency = Duration::from_millis(1000);
let get_latency = Duration::from_millis(1000);
let config = MemStoreConfig {
max_blob_size_bytes: 1024,
blob_expiration: Duration::from_secs(60),
put_latency,
get_latency,
blob_expiration: 60,
put_latency: 1000,
get_latency: 1000,
api_node_url: String::default(), // unused for this test
custom_quorum_numbers: None, // unused for this test
account_id: None, // unused for this test
};
let store = MemStore::new(config);
let store = MemStore::new(config.clone());

let blob = vec![0u8; 100];
let time_before_put = Instant::now();
let cert = store.clone().put(blob.clone()).await.unwrap();
assert!(time_before_put.elapsed() >= put_latency);
assert!(time_before_put.elapsed() >= Duration::from_millis(config.put_latency));
let time_before_get = Instant::now();
let blob2 = store.get(cert).await.unwrap();
assert!(time_before_get.elapsed() >= get_latency);
assert!(time_before_get.elapsed() >= Duration::from_millis(config.get_latency));
assert_eq!(blob, blob2);
}

Expand All @@ -236,9 +238,12 @@ mod test {
let blob_expiration = Duration::from_millis(100);
let config = MemStoreConfig {
max_blob_size_bytes: 1024,
blob_expiration,
put_latency: Duration::from_millis(1),
get_latency: Duration::from_millis(1),
blob_expiration: 100,
put_latency: 1,
get_latency: 1,
api_node_url: String::default(), // unused for this test
custom_quorum_numbers: None, // unused for this test
account_id: None, // unused for this test
};
let store = MemStore::new(config);

Expand Down
40 changes: 40 additions & 0 deletions core/node/eigenda_proxy/src/request_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use axum::{extract::Path, http::Response};

use crate::{eigenda_client::EigenDAClient, errors::RequestProcessorError};

#[derive(Clone)]
pub(crate) struct RequestProcessor {
eigenda_client: EigenDAClient,
}

impl RequestProcessor {
pub(crate) fn new(eigenda_client: EigenDAClient) -> Self {
Self { eigenda_client }
}

pub(crate) async fn get_blob_id(
&self,
Path(blob_id): Path<String>,
) -> Result<axum::response::Response, RequestProcessorError> {
let blob_id_bytes = hex::decode(blob_id).unwrap();
let response = self
.eigenda_client
.get_blob(blob_id_bytes)
.await
.map_err(|e| RequestProcessorError::EigenDA(e))?;
Ok(Response::new(response.into()))
}

pub(crate) async fn put_blob_id(
&self,
Path(data): Path<String>,
) -> Result<axum::response::Response, RequestProcessorError> {
let data_bytes = hex::decode(data).unwrap();
let response = self
.eigenda_client
.put_blob(data_bytes)
.await
.map_err(|e| RequestProcessorError::EigenDA(e))?;
Ok(Response::new(response.into()))
}
}
Loading