Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Eigenda disperser /get/ and /put/ endpoints #313

Open
wants to merge 9 commits into
base: eigenda-disperser-client
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/node/eigenda_proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tracing.workspace = true
rlp.workspace = true
rand.workspace = true
sha3.workspace = true
hex.workspace = true
zksync_config.workspace = true
# we can't use the workspace version of prost because
# the tonic dependency requires a hugher version.
Expand Down
3 changes: 2 additions & 1 deletion core/node/eigenda_proxy/src/eigenda_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
errors::EigenDAError,
};

#[derive(Clone)]
pub struct EigenDAClient {
disperser: Arc<Mutex<DisperserClient<Channel>>>,
config: DisperserConfig,
Expand All @@ -37,7 +38,7 @@ impl EigenDAClient {
let disperser = Arc::new(Mutex::new(
DisperserClient::connect(inner)
.await
.map_err(|_| EigenDAError::ConnectionError)?,
.map_err(|err| EigenDAError::ConnectionError(err))?,
));

Ok(Self { disperser, config })
Expand Down
14 changes: 13 additions & 1 deletion core/node/eigenda_proxy/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use axum::response::{IntoResponse, Response};

#[derive(Debug, PartialEq)]
pub enum MemStoreError {
BlobToLarge,
Expand All @@ -11,7 +13,17 @@ pub enum MemStoreError {
pub enum EigenDAError {
TlsError,
UriError,
ConnectionError,
ConnectionError(tonic::transport::Error),
PutError,
GetError,
}

pub(crate) enum RequestProcessorError {
EigenDA(EigenDAError),
}

impl IntoResponse for RequestProcessorError {
fn into_response(self) -> Response {
unimplemented!("EigenDA request error into response")
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be implemented?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, addressed in 680a207

}
37 changes: 30 additions & 7 deletions core/node/eigenda_proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,37 @@ use std::net::SocketAddr;

use anyhow::Context as _;
use axum::{
routing::{get, put},
extract::Path,
routing::{get, post},
Router,
};
use eigenda_client::EigenDAClient;
use request_processor::RequestProcessor;
use tokio::sync::watch;
use zksync_config::configs::da_client::eigen_da::EigenDAConfig;

mod blob_info;
mod eigenda_client;
mod errors;
mod memstore;
mod request_processor;

pub async fn run_server(mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
pub async fn run_server(
config: EigenDAConfig,
mut stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
// TODO: Replace port for config

let bind_address = SocketAddr::from(([0, 0, 0, 0], 4242));
tracing::debug!("Starting eigenda proxy on {bind_address}");
let app = create_eigenda_proxy_router();

let eigenda_client = match config {
EigenDAConfig::Disperser(disperser_config) => EigenDAClient::new(disperser_config)
.await
.map_err(|e| anyhow::anyhow!("Failed to create EigenDA client: {:?}", e))?,
_ => panic!("memstore unimplemented"),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next would be implementing this for memstore also

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Working on it

};
let app = create_eigenda_proxy_router(eigenda_client);

let listener = tokio::net::TcpListener::bind(bind_address)
.await
Expand All @@ -38,15 +55,21 @@ pub async fn run_server(mut stop_receiver: watch::Receiver<bool>) -> anyhow::Res
Ok(())
}

fn create_eigenda_proxy_router() -> Router {
fn create_eigenda_proxy_router(eigenda_client: EigenDAClient) -> Router {
let get_blob_id_processor = RequestProcessor::new(eigenda_client);
let pub_blob_id_processor = get_blob_id_processor.clone();
let router = Router::new()
.route(
"/get/",
get(|| async { todo!("Handle eigenda proxy get request") }),
"/get/:l1_batch_number",
get(move |blob_id: Path<String>| async move {
get_blob_id_processor.get_blob_id(blob_id).await
}),
)
.route(
"/put/",
put(|| async { todo!("Handle eigenda proxy put request") }),
post(move |blob_id: Path<String>| async move {
pub_blob_id_processor.put_blob_id(blob_id).await
}),
);
router
}
1 change: 1 addition & 0 deletions core/node/eigenda_proxy/src/memstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl MemStore {
}
}

#[cfg(test)]
mod test {
use std::time::Duration;

Expand Down
40 changes: 40 additions & 0 deletions core/node/eigenda_proxy/src/request_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use axum::{extract::Path, http::Response};

use crate::{eigenda_client::EigenDAClient, errors::RequestProcessorError};

#[derive(Clone)]
pub(crate) struct RequestProcessor {
eigenda_client: EigenDAClient,
}

impl RequestProcessor {
pub(crate) fn new(eigenda_client: EigenDAClient) -> Self {
Self { eigenda_client }
}

pub(crate) async fn get_blob_id(
&self,
Path(blob_id): Path<String>,
) -> Result<axum::response::Response, RequestProcessorError> {
let blob_id_bytes = hex::decode(blob_id).unwrap();
let response = self
.eigenda_client
.get_blob(blob_id_bytes)
.await
.map_err(|e| RequestProcessorError::EigenDA(e))?;
Ok(Response::new(response.into()))
}

pub(crate) async fn put_blob_id(
&self,
Path(data): Path<String>,
) -> Result<axum::response::Response, RequestProcessorError> {
let data_bytes = hex::decode(data).unwrap();
let response = self
.eigenda_client
.put_blob(data_bytes)
.await
.map_err(|e| RequestProcessorError::EigenDA(e))?;
Ok(Response::new(response.into()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,18 @@ impl WiringLayer for EigenDAProxyLayer {
}

async fn wire(self, _input: Self::Input) -> Result<Self::Output, WiringError> {
let task = EigenDAProxyTask {};
let task = EigenDAProxyTask {
eigenda_config: self.eigenda_config,
};

Ok(Output { task })
}
}

#[derive(Debug)]
pub struct EigenDAProxyTask {}
pub struct EigenDAProxyTask {
eigenda_config: EigenDAConfig,
}

#[async_trait::async_trait]
impl Task for EigenDAProxyTask {
Expand All @@ -63,6 +67,6 @@ impl Task for EigenDAProxyTask {
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
zksync_eigenda_proxy::run_server(stop_receiver.0).await
zksync_eigenda_proxy::run_server(self.eigenda_config, stop_receiver.0).await
}
}
Loading