From 489bdc9ba2ebab1e85cb15f614fe54516184c3bc Mon Sep 17 00:00:00 2001 From: Anatolii Popov Date: Wed, 13 Nov 2024 17:48:56 +0200 Subject: [PATCH] Flaky integration tests fix --- .github/workflows/main_push_workflow.yml | 2 +- .../kafka/connect/s3/source/IntegrationBase.java | 14 +++++++++++--- .../kafka/connect/s3/source/IntegrationTest.java | 5 +++-- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/.github/workflows/main_push_workflow.yml b/.github/workflows/main_push_workflow.yml index 8244ec08..39353484 100644 --- a/.github/workflows/main_push_workflow.yml +++ b/.github/workflows/main_push_workflow.yml @@ -34,4 +34,4 @@ jobs: run: ./gradlew build test - name: Build in Linux if: runner.os == 'Linux' - run: ./gradlew build check test integrationTest + run: ./gradlew build check test integrationTest -i diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index c7200cad..fad594b8 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -26,6 +26,7 @@ import java.nio.file.Files; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -118,9 +119,16 @@ static LocalStackContainer createS3Container() { .withServices(LocalStackContainer.Service.S3); } - static int getRandomPort() throws IOException { - try (ServerSocket socket = new ServerSocket(0)) { - return socket.getLocalPort(); + /** + * Finds 2 simultaneously free port for Kafka listeners + * + * @return list of 2 ports + * @throws IOException + * when port allocation failure happens + */ + static List getKafkaListenerPorts() throws IOException { + try (ServerSocket socket = new ServerSocket(0); ServerSocket socket2 = new ServerSocket(0)) { + return Arrays.asList(socket.getLocalPort(), socket2.getLocalPort()); } catch (IOException e) { throw new IOException("Failed to allocate port for test", e); } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 8ae632c2..3c310d2b 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -123,8 +123,9 @@ void setUp(final TestInfo testInfo) throws Exception { testBucketAccessor.createBucket(); connectRunner = new ConnectRunner(OFFSET_FLUSH_INTERVAL_MS); - final int localListenerPort = IntegrationBase.getRandomPort(); - final int containerListenerPort = IntegrationBase.getRandomPort(); + final List ports = IntegrationBase.getKafkaListenerPorts(); + final int localListenerPort = ports.get(0); + final int containerListenerPort = ports.get(1); connectRunner.startConnectCluster(CONNECTOR_NAME, localListenerPort, containerListenerPort); adminClient = newAdminClient(connectRunner.getBootstrapServers());