Skip to content

Commit

Permalink
LocalJobsStreamsTest.getStreams_cat_multiline fails sometimes.
Browse files Browse the repository at this point in the history
Debugging why.

Added test to run directly on service instead of via inprocess server.
  • Loading branch information
sverhoeven committed Jun 26, 2017
1 parent 24c8019 commit 35613de
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ class StreamForwarder extends Thread {
public void run() {
try {
while (true) {
int read = in.read(buffer);

int read = in.read(buffer);
System.err.println("Been here" + this.stdout);
if (read > 0) {
XenonProto.JobOutputStreams response;

Expand All @@ -52,7 +53,8 @@ public void run() {
writeOut(response);

} else if (read == -1) {
close();
System.err.println("Close" + this.stdout);
close();
return;
} }
} catch (IOException e) {
Expand All @@ -69,12 +71,13 @@ public void run() {
new StreamForwarder(stderr, false).start();
}

private synchronized void writeOut(XenonProto.JobOutputStreams response) {
private void writeOut(XenonProto.JobOutputStreams response) {
observer.onNext(response);
}

public synchronized void close() {
if (++streamsDone == 2) {
if (++streamsDone == 2) {
System.err.println("Closeall");
observer.onCompleted();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,10 @@ public void onNext(XenonProto.JobInputStream value) {
forwarder = new JobOutputStreamsForwarder(responseObserver, streams.getStderr(), streams.getStdout());
}
// write incoming stdin to xenons stdin

System.err.println("stdint forward");
streams.getStdin().write(value.getStdin().toByteArray());
streams.getStdin().flush();
System.err.println("stdint forwarded");
} catch (XenonException | IOException e) {
responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asException());
} catch (StatusException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package nl.esciencecenter.xenon.grpc.jobs;

import static java.lang.Thread.sleep;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;

import nl.esciencecenter.xenon.grpc.XenonProto;
import nl.esciencecenter.xenon.grpc.XenonSingleton;

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JobsServiceGetStreamsTest {
@Rule
public TemporaryFolder myfolder = new TemporaryFolder();

@Test
public void getStreams() throws Exception {
XenonSingleton singleton = new XenonSingleton();
JobsService service = new JobsService(singleton);

// scheduler
XenonProto.Empty empty = XenonProto.Empty.getDefaultInstance();
LocalSchedulerObserver localSchedulerObserver = new LocalSchedulerObserver();
service.localScheduler(empty, localSchedulerObserver);
XenonProto.Scheduler scheduler = localSchedulerObserver.scheduler;


// submit job
XenonProto.JobDescription description = XenonProto.JobDescription.newBuilder()
.setExecutable("cat")
.setQueueName("multi")
.setWorkingDirectory(myfolder.getRoot().getAbsolutePath())
.setInteractive(true)
.build();
XenonProto.SubmitJobRequest jobRequest = XenonProto.SubmitJobRequest.newBuilder()
.setDescription(description)
.setScheduler(scheduler)
.build();

JobObserver jobResponseObserver = new JobObserver();
service.submitJob(jobRequest, jobResponseObserver);
XenonProto.Job job = jobResponseObserver.job;

// getStreams
ResponseObserver responseObserver = new ResponseObserver();
StreamObserver<XenonProto.JobInputStream> requestWriter = service.getStreams(responseObserver);

// send first message
XenonProto.JobInputStream.Builder builder = XenonProto.JobInputStream.newBuilder()
.setJob(job);
ByteString line1 = ByteString.copyFromUtf8("first line\n");
XenonProto.JobInputStream request1 = builder.setStdin(line1).build();
requestWriter.onNext(request1);

// allow cat and xenon to work
sleep(100);

// receive first message
XenonProto.JobOutputStreams expected1 = XenonProto.JobOutputStreams.newBuilder().setStdout(line1).build();
assertThat(responseObserver.value, equalTo(expected1));

// send second and last message
ByteString line2 = ByteString.copyFromUtf8("second line\n");
XenonProto.JobInputStream request2 = builder.setStdin(line2).build();
requestWriter.onNext(request2);
requestWriter.onCompleted();

// allow cat and xenon to work
sleep(100);

// receive first message
XenonProto.JobOutputStreams expected2 = XenonProto.JobOutputStreams.newBuilder().setStdout(line2).build();
assertThat(responseObserver.value, equalTo(expected2));

singleton.close();
}

private class LocalSchedulerObserver implements StreamObserver<XenonProto.Scheduler> {
XenonProto.Scheduler scheduler;

@Override
public void onNext(XenonProto.Scheduler value) {
scheduler = value;
}

@Override
public void onError(Throwable t) {

}

@Override
public void onCompleted() {

}
}

private class JobObserver implements StreamObserver<XenonProto.Job> {
XenonProto.Job job;

@Override
public void onNext(XenonProto.Job value) {
job = value;
}

@Override
public void onError(Throwable t) {

}

@Override
public void onCompleted() {

}
}

private class ResponseObserver implements StreamObserver<XenonProto.JobOutputStreams> {
XenonProto.JobOutputStreams value;

@Override
public void onNext(XenonProto.JobOutputStreams value) {
this.value = value;
}

@Override
public void onError(Throwable t) {

}

@Override
public void onCompleted() {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.IOException;

import nl.esciencecenter.xenon.XenonException;
import nl.esciencecenter.xenon.XenonFactory;
import nl.esciencecenter.xenon.grpc.XenonJobsGrpc;
import nl.esciencecenter.xenon.grpc.XenonProto;
import nl.esciencecenter.xenon.grpc.XenonSingleton;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
package nl.esciencecenter.xenon.grpc.jobs;

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import java.io.IOException;

import nl.esciencecenter.xenon.grpc.XenonJobsGrpc;
import nl.esciencecenter.xenon.grpc.XenonProto;
import nl.esciencecenter.xenon.grpc.XenonProto.JobOutputStreams;

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;

import java.io.IOException;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

public class LocalJobsStreamsTest extends LocalJobsServiceTestBase {
private XenonJobsGrpc.XenonJobsStub aclient;

Expand Down Expand Up @@ -77,9 +82,8 @@ public void getStreams_wc() {
verify(responseObserver, never()).onError(any(Throwable.class));
}

//@Ignore("request.onCompleted is called too soon causing failure")
@Test
public void getStreams_cat_multiline() {
public void getStreams_cat_multiline() throws InterruptedException {
// submit job
XenonProto.JobDescription description = XenonProto.JobDescription.newBuilder()
.setExecutable("cat")
Expand All @@ -97,6 +101,23 @@ public void getStreams_cat_multiline() {
// mock receiver
@SuppressWarnings("unchecked")
StreamObserver<JobOutputStreams> responseObserver = mock(StreamObserver.class);
// StreamObserver<JobOutputStreams> responseObserver = new StreamObserver<JobOutputStreams>() {
// @Override
// public void onNext(JobOutputStreams value) {
// // System.err.println(value.getStderr().toString(Charset.defaultCharset()));
// // System.err.println(value.getStdout().toString(Charset.defaultCharset()));
// }
//
// @Override
// public void onError(Throwable t) {
// System.err.println("onError");
// }
//
// @Override
// public void onCompleted() {
// System.err.println("onClose");
// }
// };
ArgumentCaptor<JobOutputStreams> responseCapturer = ArgumentCaptor.forClass(JobOutputStreams.class);
InOrder inorder = inOrder(responseObserver);
// call method under test
Expand All @@ -109,23 +130,30 @@ public void getStreams_cat_multiline() {
XenonProto.JobInputStream request1 = builder.setStdin(line1).build();
requestWriter.onNext(request1);

sleep(100);

// receive first message
inorder.verify(responseObserver, timeout(100)).onNext(responseCapturer.capture());
JobOutputStreams expected1 = JobOutputStreams.newBuilder().setStdout(line1).build();
assertEquals(expected1, responseCapturer.getValue());

sleep(100);

// send second message
ByteString line2 = ByteString.copyFromUtf8("second line\n");
XenonProto.JobInputStream request2 = builder.setStdin(line2).build();
requestWriter.onNext(request2);
requestWriter.onCompleted();

sleep(100);

// receive second message
inorder.verify(responseObserver, timeout(100)).onNext(responseCapturer.capture());
System.err.println("Been here");
JobOutputStreams expected2 = JobOutputStreams.newBuilder().setStdout(line2).build();
assertEquals(expected2, responseCapturer.getValue());

requestWriter.onCompleted();
System.err.println("Been here2");
// no surprises
verify(responseObserver, timeout(100)).onCompleted();
verify(responseObserver, never()).onError(any(Throwable.class));
Expand Down

0 comments on commit 35613de

Please sign in to comment.