Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamingFast fork merged into graphprotocol #112

Merged
merged 21 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
arweave.ptr
target
/thegarii
/thegarii
/firehose-data
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ structopt = "0.3.26"
thiserror = "1.0.30"
tokio = { version = "1", features = ["full"] }
uint = "0.9"
base64 = "0.21.7"

[features]
default = [ ]
Expand Down
34 changes: 24 additions & 10 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
};
use futures::future::join_all;
use rand::Rng;
use reqwest::{Client as ReqwestClient, ClientBuilder};
use reqwest::{Client as ReqwestClient, ClientBuilder, StatusCode};
use serde::de::DeserializeOwned;
use std::time::Duration;

Expand Down Expand Up @@ -60,23 +60,37 @@ impl Client {
/// http get request with base url
async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
let mut retried = 0;
let mut ms_between_retries = 10000;
loop {
match self
.client
.get(&format!("{}/{}", self.next_endpoint(), path))
.send()
.await?
.json()
.await
{
Ok(r) => return Ok(r),
Err(e) => {
if retried < self.retry {
tokio::time::sleep(Duration::from_millis(1000)).await;
retried += 1;
continue;
Ok(r) => match r.status() {
StatusCode::OK => match r.json::<T>().await {
Ok(r) => return Ok(r),
Err(e) => return Err(e.into()),
},
_ => {
if retried < self.retry {
let duration = Duration::from_millis(ms_between_retries);
tokio::time::sleep(duration).await;
retried += 1;
ms_between_retries *= 2;
log::info!(
"retrying request in {} second(s), at attempt {}, attempts left {}",
Duration::as_secs(&duration),
retried,
self.retry - retried
);
continue;
}
return Err(Error::RetriesReached);
}

},
Err(e) => {
return Err(e.into());
}
}
Expand Down
81 changes: 62 additions & 19 deletions src/polling.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::types::FirehoseBlock;
// Copyright 2021 ChainSafe Systems
// SPDX-License-Identifier: LGPL-3.0-only
use crate::{client::Client, env::Env, pb::Block, Error, Result};
use anyhow::Context;
use base64::{engine::general_purpose, Engine as _};
use futures::stream;
use futures::StreamExt;
use futures::TryFutureExt;
use prost::Message;

use std::path::{Path, PathBuf};
use std::{fs, time::Duration};

Expand Down Expand Up @@ -57,7 +59,7 @@
forever,
latest: 0,
ptr: 0,
quiet: quiet,

Check failure on line 62 in src/polling.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant field names in struct initialization
};

poller.initialize_start_ptr(ptr).await?;
Expand Down Expand Up @@ -111,7 +113,7 @@
async fn start_ptr_from_last_irreversible(&self) -> Result<u64> {
log::info!("user requested 'live' block, retrieving it from endpoint");

self.latest_irreversible_block_num()

Check failure on line 116 in src/polling.rs

View workflow job for this annotation

GitHub Actions / Clippy

using `Result.and_then(|x| Ok(y))`, which is more succinctly expressed as `map(|x| y)`
.await
.and_then(|live_block| {
log::info!(
Expand All @@ -124,7 +126,7 @@
}

async fn start_ptr_from_flag_value(&self, value: &String) -> Result<u64> {
value

Check failure on line 129 in src/polling.rs

View workflow job for this annotation

GitHub Actions / Clippy

using `Result.and_then(|x| Ok(y))`, which is more succinctly expressed as `map(|x| y)`
.parse::<u64>()
.and_then(|value| {
log::info!(
Expand All @@ -138,16 +140,59 @@
.map_err(Into::into)
}

/// Firehose init log to stdout
///
/// FIRE INIT <VERSION> <BLOCK_TYPE_URL>
fn firehose_init(&self) {
println!("FIRE INIT 1.0 sf.arweave.type.v1.Block");
}

/// Firehose log to stdout
///
/// FIRE BLOCK <HEIGHT> <ENCODED>
fn firehose_log(&self, b: &(u64, Vec<u8>)) -> Result<()> {
let height = b.0;
/// FIRE BLOCK <BLOCK_NUM> <BLOCK_HASH> <PARENT_NUM> <PARENT_HASH> <LIB> <TIMESTAMP> <ENCODED>
fn firehose_log(&self, b: FirehoseBlock) -> Result<()> {
let block_num = b.height;
let block_hash = base64_url::decode(&b.indep_hash)
.with_context(|| format!("invalid base64url indep_hash on block {}", block_num))?;
let parent_hash = base64_url::decode(&b.previous_block)
.with_context(|| format!("invalid base64url previous_block on block {}", block_num))?;
let timestamp = b.timestamp;

let parent_num = if b.previous_block.is_empty() {
0
} else {
block_num - 1
};

let lib = if block_num > self.confirms {
block_num - self.confirms
} else {
0
};

let encoded: Block = b.try_into()?;

if self.quiet {
println!("FIRE BLOCK {} <quiet-mode>", height);
println!(
"FIRE BLOCK {} {} {} {} {} {}",
block_num,
hex::encode(block_hash),
parent_num,
hex::encode(parent_hash),
lib,
timestamp
);
} else {
println!("FIRE BLOCK {} {}", height, hex::encode(&b.1));
println!(
"FIRE BLOCK {} {} {} {} {} {} {}",
block_num,
hex::encode(block_hash),
parent_num,
hex::encode(parent_hash),
lib,
timestamp,
general_purpose::STANDARD.encode(encoded.encode_to_vec())
);
}

Ok(())
Expand All @@ -166,31 +211,27 @@
blocks.last().expect("non-empty")
);

let mut tasks = stream::iter(blocks.into_iter().map(|block| {
self.client
.get_firehose_block_by_height(block)
.and_then(|block| async {
let height = block.height;
let proto: Block = block.try_into()?;

Ok((height, proto.encode_to_vec()))
})
}))
let mut tasks = stream::iter(
blocks
.into_iter()
.map(|block| self.client.get_firehose_block_by_height(block)),
)
.buffered(self.batch);

while let Some(item) = tasks.next().await {
let block = item?;
let height = block.height;

self.firehose_log(&block)?;
self.firehose_log(block)?;
// # Safty
//
// only update ptr after firehose_log has been emitted
self.ptr = block.0 + 1;
self.ptr = height + 1;

self.write_ptr().await?;

if let Some(end) = self.end {
if block.0 == end {
if height == end {
return Err(Error::StopBlockReached);
}
}
Expand Down Expand Up @@ -236,6 +277,8 @@

/// start polling service
pub async fn start(&mut self) -> Result<()> {
self.firehose_init();

loop {
// restart when network error occurs
let result = self.track_head().await;
Expand Down
6 changes: 4 additions & 2 deletions src/result.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2021 ChainSafe Systems
// SPDX-License-Identifier: LGPL-3.0-only

//! the graii results
//! the garii results

use std::{convert::Infallible, env::VarError, net::AddrParseError, num::ParseIntError};

/// the graii errors
/// the garii errors
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("block {0} not found")]
Expand All @@ -30,6 +30,8 @@ pub enum Error {
ParseBlockPtrFailed,
#[error("stop block reached")]
StopBlockReached,
#[error("retries reached")]
RetriesReached,
#[error(transparent)]
AddrParseError(#[from] AddrParseError),
#[error(transparent)]
Expand Down
Loading