Skip to content

Commit

Permalink
Update runtime::container::start() to take a new allow_local flag
Browse files Browse the repository at this point in the history
This should fix an issue encountered by people running a local Flow stack under WSL. The problem is that in that case, Docker containers run in a separate VM to the one hosting the Linux VM, meaning that the container IP address exposed via `docker inspect` is not accessible from the "host". The solution here is to ask Docker to bind a port on the host to the corresponding port inside the container.

In order to prevent binding every task to a host port in production (which runs Linux), we ended up turning off host-port mapping on Linux entirely because we didn't have a better way to identify local vs prod environments.

We have since introduced an `allow_local` flag, and can condition host-port mapping on that, enabling Flow to run inside WSL.
  • Loading branch information
jshearer committed Jan 31, 2024
1 parent 8e7e460 commit 7dde484
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 43 deletions.
1 change: 1 addition & 0 deletions crates/runtime/src/capture/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub async fn start<L: LogHandler>(
start_rpc,
&runtime.task_name,
ops::TaskType::Capture,
runtime.allow_local,
)
.await?
.boxed()
Expand Down
99 changes: 56 additions & 43 deletions crates/runtime/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub async fn start(
network: &str,
task_name: &str,
task_type: ops::TaskType,
allow_local: bool,
) -> anyhow::Result<(runtime::Container, tonic::transport::Channel, Guard)> {
// Many operational contexts only allow for docker volume mounts
// from certain locations:
Expand Down Expand Up @@ -90,54 +91,64 @@ pub async fn start(
// Generate a unique name for this container instance.
let name = unique_container_name();

let mut process: async_process::Child = async_process::Command::new("docker")
.args([
"run",
// Remove the docker container upon its exit.
"--rm",
// Addressable name of this connector.
&format!("--name={name}"),
// Network to which the container should attach.
&format!("--network={}", network),
// The entrypoint into a connector is always flow-connector-init,
// which will delegate to the actual entrypoint of the connector.
"--entrypoint=/flow-connector-init",
// Mount the flow-connector-init binary and `docker inspect` output.
&format!(
"--mount=type=bind,source={},target=/flow-connector-init",
tmp_connector_init.to_string_lossy()
),
&format!(
"--mount=type=bind,source={},target=/image-inspect.json",
tmp_docker_inspect.to_string_lossy(),
),
// Thread-through the logging configuration of the connector.
"--env=LOG_FORMAT=json",
&format!("--env=LOG_LEVEL={}", log_level.as_str_name()),
// Cgroup memory / CPU resource limits.
// TODO(johnny): we intend to tighten these down further, over time.
"--memory=1g",
"--cpus=2",
// For now, we support only Linux amd64 connectors.
"--platform=linux/amd64",
// Attach labels that let us group connector resource usage under a few dimensions.
&format!("--label=image={}", image),
&format!("--label=task-name={}", task_name),
&format!("--label=task-type={}", task_type.as_str_name()),
let mut docker_args = vec![
"run".to_string(),
// Remove the docker container upon its exit.
"--rm".to_string(),
// Addressable name of this connector.
format!("--name={name}"),
// Network to which the container should attach.
format!("--network={}", network),
// The entrypoint into a connector is always flow-connector-init,
// which will delegate to the actual entrypoint of the connector.
"--entrypoint=/flow-connector-init".to_string(),
// Mount the flow-connector-init binary and `docker inspect` output.
format!(
"--mount=type=bind,source={},target=/flow-connector-init",
tmp_connector_init.to_string_lossy()
),
format!(
"--mount=type=bind,source={},target=/image-inspect.json",
tmp_docker_inspect.to_string_lossy(),
),
// Thread-through the logging configuration of the connector.
"--env=LOG_FORMAT=json".to_string(),
format!("--env=LOG_LEVEL={}", log_level.as_str_name()),
// Cgroup memory / CPU resource limits.
// TODO(johnny): we intend to tighten these down further, over time.
"--memory=1g".to_string(),
"--cpus=2".to_string(),
// For now, we support only Linux amd64 connectors.
"--platform=linux/amd64".to_string(),
// Attach labels that let us group connector resource usage under a few dimensions.
format!("--label=image={}", image),
format!("--label=task-name={}", task_name),
format!("--label=task-type={}", task_type.as_str_name()),
];

if allow_local {
docker_args.append(&mut vec![
// Support Docker Desktop in non-production contexts (for example, `flowctl`)
// where the container IP is not directly addressable. As an alternative,
// we ask Docker to provide mapped host ports that are then advertised
// in the attached runtime::Container description.
#[cfg(not(target_os = "linux"))]
&format!("--publish=0.0.0.0:0:{CONNECTOR_INIT_PORT}"),
#[cfg(not(target_os = "linux"))]
"--publish-all",
// Image to run.
&image,
// The following are arguments of flow-connector-init, not docker.
"--image-inspect-json-path=/image-inspect.json",
&format!("--port={CONNECTOR_INIT_PORT}"),
format!("--publish=0.0.0.0:0:{CONNECTOR_INIT_PORT}"),
"--publish-all".to_string(),
])
}

docker_args.append(&mut vec![
// Image to run.
image.to_string(),
// The following are arguments of flow-connector-init, not docker.
"--image-inspect-json-path=/image-inspect.json".to_string(),
format!("--port={CONNECTOR_INIT_PORT}"),
]);

tracing::debug!(docker_args=?docker_args, "invoking docker");

let mut process: async_process::Child = async_process::Command::new("docker")
.args(docker_args)
.stdin(async_process::Stdio::null())
.stdout(async_process::Stdio::null())
.stderr(async_process::Stdio::piped())
Expand Down Expand Up @@ -491,6 +502,7 @@ mod test {
"",
"a-task-name",
proto_flow::ops::TaskType::Capture,
true,
)
.await
.unwrap();
Expand Down Expand Up @@ -545,6 +557,7 @@ mod test {
"",
"a-task-name",
proto_flow::ops::TaskType::Capture,
true,
)
.await
else {
Expand Down
1 change: 1 addition & 0 deletions crates/runtime/src/derive/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub async fn start<L: LogHandler>(
start_rpc,
&runtime.task_name,
ops::TaskType::Derivation,
runtime.allow_local,
)
.await?
.boxed()
Expand Down
2 changes: 2 additions & 0 deletions crates/runtime/src/image_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub async fn serve<Request, Response, StartRpc, Attach>(
start_rpc: StartRpc, // Begins RPC over a started container channel.
task_name: &str, // Name of this task, used to label container.
task_type: ops::TaskType, // Type of this task, for labeling container.
allow_local: bool, // Whether we're running in local dev or not.
) -> anyhow::Result<impl Stream<Item = anyhow::Result<Response>> + Send>
where
Request: serde::Serialize + Send + 'static,
Expand All @@ -36,6 +37,7 @@ where
&network,
&task_name,
task_type,
allow_local,
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions crates/runtime/src/materialize/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub async fn start<L: LogHandler>(
start_rpc,
&runtime.task_name,
ops::TaskType::Materialization,
runtime.allow_local,
)
.await?
.boxed()
Expand Down

0 comments on commit 7dde484

Please sign in to comment.