Skip to content

Commit

Permalink
bundle replay params (#3376)
Browse files Browse the repository at this point in the history
  • Loading branch information
bw-solana authored Oct 30, 2024
1 parent ffd261e commit c80a411
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 78 deletions.
121 changes: 79 additions & 42 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,24 +251,54 @@ pub struct ReplayStageConfig {
pub vote_account: Pubkey,
pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
pub exit: Arc<AtomicBool>,
pub rpc_subscriptions: Arc<RpcSubscriptions>,
pub slot_status_notifier: Option<SlotStatusNotifier>,
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
pub accounts_background_request_sender: AbsRequestSender,
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_meta_sender: Option<CacheBlockMetaSender>,
pub entry_notification_sender: Option<EntryNotifierSender>,
pub bank_notification_sender: Option<BankNotificationSenderConfig>,
pub wait_for_vote_to_start_leader: bool,
pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender,
pub tower_storage: Arc<dyn TowerStorage>,
// Stops voting until this slot has been reached. Should be used to avoid
// duplicate voting which can lead to slashing.
pub wait_to_vote_slot: Option<Slot>,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub blockstore: Arc<Blockstore>,
pub bank_forks: Arc<RwLock<BankForks>>,
pub cluster_info: Arc<ClusterInfo>,
pub poh_recorder: Arc<RwLock<PohRecorder>>,
pub tower: Tower,
pub vote_tracker: Arc<VoteTracker>,
pub cluster_slots: Arc<ClusterSlots>,
pub log_messages_bytes_limit: Option<usize>,
pub prioritization_fee_cache: Arc<PrioritizationFeeCache>,
pub banking_tracer: Arc<BankingTracer>,
}

pub struct ReplaySenders {
pub rpc_subscriptions: Arc<RpcSubscriptions>,
pub slot_status_notifier: Option<SlotStatusNotifier>,
pub accounts_background_request_sender: AbsRequestSender,
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_meta_sender: Option<CacheBlockMetaSender>,
pub entry_notification_sender: Option<EntryNotifierSender>,
pub bank_notification_sender: Option<BankNotificationSenderConfig>,
pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender,
pub retransmit_slots_sender: Sender<u64>,
pub replay_vote_sender: ReplayVoteSender,
pub cluster_slots_update_sender: Sender<Vec<u64>>,
pub cost_update_sender: Sender<CostUpdate>,
pub voting_sender: Sender<VoteOp>,
pub drop_bank_sender: Sender<Vec<BankWithScheduler>>,
pub block_metadata_notifier: Option<BlockMetadataNotifierArc>,
pub dumped_slots_sender: Sender<Vec<(u64, Hash)>>,
}

pub struct ReplayReceivers {
pub ledger_signal_receiver: Receiver<bool>,
pub duplicate_slots_receiver: Receiver<u64>,
pub ancestor_duplicate_slots_receiver: Receiver<AncestorDuplicateSlotToRepair>,
pub duplicate_confirmed_slots_receiver: Receiver<Vec<(u64, Hash)>>,
pub gossip_verified_vote_hash_receiver: Receiver<(Pubkey, u64, Hash)>,
pub popular_pruned_forks_receiver: Receiver<Vec<u64>>,
}

/// Timing information for the ReplayStage main processing loop
Expand Down Expand Up @@ -506,55 +536,62 @@ pub struct ReplayStage {
}

impl ReplayStage {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: ReplayStageConfig,
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
ledger_signal_receiver: Receiver<bool>,
duplicate_slots_receiver: DuplicateSlotReceiver,
poh_recorder: Arc<RwLock<PohRecorder>>,
mut tower: Tower,
vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: Sender<Slot>,
ancestor_duplicate_slots_receiver: AncestorDuplicateSlotsReceiver,
replay_vote_sender: ReplayVoteSender,
duplicate_confirmed_slots_receiver: DuplicateConfirmedSlotsReceiver,
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
cluster_slots_update_sender: ClusterSlotsUpdateSender,
cost_update_sender: Sender<CostUpdate>,
voting_sender: Sender<VoteOp>,
drop_bank_sender: Sender<Vec<BankWithScheduler>>,
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
dumped_slots_sender: DumpedSlotsSender,
banking_tracer: Arc<BankingTracer>,
popular_pruned_forks_receiver: PopularPrunedForksReceiver,
senders: ReplaySenders,
receivers: ReplayReceivers,
) -> Result<Self, String> {
let ReplayStageConfig {
vote_account,
authorized_voter_keypairs,
exit,
leader_schedule_cache,
block_commitment_cache,
wait_for_vote_to_start_leader,
tower_storage,
wait_to_vote_slot,
replay_forks_threads,
replay_transactions_threads,
blockstore,
bank_forks,
cluster_info,
poh_recorder,
mut tower,
vote_tracker,
cluster_slots,
log_messages_bytes_limit,
prioritization_fee_cache,
banking_tracer,
} = config;

let ReplaySenders {
rpc_subscriptions,
slot_status_notifier,
leader_schedule_cache,
accounts_background_request_sender,
block_commitment_cache,
transaction_status_sender,
rewards_recorder_sender,
cache_block_meta_sender,
entry_notification_sender,
bank_notification_sender,
wait_for_vote_to_start_leader,
ancestor_hashes_replay_update_sender,
tower_storage,
wait_to_vote_slot,
replay_forks_threads,
replay_transactions_threads,
} = config;
retransmit_slots_sender,
replay_vote_sender,
cluster_slots_update_sender,
cost_update_sender,
voting_sender,
drop_bank_sender,
block_metadata_notifier,
dumped_slots_sender,
} = senders;

let ReplayReceivers {
ledger_signal_receiver,
duplicate_slots_receiver,
ancestor_duplicate_slots_receiver,
duplicate_confirmed_slots_receiver,
gossip_verified_vote_hash_receiver,
popular_pruned_forks_receiver,
} = receivers;

trace!("replay stage");
// Start the replay stage loop
Expand Down
80 changes: 44 additions & 36 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use {
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
repair::repair_service::{OutstandingShredRepairs, RepairInfo},
replay_stage::{ReplayStage, ReplayStageConfig},
replay_stage::{ReplayReceivers, ReplaySenders, ReplayStage, ReplayStageConfig},
rewards_recorder_service::RewardsRecorderSender,
shred_fetch_stage::ShredFetchStage,
voting_service::VotingService,
Expand Down Expand Up @@ -269,29 +269,62 @@ impl Tvu {
exit.clone(),
);

let replay_stage_config = ReplayStageConfig {
vote_account: *vote_account,
authorized_voter_keypairs,
exit: exit.clone(),
let (cost_update_sender, cost_update_receiver) = unbounded();
let (drop_bank_sender, drop_bank_receiver) = unbounded();
let (voting_sender, voting_receiver) = unbounded();

let replay_senders = ReplaySenders {
rpc_subscriptions: rpc_subscriptions.clone(),
slot_status_notifier,
leader_schedule_cache: leader_schedule_cache.clone(),
accounts_background_request_sender,
block_commitment_cache,
transaction_status_sender,
rewards_recorder_sender,
cache_block_meta_sender,
entry_notification_sender,
bank_notification_sender,
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
ancestor_hashes_replay_update_sender,
retransmit_slots_sender,
replay_vote_sender,
cluster_slots_update_sender,
cost_update_sender,
voting_sender,
drop_bank_sender,
block_metadata_notifier,
dumped_slots_sender,
};

let replay_receivers = ReplayReceivers {
ledger_signal_receiver,
duplicate_slots_receiver,
ancestor_duplicate_slots_receiver,
duplicate_confirmed_slots_receiver,
gossip_verified_vote_hash_receiver,
popular_pruned_forks_receiver,
};

let replay_stage_config = ReplayStageConfig {
vote_account: *vote_account,
authorized_voter_keypairs,
exit: exit.clone(),
leader_schedule_cache: leader_schedule_cache.clone(),
block_commitment_cache,
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
tower_storage: tower_storage.clone(),
wait_to_vote_slot,
replay_forks_threads: tvu_config.replay_forks_threads,
replay_transactions_threads: tvu_config.replay_transactions_threads,
blockstore: blockstore.clone(),
bank_forks: bank_forks.clone(),
cluster_info: cluster_info.clone(),
poh_recorder: poh_recorder.clone(),
tower,
vote_tracker,
cluster_slots,
log_messages_bytes_limit,
prioritization_fee_cache: prioritization_fee_cache.clone(),
banking_tracer,
};

let (voting_sender, voting_receiver) = unbounded();
let voting_service = VotingService::new(
voting_receiver,
cluster_info.clone(),
Expand All @@ -312,42 +345,17 @@ impl Tvu {
}
});

let (cost_update_sender, cost_update_receiver) = unbounded();
let cost_update_service = CostUpdateService::new(blockstore.clone(), cost_update_receiver);

let (drop_bank_sender, drop_bank_receiver) = unbounded();

let drop_bank_service = DropBankService::new(drop_bank_receiver);

let replay_stage = if in_wen_restart {
None
} else {
Some(ReplayStage::new(
replay_stage_config,
blockstore.clone(),
bank_forks.clone(),
cluster_info.clone(),
ledger_signal_receiver,
duplicate_slots_receiver,
poh_recorder.clone(),
tower,
vote_tracker,
cluster_slots,
retransmit_slots_sender,
ancestor_duplicate_slots_receiver,
replay_vote_sender,
duplicate_confirmed_slots_receiver,
gossip_verified_vote_hash_receiver,
cluster_slots_update_sender,
cost_update_sender,
voting_sender,
drop_bank_sender,
block_metadata_notifier,
log_messages_bytes_limit,
prioritization_fee_cache.clone(),
dumped_slots_sender,
banking_tracer,
popular_pruned_forks_receiver,
replay_senders,
replay_receivers,
)?)
};

Expand Down

0 comments on commit c80a411

Please sign in to comment.