Skip to content

Commit

Permalink
apply fix 1901
Browse files Browse the repository at this point in the history
  • Loading branch information
cqueiroz-pivotal committed Mar 28, 2016
1 parent 582da68 commit 6524e72
Showing 1 changed file with 77 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
*/
package io.pivotal.spring.xd.jdbcgpfdist.gpfdist;

/**
* Created by cq on 27/3/16.
*/
import com.codahale.metrics.Meter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Processor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.scheduling.TaskScheduler;
Expand Down Expand Up @@ -58,7 +54,7 @@ public class GPFDistMessageHandler extends AbstractGPFDistMessageHandler {

private GreenplumLoad greenplumLoad;

private Processor<Buffer, Buffer> processor;
private RingBufferProcessor<Buffer> processor;

private GPFDistServer gpfdistServer;

Expand All @@ -67,7 +63,9 @@ public class GPFDistMessageHandler extends AbstractGPFDistMessageHandler {
private final TaskFuture taskFuture = new TaskFuture();

private int rateInterval = 0;
private Meter meter = null;

private Meter meter = null;

private int meterCount = 0;

public GPFDistMessageHandler(int port, int flushCount, int flushTime, int batchTimeout, int batchCount,
Expand All @@ -85,22 +83,23 @@ public GPFDistMessageHandler(int port, int flushCount, int flushTime, int batchT
@Override
protected void doWrite(Message<?> message) throws Exception {
Object payload = message.getPayload();

if (payload instanceof String) {
String data = (String)payload;
log.info("data:" + data);
String data = (String) payload;
if (delimiter != null) {
processor.onNext(Buffer.wrap(data+delimiter));
} else {
processor.onNext(Buffer.wrap(data + delimiter));
}
else {
processor.onNext(Buffer.wrap(data));
}
if (meter != null) {
if ((meterCount++ % rateInterval) == 0) {
meter.mark(rateInterval);
log.info("METER: 1 minute rate = " + meter.getOneMinuteRate() + " mean rate = " + meter.getMeanRate());
log.info("METER: 1 minute rate = " + meter.getOneMinuteRate() + " mean rate = "
+ meter.getMeanRate());
}
}
} else {
}
else {
throw new MessageHandlingException(message, "message not a String");
}
}
Expand All @@ -110,7 +109,6 @@ protected void onInit() throws Exception {
super.onInit();
Environment.initializeIfEmpty().assignErrorJournal();
processor = RingBufferProcessor.create(false);
log.info("onInit called!!");
}

@Override
Expand All @@ -120,7 +118,8 @@ protected void doStart() {
gpfdistServer = new GPFDistServer(processor, port, flushCount, flushTime, batchTimeout, batchCount);
gpfdistServer.start();
log.info("gpfdist protocol listener running on port=" + gpfdistServer.getLocalPort());
} catch (Exception e) {
}
catch (Exception e) {
throw new RuntimeException("Error starting protocol listener", e);
}

Expand All @@ -130,47 +129,94 @@ protected void doStart() {
final RuntimeContext context = new RuntimeContext();
context.addLocation(NetworkUtils.getGPFDistUri(gpfdistServer.getLocalPort()));

sqlTaskScheduler.schedule((new FutureTask<Void>(() -> {
boolean taskValue = true;
try {
while(!taskFuture.interrupted) {
try {
greenplumLoad.load(context);
} catch (Exception e) {
log.error("Error in load", e);
sqlTaskScheduler.schedule((new FutureTask<Void>(new Runnable() {

@Override
public void run() {
boolean taskValue = true;
try {
while (!taskFuture.interrupted) {
try {
greenplumLoad.load(context);
}
catch (Exception e) {
log.error("Error in load", e);
}
Thread.sleep(batchPeriod * 1000);
}
Thread.sleep(batchPeriod*1000);
}
} catch (Exception e) {
taskValue = false;
catch (Exception e) {
taskValue = false;
}
taskFuture.set(taskValue);
}
taskFuture.set(taskValue);
}, null)), new Date());

} else {
}
else {
log.info("Skipping gpload tasks because greenplumLoad is not set");
}
}

@Override
protected void doStop() {
boolean drained = false;
if (greenplumLoad != null) {

// xd waits 30s to shutdown module, so lets wait 25 to drain
long waitDrain = System.currentTimeMillis() + 25000l;

log.info("Trying to wait buffer to get drained");
while (System.currentTimeMillis() < waitDrain) {
long capacity = processor.getCapacity();
long availableCapacity = processor.getAvailableCapacity();
log.info("Buffer capacity " + capacity);
log.info("Buffer available capacity " + availableCapacity);
if (capacity != availableCapacity) {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
}
else {
log.info("Marking stream drained");
drained = true;
break;
}
}

// try to wait current load operation to finish.
taskFuture.interruptTask();
try {
long now = System.currentTimeMillis();
// wait a bit more than batch period
log.info("Cancelling loading task");
Boolean value = taskFuture.get(batchTimeout + batchPeriod + 2, TimeUnit.SECONDS);
log.info("Stopping, got future value " + value + " from task which took "
+ (System.currentTimeMillis() - now) + "ms");
} catch (Exception e) {
}
catch (Exception e) {
log.warn("Got error from task wait value which may indicate trouble", e);
}
}

try {
processor.onComplete();
if (drained) {
log.info("Sending onComplete to processor");
processor.onComplete();
}
else {
// if it looks like we didn't drain,
// force shutdown as onComplete will
// block otherwise.
log.info("Forcing processor shutdown");
processor.forceShutdown();
}
log.info("Shutting down protocol listener");
gpfdistServer.stop();
} catch (Exception e) {
}
catch (Exception e) {
log.warn("Error shutting down protocol listener", e);
}
}
Expand Down

0 comments on commit 6524e72

Please sign in to comment.