diff --git a/lib/core/src/main/java/com/fluxninja/aperture/sdk/ApertureSDK.java b/lib/core/src/main/java/com/fluxninja/aperture/sdk/ApertureSDK.java index a892c38..22ef98d 100644 --- a/lib/core/src/main/java/com/fluxninja/aperture/sdk/ApertureSDK.java +++ b/lib/core/src/main/java/com/fluxninja/aperture/sdk/ApertureSDK.java @@ -4,6 +4,7 @@ import static com.fluxninja.aperture.sdk.Constants.SOURCE_LABEL; import static com.fluxninja.aperture.sdk.Constants.WORKLOAD_START_TIMESTAMP_LABEL; +import com.fluxninja.generated.aperture.flowcontrol.check.v1.CacheLookupRequest; import com.fluxninja.generated.aperture.flowcontrol.check.v1.CheckRequest; import com.fluxninja.generated.aperture.flowcontrol.check.v1.CheckResponse; import com.fluxninja.generated.aperture.flowcontrol.check.v1.FlowControlServiceGrpc; @@ -85,7 +86,15 @@ public Flow startFlow(FeatureFlowParameters parameters) { } catch (java.io.UnsupportedEncodingException e) { // This should never happen, as `StandardCharsets.UTF_8.name()` is a valid // encoding - throw new RuntimeException(e); + return new Flow( + null, + null, + false, + parameters.getRampMode(), + this.flowControlClient, + parameters.getResultCacheKey(), + parameters.getControlPoint(), + e); } labels.put(entry.getKey(), value); } @@ -99,6 +108,11 @@ public Flow startFlow(FeatureFlowParameters parameters) { .setControlPoint(parameters.getControlPoint()) .putAllLabels(labels) .setRampMode(parameters.getRampMode()) + .setCacheLookupRequest( + CacheLookupRequest.newBuilder() + .addAllGlobalCacheKeys(parameters.getGlobalCacheKeys()) + .setResultCacheKey(parameters.getResultCacheKey()) + .build()) .build(); Span span = @@ -121,10 +135,27 @@ public Flow startFlow(FeatureFlowParameters parameters) { } } catch (StatusRuntimeException e) { // deadline exceeded or couldn't reach agent - request should not be blocked + return new Flow( + res, + span, + false, + parameters.getRampMode(), + this.flowControlClient, + parameters.getResultCacheKey(), + parameters.getControlPoint(), + e); } span.setAttribute(WORKLOAD_START_TIMESTAMP_LABEL, Utils.getCurrentEpochNanos()); - return new Flow(res, span, false, parameters.getRampMode()); + return new Flow( + res, + span, + false, + parameters.getRampMode(), + this.flowControlClient, + parameters.getResultCacheKey(), + parameters.getControlPoint(), + null); } /** diff --git a/lib/core/src/main/java/com/fluxninja/aperture/sdk/FeatureFlowParameters.java b/lib/core/src/main/java/com/fluxninja/aperture/sdk/FeatureFlowParameters.java index 4fada4f..73f1b81 100644 --- a/lib/core/src/main/java/com/fluxninja/aperture/sdk/FeatureFlowParameters.java +++ b/lib/core/src/main/java/com/fluxninja/aperture/sdk/FeatureFlowParameters.java @@ -1,7 +1,9 @@ package com.fluxninja.aperture.sdk; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; public class FeatureFlowParameters { @@ -9,6 +11,9 @@ public class FeatureFlowParameters { private Map explicitLabels; private Boolean rampMode; private Duration flowTimeout; + private String resultCacheKey; + + private List globalCacheKeys; public static Builder newBuilder(String controlPoint) { return new Builder(controlPoint); @@ -34,6 +39,14 @@ public Duration getFlowTimeout() { return flowTimeout; } + public String getResultCacheKey() { + return resultCacheKey; + } + + public List getGlobalCacheKeys() { + return globalCacheKeys; + } + public static class Builder { private final FeatureFlowParameters params; @@ -44,6 +57,8 @@ public Builder(String controlPoint) { params.explicitLabels = new HashMap<>(); params.rampMode = false; params.flowTimeout = Constants.DEFAULT_RPC_TIMEOUT; + params.resultCacheKey = ""; + params.globalCacheKeys = Collections.emptyList(); } /** @@ -80,6 +95,39 @@ public Builder setFlowTimeout(Duration flowTimeout) { return this; } + /** + * Set the result cache key for result cache request + * + * @param resultCacheKey The result cache key + * @return This builder for method chaining + */ + public Builder setResultCacheKey(String resultCacheKey) { + params.resultCacheKey = resultCacheKey; + return this; + } + + /** + * Add global cache keys for global cache request + * + * @param globalCacheKeys The global cache keys to add + * @return This builder for method chaining + */ + public Builder addGlobalCacheKeys(List globalCacheKeys) { + params.globalCacheKeys.addAll(globalCacheKeys); + return this; + } + + /** + * Add a global cache key for global cache request + * + * @param globalCacheKey The global cache key to add + * @return This builder for method chaining + */ + public Builder addGlobalCacheKey(String globalCacheKey) { + params.globalCacheKeys.add(globalCacheKey); + return this; + } + /** * Build the FeatureFlowParameters object with the provided parameters. * diff --git a/lib/core/src/main/java/com/fluxninja/aperture/sdk/Flow.java b/lib/core/src/main/java/com/fluxninja/aperture/sdk/Flow.java index 6007cf9..fff3aeb 100644 --- a/lib/core/src/main/java/com/fluxninja/aperture/sdk/Flow.java +++ b/lib/core/src/main/java/com/fluxninja/aperture/sdk/Flow.java @@ -4,9 +4,15 @@ import static com.fluxninja.aperture.sdk.Constants.FLOW_STATUS_LABEL; import static com.fluxninja.aperture.sdk.Constants.FLOW_STOP_TIMESTAMP_LABEL; -import com.fluxninja.generated.aperture.flowcontrol.check.v1.CheckResponse; +import com.fluxninja.aperture.sdk.cache.KeyDeleteResponse; +import com.fluxninja.aperture.sdk.cache.KeyLookupResponse; +import com.fluxninja.aperture.sdk.cache.KeyUpsertResponse; +import com.fluxninja.aperture.sdk.cache.LookupStatus; +import com.fluxninja.generated.aperture.flowcontrol.check.v1.*; +import com.google.protobuf.ByteString; import com.google.protobuf.util.JsonFormat; import io.opentelemetry.api.trace.Span; +import java.time.Duration; import org.apache.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,15 +24,31 @@ public final class Flow { private boolean ended; private boolean rampMode; private FlowStatus flowStatus; + private FlowControlServiceGrpc.FlowControlServiceBlockingStub flowControlClient; + private String cacheKey; + private String controlPoint; + private Exception error; private static final Logger logger = LoggerFactory.getLogger(Flow.class); - Flow(CheckResponse checkResponse, Span span, boolean ended, boolean rampMode) { + Flow( + CheckResponse checkResponse, + Span span, + boolean ended, + boolean rampMode, + FlowControlServiceGrpc.FlowControlServiceBlockingStub fcs, + String resultCacheKey, + String controlPoint, + Exception error) { this.checkResponse = checkResponse; this.span = span; this.ended = ended; this.rampMode = rampMode; this.flowStatus = FlowStatus.OK; + this.flowControlClient = fcs; + this.cacheKey = resultCacheKey; + this.controlPoint = controlPoint; + this.error = error; } /** @@ -116,6 +138,225 @@ public void setStatus(FlowStatus status) { this.flowStatus = status; } + /** + * Set the result cache entry for the flow. + * + * @param value entry value + * @param ttl time-to-live of the entry + * @return upsert grpc response + */ + public KeyUpsertResponse setResultCache(byte[] value, Duration ttl) { + if (this.cacheKey == null) { + return new KeyUpsertResponse(new IllegalArgumentException("Cache key not set")); + } + com.google.protobuf.Duration ttl_duration = + com.google.protobuf.Duration.newBuilder().setSeconds(ttl.getSeconds()).build(); + CacheEntry entry = + CacheEntry.newBuilder() + .setTtl(ttl_duration) + .setValue(ByteString.copyFrom(value)) + .setKey(this.cacheKey) + .build(); + CacheUpsertRequest cacheUpsertRequest = + CacheUpsertRequest.newBuilder() + .setControlPoint(this.controlPoint) + .setResultCacheEntry(entry) + .build(); + + CacheUpsertResponse res; + try { + res = this.flowControlClient.cacheUpsert(cacheUpsertRequest); + } catch (Exception e) { + logger.debug("Aperture gRPC call failed", e); + return new KeyUpsertResponse(e); + } + + if (!res.hasResultCacheResponse()) { + return new KeyUpsertResponse(new IllegalArgumentException("No cache upsert response")); + } + + return new KeyUpsertResponse( + com.fluxninja.aperture.sdk.cache.Utils.convertCacheError( + res.getResultCacheResponse().getError())); + } + + /** + * Delete the result cache entry for the flow. + * + * @return delete grpc response + */ + public KeyDeleteResponse deleteResultCache() { + if (this.cacheKey == null) { + return new KeyDeleteResponse(new IllegalArgumentException("Cache key not set")); + } + + CacheDeleteRequest cacheDeleteRequest = + CacheDeleteRequest.newBuilder() + .setControlPoint(this.controlPoint) + .setResultCacheKey(this.cacheKey) + .build(); + CacheDeleteResponse res; + try { + res = this.flowControlClient.cacheDelete(cacheDeleteRequest); + } catch (Exception e) { + logger.debug("Aperture gRPC call failed", e); + return new KeyDeleteResponse(e); + } + + if (!res.hasResultCacheResponse()) { + return new KeyDeleteResponse(new IllegalArgumentException("No cache upsert response")); + } + + return new KeyDeleteResponse( + com.fluxninja.aperture.sdk.cache.Utils.convertCacheError( + res.getResultCacheResponse().getError())); + } + + /** + * Retrieve the result cache entry for the flow. + * + * @return cache entry for the flow + */ + public KeyLookupResponse resultCache() { + if (this.error != null) { + return new KeyLookupResponse(null, LookupStatus.MISS, this.error); + } + + if (this.checkResponse == null + || !this.checkResponse.hasCacheLookupResponse() + || !this.checkResponse.getCacheLookupResponse().hasResultCacheResponse()) { + return new KeyLookupResponse( + null, + LookupStatus.MISS, + new IllegalArgumentException("No cache lookup response")); + } + + com.fluxninja.generated.aperture.flowcontrol.check.v1.KeyLookupResponse lookupResponse = + this.checkResponse.getCacheLookupResponse().getResultCacheResponse(); + + return new KeyLookupResponse( + lookupResponse.getValue(), + com.fluxninja.aperture.sdk.cache.Utils.convertCacheLookupStatus( + lookupResponse.getLookupStatus()), + com.fluxninja.aperture.sdk.cache.Utils.convertCacheError( + lookupResponse.getError())); + } + + /** + * Set the global cache entry for the given key. + * + * @param key entry key + * @param value entry value + * @param ttl time-to-live of the entry + * @return upsert grpc response + */ + public KeyUpsertResponse setGlobalCache(String key, byte[] value, Duration ttl) { + com.google.protobuf.Duration ttl_duration = + com.google.protobuf.Duration.newBuilder().setSeconds(ttl.getSeconds()).build(); + CacheEntry entry = + CacheEntry.newBuilder() + .setTtl(ttl_duration) + .setValue(ByteString.copyFrom(value)) + .build(); + CacheUpsertRequest cacheUpsertRequest = + CacheUpsertRequest.newBuilder().putGlobalCacheEntries(key, entry).build(); + + CacheUpsertResponse res; + try { + res = this.flowControlClient.cacheUpsert(cacheUpsertRequest); + } catch (Exception e) { + logger.debug("Aperture gRPC call failed", e); + return new KeyUpsertResponse(e); + } + + if (res.getGlobalCacheResponsesCount() == 0) { + return new KeyUpsertResponse(new IllegalArgumentException("No cache upsert responses")); + } + + if (!res.containsGlobalCacheResponses(key)) { + return new KeyUpsertResponse( + new IllegalArgumentException("Key missing from global cache response")); + } + + return new KeyUpsertResponse( + com.fluxninja.aperture.sdk.cache.Utils.convertCacheError( + res.getGlobalCacheResponsesOrThrow(key).getError())); + } + + /** + * Delete the global cache entry for the given key. + * + * @param key entry key + * @return delete grpc response + */ + public KeyDeleteResponse deleteGlobalCache(String key) { + CacheDeleteRequest cacheDeleteRequest = + CacheDeleteRequest.newBuilder().addGlobalCacheKeys(key).build(); + CacheDeleteResponse res; + try { + res = this.flowControlClient.cacheDelete(cacheDeleteRequest); + } catch (Exception e) { + logger.debug("Aperture gRPC call failed", e); + return new KeyDeleteResponse(e); + } + + if (res.getGlobalCacheResponsesCount() == 0) { + return new KeyDeleteResponse(new IllegalArgumentException("No cache upsert response")); + } + + if (!res.containsGlobalCacheResponses(key)) { + return new KeyDeleteResponse( + new IllegalArgumentException("Key missing from global cache response")); + } + + return new KeyDeleteResponse( + com.fluxninja.aperture.sdk.cache.Utils.convertCacheError( + res.getGlobalCacheResponsesOrThrow(key).getError())); + } + + /** + * Retrieve the global cache entry for the given key. + * + * @param key entry key + * @return cache entry for the flow + */ + public KeyLookupResponse globalCache(String key) { + if (this.error != null) { + return new KeyLookupResponse(null, LookupStatus.MISS, this.error); + } + + if (this.checkResponse == null + || !this.checkResponse.hasCacheLookupResponse() + || this.checkResponse.getCacheLookupResponse().getGlobalCacheResponsesCount() + == 0) { + return new KeyLookupResponse( + null, + LookupStatus.MISS, + new IllegalArgumentException("No cache lookup response")); + } + + if (!this.checkResponse.getCacheLookupResponse().containsGlobalCacheResponses(key)) { + return new KeyLookupResponse( + null, + LookupStatus.MISS, + new IllegalArgumentException("Key missing from global cache response")); + } + + com.fluxninja.generated.aperture.flowcontrol.check.v1.KeyLookupResponse lookupResponse = + this.checkResponse.getCacheLookupResponse().getGlobalCacheResponsesOrThrow(key); + + return new KeyLookupResponse( + lookupResponse.getValue(), + com.fluxninja.aperture.sdk.cache.Utils.convertCacheLookupStatus( + lookupResponse.getLookupStatus()), + com.fluxninja.aperture.sdk.cache.Utils.convertCacheError( + lookupResponse.getError())); + } + + public Exception getError() { + return this.error; + } + /** * Ends the flow, notifying the Aperture Agent whether it succeeded. Flow's Status is assumed to * be "OK" and can be set using {@link #setStatus}. diff --git a/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/KeyDeleteResponse.java b/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/KeyDeleteResponse.java new file mode 100644 index 0000000..45f0e25 --- /dev/null +++ b/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/KeyDeleteResponse.java @@ -0,0 +1,13 @@ +package com.fluxninja.aperture.sdk.cache; + +public class KeyDeleteResponse { + private final Exception error; + + public KeyDeleteResponse(Exception error) { + this.error = error; + } + + public Exception getError() { + return error; + } +} diff --git a/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/KeyLookupResponse.java b/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/KeyLookupResponse.java new file mode 100644 index 0000000..539dc9b --- /dev/null +++ b/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/KeyLookupResponse.java @@ -0,0 +1,25 @@ +package com.fluxninja.aperture.sdk.cache; + +public class KeyLookupResponse { + private final Object value; + private final LookupStatus lookupStatus; + private final Exception error; + + public KeyLookupResponse(Object value, LookupStatus lookupStatus, Exception error) { + this.value = value; + this.lookupStatus = lookupStatus; + this.error = error; + } + + public Object getValue() { + return value; + } + + public LookupStatus getLookupStatus() { + return lookupStatus; + } + + public Exception getError() { + return error; + } +} diff --git a/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/KeyUpsertResponse.java b/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/KeyUpsertResponse.java new file mode 100644 index 0000000..a420254 --- /dev/null +++ b/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/KeyUpsertResponse.java @@ -0,0 +1,13 @@ +package com.fluxninja.aperture.sdk.cache; + +public class KeyUpsertResponse { + private final Exception error; + + public KeyUpsertResponse(Exception error) { + this.error = error; + } + + public Exception getError() { + return error; + } +} diff --git a/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/LookupStatus.java b/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/LookupStatus.java new file mode 100644 index 0000000..7de1e27 --- /dev/null +++ b/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/LookupStatus.java @@ -0,0 +1,16 @@ +package com.fluxninja.aperture.sdk.cache; + +public enum LookupStatus { + HIT("HIT"), + MISS("MISS"); + + private final String value; + + LookupStatus(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/Utils.java b/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/Utils.java new file mode 100644 index 0000000..0494381 --- /dev/null +++ b/lib/core/src/main/java/com/fluxninja/aperture/sdk/cache/Utils.java @@ -0,0 +1,15 @@ +package com.fluxninja.aperture.sdk.cache; + +import com.fluxninja.generated.aperture.flowcontrol.check.v1.CacheLookupStatus; + +public class Utils { + public static LookupStatus convertCacheLookupStatus(CacheLookupStatus status) { + return (status == CacheLookupStatus.HIT) ? LookupStatus.HIT : LookupStatus.MISS; + } + + public static Exception convertCacheError(String errorMessage) { + return (errorMessage == null || errorMessage.isEmpty()) + ? null + : new Exception(errorMessage); + } +}