Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed next_endpoint from random to round robin #122

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "thegarii"
authors = ['[email protected]']
description = 'thegarii firehose service'
version = "0.1.0"
version = "0.1.1"
license = "GPL-3.0"
homepage = 'https://github.com/ChainSafe/the-garii'
repository = 'https://github.com/ChainSafe/the-garii'
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ FLAGS:
OPTIONS:
-B, --batch-blocks <batch-blocks> how many blocks polling at one time [default: 20]
-b, --block-time <block-time> time cost for producing a new block in arweave [default: 20000]
-c, --confirms <confirms> safe blocks against to reorg in polling [default: 20]
-c, --confirms <confirms> safe blocks against to reorg in polling [default: 50]
-e, --endpoints <endpoints>... client endpoints [default: https://arweave.net/]
-p, --ptr-path <ptr-path> block ptr file path
-r, --retry <retry> retry times when failed on http requests [default: 10]
Expand All @@ -41,7 +41,7 @@ OPTIONS:
|---------------|--------------------------|---------------------------------------------|
| ENDPOINTS | `"https://arweave.net"` | for multiple endpoints, split them with ',' |
| BATCH\_BLOCKS | `50` | how many blocks batch at one time |
| CONFIRMS | `20` | irreversibility condition |
| CONFIRMS | `50` | irreversibility condition |
| PTR\_PATH | `$APP_DATA/thegarii/ptr` | the file stores the block ptr for polling |
| retry | `10` | retry times when failed on http requests |
| timeout | `120_000` | timeout of http requests |
Expand Down
84 changes: 73 additions & 11 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,22 @@ pub struct Client {

impl Client {
/// get next endpoint
fn next_endpoint(&self) -> String {
self.endpoints[rand::thread_rng().gen_range(0..self.endpoints.len())].to_string()
fn next_endpoint(&self, already_used_endpoints: &[String]) -> String {
let mut endpoints = self.endpoints.clone();

// if all endpoints are already used, return random endpoint
if endpoints.len() == already_used_endpoints.len() {
return already_used_endpoints
[rand::thread_rng().gen_range(0..already_used_endpoints.len())]
.to_string();
}

// round robin endpoints that are not already used
let mut endpoint = endpoints.remove(0);
while already_used_endpoints.contains(&endpoint) {
endpoint = endpoints.remove(0);
}
endpoint
}

/// new arweave client
Expand Down Expand Up @@ -61,32 +75,44 @@ impl Client {
async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
let mut retried = 0;
let mut ms_between_retries = 10000;
let mut already_used_endpoints: Vec<String> = Vec::new();

loop {
match self
.client
.get(&format!("{}/{}", self.next_endpoint(), path))
.send()
.await
{
let endpoint = self.next_endpoint(&already_used_endpoints);
let url = format!("{}/{}", endpoint, path);

match self.client.get(&url).send().await {
Ok(r) => match r.status() {
StatusCode::OK => match r.json::<T>().await {
Ok(r) => return Ok(r),
Err(e) => return Err(e.into()),
},
_ => {
// If there's still endpoints not used yet, retry now with different endpoint
if self.endpoints.len() != already_used_endpoints.len() {
already_used_endpoints.push(endpoint.clone());
continue;
}

// If all endpoints are used, wait then retry with random endpoint
if retried < self.retry {
let duration = Duration::from_millis(ms_between_retries);
tokio::time::sleep(duration).await;
retried += 1;
ms_between_retries *= 2;
log::info!(
"retrying request in {} second(s), at attempt {}, attempts left {}",
log::warn!(
"{{ \"endpoint\": \"{}\", \"path\": \"{}\", \"status\": \"{}\", \"retry_in\": {}, \"attempts\": {}, \"attempts_left\": {} }}",
endpoint,
path,
r.status(),
Duration::as_secs(&duration),
retried,
self.retry - retried
);
continue;
}

// If all endpoints used and all retries done, return error
return Err(Error::RetriesReached);
}
},
Expand Down Expand Up @@ -198,7 +224,11 @@ impl Client {
pub async fn get_tx_data_by_id(&self, id: &str) -> Result<String> {
Ok(self
.client
.get(&format!("{}/tx/{}/data", self.next_endpoint(), id))
.get(&format!(
"{}/tx/{}/data",
self.next_endpoint(&self.endpoints),
id
))
.send()
.await?
.text()
Expand Down Expand Up @@ -237,6 +267,38 @@ impl Client {
Ok(firehose_block)
}

/// get and parse firehose blocks by hash
///
/// ```rust
/// let client = thegarii::Client::from_env().unwrap();
/// let rt = tokio::runtime::Runtime::new().unwrap();
///
/// { // block hash 5H-hJycMS_PnPOpobXu2CNobRlgqmw4yEMQSc5LeBfS7We63l8HjS-Ek3QaxK8ug - https://arweave.net/block/hash/5H-hJycMS_PnPOpobXu2CNobRlgqmw4yEMQSc5LeBfS7We63l8HjS-Ek3QaxK8ug
/// let firehose_block = rt.block_on(client.get_firehose_block_by_hash("5H-hJycMS_PnPOpobXu2CNobRlgqmw4yEMQSc5LeBfS7We63l8HjS-Ek3QaxK8ug")).unwrap();
///
/// let mut block_without_txs = firehose_block.clone();
/// block_without_txs.txs = vec![];
///
/// assert_eq!(block_without_txs, rt.block_on(client.get_block_by_hash("5H-hJycMS_PnPOpobXu2CNobRlgqmw4yEMQSc5LeBfS7We63l8HjS-Ek3QaxK8ug")).unwrap().into());
/// for (idx, tx) in firehose_block.txs.iter().map(|tx| tx.id.clone()).enumerate() {
/// assert_eq!(firehose_block.txs[idx], rt.block_on(client.get_tx_by_id(&tx)).unwrap());
/// }
/// }
/// ```
pub async fn get_firehose_block_by_hash(&self, hash: &str) -> Result<FirehoseBlock> {
log::info!("resolving firehose block {}", hash);

let block = self.get_block_by_hash(hash).await?;
let txs: Vec<Transaction> = join_all(block.txs.iter().map(|tx| self.get_tx_by_id(tx)))
.await
.into_iter()
.collect::<Result<Vec<Transaction>>>()?;

let mut firehose_block: FirehoseBlock = block.into();
firehose_block.txs = txs;
Ok(firehose_block)
}

/// poll blocks from iterator
///
/// ```rust
Expand Down
4 changes: 2 additions & 2 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const DEFAULT_BATCH_BLOCKS: u16 = 50;
const RETRY: &str = "RETRY";
const DEFAULT_RETRY: u8 = 10;
const CONFIRMS: &str = "CONFIRMS";
const DEFAULT_CONFIRMS: u64 = 20;
const DEFAULT_CONFIRMS: u64 = 50;
const TIMEOUT: &str = "TIMEOUT";
const DEFAULT_TIMEOUT: u64 = 120_000;
const PTR_FILE: &str = "PTR_FILE";
Expand All @@ -31,7 +31,7 @@ pub struct EnvArguments {
#[structopt(short, long, default_value = "60000")]
pub block_time: u64,
/// safe blocks against to reorg in polling
#[structopt(short, long, default_value = "20")]
#[structopt(short, long, default_value = "50")]
pub confirms: u64,
/// client endpoints
#[structopt(short, long, default_value = "https://arweave.net/")]
Expand Down
6 changes: 5 additions & 1 deletion src/polling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl Polling {
/// Firehose log to stdout
///
/// FIRE BLOCK <BLOCK_NUM> <BLOCK_HASH> <PARENT_NUM> <PARENT_HASH> <LIB> <TIMESTAMP> <ENCODED>
fn firehose_log(&self, b: FirehoseBlock) -> Result<()> {
fn firehose_log(&self, mut b: FirehoseBlock) -> Result<()> {
let block_num = b.height;
let block_hash = base64_url::decode(&b.indep_hash)
.with_context(|| format!("invalid base64url indep_hash on block {}", block_num))?;
Expand All @@ -170,6 +170,10 @@ impl Polling {
0
};

if block_num < 422250 {
b.tx_root = None;
}

let encoded: Block = b.try_into()?;
let block_payload = if self.quiet {
"<trimmed>".to_string()
Expand Down
Loading