Skip to content

Commit

Permalink
Add a rest endpoint to return checkpoint in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
sadhansood committed Sep 19, 2024
1 parent 47ad468 commit 4c6641d
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 0 deletions.
100 changes: 100 additions & 0 deletions crates/sui-rest-api/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,106 @@ async fn get_checkpoint_full(
.pipe(Ok)
}

pub struct GetCheckpointFullBatch;
impl ApiEndpoint<RestService> for GetCheckpointFullBatch {
fn method(&self) -> axum::http::Method {
axum::http::Method::GET
}

fn path(&self) -> &'static str {
"/checkpoints/full/batch"
}

fn operation(
&self,
generator: &mut schemars::gen::SchemaGenerator,
) -> openapiv3::v3_1::Operation {
OperationBuilder::new()
.tag("Checkpoint")
.operation_id("GetCheckpointFullBatch")
.query_parameters::<GetCheckpointFullBatchQueryParameters>(generator)
.response(
200,
ResponseBuilder::new()
.json_content::<Vec<CheckpointData>>(generator)
.bcs_content()
.build(),
)
.response(404, ResponseBuilder::new().build())
.response(410, ResponseBuilder::new().build())
.build()
}

fn handler(&self) -> RouteHandler<RestService> {
RouteHandler::new(self.method(), get_checkpoint_full_batch)
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
pub struct GetCheckpointFullBatchQueryParameters {
pub start: CheckpointSequenceNumber,
pub batch_size: u32,
pub max_wait_ms: Option<u64>,
}

async fn get_checkpoint_full_batch(
Query(params): Query<GetCheckpointFullBatchQueryParameters>,
accept: AcceptFormat,
State(state): State<StateReader>,
) -> Result<ResponseContent<Vec<sui_types::full_checkpoint_content::CheckpointData>>> {
let start = params.start;
let batch_size = params.batch_size.min(20) as usize; // Limit batch size to 20
let max_wait = params.max_wait_ms.map(std::time::Duration::from_millis);

let oldest_checkpoint = state.inner().get_lowest_available_checkpoint_objects()?;
if start < oldest_checkpoint {
return Err(crate::RestError::new(
axum::http::StatusCode::GONE,
"Old checkpoints have been pruned",
));
}

let mut checkpoints = Vec::with_capacity(batch_size);
let start_time = std::time::Instant::now();

let mut seq = start;
while seq < start + batch_size as u64 {
if let Some(verified_summary) = state.inner().get_checkpoint_by_sequence_number(seq)? {
if let Some(checkpoint_contents) = state
.inner()
.get_checkpoint_contents_by_digest(&verified_summary.content_digest)?
{
let checkpoint_data = state
.inner()
.get_checkpoint_data(verified_summary, checkpoint_contents)?;
checkpoints.push(checkpoint_data);
seq += 1;
}
} else if let Some(max_wait) = max_wait {
if start_time.elapsed() >= max_wait {
break; // Exit if we've waited long enough
}
if !checkpoints.is_empty() {
break; // Exit if we've already fetched at least one checkpoint
}
// Wait for a short interval before checking again
// but ideally we should have some form on notification upon new checkpoint
tokio::time::sleep(std::time::Duration::from_millis(
max_wait.as_millis().min(100) as u64,
))
.await;
} else {
break; // Stop if we've reached the end of available checkpoints
}
}

match accept {
AcceptFormat::Json => ResponseContent::Json(checkpoints),
AcceptFormat::Bcs => ResponseContent::Bcs(checkpoints),
}
.pipe(Ok)
}

pub struct GetCheckpoint;

impl ApiEndpoint<RestService> for GetCheckpoint {
Expand Down
28 changes: 28 additions & 0 deletions crates/sui-rest-api/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,34 @@ impl Client {
self.inner.bcs(response).await.map(Response::into_inner)
}

pub async fn get_full_checkpoints_batch(
&self,
start: CheckpointSequenceNumber,
batch_size: u32,
max_wait_ms: Option<u64>,
) -> Result<Vec<CheckpointData>> {
let url = self.inner.url().join("checkpoints/full/batch")?;

let mut query = vec![
("start", start.to_string()),
("batch_size", batch_size.to_string()),
];
if let Some(wait) = max_wait_ms {
query.push(("max_wait_ms", wait.to_string()));
}

let response = self
.inner
.client()
.get(url)
.query(&query)
.header(reqwest::header::ACCEPT, crate::APPLICATION_BCS)
.send()
.await?;

self.inner.bcs(response).await.map(Response::into_inner)
}

pub async fn get_checkpoint_summary(
&self,
checkpoint_sequence_number: CheckpointSequenceNumber,
Expand Down

0 comments on commit 4c6641d

Please sign in to comment.