Skip to content

Commit

Permalink
refine api & redis impl
Browse files Browse the repository at this point in the history
Signed-off-by: smtmfft <[email protected]>
  • Loading branch information
smtmfft committed Oct 28, 2024
1 parent 6e9db8d commit 8660e56
Show file tree
Hide file tree
Showing 18 changed files with 191 additions and 98 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ thiserror-no-std = "2.0.2"
rusqlite = { version = "0.31.0", features = ["bundled"] }

# redis
redis = { version = "0.27.0" }
redis = { version = "=0.27.3" }

# misc
hashbrown = { version = "0.14", features = ["inline-more"] }
Expand Down
3 changes: 3 additions & 0 deletions core/src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,8 @@ impl From<ProofRequestOpt> for AggregationRequest {
#[serde(default)]
/// A request for proof aggregation of multiple proofs.
pub struct AggregationOnlyRequest {
/// The block numbers and l1 inclusion block numbers for the blocks to aggregate proofs for.
pub aggregation_ids: Vec<u64>,
/// The block numbers and l1 inclusion block numbers for the blocks to aggregate proofs for.
pub proofs: Vec<Proof>,
/// The proof type.
Expand All @@ -545,6 +547,7 @@ impl From<(AggregationRequest, Vec<Proof>)> for AggregationOnlyRequest {
fn from((request, proofs): (AggregationRequest, Vec<Proof>)) -> Self {
Self {
proofs,
aggregation_ids: request.block_numbers.iter().map(|(id, _)| *id).collect(),
proof_type: request.proof_type,
prover_args: request.prover_args,
}
Expand Down
7 changes: 4 additions & 3 deletions host/src/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl ProofActor {
let mut manager = get_task_manager(&self.opts.clone().into());
key.proof_system
.cancel_proof(
(key.chain_id, key.blockhash, key.proof_system as u8),
(key.chain_id, key.block_id, key.blockhash, key.proof_system as u8),
Box::new(&mut manager),
)
.await
Expand Down Expand Up @@ -123,6 +123,7 @@ impl ProofActor {

let key = TaskDescriptor::from((
chain_id,
proof_request.block_number,
blockhash,
proof_request.proof_type,
proof_request.prover.clone().to_string(),
Expand Down Expand Up @@ -301,7 +302,7 @@ impl ProofActor {

let status = manager.get_task_proving_status(&key).await?;

if let Some(latest_status) = status.iter().last() {
if let Some(latest_status) = status.0.iter().last() {
if !matches!(latest_status.0, TaskStatus::Registered) {
return Ok(latest_status.0.clone());
}
Expand Down Expand Up @@ -334,7 +335,7 @@ impl ProofActor {
.get_aggregation_task_proving_status(&request)
.await?;

if let Some(latest_status) = status.iter().last() {
if let Some(latest_status) = status.0.iter().last() {
if !matches!(latest_status.0, TaskStatus::Registered) {
return Ok(());
}
Expand Down
3 changes: 1 addition & 2 deletions host/src/server/api/v1/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ async fn proof_handler(
inc_host_req_count(proof_request.block_number);
inc_guest_req_count(&proof_request.proof_type, proof_request.block_number);

// In memory task manager only for V1, cannot feature = "sqlite"
let mut manager = get_task_manager(&raiko_tasks::TaskManagerOpts::default());
let mut manager = get_task_manager(&prover_state.opts.clone().into());

handle_proof(
&proof_request,
Expand Down
1 change: 1 addition & 0 deletions host/src/server/api/v2/proof/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async fn cancel_handler(

let key = TaskDescriptor::from((
chain_id,
proof_request.block_number,
block_hash,
proof_request.proof_type,
proof_request.prover.clone().to_string(),
Expand Down
3 changes: 2 additions & 1 deletion host/src/server/api/v2/proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async fn proof_handler(

let key = TaskDescriptor::from((
chain_id,
proof_request.block_number,
blockhash,
proof_request.proof_type,
proof_request.prover.to_string(),
Expand All @@ -64,7 +65,7 @@ async fn proof_handler(
let mut manager = prover_state.task_manager();
let status = manager.get_task_proving_status(&key).await?;

let Some((latest_status, ..)) = status.last() else {
let Some((latest_status, ..)) = status.0.last() else {
// If there are no tasks with provided config, create a new one.
manager.enqueue_task(&key).await?;

Expand Down
2 changes: 1 addition & 1 deletion host/src/server/api/v3/proof/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn aggregation_handler(
.get_aggregation_task_proving_status(&aggregation_request)
.await?;

let Some((latest_status, ..)) = status.last() else {
let Some((latest_status, ..)) = status.0.last() else {
// If there are no tasks with provided config, create a new one.
manager
.enqueue_aggregation_task(&aggregation_request)
Expand Down
1 change: 1 addition & 0 deletions host/src/server/api/v3/proof/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ async fn cancel_handler(

let key = TaskDescriptor::from((
chain_id,
proof_request.block_number,
block_hash,
proof_request.proof_type,
proof_request.prover.clone().to_string(),
Expand Down
8 changes: 6 additions & 2 deletions host/src/server/api/v3/proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ async fn proof_handler(

let key = TaskDescriptor::from((
chain_id,
proof_request.block_number,
blockhash,
proof_request.proof_type,
proof_request.prover.to_string(),
Expand All @@ -83,7 +84,7 @@ async fn proof_handler(
for (key, req) in tasks.iter() {
let status = manager.get_task_proving_status(key).await?;

let Some((latest_status, ..)) = status.last() else {
let Some((latest_status, ..)) = status.0.last() else {
// If there are no tasks with provided config, create a new one.
manager.enqueue_task(key).await?;

Expand Down Expand Up @@ -123,6 +124,7 @@ async fn proof_handler(
} else if is_success {
info!("All tasks are successful, aggregating proofs");
let mut proofs = Vec::with_capacity(tasks.len());
let mut aggregation_ids = Vec::with_capacity(tasks.len());
for (task, req) in tasks {
let raw_proof: Vec<u8> = manager.get_task_proof(&task).await?;
let proof: Proof = serde_json::from_slice(&raw_proof)?;
Expand All @@ -131,9 +133,11 @@ async fn proof_handler(
proof.proof, proof.input
);
proofs.push(proof);
aggregation_ids.push(req.block_number);
}

let aggregation_request = AggregationOnlyRequest {
aggregation_ids,
proofs,
proof_type: aggregation_request.proof_type,
prover_args: aggregation_request.prover_args,
Expand All @@ -143,7 +147,7 @@ async fn proof_handler(
.get_aggregation_task_proving_status(&aggregation_request)
.await?;

let Some((latest_status, ..)) = status.last() else {
let Some((latest_status, ..)) = status.0.last() else {
// If there are no tasks with provided config, create a new one.
manager
.enqueue_aggregation_task(&aggregation_request)
Expand Down
2 changes: 1 addition & 1 deletion lib/src/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl From<String> for ProverError {

pub type ProverResult<T, E = ProverError> = core::result::Result<T, E>;
pub type ProverConfig = serde_json::Value;
pub type ProofKey = (ChainId, B256, u8);
pub type ProofKey = (ChainId, u64, B256, u8);

#[derive(Clone, Debug, Serialize, ToSchema, Deserialize, Default, PartialEq, Eq, Hash)]
/// The response body of a proof request.
Expand Down
1 change: 1 addition & 0 deletions provers/risc0/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl Prover for Risc0Prover {
let config = Risc0Param::deserialize(config.get("risc0").unwrap()).unwrap();
let proof_key = (
input.chain_spec.chain_id,
input.block.header.number,
output.hash.clone(),
RISC0_PROVER_CODE,
);
Expand Down
2 changes: 1 addition & 1 deletion provers/risc0/driver/src/methods/risc0_guest.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub const RISC0_GUEST_ELF: &[u8] =
include_bytes!("../../../guest/target/riscv32im-risc0-zkvm-elf/release/risc0-guest");
pub const RISC0_GUEST_ID: [u32; 8] = [
1969729193, 1889995288, 261404698, 2630336538, 339020519, 1410619780, 514721746, 1213424171,
2705224968, 672422473, 3589767632, 3895344282, 3642477750, 1142566656, 2251137472, 1131663031,
];
7 changes: 6 additions & 1 deletion provers/sp1/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ impl Prover for Sp1Prover {
if let Some(id_store) = id_store {
id_store
.store_id(
(input.chain_spec.chain_id, output.hash, SP1_PROVER_CODE),
(
input.chain_spec.chain_id,
input.block.header.number,
output.hash,
SP1_PROVER_CODE,
),
proof_id.clone(),
)
.await?;
Expand Down
37 changes: 34 additions & 3 deletions taskdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use raiko_lib::{
#[cfg(feature = "sqlite")]
use rusqlite::Error as RustSQLiteError;
use serde::{Deserialize, Serialize};
use tracing::debug;
use utoipa::ToSchema;

#[cfg(feature = "sqlite")]
Expand Down Expand Up @@ -156,17 +157,25 @@ impl<'a> FromIterator<&'a TaskStatus> for TaskStatus {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct TaskDescriptor {
pub chain_id: ChainId,
pub block_id: u64,
pub blockhash: B256,
pub proof_system: ProofType,
pub prover: String,
}

impl From<(ChainId, B256, ProofType, String)> for TaskDescriptor {
impl From<(ChainId, u64, B256, ProofType, String)> for TaskDescriptor {
fn from(
(chain_id, blockhash, proof_system, prover): (ChainId, B256, ProofType, String),
(chain_id, block_id, blockhash, proof_system, prover): (
ChainId,
u64,
B256,
ProofType,
String,
),
) -> Self {
TaskDescriptor {
chain_id,
block_id,
blockhash,
proof_system,
prover,
Expand All @@ -186,10 +195,30 @@ impl From<TaskDescriptor> for (ChainId, B256) {
}
}

#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
#[serde(default)]
/// A request for proof aggregation of multiple proofs.
pub struct AggregationTaskDescriptor {
/// The block numbers and l1 inclusion block numbers for the blocks to aggregate proofs for.
pub aggregation_ids: Vec<u64>,
/// The proof type.
pub proof_type: Option<String>,
}

impl From<&AggregationOnlyRequest> for AggregationTaskDescriptor {
fn from(request: &AggregationOnlyRequest) -> Self {
Self {
aggregation_ids: request.aggregation_ids.clone(),
proof_type: request.proof_type.clone().map(|p| p.to_string()),
}
}
}

/// Task status triplet (status, proof, timestamp).
pub type TaskProvingStatus = (TaskStatus, Option<String>, DateTime<Utc>);

pub type TaskProvingStatusRecords = Vec<TaskProvingStatus>;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct TaskProvingStatusRecords(pub Vec<TaskProvingStatus>);

pub type TaskReport = (TaskDescriptor, TaskStatus);

Expand Down Expand Up @@ -389,6 +418,7 @@ pub type TaskManagerWrapperImpl = TaskManagerWrapper<SqliteTaskManager>;
pub type TaskManagerWrapperImpl = TaskManagerWrapper<RedisTaskManager>;

pub fn get_task_manager(opts: &TaskManagerOpts) -> TaskManagerWrapperImpl {
debug!("get task manager with options: {:?}", opts);
TaskManagerWrapperImpl::new(opts)
}

Expand Down Expand Up @@ -416,6 +446,7 @@ mod test {
task_manager
.enqueue_task(&TaskDescriptor {
chain_id: 1,
block_id: 0,
blockhash: B256::default(),
proof_system: ProofType::Native,
prover: "test".to_string(),
Expand Down
23 changes: 14 additions & 9 deletions taskdb/src/mem_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@ impl InMemoryTaskDb {
Some(task_proving_records) => {
debug!(
"Task already exists: {:?}",
task_proving_records.last().unwrap().0
task_proving_records.0.last().unwrap().0
);
} // do nothing
None => {
info!("Enqueue new task: {key:?}");
self.tasks_queue.insert(key.clone(), vec![task_status]);
self.tasks_queue
.insert(key.clone(), TaskProvingStatusRecords(vec![task_status]));
}
}
}
Expand All @@ -70,9 +71,9 @@ impl InMemoryTaskDb {
ensure(self.tasks_queue.contains_key(&key), "no task found")?;

self.tasks_queue.entry(key).and_modify(|entry| {
if let Some(latest) = entry.last() {
if let Some(latest) = entry.0.last() {
if latest.0 != status {
entry.push((status, proof.map(hex::encode), Utc::now()));
entry.0.push((status, proof.map(hex::encode), Utc::now()));
}
}
});
Expand All @@ -96,6 +97,7 @@ impl InMemoryTaskDb {
.ok_or_else(|| TaskManagerError::SqlError("no task in db".to_owned()))?;

let (_, proof, ..) = proving_status_records
.0
.iter()
.filter(|(status, ..)| (status == &TaskStatus::Success))
.last()
Expand Down Expand Up @@ -124,6 +126,7 @@ impl InMemoryTaskDb {
.iter()
.flat_map(|(descriptor, statuses)| {
statuses
.0
.last()
.map(|status| (descriptor.clone(), status.0.clone()))
})
Expand Down Expand Up @@ -161,13 +164,13 @@ impl InMemoryTaskDb {
Some(task_proving_records) => {
debug!(
"Task already exists: {:?}",
task_proving_records.last().unwrap().0
task_proving_records.0.last().unwrap().0
);
} // do nothing
None => {
info!("Enqueue new task: {request}");
self.aggregation_tasks_queue
.insert(request.clone(), vec![task_status]);
.insert(request.clone(), TaskProvingStatusRecords(vec![task_status]));
}
}
Ok(())
Expand Down Expand Up @@ -198,9 +201,9 @@ impl InMemoryTaskDb {
self.aggregation_tasks_queue
.entry(request.clone())
.and_modify(|entry| {
if let Some(latest) = entry.last() {
if let Some(latest) = entry.0.last() {
if latest.0 != status {
entry.push((status, proof.map(hex::encode), Utc::now()));
entry.0.push((status, proof.map(hex::encode), Utc::now()));
}
}
});
Expand All @@ -223,6 +226,7 @@ impl InMemoryTaskDb {
.ok_or_else(|| TaskManagerError::SqlError("no task in db".to_owned()))?;

let (_, proof, ..) = proving_status_records
.0
.iter()
.filter(|(status, ..)| (status == &TaskStatus::Success))
.last()
Expand Down Expand Up @@ -286,7 +290,7 @@ impl TaskManager for InMemoryTaskManager {
) -> TaskManagerResult<TaskProvingStatusRecords> {
let mut db = self.db.lock().await;
let status = db.get_task_proving_status(params)?;
if !status.is_empty() {
if !status.0.is_empty() {
return Ok(status);
}

Expand Down Expand Up @@ -391,6 +395,7 @@ mod tests {
let mut db = InMemoryTaskDb::new();
let params = TaskDescriptor {
chain_id: 1,
block_id: 1,
blockhash: B256::default(),
proof_system: ProofType::Native,
prover: "0x1234".to_owned(),
Expand Down
Loading

0 comments on commit 8660e56

Please sign in to comment.