Skip to content

Commit

Permalink
Moved all getStreams tests to LocalJobsStreamsTest
Browse files Browse the repository at this point in the history
Removed debug prints

Added getStreams no job test

Fixes #11
Refs #19
  • Loading branch information
sverhoeven committed Jul 7, 2017
1 parent bc40f1a commit 69f043a
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public void run() {
while (true) {

int read = in.read(buffer);
System.err.println("Been here" + this.stdout);
if (read > 0) {
if (read > 0) {
XenonProto.JobOutputStreams response;

if (stdout) {
Expand All @@ -53,7 +52,6 @@ public void run() {
writeOut(response);

} else if (read == -1) {
System.err.println("Close" + this.stdout);
close();
return;
} }
Expand All @@ -77,7 +75,6 @@ private synchronized void writeOut(XenonProto.JobOutputStreams response) {

public synchronized void close() {
if (++streamsDone == 2) {
System.err.println("Closeall");
observer.onCompleted();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,16 +466,17 @@ public StreamObserver<XenonProto.JobInputStream> getStreams(StreamObserver<Xenon
public void onNext(XenonProto.JobInputStream value) {
try {
if (streams == null) {
if (XenonProto.Job.getDefaultInstance().equals(value.getJob())) {
throw Status.INVALID_ARGUMENT.augmentDescription("job value is required").asException();
}
Jobs jobs = singleton.getInstance().jobs();
Job job = getJob(value.getJob());
streams = jobs.getStreams(job);
forwarder = new JobOutputStreamsForwarder(responseObserver, streams.getStderr(), streams.getStdout());
}
// write incoming stdin to xenons stdin
System.err.println("stdint forward");
streams.getStdin().write(value.getStdin().toByteArray());
streams.getStdin().flush();
System.err.println("stdint forwarded");
} catch (XenonException | IOException e) {
responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asException());
} catch (StatusException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,95 @@

import static java.lang.Thread.sleep;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import io.grpc.Status;
import io.grpc.StatusException;
import nl.esciencecenter.xenon.grpc.XenonProto;
import nl.esciencecenter.xenon.grpc.XenonSingleton;

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentCaptor;

/**
* The LocalJobsServiceTestBase uses in-process server which added even more a-synchronicity
* This caused unpredictable behavior, so getStreams is tested directly against the service class.
*/
public class JobsServiceGetStreamsTest {
@Rule
public TemporaryFolder myfolder = new TemporaryFolder();
private XenonSingleton singleton;
private JobsService service;
private XenonProto.Scheduler scheduler;

@Before
public void setUp() {
singleton = new XenonSingleton();
service = new JobsService(singleton);
scheduler = getScheduler(service);
}

@After
public void tearDown() {
service.close(scheduler, new EmptyObserver());
singleton.close();
}

@Test
public void getStreams() throws Exception {
XenonSingleton singleton = new XenonSingleton();
JobsService service = new JobsService(singleton);
public void getStreams_singlerequestmessage() throws InterruptedException {
// submit job
XenonProto.JobDescription description = XenonProto.JobDescription.newBuilder()
.setExecutable("wc")
.setQueueName("multi")
.setWorkingDirectory(myfolder.getRoot().getAbsolutePath())
.setInteractive(true)
.build();
XenonProto.Job job = submit(service, scheduler, description);

// scheduler
XenonProto.Empty empty = XenonProto.Empty.getDefaultInstance();
LocalSchedulerObserver localSchedulerObserver = new LocalSchedulerObserver();
service.localScheduler(empty, localSchedulerObserver);
XenonProto.Scheduler scheduler = localSchedulerObserver.scheduler;
// getStreams
ResponseObserver responseObserver = new ResponseObserver();
StreamObserver<XenonProto.JobInputStream> requestWriter = service.getStreams(responseObserver);

// send
ByteString stdin = ByteString.copyFromUtf8("a piece of text");
XenonProto.JobInputStream request = XenonProto.JobInputStream.newBuilder()
.setJob(job)
.setStdin(stdin)
.build();

requestWriter.onNext(request);
requestWriter.onCompleted();

// allow wc and xenon to work
sleep(100);

// receive
XenonProto.JobOutputStreams expected = XenonProto.JobOutputStreams.newBuilder().setStdout(ByteString.copyFromUtf8(" 0 4 15\n")).build();
assertThat(responseObserver.value, equalTo(expected));

// response end
assertNull(responseObserver.error);
assertTrue(responseObserver.completed);
}

@Test
public void getStreams_sendrecievesendrecieve() throws Exception {
// submit job
XenonProto.JobDescription description = XenonProto.JobDescription.newBuilder()
.setExecutable("cat")
.setQueueName("multi")
.setWorkingDirectory(myfolder.getRoot().getAbsolutePath())
.setInteractive(true)
.build();
XenonProto.SubmitJobRequest jobRequest = XenonProto.SubmitJobRequest.newBuilder()
.setDescription(description)
.setScheduler(scheduler)
.build();

JobObserver jobResponseObserver = new JobObserver();
service.submitJob(jobRequest, jobResponseObserver);
XenonProto.Job job = jobResponseObserver.job;
XenonProto.Job job = submit(service, scheduler, description);

// getStreams
ResponseObserver responseObserver = new ResponseObserver();
Expand Down Expand Up @@ -76,7 +123,55 @@ public void getStreams() throws Exception {
XenonProto.JobOutputStreams expected2 = XenonProto.JobOutputStreams.newBuilder().setStdout(line2).build();
assertThat(responseObserver.value, equalTo(expected2));

singleton.close();
// response end
assertNull(responseObserver.error);
assertTrue(responseObserver.completed);
}

@Test
public void getStreams_nojobsend_responseerror() {
// submit job
XenonProto.JobDescription description = XenonProto.JobDescription.newBuilder()
.setExecutable("cat")
.setQueueName("multi")
.setWorkingDirectory(myfolder.getRoot().getAbsolutePath())
.setInteractive(true)
.build();
XenonProto.Job job = submit(service, scheduler, description);

// getStreams
ResponseObserver responseObserver = new ResponseObserver();
StreamObserver<XenonProto.JobInputStream> requestWriter = service.getStreams(responseObserver);

// send first message
XenonProto.JobInputStream.Builder builder = XenonProto.JobInputStream.newBuilder();
ByteString line1 = ByteString.copyFromUtf8("first line\n");
XenonProto.JobInputStream request1 = builder.setStdin(line1).build();
requestWriter.onNext(request1);

String expected = "INVALID_ARGUMENT: job value is required";
assertThat(responseObserver.error.getMessage(), equalTo(expected));
assertNull(responseObserver.value);
assertFalse(responseObserver.completed);
}

private XenonProto.Job submit(JobsService service, XenonProto.Scheduler scheduler, XenonProto.JobDescription description) {
XenonProto.SubmitJobRequest jobRequest = XenonProto.SubmitJobRequest.newBuilder()
.setDescription(description)
.setScheduler(scheduler)
.build();

JobObserver jobResponseObserver = new JobObserver();
service.submitJob(jobRequest, jobResponseObserver);
return jobResponseObserver.job;
}

private XenonProto.Scheduler getScheduler(JobsService service) {
// scheduler
XenonProto.Empty empty = XenonProto.Empty.getDefaultInstance();
LocalSchedulerObserver localSchedulerObserver = new LocalSchedulerObserver();
service.localScheduler(empty, localSchedulerObserver);
return localSchedulerObserver.scheduler;
}

private class LocalSchedulerObserver implements StreamObserver<XenonProto.Scheduler> {
Expand Down Expand Up @@ -118,13 +213,32 @@ public void onCompleted() {
}

private class ResponseObserver implements StreamObserver<XenonProto.JobOutputStreams> {
XenonProto.JobOutputStreams value;
XenonProto.JobOutputStreams value = null;
Throwable error = null;
boolean completed = false;

@Override
public void onNext(XenonProto.JobOutputStreams value) {
this.value = value;
}

@Override
public void onError(Throwable t) {
error = t;
}

@Override
public void onCompleted() {
completed = true;
}
}

private class EmptyObserver implements StreamObserver<XenonProto.Empty> {
@Override
public void onNext(XenonProto.Empty value) {

}

@Override
public void onError(Throwable t) {

Expand Down
Loading

0 comments on commit 69f043a

Please sign in to comment.