Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor socket communication handling #79

Merged
merged 2 commits into from
Sep 13, 2024
Merged

Refactor socket communication handling #79

merged 2 commits into from
Sep 13, 2024

Conversation

meanmail
Copy link

YouTrack Issues:
#HSPC-

Check list

Description

Introduce a `contextmanager` to streamline socket attachment and closing. Extract socket processing into a new helper function to improve readability and maintainability of the `docker_communicate` function.
Changed the ruff linter command from 'ruff' to 'ruff check' in the GitHub Actions auto-format workflow. This aligns the command usage with best practices and ensures consistency in the linting process.
@meanmail meanmail merged commit 5a8b74e into master Sep 13, 2024
2 checks passed
@meanmail meanmail deleted the refactoring branch September 13, 2024 11:20
@meanmail
Copy link
Author

@CodiumAI-Agent /review

@CodiumAI-Agent
Copy link

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Possible Issue

The process_sock function raises an EOFError when receiving EOF from the container, but this might not always be an error condition. Ensure that this behavior aligns with the intended use case and doesn't cause unintended disruptions.

def process_sock(
    sock: socket.SocketIO, stdin: bytes | None, log: structlog.BoundLogger
) -> tuple[bytes, int]:
    """Process the socket IO.

    Read data from the socket if it is ready for reading.
    Write data to the socket if it is ready for writing.
    :returns: A tuple containing the received data and the number of bytes written.
    """
    ready_to_read, ready_to_write, _ = select.select([sock], [sock], [], 1)
    received_data: bytes = b""
    bytes_written = 0
    if ready_to_read:
        data = _socket_read(sock)
        if data is None:
            msg = "Received EOF from the container"
            raise EOFError(msg)
        received_data = data

    if ready_to_write and stdin:
        bytes_written = _socket_write(sock, stdin)
        if bytes_written >= len(stdin):
            log.debug(
                "All input data has been sent. "
                "Shut down the write half of the socket.",
            )
            sock._sock.shutdown(socket.SHUT_WR)  # type: ignore[attr-defined]

    if not ready_to_read and (not ready_to_write or not stdin):
        # Save CPU time by sleeping when there is no IO activity.
        time.sleep(0.05)

    return received_data, bytes_written
Resource Management

The attach_socket context manager ensures socket closure, but verify that all edge cases (e.g., exceptions during socket operations) are handled to prevent resource leaks.

@contextmanager
def attach_socket(
    docker_client: DockerClient,
    container: Container,
    params: dict[str, Any],
) -> Iterator[socket.SocketIO]:
    sock = docker_client.api.attach_socket(container.id, params=params)

    yield sock

    sock.close()
Timeout Handling

In docker_communicate, the timeout handling logic might raise a TimeoutError prematurely if the container is still processing. Ensure this behavior is consistent with expected container lifecycle management.

        "stdin": 1,
        "stdout": 1,
        "stderr": 1,
        "stream": 1,
        "logs": 0,
    }

    with attach_socket(docker_client, container, params) as sock:
        sock._sock.setblocking(False)  # type: ignore[attr-defined]  # Make socket non-blocking
        log.info(
            "Attached to the container",
            params=params,
            fd=sock.fileno(),
            timeout=timeout,
        )
        if not stdin:
            log.debug("There is no input data. Shut down the write half of the socket.")
            sock._sock.shutdown(socket.SHUT_WR)  # type: ignore[attr-defined]
        if start_container:
            container.start()
            log.info("Container started")

        stream_data = b""
        start_time = time.monotonic()
        while timeout is None or time.monotonic() - start_time < timeout:
            try:
                received_data, bytes_written = process_sock(sock, stdin, log)
            except ConnectionResetError:
                log.warning(
                    "Connection reset caught on reading the container "
                    "output stream. Break communication",
                )
                break
            except BrokenPipeError:
                # Broken pipe may happen when a container terminates quickly
                # (e.g. OOM Killer) and docker manages to close the socket
                # almost immediately before we're trying to write to stdin.
                log.warning(
                    "Broken pipe caught on writing to stdin. Break communication",
                )
                break
            except EOFError:
                log.debug("Container output reached EOF. Closing the socket")
                break

            if received_data:
                stream_data += received_data
            if stdin and bytes_written > 0:
                stdin = stdin[bytes_written:]
        else:
            msg = "Container didn't terminate after timeout seconds"
            raise TimeoutError(msg)

    return demultiplex_docker_stream(stream_data)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants