Skip to content

Commit

Permalink
validate Worker hash
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyandrews committed Aug 17, 2022
1 parent 263ff23 commit 13984fd
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 34 deletions.
76 changes: 59 additions & 17 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ impl ControllerCommand {
},
ControllerCommand::WorkerConnect => ControllerCommandDetails {
help: None,
regex: r"^WORKER-CONNECT$",
regex: r"^(WORKER-CONNECT) (\d+)$",
process_response: Box::new(|response| {
if let ControllerResponseMessage::Bool(true) = response {
Ok("worker connected".to_string())
Expand Down Expand Up @@ -1193,32 +1193,58 @@ impl GooseAttack {
// Verify running in Manager mode.
if self.configuration.manager {
// Verify expecting more Workers to connect.
// @TODO: Validate connection before sending to the Manager.
if goose_attack_run_state.gaggle_workers
< self.configuration.expect_workers.unwrap_or(0)
{
if let Some(manager) = goose_attack_run_state.manager.as_ref() {
goose_attack_run_state.gaggle_workers += 1;
info!(
"Worker {} of {} connected.",
goose_attack_run_state.gaggle_workers,
self.configuration.expect_workers.unwrap_or(0)
);
if let Some(ControllerValue::Socket(socket)) =
if let Some(ControllerValue::Socket(worker_connection)) =
message.request.value
{
// Pass the Telnet socket to the Manager thread.
let _ = manager.tx.send(ManagerMessage {
command: ManagerCommand::WorkerJoinRequest,
value: Some(socket),
});
// Use expect() as Controller uses regex to validate this is an integer.
let worker_hash =
u64::from_str(&worker_connection.hash)
.expect("failed to convert string to usize");
if worker_hash != self.metrics.hash
&& !self.configuration.no_hash_check
{
/*
self.reply_to_controller(
message,
ControllerResponseMessage::Bool(false),
);
*/
warn!("WorkerConnect request ignored, Worker hash {} does not match Manager hash {}, enable --no-hash-check to ignore.", worker_hash, self.metrics.hash)
} else {
if worker_hash != self.metrics.hash {
warn!("Ignoring that Worker hash {} does not match Manager hash {} because --no-hash-check is enabled.", worker_hash, self.metrics.hash)
} else {
warn!("Valid hash: {}", worker_hash);
}
goose_attack_run_state.gaggle_workers += 1;
info!(
"Worker {} of {} connected.",
goose_attack_run_state.gaggle_workers,
self.configuration.expect_workers.unwrap_or(0)
);
// Pass the Telnet socket to the Manager thread.
let _ = manager.tx.send(ManagerMessage {
command: ManagerCommand::WorkerJoinRequest,
value: Some(worker_connection.socket),
});
}
} else {
panic!("WorkerConnect falure, failed to move telnet socket.");
warn!("Whoops !?");
//panic!("Whoops!?");
}
} else {
panic!("WorkerConnect failure, failed to reference manager_tx.")
}
} else {
// @TODO: Can we return a helpful error?
self.reply_to_controller(
message,
ControllerResponseMessage::Bool(false),
);
warn!("WorkerConnect request ignored, all expected Workers already connected.")
}
} else {
Expand Down Expand Up @@ -1416,11 +1442,17 @@ pub(crate) struct ControllerRequestMessage {
pub value: Option<ControllerValue>,
}

#[derive(Debug)]
pub(crate) struct WorkerConnection {
hash: String,
socket: tokio::net::TcpStream,
}

/// Allows multiple types to be sent to the parent process.
#[derive(Debug)]
pub(crate) enum ControllerValue {
Text(String),
Socket(tokio::net::TcpStream),
Socket(WorkerConnection),
}

/// An enumeration of all messages the parent can reply back to the controller thread.
Expand Down Expand Up @@ -1585,6 +1617,14 @@ impl ControllerState {
if let Ok(command_string) = self.get_command_string(buf).await {
// Extract the command and value in a generic way.
if let Ok(request_message) = self.get_match(command_string.trim()).await {
let hash = if let Some(ControllerValue::Text(hash)) =
request_message.value.as_ref()
{
// Clone the value.
hash.to_string()
} else {
unreachable!("Hash must exist, enforced by regex");
};
// Workers using Telnet socket to connect to the Manager.
if request_message.command == ControllerCommand::WorkerConnect {
info!("Worker instance connecting ...");
Expand All @@ -1597,7 +1637,9 @@ impl ControllerState {
client_id: self.thread_id,
request: ControllerRequestMessage {
command: ControllerCommand::WorkerConnect,
value: Some(ControllerValue::Socket(socket)),
value: Some(ControllerValue::Socket(
WorkerConnection { hash, socket },
)),
},
})
.is_err()
Expand Down
22 changes: 18 additions & 4 deletions src/gaggle/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ enum ManagerPhase {
/// Workers are connecting to the Manager, Gaggle can not be reconfigured.
WaitForWorkers,
/// All Workers are connected and the load test is ready.
_Active,
Active,
}

impl GooseConfiguration {
Expand Down Expand Up @@ -347,8 +347,12 @@ impl GooseConfiguration {
manager_run_state.idle_status_displayed = true;
}
}
ManagerPhase::WaitForWorkers => {}
ManagerPhase::_Active => {}
ManagerPhase::WaitForWorkers => {
// @TODO: Keepalive? Timeout?
}
ManagerPhase::Active => {
// @TODO: Actually start the load test.
}
}

// Process messages received from parent or Controller thread.
Expand All @@ -358,7 +362,7 @@ impl GooseConfiguration {
ManagerCommand::WaitForWorkers => {
let expect_workers = self.expect_workers.unwrap_or(0);
if expect_workers == 1 {
info!("Manager is waiting for {} Worker.", expect_workers);
info!("Manager is waiting for 1 Worker.");
} else {
info!("Manager is waiting for {} Workers.", expect_workers);
}
Expand All @@ -371,6 +375,16 @@ impl GooseConfiguration {
}
// Store Worker socket for ongoing communications.
manager_run_state.workers.push(socket);

if let Some(expect_workers) = self.expect_workers {
if manager_run_state.workers.len() == self.expect_workers.unwrap() {
info!(
"All {} Workers have connected, starting the load test.",
expect_workers
);
manager_run_state.phase = ManagerPhase::Active;
}
}
}
ManagerCommand::_Exit => {
info!("Manager is exiting.");
Expand Down
83 changes: 71 additions & 12 deletions src/gaggle/worker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::time::{sleep, Duration};
//use tokio::io::AsyncWriteExt;
use std::io;

use crate::config::{GooseConfigure, GooseValue};
use crate::metrics::GooseCoordinatedOmissionMitigation;
Expand Down Expand Up @@ -40,15 +40,25 @@ pub(crate) struct WorkerConnection {
pub(crate) tx: WorkerTx,
}

enum ConnectionState {
WaitForPrompt,
WaitForOk,
Connected,
}

struct WorkerRunState {
/// Whether or not a message has been displayed indicating the Worker is currently idle.
idle_status_displayed: bool,
/// Whether or Worker has successfully connected to Manager instance.
connected_to_manager: bool,
/// @TODO How many times
/// @TODO: Connection status
connection_state: Option<ConnectionState>,
/// A counter tracking how many times the Worker has attempted to connect to the Manager.
connection_attempts: u8,
/// Which phase the Worker is currently operating in.
phase: WorkerPhase,
/// Whether or not a message has been displayed indicating the Worker is ready and waiting.
waiting_status_displayed: bool,
/// This variable accounts for time spent doing things which is then subtracted from
/// the time sleeping to avoid an unintentional drift in events that are supposed to
/// happen regularly.
Expand All @@ -63,8 +73,10 @@ impl WorkerRunState {
WorkerRunState {
idle_status_displayed: false,
connected_to_manager: false,
connection_state: None,
connection_attempts: 0,
phase: WorkerPhase::Idle,
waiting_status_displayed: false,
drift_timer: tokio::time::Instant::now(),
controller_rx,
stream: None,
Expand All @@ -78,7 +90,7 @@ enum WorkerPhase {
/// Trying to connect to the Manager instance.
ConnectingToManager,
/// Connected to Manager instance, waiting for the go-ahead to start load test.
_WaitingForManager,
WaitingForManager,
/// Active load test.
_Active,
Exit,
Expand Down Expand Up @@ -317,7 +329,7 @@ impl GooseConfiguration {
}

// Spawn a Worker thread, provide a channel so it can be controlled by parent and/or Control;er thread.
pub(crate) async fn setup_worker(&mut self) -> Option<(WorkerJoinHandle, WorkerTx)> {
pub(crate) async fn setup_worker(&mut self, hash: u64) -> Option<(WorkerJoinHandle, WorkerTx)> {
// There's no setup necessary if Worker mode is not enabled.
if !self.worker {
return None;
Expand All @@ -328,7 +340,8 @@ impl GooseConfiguration {
flume::unbounded();

let configuration = self.clone();
let worker_handle = tokio::spawn(async move { configuration.worker_main(worker_rx).await });
let worker_handle =
tokio::spawn(async move { configuration.worker_main(worker_rx, hash).await });

// Return worker_tx thread for the (optional) controller thread.
Some((worker_handle, worker_tx))
Expand All @@ -338,6 +351,7 @@ impl GooseConfiguration {
pub(crate) async fn worker_main(
self: GooseConfiguration,
receiver: flume::Receiver<WorkerMessage>,
hash: u64,
) -> Result<(), GooseError> {
// Initialze the Worker run state, used for the lifetime of this Worker instance.
let mut worker_run_state = WorkerRunState::new(receiver);
Expand All @@ -348,6 +362,9 @@ impl GooseConfiguration {
loop {
debug!("top of worker loop...");

// @TODO: How to detect that the socket is dropped?
// @TODO: Add a timeout.

match worker_run_state.phase {
// Display message when entering WorkerPhase::Idle, otherwise sleep waiting for a
// message from Parent or Controller thread.
Expand All @@ -362,7 +379,10 @@ impl GooseConfiguration {
if worker_run_state.connection_attempts == 0
|| worker_run_state.connection_attempts % 5 == 0
{
info!("Worker connecting to {{}}.");
info!(
"Worker connecting to {}:{}.",
self.manager_host, self.manager_port
);
}

if worker_run_state.connection_attempts >= MAX_CONNECTION_ATTEMPTS {
Expand All @@ -383,6 +403,8 @@ impl GooseConfiguration {
{
Ok(s) => {
worker_run_state.connected_to_manager = true;
worker_run_state.connection_state =
Some(ConnectionState::WaitForPrompt);
Some(s)
}
Err(e) => {
Expand All @@ -398,16 +420,53 @@ impl GooseConfiguration {
}
};
}
if let Some(stream) = worker_run_state.stream.as_ref() {
if let Some(stream) = worker_run_state.stream.as_mut() {
if let Ok(Some(message)) = read_buffer(stream) {
if message.starts_with("goose>") {
info!("Got `goose>` prompt.");
if let Some(connection_state) =
worker_run_state.connection_state.as_ref()
{
match connection_state {
ConnectionState::WaitForPrompt => {
if message.starts_with("goose>") {
info!("Got `goose>` prompt.");
worker_run_state.connection_state =
Some(ConnectionState::WaitForOk);
stream
.write_all(
format!("WORKER-CONNECT {}\n", hash).as_bytes(),
)
.await?;
} else {
panic!("Failed to get `goose>` prompt: @TODO: handle this more gracefully.");
}
}
ConnectionState::WaitForOk => {
if message.starts_with("OK") {
info!("Got OK.");
worker_run_state.connection_state =
Some(ConnectionState::Connected);
worker_run_state.phase = WorkerPhase::WaitingForManager;
} else {
panic!("Failed to get OK: @TODO: handle this more gracefully.");
}
}
_ => {
unreachable!("We should not be here.");
}
}
}
}
};
}
WorkerPhase::_WaitingForManager => {}
WorkerPhase::_Active => {}
WorkerPhase::WaitingForManager => {
if !worker_run_state.waiting_status_displayed {
info!("Standing by, waiting for Manager to start the load test...");
worker_run_state.waiting_status_displayed = true;
}
}
WorkerPhase::_Active => {
info!("Let's get this party started!");
}
WorkerPhase::Exit => {
info!("Worker is exiting.");
break;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ impl GooseAttack {
};

// Launch worker thread if enabled.
let worker = match self.configuration.setup_worker().await {
let worker = match self.configuration.setup_worker(self.metrics.hash).await {
Some((h, t)) => {
self.gaggle_phase = Some(GagglePhase::WaitingForWorkers);
Some(WorkerConnection {
Expand Down

0 comments on commit 13984fd

Please sign in to comment.