Skip to content

Commit

Permalink
Integration tests clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyPopov committed Nov 12, 2024
1 parent 3bd8a17 commit 7152f73
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.aiven.kafka.connect.s3.source;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.io.File;
Expand All @@ -24,6 +25,7 @@
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -50,7 +52,6 @@
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -72,21 +73,18 @@ default AdminClient newAdminClient(final String bootstrapServers) {
return AdminClient.create(adminClientConfig);
}

static void extractConnectorPlugin(File pluginDir) throws IOException, InterruptedException {
static void extractConnectorPlugin(Path pluginDir) throws IOException, InterruptedException {
final File distFile = new File(System.getProperty("integration-test.distribution.file.path"));
assert distFile.exists();
assertThat(distFile).exists();

final String cmd = String.format("tar -xf %s --strip-components=1 -C %s", distFile, pluginDir.toString());
final Process process = Runtime.getRuntime().exec(cmd);
assert process.waitFor() == 0;
}

static File getPluginDir() throws IOException {
final File testDir = Files.createTempDirectory(S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST).toFile();

final File pluginDir = new File(testDir, PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA);
assert pluginDir.mkdirs();
return pluginDir;
static Path getPluginDir() throws IOException {
final Path testDir = Files.createTempDirectory(S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST);
return Files.createDirectories(testDir.resolve(PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA));
}

static String topicName(final TestInfo testInfo) {
Expand Down Expand Up @@ -156,11 +154,10 @@ static List<GenericRecord> consumeAvroMessages(final String topic, final int exp
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group-avro");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Assuming string
// key
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); // Avro
// deserializer
// for values
// Assuming string key
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Avro deserializer for values
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("schema.registry.url", schemaRegistryUrl); // URL of the schema registry
props.put("specific.avro.reader", "false"); // Use GenericRecord instead of specific Avro classes
Expand All @@ -186,11 +183,10 @@ static List<JsonNode> consumeJsonMessages(final String topic, final int expected
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group-json");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Assuming string
// key
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); // Json
// deserializer
// for values
// Assuming string key
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Json deserializer for values
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

try (KafkaConsumer<String, JsonNode> consumer = new KafkaConsumer<>(props)) {
Expand Down Expand Up @@ -219,21 +215,21 @@ static Map<String, Object> consumeOffsetStorageMessages(final String topic, fina
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
final Map<String, Object> messages = new HashMap<>();
consumer.subscribe(Collections.singletonList(topic));

// Poll messages from the topic
final Map<String, Object> messages = new HashMap<>();
while (messages.size() < expectedMessageCount) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(5L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(5L));
for (final ConsumerRecord<byte[], byte[]> record : records) {
messages.putAll(OBJECT_MAPPER.readValue(new String(record.value(), StandardCharsets.UTF_8), // NOPMD
new TypeReference<>() { // NOPMD
}));
Map<String, Object> offsetRec = OBJECT_MAPPER.readValue(record.value(), new TypeReference<>() { // NOPMD
});
messages.putAll(offsetRec);
}
}
return messages;

} catch (JsonProcessingException e) {
} catch (IOException e) {
throw new ConnectException("Error while consuming messages " + e.getMessage());
}
}
Expand Down
Loading

0 comments on commit 7152f73

Please sign in to comment.