Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airdrop in batches #12

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct CacheOpts {
#[derive(clap::Args)]
pub struct ConcurrencyOpts {
/// Limit the number of concurrently-running jobs
#[arg(short = 'j', long = "jobs", default_value_t = 4)]
#[arg(short = 'j', long = "jobs", default_value_t = 2)]
pub jobs: u16,
}

Expand Down Expand Up @@ -151,6 +151,10 @@ pub struct Airdrop {
#[arg(short = 'n', long, default_value_t = 1)]
pub mints_per_wallet: u32,

/// Number of NFTs to mint to each wallet specified
#[arg(short = 'b', long, default_value_t = 25)]
pub batch_size: usize,

/// Path to one or more files containing newline-separated wallet addresses
/// to mint to
///
Expand Down
239 changes: 142 additions & 97 deletions src/commands/airdrop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
io::{prelude::*, BufReader},
path::Path,
sync::Arc,
thread,
time::Duration,
};

Expand All @@ -17,8 +18,9 @@ use tokio::task::JoinHandle;
use url::Url;
use uuid::Uuid;

use self::mint_random_queued_to_drop::{
MintRandomQueuedInput, MintRandomQueuedToDropMintRandomQueuedToDropCollectionMint,
use self::mint_random_queued_to_drop_batched::{
MintRandomQueuedBatchedInput,
MintRandomQueuedToDropBatchedMintRandomQueuedToDropBatchedCollectionMints,
};
use crate::{
cache::{AirdropId, AirdropWalletsCache, Cache, CacheConfig, CreationStatus, MintRandomQueued},
Expand All @@ -36,7 +38,7 @@ use crate::{
query_path = "src/queries/mint-random.graphql",
response_derives = "Debug"
)]
struct MintRandomQueuedToDrop;
struct MintRandomQueuedToDropBatched;

#[derive(GraphQLQuery)]
#[graphql(
Expand All @@ -59,6 +61,7 @@ struct Params<'a> {
drop_id: Uuid,
compressed: bool,
mints_per_wallet: u32,
batch_size: usize,
tx: &'a Sender<Job>,
backoff: ExponentialBuilder,
}
Expand All @@ -68,27 +71,46 @@ fn read_file<N: fmt::Debug, R: BufRead>(name: N, reader: R, params: Params) -> R
drop_id,
compressed,
mints_per_wallet,
batch_size,
tx,
backoff,
} = params;

let mut batch = Vec::new();

for line in reader.lines() {
let wallet: Pubkey = line
.map_err(anyhow::Error::new)
.and_then(|l| l.trim().parse().map_err(Into::into))
.with_context(|| format!("Error parsing wallets file {name:?}"))?;

for nft_number in 1..=mints_per_wallet {
tx.send(Job::Mint(MintRandomQueuedJob {
airdrop_id: AirdropId { wallet, nft_number },
drop_id,
compressed,
backoff: backoff.build(),
}))
.context("Error seeding initial job queue")?;
batch.push(AirdropId { wallet, nft_number });

if batch.len() == batch_size {
tx.send(Job::Mint(MintRandomQueuedBatchJob {
airdrop_ids: batch,
drop_id,
compressed,
backoff: backoff.clone(),
}))
.context("Error seeding initial job queue")?;

batch = Vec::new();
}
}
}

if !batch.is_empty() {
tx.send(Job::Mint(MintRandomQueuedBatchJob {
airdrop_ids: batch,
drop_id,
compressed,
backoff,
}))
.context("Error seeding final job queue")?;
}

Ok(())
}

Expand All @@ -98,6 +120,7 @@ pub fn run(config: &Config, cache: CacheConfig, args: Airdrop) -> Result<()> {
drop_id,
no_compressed,
mints_per_wallet,
batch_size,
wallets,
} = args;

