Skip to content

Commit

Permalink
added: Customizable publishers
Browse files Browse the repository at this point in the history
  • Loading branch information
felipeclopes committed Oct 20, 2015
1 parent b1f51c1 commit 4192a22
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 4 deletions.
90 changes: 90 additions & 0 deletions src/main/java/com/gnip/core/endpoint/GnipStreamingEndpoint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.gnip.core.endpoint;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.twitter.hbc.core.HttpConstants;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;

import java.util.Map;
import java.util.concurrent.ConcurrentMap;

public abstract class GnipStreamingEndpoint implements StreamingEndpoint {
private static final String BASE_PATH = "/accounts/%s/publishers/%s/streams/%s/%s.json";
protected final String account;
protected final String publisher;
protected final String product;
protected final String label;
protected final ConcurrentMap<String, String> queryParameters = Maps.newConcurrentMap();

public GnipStreamingEndpoint(String account, String publisher, String product, String label) {
this(account, publisher, product, label, 0);
}

public GnipStreamingEndpoint(String account, String publisher, String product, String label, int clientId) {
this.account = Preconditions.checkNotNull(account);
this.publisher = Preconditions.checkNotNull(publisher);
this.product = Preconditions.checkNotNull(product);
this.label = Preconditions.checkNotNull(label);

if (clientId > 0) {
addQueryParameter("client", String.valueOf(clientId));
}
}

@Override
public String getURI() {
String uri = String.format(BASE_PATH, account.trim(), publisher.trim(), product.trim(), label.trim());

if (queryParameters.isEmpty()) {
return uri;
} else {
return uri + "?" + generateParamString(queryParameters);
}
}

protected String generateParamString(Map<String, String> params) {
return Joiner.on("&")
.withKeyValueSeparator("=")
.join(params);
}

@Override
public String getHttpMethod() {
return HttpConstants.HTTP_GET;
}

@Override
public String getPostParamString() {
return null;
}

@Override
public String getQueryParamString() {
return generateParamString(queryParameters);
}

@Override
public void addQueryParameter(String param, String value) {
queryParameters.put(param, value);
}

@Override
public void removeQueryParameter(String param) {
queryParameters.remove(param);
}

// These don't do anything
@Override
public void setBackfillCount(int count) { }

@Override
public void setApiVersion(String apiVersion) { }

@Override
public void addPostParameter(String param, String value) { }

@Override
public void removePostParameter(String param) { }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.gnip.core.endpoint;

public class RealTimeGnipStreamingEndpoint extends GnipStreamingEndpoint {

public RealTimeGnipStreamingEndpoint(String account, String publisher, String product, String label) {
super(account, publisher, product, label);
}

public RealTimeGnipStreamingEndpoint(String account, String publisher, String product, String label, int clientId) {
super(account, publisher, product, label, clientId);
}
}
9 changes: 5 additions & 4 deletions src/main/java/com/twitter/kinesis/ConnectorApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.gnip.core.endpoint.GnipStreamingEndpoint;
import com.gnip.core.endpoint.RealTimeGnipStreamingEndpoint;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.EnterpriseStreamingEndpoint;
import com.twitter.hbc.core.endpoint.RealTimeEnterpriseStreamingEndpoint;
import com.twitter.hbc.core.processor.LineStringProcessor;
import com.twitter.hbc.httpclient.auth.BasicAuth;
import com.twitter.kinesis.metrics.HBCStatsTrackerMetric;
Expand Down Expand Up @@ -88,10 +88,11 @@ private void start() throws InterruptedException {
client.connect();
}

private EnterpriseStreamingEndpoint endpoint() {
private GnipStreamingEndpoint endpoint() {
String account = this.environment.accountName();
String publisher = this.environment.publisher();
String label = this.environment.streamLabel();
String product = this.environment.product();
return new RealTimeEnterpriseStreamingEndpoint(account, product, label);
return new RealTimeGnipStreamingEndpoint(account, publisher, product, label);
}
}
1 change: 1 addition & 0 deletions src/main/resources/config.properties.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ gnip.user.name=YOUR_GNIP_USERNAME
gnip.user.password=YOUR_GNIP_PASSWORD
gnip.account.name=YOUR_GNPI_ACCOUNT_NAME
gnip.product=YOUR_GNIP_PRODUCT
gnip.publisher=YOUR_GNIP_PUBLISHER
gnip.stream.label=YOUR_GNIP_STREAM_LABEL

#AWS Account Information
Expand Down

0 comments on commit 4192a22

Please sign in to comment.