Skip to content

Commit

Permalink
Version 1.3.0 of the Amazon Kinesis Client Library
Browse files Browse the repository at this point in the history
A new metric called "MillisBehindLatest", which tracks how far
consumers are from real time, is now uploaded to CloudWatch.
  • Loading branch information
Dosani, Adnan committed May 22, 2015
1 parent 0fc90ff commit 1861f12
Show file tree
Hide file tree
Showing 13 changed files with 44 additions and 40 deletions.
4 changes: 2 additions & 2 deletions META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.2.1
Bundle-Version: 1.3.0
Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
Expand All @@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.3.0",
org.apache.httpcomponents.httpcore;bundle-version="4.3.2",
org.apache.httpcomponents.httpclient;bundle-version="4.3.4"
com.amazonaws.sdk;bundle-version="1.9.16",
com.amazonaws.sdk;bundle-version="1.9.37",
Export-Package: com.amazonaws.services.kinesis,
com.amazonaws.services.kinesis.clientlibrary,
com.amazonaws.services.kinesis.clientlibrary.config,
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.

## Release Notes
### Release 1.3.0 (May 22, 2015)
* A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch.

### Release 1.2.1 (January 26, 2015)
* **MultiLangDaemon** Changes to the MultiLangDaemon to make it easier to provide a custom worker.

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name>
<version>1.2.1</version>
<version>1.3.0</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
<url>https://aws.amazon.com/kinesis</url>

Expand All @@ -23,7 +23,7 @@
</licenses>

<properties>
<aws-java-sdk.version>1.9.16</aws-java-sdk.version>
<aws-java-sdk.version>1.9.37</aws-java-sdk.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.2.1";
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.3.0";

