diff --git a/servicetalk-concurrent-api/gradle/checkstyle/suppressions.xml b/servicetalk-concurrent-api/gradle/checkstyle/suppressions.xml
index 8979886a5c..acf1734e7b 100644
--- a/servicetalk-concurrent-api/gradle/checkstyle/suppressions.xml
+++ b/servicetalk-concurrent-api/gradle/checkstyle/suppressions.xml
@@ -24,4 +24,8 @@
+
+
+
diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java
index e0fcdd61ec..e499266378 100644
--- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java
+++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java
@@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import javax.annotation.Nullable;
import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
@@ -132,8 +131,6 @@ private static final class InputStreamPublisherSubscription implements Subscript
* Subscription} are terminated.
*/
private long requested;
- @Nullable
- private byte[] buffer;
private int writeIdx;
private boolean ignoreRequests;
@@ -176,14 +173,29 @@ public void cancel() {
private void readAndDeliver(final Subscriber super byte[]> subscriber) {
try {
do {
+ // Initialize readByte with a negative value different from END_OF_FILE as an indicator that it was
+ // not initialized.
+ int readByte = Integer.MIN_VALUE;
// Can't fully trust available(), but it's a reasonable hint to mitigate blocking on read().
int available = stream.available();
if (available == 0) {
- // Work around InputStreams that don't strictly honor the 0 == EOF contract.
- available = buffer != null ? buffer.length : readChunkSize;
+ // This can be an indicator of EOF or a signal that no bytes are available to read without
+ // blocking. To avoid unnecessary allocation, we first probe for EOF:
+ readByte = stream.read();
+ if (readByte == END_OF_FILE) {
+ sendOnComplete(subscriber);
+ return;
+ }
+ // There is a chance a single read triggered availability of more bytes, let's check:
+ available = stream.available();
+ if (available == 0) {
+ // This InputStream either does not implement available() method at all, or does not honor
+ // the 0 == EOF contract, or does not prefetch data in larger chunks.
+ // In this case, we attempt to read based on the configured readChunkSize:
+ available = readChunkSize;
+ }
}
- available = fillBufferAvoidingBlocking(available);
- emitSingleBuffer(subscriber);
+ available = readAvailableAndEmit(available, readByte);
if (available == END_OF_FILE) {
sendOnComplete(subscriber);
return;
@@ -194,11 +206,21 @@ private void readAndDeliver(final Subscriber super byte[]> subscriber) {
}
}
- // This method honors the estimated available bytes that can be read without blocking
- private int fillBufferAvoidingBlocking(int available) throws IOException {
- if (buffer == null) {
+ private int readAvailableAndEmit(final int available, final int readByte) throws IOException {
+ final byte[] buffer;
+ if (readByte >= 0) {
+ buffer = new byte[available < readChunkSize ? available + 1 : readChunkSize];
+ buffer[writeIdx++] = (byte) readByte;
+ } else {
buffer = new byte[min(available, readChunkSize)];
}
+ final int remainingLength = fillBuffer(buffer, available);
+ emitSingleBuffer(subscriber, buffer, remainingLength);
+ return remainingLength;
+ }
+
+ // This method honors the estimated available bytes that can be read without blocking
+ private int fillBuffer(final byte[] buffer, int available) throws IOException {
while (writeIdx != buffer.length && available > 0) {
int len = min(buffer.length - writeIdx, available);
int readActual = stream.read(buffer, writeIdx, len); // may block if len > available
@@ -211,15 +233,17 @@ private int fillBufferAvoidingBlocking(int available) throws IOException {
return available;
}
- private void emitSingleBuffer(final Subscriber super byte[]> subscriber) {
+ private void emitSingleBuffer(final Subscriber super byte[]> subscriber,
+ final byte[] buffer, final int remainingLength) {
if (writeIdx < 1) {
+ assert remainingLength == END_OF_FILE :
+ "unexpected writeIdx == 0 while we still have some remaining data to read";
return;
}
- assert buffer != null : "should have a buffer when writeIdx > 0";
+ assert writeIdx <= buffer.length : "writeIdx can not be grater than buffer.length";
final byte[] b;
if (writeIdx == buffer.length) {
b = buffer;
- buffer = null;
} else {
// this extra copy is necessary when we read the last chunk and total number of bytes read before EOF
// is less than guesstimated buffer size
@@ -242,7 +266,7 @@ private void sendOnComplete(final Subscriber super byte[]> subscriber) {
}
}
- private void sendOnError(final Subscriber super byte[]> subscriber, final T t) {
+ private void sendOnError(final Subscriber super byte[]> subscriber, final Throwable t) {
if (trySetTerminalSent()) {
try {
subscriber.onError(t);
diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java
index 219e7f3197..41a50e91a7 100644
--- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java
+++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java
@@ -368,37 +368,77 @@ void readsAllBytesWhenAvailableNotImplemented() throws Throwable {
}
@ParameterizedTest(name = "{displayName} [{index}] readChunkSize={0}")
- @ValueSource(ints = {7, 1024})
+ @ValueSource(ints = {4, 1024})
void doNotFailOnInputStreamWithBrokenAvailableCall(int readChunkSize) throws Throwable {
- initChunkedStream(bigBuff, of(5, 0, 0, 10, 5, 5, 1, 0),
- of(5, 7, 7, 10, 5, 5, 1, 0));
+ // We use double "0, 0" because FromInputStreamPublisher does two calls to available() now. For this test, both
+ // calls to "broken" available() should consistently return `0`.
+ initChunkedStream(bigBuff, of(3, 0, 0, 4, 0, 0, 5, 0, 0, 2, 0, 0, 0, 0, 4, 0),
+ of(3, 7, 4, 4, 5, 2, 2, 2, 1, 2, 1, 1, 1, 1, 1, 4, 0));
pub = new FromInputStreamPublisher(inputStream, readChunkSize);
if (readChunkSize > bigBuff.length) {
byte[][] items = {
- new byte[]{0, 1, 2, 3, 4},
+ new byte[]{0, 1, 2},
// avail == 0 -> override to readChunkSize
- new byte[]{5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27,
- 28, 29, 30, 31, 32, 33, 34, 35, 36},
+ new byte[]{3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26,
+ 27, 28, 29, 30, 31, 32, 33, 34, 35, 36},
};
verifySuccess(items);
} else {
byte[][] items = {
- new byte[]{0, 1, 2, 3, 4},
- // avail == 0 -> override to readChunkSize
- new byte[]{5, 6, 7, 8, 9, 10, 11},
- // avail == 0 -> override to readChunkSize
- new byte[]{12, 13, 14, 15, 16, 17, 18},
- // readChunkSize < available
- new byte[]{19, 20, 21, 22, 23, 24, 25},
- new byte[]{26, 27, 28, 29, 30},
- new byte[]{31, 32, 33, 34, 35},
- new byte[]{36},
+ // available < readChunkSize
+ new byte[]{0, 1, 2},
+ // available == 0 -> override to readChunkSize, actual chunk > readChunkSize
+ new byte[]{3, 4, 5, 6},
+ // available == readChunkSize
+ new byte[]{7, 8, 9, 10},
+ // available == 0 -> override to readChunkSize, actual chunk == readChunkSize
+ new byte[]{11, 12, 13, 14},
+ // available > readChunkSize -> limit by readChunkSize
+ new byte[]{15, 16, 17, 18},
+ // available == 0 -> override to readChunkSize, actual chunk < readChunkSize -> read twice
+ new byte[]{19, 20, 21, 22},
+ // available < readChunkSize
+ new byte[]{23, 24},
+ // available == 0 -> override to readChunkSize, actual chunk < readChunkSize -> read 3 times
+ new byte[]{25, 26, 27, 28},
+ // available == 0 -> override to readChunkSize, actual chunk < readChunkSize -> read 4 times
+ new byte[]{29, 30, 31, 32},
+ // available == readChunkSize
+ new byte[]{33, 34, 35, 36},
};
verifySuccess(items);
}
}
+ @Test
+ void singleReadTriggersMoreAvailability() throws Throwable {
+ // We simulate a case when a single stream.read() triggers a read of a larger chunk and then the next call to
+ // available() returns "chunk - 1". To accommodate mock behavior, if the 3rd in a row call to available()
+ // returns a non zero value, we should return a chunk value of "chunks[idx - 1] - number of read bytes".
+ initChunkedStream(bigBuff, of(0, 1, 0, 7, 0, 8, 1, 0, 17, 10, 2, 0),
+ of(2, 8, 9, 1, 18, 10, 2, 0));
+ pub = new FromInputStreamPublisher(inputStream, 8);
+
+ byte[][] items = {
+ // available < readChunkSize
+ new byte[]{0, 1},
+ // available == readChunkSize
+ new byte[]{2, 3, 4, 5, 6, 7, 8, 9},
+ // available > readChunkSize -> limit by readChunkSize
+ new byte[]{10, 11, 12, 13, 14, 15, 16, 17},
+ // available == 1 - unread remaining from the previous chunk of 9
+ new byte[]{18},
+ // available > 2x readChunkSize -> limit by readChunkSize
+ new byte[]{19, 20, 21, 22, 23, 24, 25, 26},
+ // available == 10 > readChunkSize - unread remaining from the previous chunk of 18
+ new byte[]{27, 28, 29, 30, 31, 32, 33, 34},
+ // available == 2 -> unread remaining from the previous chunk of 18
+ new byte[]{35, 36},
+ };
+ verifySuccess(items);
+ }
+
@ParameterizedTest(name = "{displayName} [{index}] chunkSize={0}")
@ValueSource(ints = {3, 5, 7})
void readChunkSizeRespectedWhenAvailableNotImplemented(int chunkSize) throws Throwable {
@@ -478,6 +518,7 @@ private IntStream ofAll(int i) {
private void initEmptyStream() throws IOException {
when(inputStream.available()).thenReturn(0);
+ when(inputStream.read()).thenReturn(-1);
when(inputStream.read(any(), anyInt(), anyInt())).thenReturn(-1);
}
@@ -491,13 +532,23 @@ private void initChunkedStream(final byte[] data, final IntStream avails, final
AtomicInteger readIdx = new AtomicInteger();
OfInt availSizes = avails.iterator();
OfInt chunkSizes = chunks.iterator();
+ AtomicBoolean readOneByte = new AtomicBoolean();
try {
when(inputStream.available()).then(inv -> availSizes.nextInt());
+ when(inputStream.read()).then(inv -> {
+ if (data.length == readIdx.get()) {
+ return -1;
+ }
+ readOneByte.set(true);
+ return (int) data[readIdx.getAndIncrement()];
+ });
when(inputStream.read(any(), anyInt(), anyInt())).then(inv -> {
byte[] b = inv.getArgument(0);
int pos = inv.getArgument(1);
int len = inv.getArgument(2);
- int read = min(min(len, data.length - readIdx.get()), chunkSizes.nextInt());
+ // subtract 1 byte from the next chunk if a single byte was already read
+ final int chunkSize = chunkSizes.nextInt() - (readOneByte.getAndSet(false) ? 1 : 0);
+ int read = min(min(len, data.length - readIdx.get()), chunkSize);
if (read == 0) {
return data.length == readIdx.get() ? -1 : 0;
}