Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a rest endpoint to return checkpoint in batches #19457

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions crates/sui-rest-api/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,106 @@ async fn get_checkpoint_full(
.pipe(Ok)
}

pub struct GetCheckpointFullBatch;
impl ApiEndpoint<RestService> for GetCheckpointFullBatch {
fn method(&self) -> axum::http::Method {
axum::http::Method::GET
}

fn path(&self) -> &'static str {
"/checkpoints/full/batch"
}
Comment on lines +103 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably just want the path to be "/checkpoints/full" to be consistent with the other apis


fn operation(
&self,
generator: &mut schemars::gen::SchemaGenerator,
) -> openapiv3::v3_1::Operation {
OperationBuilder::new()
.tag("Checkpoint")
.operation_id("GetCheckpointFullBatch")
.query_parameters::<GetCheckpointFullBatchQueryParameters>(generator)
.response(
200,
ResponseBuilder::new()
.json_content::<Vec<CheckpointData>>(generator)
.bcs_content()
.build(),
)
.response(404, ResponseBuilder::new().build())
.response(410, ResponseBuilder::new().build())
.build()
}

fn handler(&self) -> RouteHandler<RestService> {
RouteHandler::new(self.method(), get_checkpoint_full_batch)
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
pub struct GetCheckpointFullBatchQueryParameters {
pub start: CheckpointSequenceNumber,
pub batch_size: u32,
Comment on lines +133 to +135
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the other list apis (checkpoints as an example) to match the param naming

pub max_wait_ms: Option<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed offline lets punt the long polling till later

}

async fn get_checkpoint_full_batch(
Query(params): Query<GetCheckpointFullBatchQueryParameters>,
accept: AcceptFormat,
State(state): State<StateReader>,
) -> Result<ResponseContent<Vec<sui_types::full_checkpoint_content::CheckpointData>>> {
let start = params.start;
let batch_size = params.batch_size.min(20) as usize; // Limit batch size to 20
let max_wait = params.max_wait_ms.map(std::time::Duration::from_millis);

let oldest_checkpoint = state.inner().get_lowest_available_checkpoint_objects()?;
if start < oldest_checkpoint {
return Err(crate::RestError::new(
axum::http::StatusCode::GONE,
"Old checkpoints have been pruned",
));
}

let mut checkpoints = Vec::with_capacity(batch_size);
let start_time = std::time::Instant::now();

let mut seq = start;
while seq < start + batch_size as u64 {
if let Some(verified_summary) = state.inner().get_checkpoint_by_sequence_number(seq)? {
if let Some(checkpoint_contents) = state
.inner()
.get_checkpoint_contents_by_digest(&verified_summary.content_digest)?
{
let checkpoint_data = state
.inner()
.get_checkpoint_data(verified_summary, checkpoint_contents)?;
checkpoints.push(checkpoint_data);
seq += 1;
}
} else if let Some(max_wait) = max_wait {
if start_time.elapsed() >= max_wait {
break; // Exit if we've waited long enough
}
if !checkpoints.is_empty() {
break; // Exit if we've already fetched at least one checkpoint
}
// Wait for a short interval before checking again
// but ideally we should have some form on notification upon new checkpoint
tokio::time::sleep(std::time::Duration::from_millis(
max_wait.as_millis().min(100) as u64,
))
.await;
} else {
break; // Stop if we've reached the end of available checkpoints
}
}

match accept {
AcceptFormat::Json => ResponseContent::Json(checkpoints),
AcceptFormat::Bcs => ResponseContent::Bcs(checkpoints),
}
.pipe(Ok)
}

pub struct GetCheckpoint;

impl ApiEndpoint<RestService> for GetCheckpoint {
Expand Down
28 changes: 28 additions & 0 deletions crates/sui-rest-api/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,34 @@ impl Client {
self.inner.bcs(response).await.map(Response::into_inner)
}

pub async fn get_full_checkpoints_batch(
&self,
start: CheckpointSequenceNumber,
batch_size: u32,
max_wait_ms: Option<u64>,
) -> Result<Vec<CheckpointData>> {
let url = self.inner.url().join("checkpoints/full/batch")?;

let mut query = vec![
("start", start.to_string()),
("batch_size", batch_size.to_string()),
];
if let Some(wait) = max_wait_ms {
query.push(("max_wait_ms", wait.to_string()));
}

let response = self
.inner
.client()
.get(url)
.query(&query)
.header(reqwest::header::ACCEPT, crate::APPLICATION_BCS)
.send()
.await?;

self.inner.bcs(response).await.map(Response::into_inner)
}

pub async fn get_checkpoint_summary(
&self,
checkpoint_sequence_number: CheckpointSequenceNumber,
Expand Down
Loading