/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
Expand Down Expand Up @@ -57,17 +54,15 @@ public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) {
* @param maxRecords Max records to fetch
* @return list of records of up to maxRecords size
*/
public List<Record> getRecords(int maxRecords) {
public GetRecordsResult getRecords(int maxRecords) {
if (!isInitialized) {
throw new IllegalArgumentException("KinesisDataFetcher.getRecords called before initialization.");
}

List<Record> records = null;
GetRecordsResult response = null;
if (nextIterator != null) {
try {
response = kinesisProxy.get(nextIterator, maxRecords);
records = response.getRecords();
nextIterator = response.getNextShardIterator();
} catch (ResourceNotFoundException e) {
LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId);
Expand All @@ -80,7 +75,7 @@ public List<Record> getRecords(int maxRecords) {
isShardEndReached = true;
}

return records;
return response;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.commons.logging.LogFactory;

import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
Expand All @@ -37,6 +38,8 @@ class ProcessTask implements ITask {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest";

private static final Log LOG = LogFactory.getLog(ProcessTask.class);

private final ShardInfo shardInfo;
Expand Down Expand Up @@ -93,7 +96,14 @@ public TaskResult call() {
boolean shardEndReached = true;
return new TaskResult(null, shardEndReached);
}
List<Record> records = getRecords();
final GetRecordsResult getRecordsResult = getRecords();

if (getRecordsResult.getMillisBehindLatest() != null) {
scope.addData(MILLIS_BEHIND_LATEST_METRIC, getRecordsResult.getMillisBehindLatest(),
StandardUnit.Milliseconds);
}

final List<Record> records = getRecordsResult.getRecords();

if (records.isEmpty()) {
LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId());
Expand Down Expand Up @@ -180,7 +190,7 @@ private String getMaxSequenceNumber(IMetricsScope scope, List<Record> records) {
* @throws KinesisClientLibException if reading checkpoints fails in the edge case where we haven't passed any
* records to the client code yet
*/
private List<Record> getRecords() throws KinesisClientLibException {
private GetRecordsResult getRecords() throws KinesisClientLibException {
int maxRecords = streamConfig.getMaxRecords();
try {
return dataFetcher.getRecords(maxRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public <T extends Lease> void update(T other) {
}
KinesisClientLease casted = (KinesisClientLease) other;

// Do not update ownerSwitchesSinceCheckpoint here - that field is maintained by the leasing library.
setOwnerSwitchesSinceCheckpoint(casted.ownerSwitchesSinceCheckpoint);
setCheckpoint(casted.checkpoint);
setParentShardIds(casted.parentShardIds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,12 @@ public Map<String, AttributeValueUpdate> getDynamoLeaseCounterUpdate(KinesisClie
public Map<String, AttributeValueUpdate> getDynamoTakeLeaseUpdate(KinesisClientLease lease, String newOwner) {
Map<String, AttributeValueUpdate> result = baseSerializer.getDynamoTakeLeaseUpdate(lease, newOwner);

Long ownerSwitchesSinceCheckpoint = lease.getOwnerSwitchesSinceCheckpoint();
String oldOwner = lease.getLeaseOwner();
if (oldOwner != null && !oldOwner.equals(newOwner)) {
ownerSwitchesSinceCheckpoint++;
result.put(OWNER_SWITCHES_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(1L),
AttributeAction.ADD));
}

result.put(OWNER_SWITCHES_KEY,
new AttributeValueUpdate(DynamoUtils.createAttributeValue(ownerSwitchesSinceCheckpoint),
AttributeAction.PUT));

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.leases.impl;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import com.amazonaws.util.json.JSONObject;

Expand All @@ -31,11 +32,8 @@ public class Lease {
*
* Sometimes System.nanoTime's return values will wrap due to overflow. When they do, the difference between two
* values will be very large. We will consider leases to be expired if they are more than a year old.
*
* 365 days per year * 24 hours per day * 60 minutes per hour * 60 seconds per minute * 1000000000
* nanoseconds/second
*/
private static final long MAX_ABS_AGE_NANOS = 365 * 24 * 60 * 60 * 1000000000L;
private static final long MAX_ABS_AGE_NANOS = TimeUnit.DAYS.toNanos(365);

private String leaseKey;
private String leaseOwner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -160,14 +161,14 @@ public boolean leaseTableExists() throws DependencyException {

@Override
public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException {
long sleepTimeRemaining = timeoutSeconds * 1000;
long sleepTimeRemaining = TimeUnit.SECONDS.toMillis(timeoutSeconds);

while (!leaseTableExists()) {
if (sleepTimeRemaining <= 0) {
return false;
}

long timeToSleepMillis = Math.min(1000 * secondsBetweenPolls, sleepTimeRemaining);
long timeToSleepMillis = Math.min(TimeUnit.SECONDS.toMillis(secondsBetweenPolls), sleepTimeRemaining);

sleepTimeRemaining -= sleep(timeToSleepMillis);
}
Expand Down Expand Up @@ -385,7 +386,7 @@ public boolean takeLease(T lease, String owner)
verifyNotNull(owner, "owner cannot be null");

if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Taking lease with shardId %s from %s to %s",
LOG.debug(String.format("Taking lease with leaseKey %s from %s to %s",
lease.getLeaseKey(),
lease.getLeaseOwner() == null ? "nobody" : lease.getLeaseOwner(),
owner));
Expand Down Expand Up @@ -428,7 +429,7 @@ public boolean evictLease(T lease)
verifyNotNull(lease, "lease cannot be null");

if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Voiding lease with shardId %s owned by %s",
LOG.debug(String.format("Evicting lease with leaseKey %s owned by %s",
lease.getLeaseKey(),
lease.getLeaseOwner()));
}
Expand Down Expand Up @@ -485,7 +486,7 @@ public void deleteLease(T lease) throws DependencyException, InvalidStateExcepti
verifyNotNull(lease, "lease cannot be null");

if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Deleting lease with shardId %s", lease.getLeaseKey()));
LOG.debug(String.format("Deleting lease with leaseKey %s", lease.getLeaseKey()));
}

DeleteItemRequest deleteRequest = new DeleteItemRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
public LeaseRenewer(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
this.leaseManager = leaseManager;
this.workerIdentifier = workerIdentifier;
this.leaseDurationNanos = leaseDurationMillis * 1000000L;
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -68,7 +69,7 @@ public Long call() {
public LeaseTaker(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
this.leaseManager = leaseManager;
this.workerIdentifier = workerIdentifier;
this.leaseDurationNanos = leaseDurationMillis * 1000000;
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ public static void addSuccessAndLatency(long startTimeMillis, boolean success) {
public static void addSuccessAndLatency(String prefix, long startTimeMillis, boolean success) {
addSuccessAndLatencyPerShard(null, prefix, startTimeMillis, success);
}

public static void addSuccessAndLatencyPerShard (
String shardId,
String prefix,
long startTimeMillis,
String shardId,
String prefix,
long startTimeMillis,
boolean success) {
IMetricsScope scope = getMetricsScope();

String realPrefix = prefix == null ? "" : prefix + SEP;

if (shardId != null) {
scope.addDimension("ShardId", shardId);
}
Expand All @@ -103,10 +103,9 @@ public static void addSuccessAndLatencyPerShard (
public static void endScope() {
IMetricsScope scope = getMetricsScope();
if (scope != null) {
Integer refCount = referenceCount.get();
refCount--;
referenceCount.set(referenceCount.get() - 1);

if (refCount == 0) {
if (referenceCount.get() == 0) {
scope.end();
currentScope.remove();
}
Expand Down

0 comments on commit 1861f12

Please sign in to comment.