Expand All @@ -107,6 +130,7 @@ pub fn run(config: &Config, cache: CacheConfig, args: Airdrop) -> Result<()> {
drop_id,
compressed: !no_compressed,
mints_per_wallet,
batch_size,
tx: &tx,
backoff: ExponentialBuilder::default()
.with_jitter()
Expand Down Expand Up @@ -182,7 +206,7 @@ struct Context {

#[derive(Debug)]
enum Job {
Mint(MintRandomQueuedJob),
Mint(MintRandomQueuedBatchJob),
CheckStatus(CheckMintStatusJob),
}

Expand All @@ -197,104 +221,121 @@ impl Job {
}

#[derive(Debug)]
struct MintRandomQueuedJob {
airdrop_id: AirdropId,
struct MintRandomQueuedBatchJob {
airdrop_ids: Vec<AirdropId>,
drop_id: Uuid,
compressed: bool,
backoff: ExponentialBackoff,
backoff: ExponentialBuilder,
}

impl MintRandomQueuedJob {
impl MintRandomQueuedBatchJob {
fn run(self, ctx: Context) -> JoinHandle<Result<()>> {
tokio::task::spawn(async move {
let Self {
airdrop_id,
airdrop_ids,
drop_id,
compressed,
backoff,
} = self;
let AirdropId {
wallet,
nft_number: _,
} = airdrop_id;

let cache: AirdropWalletsCache = ctx.cache.get().await?;
let mut ids = Vec::new();

let record = cache.get(airdrop_id).await?;
if let Some(r) = record {
let status = CreationStatus::try_from(r.status)
.with_context(|| format!("Missing creation status for {airdrop_id:?}"))?;
for airdrop_id in airdrop_ids {
let AirdropId {
wallet,
nft_number: _,
} = airdrop_id;

match status {
CreationStatus::Created => {
info!("Mint {:?} already airdropped to {wallet:?}", r.mint_id,);
return Ok(());
},
CreationStatus::Failed => {
// TODO: retry here

warn!("Mint {:?} failed.", r.mint_id);
},
CreationStatus::Pending => {
info!("Mint {:?} is pending. Checking status again...", r.mint_id);
ctx.q
.send(Job::CheckStatus(CheckMintStatusJob {
airdrop_id,
mint_id: r.mint_id.parse().with_context(|| {
format!("Invalid mint ID for {airdrop_id:?}")
})?,
backoff,
}))
.context("Error submitting pending mint status check job")?;
},
let record = cache.get(airdrop_id).await?;

if let Some(r) = record {
let status = CreationStatus::try_from(r.status)
.with_context(|| format!("Missing creation status for {airdrop_id:?}"))?;

match status {
CreationStatus::Created => {
info!("Mint {:?} already airdropped to {wallet:?}", r.mint_id,);
return Ok(());
},
CreationStatus::Failed => {
// TODO: retry here

warn!("Mint {:?} failed.", r.mint_id);
},
CreationStatus::Pending => {
info!("Mint {:?} is pending. Checking status again...", r.mint_id);
ctx.q
.send(Job::CheckStatus(CheckMintStatusJob {
airdrop_id,
mint_id: r.mint_id.parse().with_context(|| {
format!("Invalid mint ID for {airdrop_id:?}")
})?,
backoff: backoff.build(),
}))
.context("Error submitting pending mint status check job")?;
},
}
} else {
ids.push(airdrop_id);
ctx.stats.queued_mints.increment();
}
} else {
let input = mint_random_queued_to_drop::Variables {
in_: MintRandomQueuedInput {
drop: drop_id,
recipient: wallet.to_string(),
compressed,
},
};

let res = ctx
.client
.graphql::<MintRandomQueuedToDrop>()
.post(ctx.graphql_endpoint, input, || {
format!("mintRandomQueuedToDrop mutation for {airdrop_id:?}")
})
.await?;
}

if ids.is_empty() {
return Ok(());
}

let mint_random_queued_to_drop::ResponseData {
mint_random_queued_to_drop:
mint_random_queued_to_drop::MintRandomQueuedToDropMintRandomQueuedToDrop {
collection_mint:
MintRandomQueuedToDropMintRandomQueuedToDropCollectionMint {
id, ..
},
let input = mint_random_queued_to_drop_batched::Variables {
in_: MintRandomQueuedBatchedInput {
drop: drop_id,
recipients: ids.iter().map(|r| r.wallet.to_string()).collect(),
compressed,
},
};

let res = ctx
.client
.graphql::<MintRandomQueuedToDropBatched>()
.post(ctx.graphql_endpoint, input, || {
format!("mintRandomQueuedToDropBatched mutation")
})
.await?;

let mint_random_queued_to_drop_batched::ResponseData {
mint_random_queued_to_drop_batched:
mint_random_queued_to_drop_batched::MintRandomQueuedToDropBatchedMintRandomQueuedToDropBatched {
collection_mints
},
} = res.data;

ctx.stats.queued_mints.increment();
for (mint, airdrop_id) in collection_mints.iter().zip(ids.into_iter()) {
let MintRandomQueuedToDropBatchedMintRandomQueuedToDropBatchedCollectionMints {
id,
..
} = mint;
info!("Pending for wallet {:?}", airdrop_id.wallet);

cache
.set(airdrop_id, MintRandomQueued {
mint_id: id.to_string(),
mint_address: None,
status: CreationStatus::Pending.into(),
})
.await?;

ctx.q
.send(Job::CheckStatus(CheckMintStatusJob {
.set(
airdrop_id,
mint_id: id,
backoff,
}))
.context("Error submitting mint status check job")?;
MintRandomQueued {
mint_id: id.to_string(),
mint_address: None,
status: CreationStatus::Pending.into(),
},
)
.await?;

info!("Pending for wallet {wallet:?}");
// ctx.q
// .send(Job::CheckStatus(CheckMintStatusJob {
// airdrop_id,
// mint_id: mint.id,
// backoff: backoff.build(),
// }))
// .context("Error submitting mint status check job")?;
}

thread::sleep(Duration::from_secs(15));
Ok(())
})
}
Expand Down Expand Up @@ -343,15 +384,16 @@ impl CheckMintStatusJob {
match creation_status {
mint_status::CreationStatus::CREATED => {
info!("Mint {mint_id:?} airdropped for {airdrop_id:?}");

ctx.stats.created_mints.increment();

cache
.set(airdrop_id, MintRandomQueued {
mint_id: id.to_string(),
mint_address: None,
status: CreationStatus::Created.into(),
})
.set(
airdrop_id,
MintRandomQueued {
mint_id: id.to_string(),
mint_address: None,
status: CreationStatus::Created.into(),
},
)
.await?;
},
mint_status::CreationStatus::PENDING => {
Expand Down Expand Up @@ -382,11 +424,14 @@ impl CheckMintStatusJob {
tokio::time::sleep(dur).await;

cache
.set(airdrop_id, MintRandomQueued {
mint_id: id.to_string(),
mint_address: None,
status: CreationStatus::Failed.into(),
})
.set(
airdrop_id,
MintRandomQueued {
mint_id: id.to_string(),
mint_address: None,
status: CreationStatus::Failed.into(),
},
)
.await
.context("Error submitting mint retry job")?;
},
Expand Down
8 changes: 6 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ impl Config {
let text = match fs::read_to_string(&location.path) {
Ok(s) => s,
Err(e)
if e.kind() == ErrorKind::NotFound && !matches!(location.mode, ConfigMode::ExplicitRead) =>
if e.kind() == ErrorKind::NotFound
&& !matches!(location.mode, ConfigMode::ExplicitRead) =>
{
return Ok(Self::default());
},
Expand Down Expand Up @@ -291,7 +292,10 @@ impl Config {
}

pub fn save(&mut self, location: &ConfigLocation) -> Result<()> {
assert!(location.mode.writable(), "Config::save called with a non-writable location!");
assert!(
location.mode.writable(),
"Config::save called with a non-writable location!"
);

if matches!(location.mode, ConfigMode::Implicit(_)) {
if let Some(dir) = location.path.parent() {
Expand Down
Loading