Skip to content

Commit

Permalink
fix(raiko): fix task db report and docker image build (#400)
Browse files Browse the repository at this point in the history
* fix zk docker image build

Signed-off-by: smtmfft <[email protected]>

* fix: refine task db report

* fix lint

---------

Signed-off-by: smtmfft <[email protected]>
  • Loading branch information
smtmfft authored Oct 31, 2024
1 parent 14b15e6 commit 54291f3
Show file tree
Hide file tree
Showing 14 changed files with 215 additions and 105 deletions.
1 change: 1 addition & 0 deletions Dockerfile.zk
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ RUN apt-get update && apt-get install -y \
ca-certificates \
openssl \
curl \
jq \
&& rm -rf /var/lib/apt/lists/*

# copy to /etc/raiko, but if self register mode, the mounted one will overwrite it.
Expand Down
6 changes: 5 additions & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ services:
volumes:
- /var/log/raiko:/var/log/raiko
ports:
- "8080:8080"
- "8090:8080"
environment:
# you can use your own PCCS host
# - PCCS_HOST=host.docker.internal:8081
Expand All @@ -167,6 +167,10 @@ services:
- SP1_VERIFIER_RPC_URL=${SP1_VERIFIER_RPC_URL}
- SP1_VERIFIER_ADDRESS=${SP1_VERIFIER_ADDRESS}
- REDIS_URL=${REDIS_URL:-redis://redis:6379}
depends_on:
- redis
profiles:
- prod-redis
pccs:
build:
context: ..
Expand Down
8 changes: 4 additions & 4 deletions host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use raiko_core::{
merge,
};
use raiko_lib::consts::SupportedChainSpecs;
use raiko_tasks::{get_task_manager, TaskDescriptor, TaskManagerOpts, TaskManagerWrapperImpl};
use raiko_tasks::{get_task_manager, ProofTaskDescriptor, TaskManagerOpts, TaskManagerWrapperImpl};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -155,7 +155,7 @@ pub struct ProverState {

#[derive(Debug, Serialize)]
pub enum Message {
Cancel(TaskDescriptor),
Cancel(ProofTaskDescriptor),
Task(ProofRequest),
TaskComplete(ProofRequest),
CancelAggregate(AggregationOnlyRequest),
Expand All @@ -168,8 +168,8 @@ impl From<&ProofRequest> for Message {
}
}

impl From<&TaskDescriptor> for Message {
fn from(value: &TaskDescriptor) -> Self {
impl From<&ProofTaskDescriptor> for Message {
fn from(value: &ProofTaskDescriptor) -> Self {
Self::Cancel(value.clone())
}
}
Expand Down
12 changes: 6 additions & 6 deletions host/src/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use raiko_lib::{
Measurement,
};
use raiko_tasks::{
get_task_manager, TaskDescriptor, TaskManager, TaskManagerWrapperImpl, TaskStatus,
get_task_manager, ProofTaskDescriptor, TaskManager, TaskManagerWrapperImpl, TaskStatus,
};
use reth_primitives::B256;
use tokio::{
Expand Down Expand Up @@ -45,7 +45,7 @@ pub struct ProofActor {
opts: Opts,
chain_specs: SupportedChainSpecs,
aggregate_tasks: Arc<Mutex<HashMap<AggregationOnlyRequest, CancellationToken>>>,
running_tasks: Arc<Mutex<HashMap<TaskDescriptor, CancellationToken>>>,
running_tasks: Arc<Mutex<HashMap<ProofTaskDescriptor, CancellationToken>>>,
pending_tasks: Arc<Mutex<VecDeque<ProofRequest>>>,
receiver: Receiver<Message>,
sender: Sender<Message>,
Expand All @@ -59,7 +59,7 @@ impl ProofActor {
chain_specs: SupportedChainSpecs,
) -> Self {
let running_tasks = Arc::new(Mutex::new(
HashMap::<TaskDescriptor, CancellationToken>::new(),
HashMap::<ProofTaskDescriptor, CancellationToken>::new(),
));
let aggregate_tasks = Arc::new(Mutex::new(HashMap::<
AggregationOnlyRequest,
Expand All @@ -78,7 +78,7 @@ impl ProofActor {
}
}

pub async fn cancel_task(&mut self, key: TaskDescriptor) -> HostResult<()> {
pub async fn cancel_task(&mut self, key: ProofTaskDescriptor) -> HostResult<()> {
let tasks_map = self.running_tasks.lock().await;
let Some(task) = tasks_map.get(&key) else {
warn!("No task with those keys to cancel");
Expand Down Expand Up @@ -126,7 +126,7 @@ impl ProofActor {
}
};

let key = TaskDescriptor::from((
let key = ProofTaskDescriptor::from((
chain_id,
proof_request.block_number,
blockhash,
Expand Down Expand Up @@ -299,7 +299,7 @@ impl ProofActor {

pub async fn handle_message(
proof_request: ProofRequest,
key: TaskDescriptor,
key: ProofTaskDescriptor,
opts: &Opts,
chain_specs: &SupportedChainSpecs,
) -> HostResult<TaskStatus> {
Expand Down
4 changes: 2 additions & 2 deletions host/src/server/api/v2/proof/cancel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use axum::{debug_handler, extract::State, routing::post, Json, Router};
use raiko_core::{interfaces::ProofRequest, provider::get_task_data};
use raiko_tasks::{TaskDescriptor, TaskManager, TaskStatus};
use raiko_tasks::{ProofTaskDescriptor, TaskManager, TaskStatus};
use serde_json::Value;
use utoipa::OpenApi;

Expand Down Expand Up @@ -41,7 +41,7 @@ async fn cancel_handler(
)
.await?;

let key = TaskDescriptor::from((
let key = ProofTaskDescriptor::from((
chain_id,
proof_request.block_number,
block_hash,
Expand Down
4 changes: 2 additions & 2 deletions host/src/server/api/v2/proof/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use axum::{debug_handler, extract::State, routing::post, Json, Router};
use raiko_core::{interfaces::ProofRequest, provider::get_task_data};
use raiko_tasks::{TaskDescriptor, TaskManager, TaskStatus};
use raiko_tasks::{ProofTaskDescriptor, TaskManager, TaskStatus};
use serde_json::Value;
use utoipa::OpenApi;

Expand Down Expand Up @@ -54,7 +54,7 @@ async fn proof_handler(
)
.await?;

let key = TaskDescriptor::from((
let key = ProofTaskDescriptor::from((
chain_id,
proof_request.block_number,
blockhash,
Expand Down
4 changes: 2 additions & 2 deletions host/src/server/api/v3/proof/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use raiko_core::{
interfaces::{AggregationRequest, ProofRequest, ProofRequestOpt},
provider::get_task_data,
};
use raiko_tasks::{TaskDescriptor, TaskManager, TaskStatus};
use raiko_tasks::{ProofTaskDescriptor, TaskManager, TaskStatus};
use utoipa::OpenApi;

use crate::{interfaces::HostResult, server::api::v2::CancelStatus, Message, ProverState};
Expand Down Expand Up @@ -44,7 +44,7 @@ async fn cancel_handler(
)
.await?;

let key = TaskDescriptor::from((
let key = ProofTaskDescriptor::from((
chain_id,
proof_request.block_number,
block_hash,
Expand Down
4 changes: 2 additions & 2 deletions host/src/server/api/v3/proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use raiko_core::{
provider::get_task_data,
};
use raiko_lib::prover::Proof;
use raiko_tasks::{TaskDescriptor, TaskManager, TaskStatus};
use raiko_tasks::{ProofTaskDescriptor, TaskManager, TaskStatus};
use tracing::{debug, info};
use utoipa::OpenApi;

Expand Down Expand Up @@ -64,7 +64,7 @@ async fn proof_handler(
)
.await?;

let key = TaskDescriptor::from((
let key = ProofTaskDescriptor::from((
chain_id,
proof_request.block_number,
blockhash,
Expand Down
8 changes: 5 additions & 3 deletions host/tests/common/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use raiko_core::interfaces::ProofRequestOpt;
use raiko_host::server::api::{v1, v2};
use raiko_tasks::{TaskDescriptor, TaskStatus};
use raiko_tasks::{ProofTaskDescriptor, TaskStatus};

const URL: &str = "http://localhost:8080";

Expand Down Expand Up @@ -87,15 +87,17 @@ impl ProofClient {
}
}

pub async fn report_proof(&self) -> anyhow::Result<Vec<(TaskDescriptor, TaskStatus)>> {
pub async fn report_proof(&self) -> anyhow::Result<Vec<(ProofTaskDescriptor, TaskStatus)>> {
let response = self
.reqwest_client
.get(&format!("{URL}/v2/proof/report"))
.send()
.await?;

if response.status().is_success() {
let report_response = response.json::<Vec<(TaskDescriptor, TaskStatus)>>().await?;
let report_response = response
.json::<Vec<(ProofTaskDescriptor, TaskStatus)>>()
.await?;
Ok(report_response)
} else {
Err(anyhow::anyhow!("Failed to send proof request"))
Expand Down
32 changes: 16 additions & 16 deletions taskdb/src/adv_sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ use rusqlite::{
use tokio::sync::Mutex;

use crate::{
TaskDescriptor, TaskManager, TaskManagerError, TaskManagerOpts, TaskManagerResult,
TaskProvingStatus, TaskProvingStatusRecords, TaskReport, TaskStatus,
ProofTaskDescriptor, TaskDescriptor, TaskManager, TaskManagerError, TaskManagerOpts,
TaskManagerResult, TaskProvingStatus, TaskProvingStatusRecords, TaskReport, TaskStatus,
};

// Types
Expand Down Expand Up @@ -499,13 +499,13 @@ impl TaskDb {

pub fn enqueue_task(
&self,
TaskDescriptor {
ProofTaskDescriptor {
chain_id,
block_id,
blockhash,
proof_system,
prover,
}: &TaskDescriptor,
}: &ProofTaskDescriptor,
) -> TaskManagerResult<TaskProvingStatusRecords> {
let mut statement = self.conn.prepare_cached(
r#"
Expand Down Expand Up @@ -542,13 +542,13 @@ impl TaskDb {

pub fn update_task_progress(
&self,
TaskDescriptor {
ProofTaskDescriptor {
chain_id,
block_id,
blockhash,
proof_system,
prover,
}: TaskDescriptor,
}: ProofTaskDescriptor,
status: TaskStatus,
proof: Option<&[u8]>,
) -> TaskManagerResult<()> {
Expand Down Expand Up @@ -588,13 +588,13 @@ impl TaskDb {

pub fn get_task_proving_status(
&self,
TaskDescriptor {
ProofTaskDescriptor {
chain_id,
block_id,
blockhash,
proof_system,
prover,
}: &TaskDescriptor,
}: &ProofTaskDescriptor,
) -> TaskManagerResult<TaskProvingStatusRecords> {
let mut statement = self.conn.prepare_cached(
r#"
Expand Down Expand Up @@ -639,13 +639,13 @@ impl TaskDb {

pub fn get_task_proof(
&self,
TaskDescriptor {
ProofTaskDescriptor {
chain_id,
block_id,
blockhash,
proof_system,
prover,
}: &TaskDescriptor,
}: &ProofTaskDescriptor,
) -> TaskManagerResult<Vec<u8>> {
let mut statement = self.conn.prepare_cached(
r#"
Expand Down Expand Up @@ -748,13 +748,13 @@ impl TaskDb {
let query = statement
.query_map([], |row| {
Ok((
TaskDescriptor {
TaskDescriptor::SingleProof(ProofTaskDescriptor {
chain_id: row.get(0)?,
block_id: row.get(1)?,
blockhash: B256::from_slice(&row.get::<_, Vec<u8>>(1)?),
proof_system: row.get::<_, u8>(2)?.try_into().unwrap(),
prover: row.get(3)?,
},
}),
TaskStatus::from(row.get::<_, i32>(4)?),
))
})?
Expand Down Expand Up @@ -910,15 +910,15 @@ impl TaskManager for SqliteTaskManager {

async fn enqueue_task(
&mut self,
params: &TaskDescriptor,
params: &ProofTaskDescriptor,
) -> Result<TaskProvingStatusRecords, TaskManagerError> {
let task_db: tokio::sync::MutexGuard<'_, TaskDb> = self.arc_task_db.lock().await;
task_db.enqueue_task(params)
}

async fn update_task_progress(
&mut self,
key: TaskDescriptor,
key: ProofTaskDescriptor,
status: TaskStatus,
proof: Option<&[u8]>,
) -> TaskManagerResult<()> {
Expand All @@ -929,13 +929,13 @@ impl TaskManager for SqliteTaskManager {
/// Returns the latest triplet (submitter or fulfiller, status, last update time)
async fn get_task_proving_status(
&mut self,
key: &TaskDescriptor,
key: &ProofTaskDescriptor,
) -> TaskManagerResult<TaskProvingStatusRecords> {
let task_db = self.arc_task_db.lock().await;
task_db.get_task_proving_status(key)
}

async fn get_task_proof(&mut self, key: &TaskDescriptor) -> TaskManagerResult<Vec<u8>> {
async fn get_task_proof(&mut self, key: &ProofTaskDescriptor) -> TaskManagerResult<Vec<u8>> {
let task_db = self.arc_task_db.lock().await;
task_db.get_task_proof(key)
}
Expand Down
Loading

0 comments on commit 54291f3

Please sign in to comment.