Skip to content

Commit

Permalink
flowctl: fix flowctl raw connection (broken transport)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Jul 27, 2023
1 parent c4635db commit a72570d
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions crates/flowctl/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,16 @@ pub fn docker_spawn(image: &str, args: &[&str]) -> anyhow::Result<(Child, TempDi
let child = Command::new("docker")
.args([&[
"run",
"--interactive",
"--init",
"--rm",
"--log-driver=none",
"--mount",
&format!("type=bind,source={host_inspect_str},target={target_inspect}"),
"--mount",
&format!("type=bind,source={host_connector_init_str},target={target_connector_init}"),
"--mount", &format!("type=bind,source={host_connector_init_str},target={target_connector_init}"),
"--entrypoint", target_connector_init,
"--env", "LOG_LEVEL=trace",
"--publish", &format!("0.0.0.0:{}:{}/tcp", port, CONNECTOR_INIT_PORT),
"--entrypoint",
target_connector_init,
"--mount", &format!("type=bind,source={host_inspect_str},target={target_inspect}"),
image,
&format!("--image-inspect-json-path={target_inspect}"),
&format!("--port={CONNECTOR_INIT_PORT}"),
], args].concat())
.stderr(Stdio::inherit())
.spawn()
.context("spawning docker run child")?;

Expand All @@ -65,25 +59,38 @@ pub fn docker_spawn(image: &str, args: &[&str]) -> anyhow::Result<(Child, TempDi
async fn connector_client(port: u16) -> anyhow::Result<ConnectorClient<tonic::transport::Channel>> {
loop {
match ConnectorClient::connect(format!("tcp://127.0.0.1:{port}")).await {
Ok(client) => return Ok(client),
Ok(client) => {
return Ok(client)
},
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(3000));
std::thread::sleep(std::time::Duration::from_millis(1000));
continue;
}
};
}
}

pub async fn docker_run(image: &str, req: Request) -> anyhow::Result<Response> {
pub async fn docker_run(image: &str, request: Request) -> anyhow::Result<Response> {
let (_child, _dir, port) = docker_spawn(image, &[])?;
std::thread::sleep(std::time::Duration::from_millis(1000));

let mut client = connector_client(port).await?;
loop {
let req = request.clone();
let mut client = connector_client(port).await?;

let mut response_stream = client.capture(stream::once(async { req })).await?;
let mut response_stream = match client.capture(stream::once(async { req })).await {
Ok(rs) => rs,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(1000));
continue;
}
};

let response = response_stream.get_mut().message().await?;

let response = response_stream.get_mut().message().await?;
return response.ok_or(anyhow!("no response message"))
}

return response.ok_or(anyhow!("no response message"))
}

pub async fn docker_run_stream(image: &str, stream: Pin<Box<dyn Stream<Item = Request> + Send + Sync>>) -> anyhow::Result<Pin<Box<dyn TryStream<Item = anyhow::Result<Response>, Ok = Response, Error = anyhow::Error>>>> {
Expand Down

0 comments on commit a72570d

Please sign in to comment.