Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis module for cache #60

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
4 changes: 2 additions & 2 deletions Dockerfile.es
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.elastic.co/elasticsearch/elasticsearch-oss:6.3.0 as builder
FROM docker.elastic.co/elasticsearch/elasticsearch:7.12.0 as builder

ADD https://raw.githubusercontent.com/vishnubob/wait-for-it/e1f115e4ca285c3c24e847c4dd4be955e0ed51c2/wait-for-it.sh /utils/wait-for-it.sh

Expand All @@ -9,6 +9,6 @@ RUN /usr/local/bin/docker-entrypoint.sh elasticsearch -p /tmp/epid & /bin/bash /
./bin/create-es-indices.sh ; \
kill $(cat /tmp/epid) && wait $(cat /tmp/epid); exit 0;

FROM docker.elastic.co/elasticsearch/elasticsearch-oss:6.3.0
FROM docker.elastic.co/elasticsearch/elasticsearch:7.12.0

COPY --from=builder /usr/share/elasticsearch/data /usr/share/elasticsearch/data
2 changes: 1 addition & 1 deletion administration-ui/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>crawling-framework</artifactId>
<groupId>lt.tokenmill.crawling</groupId>
<version>0.3.4-SNAPSHOT</version>
<version>0.3.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion analysis-ui/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>crawling-framework</artifactId>
<groupId>lt.tokenmill.crawling</groupId>
<version>0.3.4-SNAPSHOT</version>
<version>0.3.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
96 changes: 96 additions & 0 deletions cache/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>crawling-framework</artifactId>
<groupId>lt.tokenmill.crawling</groupId>
<version>0.3.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cache</artifactId>

<properties>
<jackson.version>2.11.1</jackson.version>
</properties>

<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
<type>jar</type>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.3</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.jr</groupId>
<artifactId>jackson-jr-objects</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.15</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>release</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
</plugin>
</plugins>
</build>
</profile>
</profiles>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package lt.tokenmill.crawling.cache;

