From e5619a5b5728e3bbe18d44b40e546b16c3337f51 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 30 Oct 2023 18:29:56 +0200 Subject: [PATCH 1/3] refactor: move sensor provider to common package Make it available for storage layer --- .../java/io/aiven/kafka/tieredstorage/metrics/SensorProvider.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {core => commons}/src/main/java/io/aiven/kafka/tieredstorage/metrics/SensorProvider.java (100%) diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/SensorProvider.java b/commons/src/main/java/io/aiven/kafka/tieredstorage/metrics/SensorProvider.java similarity index 100% rename from core/src/main/java/io/aiven/kafka/tieredstorage/metrics/SensorProvider.java rename to commons/src/main/java/io/aiven/kafka/tieredstorage/metrics/SensorProvider.java From c55aa7ec9785709d7eb4e3bfd72cc9dc61faea3d Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 30 Oct 2023 18:30:50 +0200 Subject: [PATCH 2/3] feat: add metrics to azure blob storage --- .../azure/AzureBlobStorageMetricsTest.java | 206 ++++++++++++++++++ .../storage/azure/AzureBlobStorage.java | 14 +- .../storage/azure/MetricCollector.java | 148 +++++++++++++ .../storage/azure/MetricCollectorTest.java | 54 +++++ 4 files changed, 420 insertions(+), 2 deletions(-) create mode 100644 storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageMetricsTest.java create mode 100644 storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/MetricCollector.java create mode 100644 storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/MetricCollectorTest.java diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageMetricsTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageMetricsTest.java new file mode 100644 index 000000000..cbd1127a4 --- /dev/null +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageMetricsTest.java @@ -0,0 +1,206 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.management.ManagementFactory; +import java.util.Map; +import java.util.stream.Stream; + +import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; +import io.aiven.kafka.tieredstorage.storage.StorageBackend; +import io.aiven.kafka.tieredstorage.storage.StorageBackendException; +import io.aiven.kafka.tieredstorage.storage.TestObjectKey; + +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Named; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.DOUBLE; + +@Testcontainers +public class AzureBlobStorageMetricsTest { + static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer(); + private static final int UPLOAD_BLOCK_SIZE = 256 * 1024; + private static final int BLOB_STORAGE_PORT = 10000; + @Container + static final GenericContainer AZURITE_SERVER = + new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite")) + .withCopyFileToContainer( + MountableFile.forClasspathResource("/azurite-cert.pem"), + "/opt/azurite/azurite-cert.pem") + .withCopyFileToContainer( + MountableFile.forClasspathResource("/azurite-key.pem"), + "/opt/azurite/azurite-key.pem") + .withExposedPorts(BLOB_STORAGE_PORT) + .withCommand("azurite-blob --blobHost 0.0.0.0 " + + "--cert /opt/azurite/azurite-cert.pem --key /opt/azurite/azurite-key.pem"); + + static BlobServiceClient blobServiceClient; + + protected String azureContainerName; + + protected static String endpoint() { + return "https://127.0.0.1:" + AZURITE_SERVER.getMappedPort(BLOB_STORAGE_PORT) + "/devstoreaccount1"; + } + + protected static String connectionString() { + // The well-known Azurite connection string. + return "DefaultEndpointsProtocol=https;" + + "AccountName=devstoreaccount1;" + + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" + + "BlobEndpoint=" + endpoint() + ";"; + } + + @BeforeAll + static void setUpClass() { + // Generally setting JVM-wide trust store needed only for one test may be not OK, + // but it's not conflicting with any other test now and this is the most straightforward way + // to make the self-signed certificate work. + System.setProperty("javax.net.ssl.trustStore", + AzureBlobStorageMetricsTest.class.getResource("/azurite-cacerts.jks").getPath()); + System.setProperty("javax.net.ssl.trustStorePassword", "changeit"); + blobServiceClient = new BlobServiceClientBuilder() + .connectionString(connectionString()) + .buildClient(); + } + + @BeforeEach + void setUp(final TestInfo testInfo) { + azureContainerName = testInfo.getDisplayName() + .toLowerCase() + .replace(" ", "") + .replace(",", "-") + .replace("(", "") + .replace(")", "") + .replace("[", "") + .replace("]", ""); + while (azureContainerName.length() < 3) { + azureContainerName += azureContainerName; + } + blobServiceClient.createBlobContainer(azureContainerName); + } + + StorageBackend storage() { + final AzureBlobStorage azureBlobStorage = new AzureBlobStorage(); + // The well-known Azurite account name and key. + final String accountName = "devstoreaccount1"; + final String accountKey = + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; + final Map configs = Map.of( + "azure.container.name", azureContainerName, + "azure.account.name", accountName, + "azure.account.key", accountKey, + "azure.endpoint.url", endpoint(), + "azure.upload.block.size", UPLOAD_BLOCK_SIZE + ); + azureBlobStorage.configure(configs); + return azureBlobStorage; + } + + static Stream metricsShouldBeReported() { + return Stream.of( + Arguments.of( + Named.of("smaller-than-block-size-payload", UPLOAD_BLOCK_SIZE - 1), + 1, 0, 0), + Arguments.of( + Named.of("larger-than-block-size-payload", UPLOAD_BLOCK_SIZE + 1), + 0, 2, 1) + ); + } + + @ParameterizedTest + @MethodSource + void metricsShouldBeReported( + final int uploadBlockSize, + final double expectedPutBlob, + final double expectedPutBlock, + final double expectedPutBlockList + ) throws StorageBackendException, IOException, JMException { + final byte[] data = new byte[uploadBlockSize]; + + final ObjectKey key = new TestObjectKey("x"); + + final var storage = storage(); + + storage.upload(new ByteArrayInputStream(data), key); + try (final InputStream fetch = storage.fetch(key)) { + fetch.readAllBytes(); + } + try (final InputStream fetch = storage.fetch(key, BytesRange.of(0, 1))) { + fetch.readAllBytes(); + } + storage.delete(key); + + final ObjectName objectName = + ObjectName.getInstance("aiven.kafka.server.tieredstorage.azure:type=azure-blob-metrics"); + assertThat(MBEAN_SERVER.getAttribute(objectName, "blob-get-rate")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + assertThat(MBEAN_SERVER.getAttribute(objectName, "blob-get-total")) + .isEqualTo(2.0); + + if (expectedPutBlob > 0) { + assertThat(MBEAN_SERVER.getAttribute(objectName, "blob-upload-rate")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + } + assertThat(MBEAN_SERVER.getAttribute(objectName, "blob-upload-total")) + .isEqualTo(expectedPutBlob); + + if (expectedPutBlock > 0) { + assertThat(MBEAN_SERVER.getAttribute(objectName, "block-upload-rate")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + } + assertThat(MBEAN_SERVER.getAttribute(objectName, "block-upload-total")) + .isEqualTo(expectedPutBlock); + + if (expectedPutBlockList > 0) { + assertThat(MBEAN_SERVER.getAttribute(objectName, "block-list-upload-rate")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + } + assertThat(MBEAN_SERVER.getAttribute(objectName, "block-list-upload-total")) + .isEqualTo(expectedPutBlockList); + + assertThat(MBEAN_SERVER.getAttribute(objectName, "blob-delete-rate")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + assertThat(MBEAN_SERVER.getAttribute(objectName, "blob-delete-total")) + .isEqualTo(1.0); + } +} diff --git a/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java index c7a49693e..7b7c1711a 100644 --- a/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java +++ b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java @@ -43,6 +43,7 @@ public class AzureBlobStorage implements StorageBackend { private AzureBlobStorageConfig config; private BlobContainerClient blobContainerClient; + private MetricCollector metricsPolicy; @Override public void configure(final Map configs) { @@ -65,7 +66,11 @@ public void configure(final Map configs) { } } - blobContainerClient = blobServiceClientBuilder.buildClient() + metricsPolicy = new MetricCollector(config); + + blobContainerClient = blobServiceClientBuilder + .addPolicy(metricsPolicy.policy()) + .buildClient() .getBlobContainerClient(config.containerName()); } @@ -96,6 +101,7 @@ public long upload(final InputStream inputStream, final ObjectKey key) throws St } } final BlockBlobClient blockBlobClient = specializedBlobClientBuilder + .addPolicy(metricsPolicy.policy()) .containerName(config.containerName()) .blobName(key.value()) .buildBlockBlobClient(); @@ -108,6 +114,9 @@ public long upload(final InputStream inputStream, final ObjectKey key) throws St parallelTransferOptions.setMaxSingleUploadSizeLong(blockSizeLong); final BlockBlobOutputStreamOptions options = new BlockBlobOutputStreamOptions() .setParallelTransferOptions(parallelTransferOptions); + // Be aware that metrics instrumentation is based on PutBlob (single upload), PutBlock (upload part), + // and PutBlockList (complete upload) used by this call. + // If upload changes, change metrics instrumentation accordingly. try (OutputStream os = new BufferedOutputStream( blockBlobClient.getBlobOutputStream(options), config.uploadBlockSize())) { return inputStream.transferTo(os); @@ -119,7 +128,8 @@ public long upload(final InputStream inputStream, final ObjectKey key) throws St @Override public InputStream fetch(final ObjectKey key) throws StorageBackendException { try { - return blobContainerClient.getBlobClient(key.value()).openInputStream(); + return blobContainerClient.getBlobClient(key.value()) + .openInputStream(); } catch (final BlobStorageException e) { if (e.getStatusCode() == 404) { throw new KeyNotFoundException(this, key, e); diff --git a/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/MetricCollector.java b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/MetricCollector.java new file mode 100644 index 000000000..a545dd92c --- /dev/null +++ b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/MetricCollector.java @@ -0,0 +1,148 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.utils.Time; + +import io.aiven.kafka.tieredstorage.metrics.SensorProvider; + +import com.azure.core.http.HttpPipelineCallContext; +import com.azure.core.http.HttpPipelineNextPolicy; +import com.azure.core.http.HttpPipelineNextSyncPolicy; +import com.azure.core.http.HttpResponse; +import com.azure.core.http.policy.HttpPipelinePolicy; +import reactor.core.publisher.Mono; + +public class MetricCollector { + private final org.apache.kafka.common.metrics.Metrics metrics; + + private static final String METRIC_GROUP = "azure-blob-metrics"; + + final AzureBlobStorageConfig config; + + MetricCollector(final AzureBlobStorageConfig config) { + this.config = config; + + final JmxReporter reporter = new JmxReporter(); + + metrics = new org.apache.kafka.common.metrics.Metrics( + new MetricConfig(), List.of(reporter), Time.SYSTEM, + new KafkaMetricsContext("aiven.kafka.server.tieredstorage.azure") + ); + } + + Pattern pathPattern() { + // account is in the hostname when on azure, but included on the path when testing on Azurite + final var maybeAccountName = "(/" + config.accountName() + ")?"; + final var exp = "^" + maybeAccountName + "/" + config.containerName() + "/" + "([^/])"; + return Pattern.compile(exp); + } + + MetricsPolicy policy() { + return new MetricsPolicy(metrics, pathPattern()); + } + + static class MetricsPolicy implements HttpPipelinePolicy { + + static final Pattern UPLOAD_QUERY_PATTERN = Pattern.compile("comp=(?[^&]+)"); + + private final Sensor deleteBlobRequests; + private final Sensor uploadBlobRequests; + private final Sensor uploadBlockRequests; + private final Sensor uploadBlockListRequests; + private final Sensor getBlobRequests; + + private final Metrics metrics; + private final Pattern pathPattern; + + MetricsPolicy(final Metrics metrics, final Pattern pathPattern) { + this.metrics = metrics; + this.pathPattern = pathPattern; + this.deleteBlobRequests = createSensor("blob-delete"); + this.uploadBlobRequests = createSensor("blob-upload"); + this.uploadBlockRequests = createSensor("block-upload"); + this.uploadBlockListRequests = createSensor("block-list-upload"); + this.getBlobRequests = createSensor("blob-get"); + } + + private Sensor createSensor(final String name) { + return new SensorProvider(metrics, name) + .with(new MetricNameTemplate(name + "-rate", METRIC_GROUP, ""), new Rate()) + .with(new MetricNameTemplate(name + "-total", METRIC_GROUP, ""), new CumulativeCount()) + .get(); + } + + @Override + public Mono process(final HttpPipelineCallContext context, final HttpPipelineNextPolicy next) { + processMetrics(context); + return next.process(); + } + + @Override + public HttpResponse processSync(final HttpPipelineCallContext context, final HttpPipelineNextSyncPolicy next) { + processMetrics(context); + return next.processSync(); + } + + void processMetrics(final HttpPipelineCallContext context) { + final var httpRequest = context.getHttpRequest(); + final var path = httpRequest.getUrl().getPath(); + if (pathPattern.matcher(path).matches()) { + switch (httpRequest.getHttpMethod()) { + case GET: + getBlobRequests.record(); + break; + case PUT: + final var q = httpRequest.getUrl().getQuery(); + if (q == null) { + uploadBlobRequests.record(); + break; + } + final var matcher = UPLOAD_QUERY_PATTERN.matcher(q); + if (matcher.find()) { + final var comp = matcher.group("comp"); + switch (comp) { + case "block": + uploadBlockRequests.record(); + break; + case "blocklist": + uploadBlockListRequests.record(); + break; + default: + } + } + break; + case DELETE: + deleteBlobRequests.record(); + break; + default: + } + } + } + } +} diff --git a/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/MetricCollectorTest.java b/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/MetricCollectorTest.java new file mode 100644 index 000000000..1196605b5 --- /dev/null +++ b/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/MetricCollectorTest.java @@ -0,0 +1,54 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.util.Map; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.assertj.core.api.Assertions.assertThat; + +class MetricCollectorTest { + + @Test + void pathInDevWithAccountName() { + final var props = Map.of("azure.account.name", "test-account", + "azure.container.name", "cont1"); + final var metrics = new MetricCollector(new AzureBlobStorageConfig(props)); + final var matcher = metrics.pathPattern().matcher("/test-account/cont1/x"); + assertThat(matcher).matches(); + } + + @Test + void pathInProdWithoutAccountName() { + final var props = Map.of("azure.account.name", "test-account", + "azure.container.name", "cont1"); + final var metrics = new MetricCollector(new AzureBlobStorageConfig(props)); + final var matcher = metrics.pathPattern().matcher("/cont1/x"); + assertThat(matcher).matches(); + } + + @ParameterizedTest + @ValueSource(strings = {"comp=test", "comp=test&post=val", "pre=val&comp=test", "pre=val&comp=test&post=val"}) + void uploadQueryWithComp(final String query) { + final var matcher = MetricCollector.MetricsPolicy.UPLOAD_QUERY_PATTERN.matcher(query); + assertThat(matcher.find()).isTrue(); + assertThat(matcher.group("comp")).isEqualTo("test"); + } +} From 4688b0df741b8552f53b98d8fc791ef0d19af2ab Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 7 Nov 2023 15:07:59 +0200 Subject: [PATCH 3/3] refactor: move azurite utils to a separate class for reuse --- .../azure/AzureBlobStorageAccountKeyTest.java | 4 +- .../AzureBlobStorageConnectionStringTest.java | 4 +- .../azure/AzureBlobStorageMetricsTest.java | 33 +++--------- .../azure/AzureBlobStorageSasTokenTest.java | 4 +- .../storage/azure/AzureBlobStorageTest.java | 32 ++---------- .../azure/AzuriteBlobStorageUtils.java | 50 +++++++++++++++++++ 6 files changed, 70 insertions(+), 57 deletions(-) create mode 100644 storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzuriteBlobStorageUtils.java diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java index f1a6dfdfa..695526aa2 100644 --- a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java @@ -20,6 +20,8 @@ import io.aiven.kafka.tieredstorage.storage.StorageBackend; +import static io.aiven.kafka.tieredstorage.storage.azure.AzuriteBlobStorageUtils.endpoint; + class AzureBlobStorageAccountKeyTest extends AzureBlobStorageTest { @Override protected StorageBackend storage() { @@ -32,7 +34,7 @@ protected StorageBackend storage() { "azure.container.name", azureContainerName, "azure.account.name", accountName, "azure.account.key", accountKey, - "azure.endpoint.url", endpoint() + "azure.endpoint.url", endpoint(AZURITE_SERVER, BLOB_STORAGE_PORT) ); azureBlobStorage.configure(configs); return azureBlobStorage; diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java index 6ab5e2b73..f043245f8 100644 --- a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java @@ -20,13 +20,15 @@ import io.aiven.kafka.tieredstorage.storage.StorageBackend; +import static io.aiven.kafka.tieredstorage.storage.azure.AzuriteBlobStorageUtils.connectionString; + class AzureBlobStorageConnectionStringTest extends AzureBlobStorageTest { @Override protected StorageBackend storage() { final AzureBlobStorage azureBlobStorage = new AzureBlobStorage(); final Map configs = Map.of( "azure.container.name", azureContainerName, - "azure.connection.string", connectionString() + "azure.connection.string", connectionString(AZURITE_SERVER, BLOB_STORAGE_PORT) ); azureBlobStorage.configure(configs); return azureBlobStorage; diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageMetricsTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageMetricsTest.java index cbd1127a4..68cff6f5d 100644 --- a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageMetricsTest.java +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageMetricsTest.java @@ -45,9 +45,10 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.MountableFile; +import static io.aiven.kafka.tieredstorage.storage.azure.AzuriteBlobStorageUtils.azuriteContainer; +import static io.aiven.kafka.tieredstorage.storage.azure.AzuriteBlobStorageUtils.connectionString; +import static io.aiven.kafka.tieredstorage.storage.azure.AzuriteBlobStorageUtils.endpoint; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.InstanceOfAssertFactories.DOUBLE; @@ -57,34 +58,12 @@ public class AzureBlobStorageMetricsTest { private static final int UPLOAD_BLOCK_SIZE = 256 * 1024; private static final int BLOB_STORAGE_PORT = 10000; @Container - static final GenericContainer AZURITE_SERVER = - new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite")) - .withCopyFileToContainer( - MountableFile.forClasspathResource("/azurite-cert.pem"), - "/opt/azurite/azurite-cert.pem") - .withCopyFileToContainer( - MountableFile.forClasspathResource("/azurite-key.pem"), - "/opt/azurite/azurite-key.pem") - .withExposedPorts(BLOB_STORAGE_PORT) - .withCommand("azurite-blob --blobHost 0.0.0.0 " - + "--cert /opt/azurite/azurite-cert.pem --key /opt/azurite/azurite-key.pem"); + static final GenericContainer AZURITE_SERVER = azuriteContainer(BLOB_STORAGE_PORT); static BlobServiceClient blobServiceClient; protected String azureContainerName; - protected static String endpoint() { - return "https://127.0.0.1:" + AZURITE_SERVER.getMappedPort(BLOB_STORAGE_PORT) + "/devstoreaccount1"; - } - - protected static String connectionString() { - // The well-known Azurite connection string. - return "DefaultEndpointsProtocol=https;" - + "AccountName=devstoreaccount1;" - + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" - + "BlobEndpoint=" + endpoint() + ";"; - } - @BeforeAll static void setUpClass() { // Generally setting JVM-wide trust store needed only for one test may be not OK, @@ -94,7 +73,7 @@ static void setUpClass() { AzureBlobStorageMetricsTest.class.getResource("/azurite-cacerts.jks").getPath()); System.setProperty("javax.net.ssl.trustStorePassword", "changeit"); blobServiceClient = new BlobServiceClientBuilder() - .connectionString(connectionString()) + .connectionString(connectionString(AZURITE_SERVER, BLOB_STORAGE_PORT)) .buildClient(); } @@ -124,7 +103,7 @@ StorageBackend storage() { "azure.container.name", azureContainerName, "azure.account.name", accountName, "azure.account.key", accountKey, - "azure.endpoint.url", endpoint(), + "azure.endpoint.url", endpoint(AZURITE_SERVER, BLOB_STORAGE_PORT), "azure.upload.block.size", UPLOAD_BLOCK_SIZE ); azureBlobStorage.configure(configs); diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageSasTokenTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageSasTokenTest.java index 3d5ecba0e..7d1acf1a9 100644 --- a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageSasTokenTest.java +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageSasTokenTest.java @@ -24,6 +24,8 @@ import com.azure.storage.blob.sas.BlobContainerSasPermission; import com.azure.storage.blob.sas.BlobServiceSasSignatureValues; +import static io.aiven.kafka.tieredstorage.storage.azure.AzuriteBlobStorageUtils.endpoint; + class AzureBlobStorageSasTokenTest extends AzureBlobStorageTest { @Override protected StorageBackend storage() { @@ -42,7 +44,7 @@ protected StorageBackend storage() { final Map configs = Map.of( "azure.container.name", azureContainerName, "azure.sas.token", sasToken, - "azure.endpoint.url", endpoint() + "azure.endpoint.url", endpoint(AZURITE_SERVER, BLOB_STORAGE_PORT) ); azureBlobStorage.configure(configs); return azureBlobStorage; diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java index 3a6723255..85e123654 100644 --- a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java @@ -31,43 +31,21 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.MountableFile; +import static io.aiven.kafka.tieredstorage.storage.azure.AzuriteBlobStorageUtils.azuriteContainer; +import static io.aiven.kafka.tieredstorage.storage.azure.AzuriteBlobStorageUtils.connectionString; import static org.assertj.core.api.Assertions.assertThatThrownBy; @Testcontainers abstract class AzureBlobStorageTest extends BaseStorageTest { - private static final int BLOB_STORAGE_PORT = 10000; + static final int BLOB_STORAGE_PORT = 10000; @Container - static final GenericContainer AZURITE_SERVER = - new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite")) - .withCopyFileToContainer( - MountableFile.forClasspathResource("/azurite-cert.pem"), - "/opt/azurite/azurite-cert.pem") - .withCopyFileToContainer( - MountableFile.forClasspathResource("/azurite-key.pem"), - "/opt/azurite/azurite-key.pem") - .withExposedPorts(BLOB_STORAGE_PORT) - .withCommand("azurite-blob --blobHost 0.0.0.0 " - + "--cert /opt/azurite/azurite-cert.pem --key /opt/azurite/azurite-key.pem"); + static final GenericContainer AZURITE_SERVER = azuriteContainer(BLOB_STORAGE_PORT); static BlobServiceClient blobServiceClient; protected String azureContainerName; - protected static String endpoint() { - return "https://127.0.0.1:" + AZURITE_SERVER.getMappedPort(BLOB_STORAGE_PORT) + "/devstoreaccount1"; - } - - protected static String connectionString() { - // The well-known Azurite connection string. - return "DefaultEndpointsProtocol=https;" - + "AccountName=devstoreaccount1;" - + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" - + "BlobEndpoint=" + endpoint() + ";"; - } - @BeforeAll static void setUpClass() { // Generally setting JVM-wide trust store needed only for one test may be not OK, @@ -77,7 +55,7 @@ static void setUpClass() { AzureBlobStorageTest.class.getResource("/azurite-cacerts.jks").getPath()); System.setProperty("javax.net.ssl.trustStorePassword", "changeit"); blobServiceClient = new BlobServiceClientBuilder() - .connectionString(connectionString()) + .connectionString(connectionString(AZURITE_SERVER, BLOB_STORAGE_PORT)) .buildClient(); } diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzuriteBlobStorageUtils.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzuriteBlobStorageUtils.java new file mode 100644 index 000000000..bbcaacf2d --- /dev/null +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzuriteBlobStorageUtils.java @@ -0,0 +1,50 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +public class AzuriteBlobStorageUtils { + static GenericContainer azuriteContainer(final int port) { + return + new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite")) + .withCopyFileToContainer( + MountableFile.forClasspathResource("/azurite-cert.pem"), + "/opt/azurite/azurite-cert.pem") + .withCopyFileToContainer( + MountableFile.forClasspathResource("/azurite-key.pem"), + "/opt/azurite/azurite-key.pem") + .withExposedPorts(port) + .withCommand("azurite-blob --blobHost 0.0.0.0 " + + "--cert /opt/azurite/azurite-cert.pem --key /opt/azurite/azurite-key.pem"); + } + + + static String endpoint(final GenericContainer azuriteContainer, final int port) { + return "https://127.0.0.1:" + azuriteContainer.getMappedPort(port) + "/devstoreaccount1"; + } + + static String connectionString(final GenericContainer azuriteContainer, final int port) { + // The well-known Azurite connection string. + return "DefaultEndpointsProtocol=https;" + + "AccountName=devstoreaccount1;" + + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" + + "BlobEndpoint=" + endpoint(azuriteContainer, port) + ";"; + } +}