From 4192a228196e294a140360378a39ea09908481be Mon Sep 17 00:00:00 2001 From: felipeclopes Date: Mon, 19 Oct 2015 18:37:06 -0700 Subject: [PATCH] added: Customizable publishers --- .../core/endpoint/GnipStreamingEndpoint.java | 90 +++++++++++++++++++ .../RealTimeGnipStreamingEndpoint.java | 12 +++ .../twitter/kinesis/ConnectorApplication.java | 9 +- src/main/resources/config.properties.example | 1 + 4 files changed, 108 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/gnip/core/endpoint/GnipStreamingEndpoint.java create mode 100644 src/main/java/com/gnip/core/endpoint/RealTimeGnipStreamingEndpoint.java diff --git a/src/main/java/com/gnip/core/endpoint/GnipStreamingEndpoint.java b/src/main/java/com/gnip/core/endpoint/GnipStreamingEndpoint.java new file mode 100644 index 0000000..01c732f --- /dev/null +++ b/src/main/java/com/gnip/core/endpoint/GnipStreamingEndpoint.java @@ -0,0 +1,90 @@ +package com.gnip.core.endpoint; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.twitter.hbc.core.HttpConstants; +import com.twitter.hbc.core.endpoint.StreamingEndpoint; + +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +public abstract class GnipStreamingEndpoint implements StreamingEndpoint { + private static final String BASE_PATH = "/accounts/%s/publishers/%s/streams/%s/%s.json"; + protected final String account; + protected final String publisher; + protected final String product; + protected final String label; + protected final ConcurrentMap queryParameters = Maps.newConcurrentMap(); + + public GnipStreamingEndpoint(String account, String publisher, String product, String label) { + this(account, publisher, product, label, 0); + } + + public GnipStreamingEndpoint(String account, String publisher, String product, String label, int clientId) { + this.account = Preconditions.checkNotNull(account); + this.publisher = Preconditions.checkNotNull(publisher); + this.product = Preconditions.checkNotNull(product); + this.label = Preconditions.checkNotNull(label); + + if (clientId > 0) { + addQueryParameter("client", String.valueOf(clientId)); + } + } + + @Override + public String getURI() { + String uri = String.format(BASE_PATH, account.trim(), publisher.trim(), product.trim(), label.trim()); + + if (queryParameters.isEmpty()) { + return uri; + } else { + return uri + "?" + generateParamString(queryParameters); + } + } + + protected String generateParamString(Map params) { + return Joiner.on("&") + .withKeyValueSeparator("=") + .join(params); + } + + @Override + public String getHttpMethod() { + return HttpConstants.HTTP_GET; + } + + @Override + public String getPostParamString() { + return null; + } + + @Override + public String getQueryParamString() { + return generateParamString(queryParameters); + } + + @Override + public void addQueryParameter(String param, String value) { + queryParameters.put(param, value); + } + + @Override + public void removeQueryParameter(String param) { + queryParameters.remove(param); + } + + // These don't do anything + @Override + public void setBackfillCount(int count) { } + + @Override + public void setApiVersion(String apiVersion) { } + + @Override + public void addPostParameter(String param, String value) { } + + @Override + public void removePostParameter(String param) { } + +} \ No newline at end of file diff --git a/src/main/java/com/gnip/core/endpoint/RealTimeGnipStreamingEndpoint.java b/src/main/java/com/gnip/core/endpoint/RealTimeGnipStreamingEndpoint.java new file mode 100644 index 0000000..0041a75 --- /dev/null +++ b/src/main/java/com/gnip/core/endpoint/RealTimeGnipStreamingEndpoint.java @@ -0,0 +1,12 @@ +package com.gnip.core.endpoint; + +public class RealTimeGnipStreamingEndpoint extends GnipStreamingEndpoint { + + public RealTimeGnipStreamingEndpoint(String account, String publisher, String product, String label) { + super(account, publisher, product, label); + } + + public RealTimeGnipStreamingEndpoint(String account, String publisher, String product, String label, int clientId) { + super(account, publisher, product, label, clientId); + } +} \ No newline at end of file diff --git a/src/main/java/com/twitter/kinesis/ConnectorApplication.java b/src/main/java/com/twitter/kinesis/ConnectorApplication.java index 4e4dc18..f7a25d5 100644 --- a/src/main/java/com/twitter/kinesis/ConnectorApplication.java +++ b/src/main/java/com/twitter/kinesis/ConnectorApplication.java @@ -4,11 +4,11 @@ import com.amazonaws.auth.AWSCredentialsProviderChain; import com.amazonaws.auth.InstanceProfileCredentialsProvider; import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.gnip.core.endpoint.GnipStreamingEndpoint; +import com.gnip.core.endpoint.RealTimeGnipStreamingEndpoint; import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.core.Client; import com.twitter.hbc.core.Constants; -import com.twitter.hbc.core.endpoint.EnterpriseStreamingEndpoint; -import com.twitter.hbc.core.endpoint.RealTimeEnterpriseStreamingEndpoint; import com.twitter.hbc.core.processor.LineStringProcessor; import com.twitter.hbc.httpclient.auth.BasicAuth; import com.twitter.kinesis.metrics.HBCStatsTrackerMetric; @@ -88,10 +88,11 @@ private void start() throws InterruptedException { client.connect(); } - private EnterpriseStreamingEndpoint endpoint() { + private GnipStreamingEndpoint endpoint() { String account = this.environment.accountName(); + String publisher = this.environment.publisher(); String label = this.environment.streamLabel(); String product = this.environment.product(); - return new RealTimeEnterpriseStreamingEndpoint(account, product, label); + return new RealTimeGnipStreamingEndpoint(account, publisher, product, label); } } \ No newline at end of file diff --git a/src/main/resources/config.properties.example b/src/main/resources/config.properties.example index 39fd19c..56bc12d 100644 --- a/src/main/resources/config.properties.example +++ b/src/main/resources/config.properties.example @@ -3,6 +3,7 @@ gnip.user.name=YOUR_GNIP_USERNAME gnip.user.password=YOUR_GNIP_PASSWORD gnip.account.name=YOUR_GNPI_ACCOUNT_NAME gnip.product=YOUR_GNIP_PRODUCT +gnip.publisher=YOUR_GNIP_PUBLISHER gnip.stream.label=YOUR_GNIP_STREAM_LABEL #AWS Account Information