public class CacheConstants {
public static final String REDIS_HOST = "cache.redis.host";
public static final String REDIS_PORT = "cache.redis.port";
public static final String REDIS_AUTH = "cache.redis.auth";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package lt.tokenmill.crawling.cache;

import lt.tokenmill.crawling.cache.providers.CacheProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static lt.tokenmill.crawling.cache.utils.FutureUtils.tryGet;
import static lt.tokenmill.crawling.cache.utils.FutureUtils.waitFor;
import static lt.tokenmill.crawling.cache.utils.HashUtils.hashKey;

public class UrlProcessingCache {
private static final Logger LOG = LoggerFactory.getLogger(UrlProcessingCache.class);
private final CacheProvider provider;

public UrlProcessingCache(CacheProvider provider){
this.provider = provider;
}

public static String parseDomain(String url){
try {
URL u = new URL(url);
return u.getHost();
} catch (MalformedURLException e) {
e.printStackTrace();
return null;
}
}

private String globalNamespace(String domain){
return String.format("global:%s", hashKey(domain));
}

public void addUrl(String url){
String domain = parseDomain(url);
waitFor(provider.addKey(globalNamespace(domain), url));
}

public Set<String> filterUrls(String domain, Collection<String> urls) {
return filterUrls(domain, urls.stream());
}

public Set<String> filterUrls(String domain, Stream<String> urls) {
Set<String> keys = tryGet(provider.keysInNamespace(globalNamespace(domain)), () -> Collections.emptySet());
return urls.filter(k -> !keys.contains(hashKey(k))).collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package lt.tokenmill.crawling.cache.datamodel;

public class Pair<T, U> {
private final T key;
private final U value;
public Pair(T key, U value){
this.key = key;
this.value = value;
}

public T getKey(){
return key;
}

public U getValue(){
return value;
}

@Override
public boolean equals(Object other) {
if(other instanceof Pair){
Pair<T, U> otherPair = (Pair<T, U>)other;

return key.equals(otherPair.getKey()) && value.equals(otherPair.getValue());
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package lt.tokenmill.crawling.cache.providers;

import lt.tokenmill.crawling.cache.datamodel.Pair;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

public interface CacheProvider {
<T> CompletableFuture<Boolean> set(String namespace, String key, T value);
<T> CompletableFuture<Boolean> setMultiple(String namespace, Set<Pair<String, T>> pairs);

CompletableFuture<Boolean> addKey(String namespace, String key);
CompletableFuture<Boolean> addKeyMultiple(String namespace, Collection<String> keys);
CompletableFuture<Boolean> removeKey(String namespace, String key);
CompletableFuture<Boolean> moveKey(String source, String dest, String key);
CompletableFuture<String> findKey(String namespace, String key);

<T> CompletableFuture<T> get(Class<T> klass, String namespace, String key);
<T> CompletableFuture<Set<T>> getMultiple(Class<T> klass, String namespace, String... keys);
<T> CompletableFuture<Set<T>> getMultiple(Class<T> klass, String namespace, Collection<String> keys);

CompletableFuture<Boolean> contains(String key);
CompletableFuture<Boolean> contains(String namespace, String key);

CompletableFuture<Set<String>> keysInNamespace(String namespace);

void cleanup();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package lt.tokenmill.crawling.cache.providers.redis;

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;

import java.util.Optional;

public class Builder{
private String host = "localhost";
private int port = 6379;
private Optional<String> auth = Optional.empty();
private int timeout = Protocol.DEFAULT_TIMEOUT;

public Builder withHost(String host){
this.host = host;
return this;
}

public Builder withPort(int port){
this.port = port;
return this;
}

public Builder withAuth(String auth){
if(auth != null) {
this.auth = Optional.of(auth);
}
return this;
}

public Builder withTimeoutMillis(int millis){
this.timeout = millis;
return this;
}

public RedisProvider build(){
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(100);
JedisPool pool;
if(auth.isPresent()){
pool = new JedisPool(config, host, port, timeout, auth.get());
}
else{
pool = new JedisPool(config, host, port, timeout);
}

return new RedisProvider(pool);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package lt.tokenmill.crawling.cache.providers.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import lt.tokenmill.crawling.cache.datamodel.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Pipeline;

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class RedisKVProvider {
private static final Logger LOG = LoggerFactory.getLogger(RedisKVProvider.class);
private final RedisResourceProvider resource;
private static ObjectMapper objectMapper = new ObjectMapper();

public RedisKVProvider(RedisResourceProvider resourceProvider){
this.resource = resourceProvider;
}

public CompletableFuture<Boolean> contains(String namespace, String key) {
return resource.withJedis(redis -> redis.hexists(namespace, key));
}

public <T> CompletableFuture<Boolean> set(String namespace, String key, T value) {
return resource.withJedis(redis -> {
try {
redis.hset(namespace, key, objectMapper.writeValueAsString(value));
return true;
}
catch (Exception ex){
LOG.error("Failed to set value", ex);
return false;
}
});
}

public <T> CompletableFuture<Boolean> setMultiple(String namespace, Set<Pair<String, T>> pairs){
return resource.withJedis(redis -> {
Pipeline pipe = redis.pipelined();
pipe.multi();
for(Pair<String, T> pair : pairs){
try {
pipe.hset(namespace, pair.getKey(), objectMapper.writeValueAsString(pair.getValue()));
}
catch (Exception ex){
LOG.error("Failed to set value", ex);
}
}

pipe.sync();
pipe.exec();
return true;
});
}

public <T> CompletableFuture<T> get(Class<T> klass, String namespace, String key) {
return resource.withJedis(redis -> {
String data = redis.hget(namespace, key);
return parseObj(data, klass);
});
}

private<T> T parseObj(String data, Class<T> klass){
try {
return objectMapper.readValue(data, klass);
}
catch(Exception ex){
return null;
}
}

public <T> CompletableFuture<Set<T>> getMultiple(Class<T> klass, String namespace, Set<String> keys) {
return resource
.withPipeline(pipe -> keys.stream().map(k -> pipe.hget(namespace, k)))
.thenApply(responses -> {
return responses
.map(data -> parseObj(data, klass))
.filter(Objects::nonNull)
.collect(Collectors.toSet());
});
}
}
Loading