Skip to content
This repository has been archived by the owner on Apr 5, 2022. It is now read-only.

XD-3751 Fix gpfdist processor shutdown #1901

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.xd.greenplum.gpfdist;

import java.util.Date;
Expand All @@ -21,7 +22,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Processor;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -31,12 +32,12 @@
import org.springframework.xd.greenplum.support.NetworkUtils;
import org.springframework.xd.greenplum.support.RuntimeContext;

import com.codahale.metrics.Meter;

import reactor.Environment;
import reactor.core.processor.RingBufferProcessor;
import reactor.io.buffer.Buffer;

import com.codahale.metrics.Meter;

public class GPFDistMessageHandler extends AbstractGPFDistMessageHandler {

private final Log log = LogFactory.getLog(GPFDistMessageHandler.class);
Expand All @@ -57,7 +58,7 @@ public class GPFDistMessageHandler extends AbstractGPFDistMessageHandler {

private GreenplumLoad greenplumLoad;

private Processor<Buffer, Buffer> processor;
private RingBufferProcessor<Buffer> processor;

private GPFDistServer gpfdistServer;

Expand All @@ -66,7 +67,9 @@ public class GPFDistMessageHandler extends AbstractGPFDistMessageHandler {
private final TaskFuture taskFuture = new TaskFuture();

private int rateInterval = 0;
private Meter meter = null;

private Meter meter = null;

private int meterCount = 0;

public GPFDistMessageHandler(int port, int flushCount, int flushTime, int batchTimeout, int batchCount,
Expand All @@ -85,19 +88,22 @@ public GPFDistMessageHandler(int port, int flushCount, int flushTime, int batchT
protected void doWrite(Message<?> message) throws Exception {
Object payload = message.getPayload();
if (payload instanceof String) {
String data = (String)payload;
String data = (String) payload;
if (delimiter != null) {
processor.onNext(Buffer.wrap(data+delimiter));
} else {
processor.onNext(Buffer.wrap(data + delimiter));
}
else {
processor.onNext(Buffer.wrap(data));
}
if (meter != null) {
if ((meterCount++ % rateInterval) == 0) {
meter.mark(rateInterval);
log.info("METER: 1 minute rate = " + meter.getOneMinuteRate() + " mean rate = " + meter.getMeanRate());
log.info("METER: 1 minute rate = " + meter.getOneMinuteRate() + " mean rate = "
+ meter.getMeanRate());
}
}
} else {
}
else {
throw new MessageHandlingException(message, "message not a String");
}
}
Expand All @@ -116,7 +122,8 @@ protected void doStart() {
gpfdistServer = new GPFDistServer(processor, port, flushCount, flushTime, batchTimeout, batchCount);
gpfdistServer.start();
log.info("gpfdist protocol listener running on port=" + gpfdistServer.getLocalPort());
} catch (Exception e) {
}
catch (Exception e) {
throw new RuntimeException("Error starting protocol listener", e);
}

Expand All @@ -127,49 +134,93 @@ protected void doStart() {
context.addLocation(NetworkUtils.getGPFDistUri(gpfdistServer.getLocalPort()));

sqlTaskScheduler.schedule((new FutureTask<Void>(new Runnable() {

@Override
public void run() {
boolean taskValue = true;
try {
while(!taskFuture.interrupted) {
while (!taskFuture.interrupted) {
try {
greenplumLoad.load(context);
} catch (Exception e) {
}
catch (Exception e) {
log.error("Error in load", e);
}
Thread.sleep(batchPeriod*1000);
Thread.sleep(batchPeriod * 1000);
}
} catch (Exception e) {
}
catch (Exception e) {
taskValue = false;
}
taskFuture.set(taskValue);
}
}, null)), new Date());

} else {
}
else {
log.info("Skipping gpload tasks because greenplumLoad is not set");
}
}

@Override
protected void doStop() {
boolean drained = false;
if (greenplumLoad != null) {

// xd waits 30s to shutdown module, so lets wait 25 to drain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this a bit more - sounds like some sort of race condition, shouldn't we let the entire buffer drain since the messages in the buffer have been ack'd (say in rabbit) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deadlock within a reactor is something which exists in 2.0.x, although it fixed in 2.5. Effectively when trying to shutdown a processor, signal is sent into a downstream indicating its complete but if there's existing messages in a ringbuffer, that terminate signal never reach a correct component in a reactor because we already stopped draining. Module shutdown timeout is afaik hardcoded to 30 secs in XD and after that things go a bit haywire if module is not actually properly closed.

This was a workaround I came out with discussion with stephane. It rely on a fact that we try to keep the load operations running little less time when XD would throw errors that it's unable to shutdown a module. We're hoping that these load operations will eventually drain the buffers and allows terminate signal to go down stream, thus allowing processor to shutdown and thus allow clean shutdown of a module.

long waitDrain = System.currentTimeMillis() + 25000l;

log.info("Trying to wait buffer to get drained");
while (System.currentTimeMillis() < waitDrain) {
long capacity = processor.getCapacity();
long availableCapacity = processor.getAvailableCapacity();
log.info("Buffer capacity " + capacity);
log.info("Buffer available capacity " + availableCapacity);
if (capacity != availableCapacity) {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
}
else {
log.info("Marking stream drained");
drained = true;
break;
}
}

// try to wait current load operation to finish.
taskFuture.interruptTask();
try {
long now = System.currentTimeMillis();
// wait a bit more than batch period
log.info("Cancelling loading task");
Boolean value = taskFuture.get(batchTimeout + batchPeriod + 2, TimeUnit.SECONDS);
log.info("Stopping, got future value " + value + " from task which took "
+ (System.currentTimeMillis() - now) + "ms");
} catch (Exception e) {
}
catch (Exception e) {
log.warn("Got error from task wait value which may indicate trouble", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just seeing this now, but I'd probably rip my hair out if I had to read this error message in production. What is the trouble?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to add a logging here instead of just ignoring if we get an exception from a TaskScheduler when cancelling Future. I'm not entirely sure when that exception would be thrown but wanted to add it to logs in case shutdown somehow still fails and we're getting exception here.

}
}

try {
processor.onComplete();
if (drained) {
log.info("Sending onComplete to processor");
processor.onComplete();
}
else {
// if it looks like we didn't drain,
// force shutdown as onComplete will
// block otherwise.
log.info("Forcing processor shutdown");
processor.forceShutdown();
}
log.info("Shutting down protocol listener");
gpfdistServer.stop();
} catch (Exception e) {
}
catch (Exception e) {
log.warn("Error shutting down protocol listener", e);
}
}
Expand Down