diff --git a/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobOutputStreamsForwarder.java b/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobOutputStreamsForwarder.java index 326119c..a7a3daa 100644 --- a/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobOutputStreamsForwarder.java +++ b/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobOutputStreamsForwarder.java @@ -38,8 +38,9 @@ class StreamForwarder extends Thread { public void run() { try { while (true) { - int read = in.read(buffer); + int read = in.read(buffer); + System.err.println("Been here" + this.stdout); if (read > 0) { XenonProto.JobOutputStreams response; @@ -52,7 +53,8 @@ public void run() { writeOut(response); } else if (read == -1) { - close(); + System.err.println("Close" + this.stdout); + close(); return; } } } catch (IOException e) { @@ -69,12 +71,13 @@ public void run() { new StreamForwarder(stderr, false).start(); } - private synchronized void writeOut(XenonProto.JobOutputStreams response) { + private void writeOut(XenonProto.JobOutputStreams response) { observer.onNext(response); } public synchronized void close() { - if (++streamsDone == 2) { + if (++streamsDone == 2) { + System.err.println("Closeall"); observer.onCompleted(); } } diff --git a/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobsService.java b/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobsService.java index 61ba0bc..aba5a04 100644 --- a/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobsService.java +++ b/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobsService.java @@ -472,9 +472,10 @@ public void onNext(XenonProto.JobInputStream value) { forwarder = new JobOutputStreamsForwarder(responseObserver, streams.getStderr(), streams.getStdout()); } // write incoming stdin to xenons stdin - + System.err.println("stdint forward"); streams.getStdin().write(value.getStdin().toByteArray()); streams.getStdin().flush(); + System.err.println("stdint forwarded"); } catch (XenonException | IOException e) { responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asException()); } catch (StatusException e) { diff --git a/src/test/java/nl/esciencecenter/xenon/grpc/jobs/JobsServiceGetStreamsTest.java b/src/test/java/nl/esciencecenter/xenon/grpc/jobs/JobsServiceGetStreamsTest.java new file mode 100644 index 0000000..2a88940 --- /dev/null +++ b/src/test/java/nl/esciencecenter/xenon/grpc/jobs/JobsServiceGetStreamsTest.java @@ -0,0 +1,138 @@ +package nl.esciencecenter.xenon.grpc.jobs; + +import static java.lang.Thread.sleep; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import nl.esciencecenter.xenon.grpc.XenonProto; +import nl.esciencecenter.xenon.grpc.XenonSingleton; + +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class JobsServiceGetStreamsTest { + @Rule + public TemporaryFolder myfolder = new TemporaryFolder(); + + @Test + public void getStreams() throws Exception { + XenonSingleton singleton = new XenonSingleton(); + JobsService service = new JobsService(singleton); + + // scheduler + XenonProto.Empty empty = XenonProto.Empty.getDefaultInstance(); + LocalSchedulerObserver localSchedulerObserver = new LocalSchedulerObserver(); + service.localScheduler(empty, localSchedulerObserver); + XenonProto.Scheduler scheduler = localSchedulerObserver.scheduler; + + + // submit job + XenonProto.JobDescription description = XenonProto.JobDescription.newBuilder() + .setExecutable("cat") + .setQueueName("multi") + .setWorkingDirectory(myfolder.getRoot().getAbsolutePath()) + .setInteractive(true) + .build(); + XenonProto.SubmitJobRequest jobRequest = XenonProto.SubmitJobRequest.newBuilder() + .setDescription(description) + .setScheduler(scheduler) + .build(); + + JobObserver jobResponseObserver = new JobObserver(); + service.submitJob(jobRequest, jobResponseObserver); + XenonProto.Job job = jobResponseObserver.job; + + // getStreams + ResponseObserver responseObserver = new ResponseObserver(); + StreamObserver requestWriter = service.getStreams(responseObserver); + + // send first message + XenonProto.JobInputStream.Builder builder = XenonProto.JobInputStream.newBuilder() + .setJob(job); + ByteString line1 = ByteString.copyFromUtf8("first line\n"); + XenonProto.JobInputStream request1 = builder.setStdin(line1).build(); + requestWriter.onNext(request1); + + // allow cat and xenon to work + sleep(100); + + // receive first message + XenonProto.JobOutputStreams expected1 = XenonProto.JobOutputStreams.newBuilder().setStdout(line1).build(); + assertThat(responseObserver.value, equalTo(expected1)); + + // send second and last message + ByteString line2 = ByteString.copyFromUtf8("second line\n"); + XenonProto.JobInputStream request2 = builder.setStdin(line2).build(); + requestWriter.onNext(request2); + requestWriter.onCompleted(); + + // allow cat and xenon to work + sleep(100); + + // receive first message + XenonProto.JobOutputStreams expected2 = XenonProto.JobOutputStreams.newBuilder().setStdout(line2).build(); + assertThat(responseObserver.value, equalTo(expected2)); + + singleton.close(); + } + + private class LocalSchedulerObserver implements StreamObserver { + XenonProto.Scheduler scheduler; + + @Override + public void onNext(XenonProto.Scheduler value) { + scheduler = value; + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onCompleted() { + + } + } + + private class JobObserver implements StreamObserver { + XenonProto.Job job; + + @Override + public void onNext(XenonProto.Job value) { + job = value; + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onCompleted() { + + } + } + + private class ResponseObserver implements StreamObserver { + XenonProto.JobOutputStreams value; + + @Override + public void onNext(XenonProto.JobOutputStreams value) { + this.value = value; + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onCompleted() { + + } + } +} \ No newline at end of file diff --git a/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsServiceTestBase.java b/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsServiceTestBase.java index 9c86e72..9277f2d 100644 --- a/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsServiceTestBase.java +++ b/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsServiceTestBase.java @@ -3,7 +3,6 @@ import java.io.IOException; import nl.esciencecenter.xenon.XenonException; -import nl.esciencecenter.xenon.XenonFactory; import nl.esciencecenter.xenon.grpc.XenonJobsGrpc; import nl.esciencecenter.xenon.grpc.XenonProto; import nl.esciencecenter.xenon.grpc.XenonSingleton; diff --git a/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsStreamsTest.java b/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsStreamsTest.java index 24ac60d..d3f7a6a 100644 --- a/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsStreamsTest.java +++ b/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsStreamsTest.java @@ -1,22 +1,27 @@ package nl.esciencecenter.xenon.grpc.jobs; -import com.google.protobuf.ByteString; -import io.grpc.stub.StreamObserver; +import static java.lang.Thread.sleep; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + import nl.esciencecenter.xenon.grpc.XenonJobsGrpc; import nl.esciencecenter.xenon.grpc.XenonProto; import nl.esciencecenter.xenon.grpc.XenonProto.JobOutputStreams; + +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; -import java.io.IOException; - -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; - public class LocalJobsStreamsTest extends LocalJobsServiceTestBase { private XenonJobsGrpc.XenonJobsStub aclient; @@ -77,9 +82,8 @@ public void getStreams_wc() { verify(responseObserver, never()).onError(any(Throwable.class)); } - //@Ignore("request.onCompleted is called too soon causing failure") @Test - public void getStreams_cat_multiline() { + public void getStreams_cat_multiline() throws InterruptedException { // submit job XenonProto.JobDescription description = XenonProto.JobDescription.newBuilder() .setExecutable("cat") @@ -97,6 +101,23 @@ public void getStreams_cat_multiline() { // mock receiver @SuppressWarnings("unchecked") StreamObserver responseObserver = mock(StreamObserver.class); +// StreamObserver responseObserver = new StreamObserver() { +// @Override +// public void onNext(JobOutputStreams value) { +// // System.err.println(value.getStderr().toString(Charset.defaultCharset())); +// // System.err.println(value.getStdout().toString(Charset.defaultCharset())); +// } +// +// @Override +// public void onError(Throwable t) { +// System.err.println("onError"); +// } +// +// @Override +// public void onCompleted() { +// System.err.println("onClose"); +// } +// }; ArgumentCaptor responseCapturer = ArgumentCaptor.forClass(JobOutputStreams.class); InOrder inorder = inOrder(responseObserver); // call method under test @@ -109,23 +130,30 @@ public void getStreams_cat_multiline() { XenonProto.JobInputStream request1 = builder.setStdin(line1).build(); requestWriter.onNext(request1); + sleep(100); + // receive first message inorder.verify(responseObserver, timeout(100)).onNext(responseCapturer.capture()); JobOutputStreams expected1 = JobOutputStreams.newBuilder().setStdout(line1).build(); assertEquals(expected1, responseCapturer.getValue()); + sleep(100); + // send second message ByteString line2 = ByteString.copyFromUtf8("second line\n"); XenonProto.JobInputStream request2 = builder.setStdin(line2).build(); requestWriter.onNext(request2); + requestWriter.onCompleted(); + sleep(100); // receive second message inorder.verify(responseObserver, timeout(100)).onNext(responseCapturer.capture()); + System.err.println("Been here"); JobOutputStreams expected2 = JobOutputStreams.newBuilder().setStdout(line2).build(); assertEquals(expected2, responseCapturer.getValue()); - requestWriter.onCompleted(); + System.err.println("Been here2"); // no surprises verify(responseObserver, timeout(100)).onCompleted(); verify(responseObserver, never()).onError(any(Throwable.class));