Skip to content

Commit

Permalink
kafka protocol proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
shortishly committed Aug 23, 2024
1 parent f42fc32 commit 4e44b00
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 2 deletions.
11 changes: 9 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ test-topic-describe:
kafka-topics --bootstrap-server 127.0.0.1:9092 --describe --topic test

test-topic-create:
kafka-topics --bootstrap-server 127.0.0.1:9092 --config cleanup.policy=compact --partitions=1 --replication-factor=1 --create --topic test
kafka-topics --bootstrap-server 127.0.0.1:9092 --config cleanup.policy=compact --partitions=3 --replication-factor=1 --create --topic test

test-topic-produce:
echo "h1:pqr,h2:jkl,h3:uio qwerty poiuy" | kafka-console-producer --bootstrap-server localhost:9092 --topic test --property parse.headers=true --property parse.key=true
echo "h1:pqr,h2:jkl,h3:uio qwerty poiuy\nh1:def,h2:lmn,h3:xyz asdfgh lkj\nh1:stu,h2:fgh,h3:ijk zxcvbn mnbvc" | kafka-console-producer --bootstrap-server localhost:9092 --topic test --property parse.headers=true --property parse.key=true

test-topic-consume:
kafka-console-consumer --consumer.config /usr/local/etc/kafka/consumer.properties --bootstrap-server localhost:9092 --topic test --from-beginning --property print.timestamp=true --property print.key=true --property print.offset=true --property print.partition=true --property print.headers=true --property print.value=true
Expand All @@ -35,4 +35,11 @@ tansu-1:
--raft-peer-url tcp://localhost:4569/ \
--work-dir work-dir/tansu-1

kafka-proxy:
docker run -d -p 19092:9092 apache/kafka:3.8.0

proxy:
./target/debug/tansu-proxy


all: test miri
15 changes: 15 additions & 0 deletions tansu-proxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "tansu-proxy"
edition.workspace = true
version.workspace = true

[dependencies]
bytes.workspace = true
clap.workspace = true
tansu-kafka-model = { path = "../tansu-kafka-model" }
tansu-kafka-sans-io = { path = "../tansu-kafka-sans-io" }
thiserror.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
url.workspace = true
155 changes: 155 additions & 0 deletions tansu-proxy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright ⓒ 2024 Peter Morgan <[email protected]>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::{
fmt,
io::{self, ErrorKind},
result,
sync::Arc,
};
use tansu_kafka_sans_io::{Frame, Header};
use thiserror::Error;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
};
use tracing::{debug, error, info};
use url::Url;

pub type Result<T, E = Error> = result::Result<T, E>;

#[derive(Error, Debug)]
pub enum Error {
Io(Arc<io::Error>),
Protocol(#[from] tansu_kafka_sans_io::Error),
}

impl From<io::Error> for Error {
fn from(value: io::Error) -> Self {
Self::Io(Arc::new(value))
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}

#[derive(Clone, Debug)]
pub struct Proxy {
listener: Url,
origin: Url,
}

impl Proxy {
pub fn new(listener: Url, origin: Url) -> Self {
Self { listener, origin }
}

pub async fn listen(&self) -> Result<()> {
debug!("listener: {}", self.listener.as_str());

let listener = TcpListener::bind(format!(
"{}:{}",
self.listener.host_str().unwrap(),
self.listener.port().unwrap()
))
.await?;

loop {
let (stream, addr) = listener.accept().await?;
info!(?addr);

let mut connection = Connection::open(&self.origin, stream).await?;

_ = tokio::spawn(async move {
match connection.stream_handler().await {
Err(ref error @ Error::Io(ref io)) if io.kind() == ErrorKind::UnexpectedEof => {
info!(?error);
}

Err(error) => {
error!(?error);
}

Ok(_) => {}
}
});
}
}
}

struct Connection {
proxy: TcpStream,
origin: TcpStream,
}

impl Connection {
async fn open(origin: &Url, proxy: TcpStream) -> Result<Self> {
TcpStream::connect(format!(
"{}:{}",
origin.host_str().unwrap(),
origin.port().unwrap()
))
.await
.map(|origin| Self { proxy, origin })
.map_err(Into::into)
}

async fn stream_handler(&mut self) -> Result<()> {
let mut size = [0u8; 4];

loop {
_ = self.proxy.read_exact(&mut size).await?;

let mut buffer: Vec<u8> = vec![0u8; i32::from_be_bytes(size) as usize + size.len()];
buffer[0..4].copy_from_slice(&size[..]);
_ = self.proxy.read_exact(&mut buffer[4..]).await?;

let request = Frame::request_from_bytes(&buffer)?;
debug!(?request);

match request {
Frame {
header:
Header::Request {
api_key,
api_version,
..
},
..
} => {
self.origin.write_all(&buffer).await?;

_ = self.origin.read_exact(&mut size).await?;

let mut buffer: Vec<u8> =
vec![0u8; i32::from_be_bytes(size) as usize + size.len()];
buffer[0..4].copy_from_slice(&size[..]);
_ = self.origin.read_exact(&mut buffer[4..]).await?;

let response = Frame::response_from_bytes(&buffer, api_key, api_version)?;

debug!(?response);

self.proxy.write_all(&buffer).await?;
}

_ => unreachable!(),
};
}
}
}
66 changes: 66 additions & 0 deletions tansu-proxy/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright ⓒ 2024 Peter Morgan <[email protected]>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use clap::Parser;
use tansu_proxy::Result;
use tokio::task::JoinSet;
use tracing::{debug, Level};
use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, prelude::*};
use url::Url;

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Cli {
#[arg(long, default_value = "tcp://localhost:9092")]
listener_url: Url,

#[arg(long, default_value = "tcp://localhost:19092")]
origin_url: Url,
}

#[tokio::main]
async fn main() -> Result<()> {
let filter = Targets::new().with_target("tansu_proxy", Level::DEBUG);

tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.pretty()
.with_line_number(true)
.with_span_events(FmtSpan::ACTIVE)
.with_thread_ids(true),
)
.with(filter)
.init();

let args = Cli::parse();

let mut set = JoinSet::new();

{
let proxy = tansu_proxy::Proxy::new(args.listener_url, args.origin_url);
debug!(?proxy);

set.spawn(async move { proxy.listen().await.unwrap() });
}

loop {
if set.join_next().await.is_none() {
break;
}
}

Ok(())
}

0 comments on commit 4e44b00

Please sign in to comment.