Skip to content

Commit

Permalink
fwd working minus server pt interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jmwample committed Mar 23, 2024
1 parent 634369f commit 39edc22
Showing 1 changed file with 72 additions and 27 deletions.
99 changes: 72 additions & 27 deletions crates/obfs4/src/bin/fwd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use fast_socks5::{
AuthenticationMethod,
};
use safelog::sensitive;
use tokio::task::JoinSet;
use tokio::{
io::{copy_bidirectional, AsyncRead, AsyncWrite, AsyncWriteExt},
net::{TcpListener, TcpStream},
signal::unix::SignalKind,
sync::oneshot,
};
use tokio::{net::ToSocketAddrs, task::JoinSet};
use tokio_util::sync::CancellationToken;
// use tokio_stream::StreamExt;
// use tor_chanmgr::transport::proxied::{settings_to_protocol, Protocol};
Expand All @@ -37,19 +37,22 @@ use tokio_util::sync::CancellationToken;
// };
// use tor_rtcompat::PreferredRuntime;
// use tor_socksproto::{SocksAuth, SocksVersion};
use tracing::{error, info, debug, warn, Level};
use tracing::{debug, error, info, warn, Level};
use tracing_subscriber::{filter::LevelFilter, prelude::*};

use std::{
env,
net::{Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
str::FromStr,
sync::Arc,
};

/// Client Socks address to listen on.
const CLIENT_SOCKS_ADDR: &str = "127.0.0.1:0";

const U4: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
const U6: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 9000);

