Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add builder url to logs #33

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Ignore build artifacts
target

# Ignore version control directories
.git
15 changes: 8 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
FROM lukemathwalker/cargo-chef:latest AS chef
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get -y upgrade && apt-get install -y libclang-dev pkg-config

# Prepare build plan
FROM chef AS planner
COPY ./Cargo.toml ./Cargo.lock ./
Expand All @@ -13,14 +10,18 @@ RUN cargo chef prepare
# Build application
FROM chef AS builder
COPY --from=planner /app/recipe.json .

# Install system dependencies
RUN apt-get update && \
apt-get install -y openssl libclang-dev libssl3 && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

RUN cargo chef cook --release
COPY . .
RUN cargo build --release

FROM debian:stable-slim AS runtime

WORKDIR /app

FROM chef AS final
COPY --from=builder /app/target/release/rollup-boost /usr/local/bin/

ENTRYPOINT ["/usr/local/bin/rollup-boost"]
15 changes: 9 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use opentelemetry_sdk::trace::Config;
use opentelemetry_sdk::Resource;
use proxy::ProxyLayer;
use reth_rpc_layer::{AuthClientLayer, AuthClientService};
use server::{EngineApiServer, EthEngineApi};
use server::{EngineApiServer, EthEngineApi, HttpClientWrapper};
use std::sync::Arc;
use std::time::Duration;
use std::{net::SocketAddr, path::PathBuf};
use tracing::error;
use tracing::{info, Level};
Expand Down Expand Up @@ -191,15 +192,17 @@ async fn main() -> Result<()> {
fn create_client(
url: &str,
jwt_secret: JwtSecret,
) -> Result<HttpClient<AuthClientService<HttpBackend>>> {
) -> Result<HttpClientWrapper<HttpClient<AuthClientService<HttpBackend>>>> {
// Create a middleware that adds a new JWT token to every request.
let auth_layer = AuthClientLayer::new(jwt_secret);
let client_middleware = tower::ServiceBuilder::new().layer(auth_layer);

HttpClientBuilder::new()
let client = HttpClientBuilder::new()
.set_http_middleware(client_middleware)
.request_timeout(Duration::from_secs(10))
.build(url)
.map_err(|e| Error::InitRPCClient(e.to_string()))
.map_err(|e| Error::InitRPCClient(e.to_string()))?;
Ok(HttpClientWrapper::new(client, url.to_string()))
}

fn init_tracing(endpoint: &str) {
Expand Down Expand Up @@ -265,7 +268,7 @@ mod tests {
let secret = JwtSecret::from_hex(SECRET).unwrap();
let url = format!("http://{}:{}", AUTH_ADDR, AUTH_PORT);
let client = create_client(url.as_str(), secret);
let response = send_request(client.unwrap()).await;
let response = send_request(client.unwrap().client).await;
assert!(response.is_ok());
assert_eq!(response.unwrap(), "You are the dark lord");
}
Expand All @@ -274,7 +277,7 @@ mod tests {
let secret = JwtSecret::random();
let url = format!("http://{}:{}", AUTH_ADDR, AUTH_PORT);
let client = create_client(url.as_str(), secret);
let response = send_request(client.unwrap()).await;
let response = send_request(client.unwrap().client).await;
assert!(response.is_err());
assert!(matches!(
response.unwrap_err(),
Expand Down
72 changes: 49 additions & 23 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,18 @@ pub trait EthApi {
async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult<B256>;
}

pub struct EthEngineApi<C = HttpClient<AuthClientService<HttpBackend>>> {
pub struct HttpClientWrapper<C = HttpClient<AuthClientService<HttpBackend>>> {
pub client: C,
pub url: String,
}

impl<C> HttpClientWrapper<C> {
pub fn new(client: C, url: String) -> Self {
Self { client, url }
}
}

pub struct EthEngineApi<C = HttpClientWrapper> {
l2_client: Arc<C>,
builder_client: Arc<C>,
boost_sync: bool,
Expand All @@ -143,9 +154,9 @@ impl<C> EthEngineApi<C> {
}

#[async_trait]
impl<C> EthApiServer for EthEngineApi<C>
impl<C> EthApiServer for EthEngineApi<HttpClientWrapper<C>>
where
C: EthApiClient + Send + Sync + 'static,
C: EthApiClient + Send + Sync + Clone + 'static,
{
async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult<B256> {
debug!(
Expand All @@ -157,22 +168,25 @@ where
metrics.send_raw_tx_count.increment(1);
}

let builder = self.builder_client.clone();
let builder_client = self.builder_client.client.clone();
let url = self.builder_client.url.clone();
let tx_bytes = bytes.clone();
tokio::spawn(async move {
builder.send_raw_transaction(tx_bytes).await.map_err(|e| {
error!(message = "error calling send_raw_transaction for builder", "error" = %e);
builder_client.send_raw_transaction(tx_bytes).await.map_err(|e| {
error!(message = "error calling send_raw_transaction for builder", "url" = url, "error" = %e);
})
});

self.l2_client
.client
.send_raw_transaction(bytes)
.await
.map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling send_raw_transaction for l2 client",
"url" = self.l2_client.url,
"error" = %other_error,
);
ErrorCode::InternalError.into()
Expand All @@ -182,9 +196,9 @@ where
}

#[async_trait]
impl<C> EngineApiServer for EthEngineApi<C>
impl<C> EngineApiServer for EthEngineApi<HttpClientWrapper<C>>
where
C: EngineApiClient + Send + Sync + 'static,
C: EngineApiClient + Send + Sync + Clone + 'static,
{
async fn fork_choice_updated_v3(
&self,
Expand Down Expand Up @@ -251,17 +265,18 @@ where
let builder = self.builder_client.clone();
let attr = payload_attributes.clone();
tokio::spawn(async move {
let _ = builder.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| {
let _ = builder.client.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| {
let payload_id_str = response.payload_id.map(|id| id.to_string()).unwrap_or_default();
if response.is_invalid() {
error!(message = "builder rejected fork_choice_updated_v3 with attributes", "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status);
error!(message = "builder rejected fork_choice_updated_v3 with attributes", "url" = builder.url, "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status);
} else {
info!(message = "called fork_choice_updated_v3 to builder with payload attributes", "payload_status" = %response.payload_status.status, "payload_id" = payload_id_str);
info!(message = "called fork_choice_updated_v3 to builder with payload attributes", "url" = builder.url, "payload_status" = %response.payload_status.status, "payload_id" = payload_id_str);
}
})
.map_err(|e| {
error!(
message = "error calling fork_choice_updated_v3 to builder",
"url" = builder.url,
"error" = %e,
"head_block_hash" = %fork_choice_state.head_block_hash
);
Expand All @@ -275,13 +290,15 @@ where
}

self.l2_client
.client
.fork_choice_updated_v3(fork_choice_state, payload_attributes)
.await
.map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling fork_choice_updated_v3 for l2 client",
"url" = self.l2_client.url,
"error" = %other_error,
"head_block_hash" = %fork_choice_state.head_block_hash,
);
Expand All @@ -295,7 +312,7 @@ where
payload_id: PayloadId,
) -> RpcResult<OpExecutionPayloadEnvelopeV3> {
info!(message = "received get_payload_v3", "payload_id" = %payload_id);
let l2_client_future = self.l2_client.get_payload_v3(payload_id);
let l2_client_future = self.l2_client.client.get_payload_v3(payload_id);
let builder_client_future = Box::pin(async move {
if let Some(metrics) = &self.metrics {
metrics.get_payload_count.increment(1);
Expand All @@ -312,8 +329,8 @@ where
});

let builder = self.builder_client.clone();
let payload = builder.get_payload_v3(payload_id).await.map_err(|e| {
error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %payload_id);
let payload = builder.client.get_payload_v3(payload_id).await.map_err(|e| {
error!(message = "error calling get_payload_v3 from builder", "url" = builder.url, "error" = %e, "payload_id" = %payload_id);
e
})?;

Expand All @@ -326,8 +343,8 @@ where
if let Some(metrics) = &self.metrics {
metrics.new_payload_count.increment(1);
}
let payload_status = self.l2_client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| {
error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %payload_id);
let payload_status = self.l2_client.client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| {
error!(message = "error calling new_payload_v3 to validate builder payload", "url" = self.l2_client.url, "error" = %e, "payload_id" = %payload_id);
e
})?;
if let Some(mut s) = span {
Expand All @@ -340,7 +357,7 @@ where
}
};
if payload_status.is_invalid() {
error!(message = "builder payload was not valid", "payload_status" = %payload_status.status, "payload_id" = %payload_id);
error!(message = "builder payload was not valid", "url" = builder.url, "payload_status" = %payload_status.status, "payload_id" = %payload_id);
Err(ClientError::Call(ErrorObject::owned(
INVALID_REQUEST_CODE,
"Builder payload was not valid",
Expand All @@ -358,6 +375,7 @@ where
other_error => {
error!(
message = "error calling get_payload_v3",
"url" = self.builder_client.url,
"error" = %other_error,
"payload_id" = %payload_id
);
Expand Down Expand Up @@ -401,19 +419,20 @@ where
.remove_by_parent_hash(&parent_hash)
.await;

let builder = self.builder_client.clone();
let builder = self.builder_client.client.clone();
let builder_url = self.builder_client.url.clone();
let builder_payload = payload.clone();
let builder_versioned_hashes = versioned_hashes.clone();
tokio::spawn(async move {
let _ = builder.new_payload_v3(builder_payload, builder_versioned_hashes, parent_beacon_block_root).await
.map(|response: PayloadStatus| {
if response.is_invalid() {
error!(message = "builder rejected new_payload_v3", "block_hash" = %block_hash);
error!(message = "builder rejected new_payload_v3", "url" = builder_url, "block_hash" = %block_hash);
} else {
info!(message = "called new_payload_v3 to builder", "payload_status" = %response.status, "block_hash" = %block_hash);
info!(message = "called new_payload_v3 to builder", "url" = builder_url, "payload_status" = %response.status, "block_hash" = %block_hash);
}
}).map_err(|e| {
error!(message = "error calling new_payload_v3 to builder", "error" = %e, "block_hash" = %block_hash);
error!(message = "error calling new_payload_v3 to builder", "url" = builder_url, "error" = %e, "block_hash" = %block_hash);
e
});
if let Some(mut spans) = spans {
Expand All @@ -422,6 +441,7 @@ where
});
}
self.l2_client
.client
.new_payload_v3(payload, versioned_hashes, parent_beacon_block_root)
.await
.map_err(|e| match e {
Expand Down Expand Up @@ -536,8 +556,14 @@ mod tests {
.unwrap();

let eth_engine_api = EthEngineApi::new(
Arc::new(l2_client),
Arc::new(builder_client),
Arc::new(HttpClientWrapper::new(
l2_client,
format!("http://{L2_ADDR}"),
)),
Arc::new(HttpClientWrapper::new(
builder_client,
format!("http://{BUILDER_ADDR}"),
)),
boost_sync,
None,
);
Expand Down
Loading