Skip to content

Commit

Permalink
Merge pull request #57 from jmprovencher/breaking/redis-bump
Browse files Browse the repository at this point in the history
Bumping jedis to latest major.
  • Loading branch information
jebeaudet authored Apr 22, 2022
2 parents 7f31500 + 0869c5b commit 7b72700
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 17 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.coveo</groupId>
<artifactId>spillway</artifactId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>${project.artifactId}</name>
Expand Down Expand Up @@ -77,7 +77,7 @@
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
<version>4.2.1</version>
</dependency>

<dependency>
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/com/coveo/spillway/storage/RedisStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
*/
package com.coveo.spillway.storage;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
Expand All @@ -46,6 +45,7 @@
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.Transaction;

/**
* Implementation of {@link LimitUsageStorage} using a Redis storage.
Expand Down Expand Up @@ -92,7 +92,7 @@ public Map<LimitKey, Integer> addAndGet(Collection<AddAndGetRequest> requests) {
try (Pipeline pipeline = jedis.pipelined()) {

for (AddAndGetRequest request : requests) {
pipeline.multi();
Transaction transaction = jedis.multi();
LimitKey limitKey = LimitKey.fromRequest(request);
String redisKey =
Stream.of(
Expand All @@ -105,16 +105,16 @@ public Map<LimitKey, Integer> addAndGet(Collection<AddAndGetRequest> requests) {
.map(RedisStorage::clean)
.collect(Collectors.joining(KEY_SEPARATOR));

responses.put(limitKey, pipeline.incrBy(redisKey, request.getCost()));
responses.put(limitKey, transaction.incrBy(redisKey, request.getCost()));
// We set the expire to twice the expiration period. The expiration is there to ensure that we don't fill the Redis cluster with
// useless keys. The actual expiration mechanism is handled by the bucketing mechanism.
pipeline.expire(redisKey, (int) request.getExpiration().getSeconds() * 2);
pipeline.exec();
transaction.expire(redisKey, request.getExpiration().getSeconds() * 2);
transaction.exec();
}

pipeline.sync();
} catch (IOException e) {
logger.error("Unable to close redis storage pipeline.", e);
} catch (Throwable e) {
logger.error("An exception occured while publishing limits to Redis.", e);
}
}

Expand All @@ -126,13 +126,14 @@ public Map<LimitKey, Integer> addAndGet(Collection<AddAndGetRequest> requests) {

@Override
public Map<LimitKey, Integer> addAndGetWithLimit(Collection<AddAndGetRequest> requests) {
Map<LimitKey, Response<String>> responses = new LinkedHashMap<>();
Map<LimitKey, Response<Object>> responses = new LinkedHashMap<>();

try (Jedis jedis = jedisPool.getResource()) {
try (Pipeline pipeline = jedis.pipelined()) {
Transaction transaction = jedis.multi();

requests.forEach(
request -> {
pipeline.multi();
LimitKey limitKey = LimitKey.fromRequest(request);
String redisKey =
Stream.of(
Expand All @@ -147,25 +148,27 @@ public Map<LimitKey, Integer> addAndGetWithLimit(Collection<AddAndGetRequest> re

responses.put(
limitKey,
pipeline.eval(
transaction.eval(
COUNTER_SCRIPT,
Collections.singletonList(redisKey),
Arrays.asList(
String.valueOf(request.getCost()), String.valueOf(request.getLimit()))));
pipeline.expire(redisKey, (int) request.getExpiration().getSeconds() * 2);
pipeline.exec();
transaction.expire(redisKey, request.getExpiration().getSeconds() * 2);
transaction.exec();
});

pipeline.sync();
} catch (IOException e) {
logger.error("Unable to close redis storage pipeline.", e);
} catch (Throwable e) {
logger.error("An exception occured while publishing limits to Redis.", e);
}
}

return responses
.entrySet()
.stream()
.collect(
Collectors.toMap(Map.Entry::getKey, kvp -> Integer.parseInt(kvp.getValue().get())));
Collectors.toMap(
Map.Entry::getKey, kvp -> Integer.parseInt(kvp.getValue().get().toString())));
}

@Override
Expand Down

0 comments on commit 7b72700

Please sign in to comment.