/// Error defined to denote a failure to get the bridge line
#[derive(Debug, thiserror::Error)]
#[error("Error while obtaining bridge line data")]
Expand All @@ -66,11 +69,11 @@ struct Args {
/// server
dst: String,

/// Listen address, defaults to ":9000" for client, ":9001" for server
/// Listen address, defaults to "[::]:9000" for client, "[::]:9001" for server
laddr: Option<String>,

/// Log Level (ERROR/WARN/INFO/DEBUG/TRACE)
#[arg(short, long, default_value_t=String::from("ERROR"))]
#[arg(short, long, default_value_t=String::from("INFO"))]
log_level: String,

/// Disable the address scrubber on logging
Expand All @@ -90,7 +93,6 @@ enum Mode {
/// tracing / tracing_subscriber libraries
// TODO: GeoIP. Json for file log writer.
fn init_logging_recvr(unsafe_logging: bool, level_str: &str) -> Result<safelog::Guard> {

let log_lvl = LevelFilter::from_str(level_str)?;

let console_layer = tracing_subscriber::fmt::layer()
Expand All @@ -100,10 +102,10 @@ fn init_logging_recvr(unsafe_logging: bool, level_str: &str) -> Result<safelog::
tracing_subscriber::registry()
.with(console_layer.boxed())
.init();
warn!("log level set to {level_str}");
info!("log level set to {level_str}");

if unsafe_logging {
info!("⚠️ ⚠️ enabling unsafe logging ⚠️ ⚠️ ");
info!("⚠️ ⚠️ unsafe logging enabled ⚠️ ⚠️ ");
safelog::disable_safe_logging().context("failed to get safelog Guard")
} else {
safelog::enforce_safe_logging().context("failed to get safelog Guard")
Expand All @@ -115,17 +117,26 @@ fn init_logging_recvr(unsafe_logging: bool, level_str: &str) -> Result<safelog::
async fn main() -> Result<()> {
let args = Args::parse();

println!("unsafe_logging: {}",args.unsafe_logging);
println!("unsafe_logging: {}", args.unsafe_logging);
// launch tracing subscriber with filter level
let _guard = init_logging_recvr(args.unsafe_logging, &args.log_level)?;

let dst_addr = SocketAddr::from_str(&args.dst)?;
let (dst_addr, echo) = match args.dst.as_str() {
"echo" => {
if args.mode == Mode::Client {
error!("echo destination is invalid while running as client");
return Ok(());
}
(U4, true)
}
d => (SocketAddr::from_str(d)?, false),
};

let listen_addr = match args.laddr {
Some(a) => SocketAddr::from_str(&a)?,
Some(a) => a,
None => match args.mode {
Mode::Client => SocketAddr::from((Ipv4Addr::UNSPECIFIED, 9000)),
Mode::Server => SocketAddr::from((Ipv4Addr::UNSPECIFIED, 9001)),
Mode::Client => String::from("[::]:9000"),
Mode::Server => String::from("[::]:9001"),
},
};

Expand All @@ -139,7 +150,7 @@ async fn main() -> Result<()> {
}
Mode::Server => {
// running as SERVER
server_setup(listen_addr, dst_addr, cancel_token.clone()).await?
server_setup(listen_addr, dst_addr, echo, cancel_token.clone()).await?
}
};

Expand Down Expand Up @@ -184,8 +195,8 @@ async fn main() -> Result<()> {
// Client //
// ================================================================ //

async fn client_setup(
listen_addr: SocketAddr,
async fn client_setup<A: ToSocketAddrs>(
listen_addrs: A,
remote_addr: SocketAddr,
cancel_token: CancellationToken,
) -> Result<oneshot::Receiver<bool>> {
Expand All @@ -195,7 +206,7 @@ async fn client_setup(
let mut listeners = Vec::new();

let builder = Obfs4PT::client_builder();
let listener = tokio::net::TcpListener::bind(CLIENT_SOCKS_ADDR).await?;
let listener = tokio::net::TcpListener::bind(listen_addrs).await?;

listeners.push(client_accept_loop(
listener,
Expand Down Expand Up @@ -415,9 +426,10 @@ where
// Server //
// ================================================================ //

async fn server_setup(
listen_addr: SocketAddr,
async fn server_setup<A: ToSocketAddrs>(
listen_addrs: A,
remote_addr: SocketAddr,
echo: bool,
cancel_token: CancellationToken,
) -> Result<oneshot::Receiver<bool>> {
let obfs4_name = Obfs4PT::name();
Expand All @@ -439,11 +451,12 @@ async fn server_setup(
// .options(bind_addr.options)?
// .build();

let listener = tokio::net::TcpListener::bind(listen_addr).await?;
let listener = tokio::net::TcpListener::bind(listen_addrs).await?;
listeners.push(server_listen_loop::<TcpStream, _>(
listener,
server,
remote_addr,
echo,
cancel_token.clone(),
));

Expand Down Expand Up @@ -478,6 +491,7 @@ async fn server_listen_loop<In, S>(
listener: TcpListener,
server: S,
remote_addr: SocketAddr,
echo: bool,
cancel_token: CancellationToken,
) -> Result<()>
where
Expand Down Expand Up @@ -508,12 +522,20 @@ where
Ok(c) => c,
};
debug!("accepted new connection -> {}:{}", sensitive(client_addr.ip()), client_addr.port());
tokio::spawn(server_handle_connection(
conn,
server.clone(),
remote_addr,
client_addr,
));
if echo {
tokio::spawn(server_echo_connection(
conn,
server.clone(),
client_addr,
));
} else {
tokio::spawn(server_handle_connection(
conn,
server.clone(),
remote_addr,
client_addr,
));
}
}
}
}
Expand All @@ -536,7 +558,6 @@ where
{
// let mut conn_pt = server.reveal(conn).await.context("server handshake failed {client_addr}")?;

// let mut conn_or = server.connect_to_or().await?;
let mut remote_conn = TcpStream::connect(remote_addr).await?;

if let Err(e) = copy_bidirectional(&mut conn, &mut remote_conn).await {
Expand All @@ -548,3 +569,27 @@ where

Ok(())
}
async fn server_echo_connection<In, S>(
mut conn: In,
server: Arc<S>,
client_addr: SocketAddr,
) -> Result<()>
where
// the provided In must be usable as a connection in an async context
In: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
// The provided S must be usable as a Pluggable Transport Server.
S: ptrs::ServerTransport<In> + Send + Sync + ptrs::ServerTransport<TcpStream>,
<S as ptrs::ServerTransport<In>>::OutErr: 'static,
{
// let mut conn_pt = server.reveal(conn).await.context("server handshake failed {client_addr}")?;

let (mut r, mut w) = tokio::io::split(conn);
if let Err(e) = tokio::io::copy(&mut r, &mut w).await {
warn!(
address = sensitive(client_addr).to_string(),
"tunnel closed with error {e:#?}"
)
}

Ok(())
}

0 comments on commit 39edc22

Please sign in to comment.