Skip to content

Commit

Permalink
FromInputStreamPublisher: avoid extra allocation of a buffer (#2965)
Browse files Browse the repository at this point in the history
Motivation:

In #2949 we optimized a case when `available()` is not implemented and always returns `0`. However, we de-optimized a use-case when it's implemented because the last call to `available()` always returns 0, but we still allocate a buffer of size `readChunkSize` that won't be used.

Modifications:
- Enhance `doNotFailOnInputStreamWithBrokenAvailableCall(...)` test before any changes for better test coverage.
- Remove `byte[] buffer` from a class variable. It can be a local variable because it's never reused in practice. Only the last `buffer` won't be nullified, but we don't need it after that.
- When `available()` returns `0`, try reading a single byte and then recheck availability instead of always falling back to
`readChunkSize`.
- Adjust `doNotFailOnInputStreamWithBrokenAvailableCall()` test to account for the 2nd call to `available()`;
- Add `singleReadTriggersMoreAvailability()` test to simulate when the 2nd call to `available()` returns positive value;

Result:

1. No allocation of a `buffer` that won't be used at the EOF.
2. Account for new availability if it appears after a `read()`.
  • Loading branch information
idelpivnitskiy authored Jun 14, 2024
1 parent febb582 commit 82e256e
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 31 deletions.
4 changes: 4 additions & 0 deletions servicetalk-concurrent-api/gradle/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@
<!-- mapOnError supports re-throwing a Throwable from onError(Throwable) -->
<suppress checks="IllegalThrowsCheck" files="io[\\/]servicetalk[\\/]concurrent[\\/]api[\\/]ScanWithMapper.java"/>
<suppress checks="IllegalThrowsCheck" files="io[\\/]servicetalk[\\/]concurrent[\\/]api[\\/]ScanMapper.java"/>

<!-- Extra whitespace makes it easier to read doNotFailOnInputStreamWithBrokenAvailableCall() stream inputs -->
<suppress checks="SingleSpaceSeparator"
files="io[\\/]servicetalk[\\/]concurrent[\\/]api[\\/]FromInputStreamPublisherTest.java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
Expand Down Expand Up @@ -132,8 +131,6 @@ private static final class InputStreamPublisherSubscription implements Subscript
* Subscription} are terminated.
*/
private long requested;
@Nullable
private byte[] buffer;
private int writeIdx;
private boolean ignoreRequests;

Expand Down Expand Up @@ -176,14 +173,29 @@ public void cancel() {
private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
try {
do {
// Initialize readByte with a negative value different from END_OF_FILE as an indicator that it was
// not initialized.
int readByte = Integer.MIN_VALUE;
// Can't fully trust available(), but it's a reasonable hint to mitigate blocking on read().
int available = stream.available();
if (available == 0) {
// Work around InputStreams that don't strictly honor the 0 == EOF contract.
available = buffer != null ? buffer.length : readChunkSize;
// This can be an indicator of EOF or a signal that no bytes are available to read without
// blocking. To avoid unnecessary allocation, we first probe for EOF:
readByte = stream.read();
if (readByte == END_OF_FILE) {
sendOnComplete(subscriber);
return;
}
// There is a chance a single read triggered availability of more bytes, let's check:
available = stream.available();
if (available == 0) {
// This InputStream either does not implement available() method at all, or does not honor
// the 0 == EOF contract, or does not prefetch data in larger chunks.
// In this case, we attempt to read based on the configured readChunkSize:
available = readChunkSize;
}
}
available = fillBufferAvoidingBlocking(available);
emitSingleBuffer(subscriber);
available = readAvailableAndEmit(available, readByte);
if (available == END_OF_FILE) {
sendOnComplete(subscriber);
return;
Expand All @@ -194,11 +206,21 @@ private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
}
}

// This method honors the estimated available bytes that can be read without blocking
private int fillBufferAvoidingBlocking(int available) throws IOException {
if (buffer == null) {
private int readAvailableAndEmit(final int available, final int readByte) throws IOException {
final byte[] buffer;
if (readByte >= 0) {
buffer = new byte[available < readChunkSize ? available + 1 : readChunkSize];
buffer[writeIdx++] = (byte) readByte;
} else {
buffer = new byte[min(available, readChunkSize)];
}
final int remainingLength = fillBuffer(buffer, available);
emitSingleBuffer(subscriber, buffer, remainingLength);
return remainingLength;
}

// This method honors the estimated available bytes that can be read without blocking
private int fillBuffer(final byte[] buffer, int available) throws IOException {
while (writeIdx != buffer.length && available > 0) {
int len = min(buffer.length - writeIdx, available);
int readActual = stream.read(buffer, writeIdx, len); // may block if len > available
Expand All @@ -211,15 +233,17 @@ private int fillBufferAvoidingBlocking(int available) throws IOException {
return available;
}

private void emitSingleBuffer(final Subscriber<? super byte[]> subscriber) {
private void emitSingleBuffer(final Subscriber<? super byte[]> subscriber,
final byte[] buffer, final int remainingLength) {
if (writeIdx < 1) {
assert remainingLength == END_OF_FILE :
"unexpected writeIdx == 0 while we still have some remaining data to read";
return;
}
assert buffer != null : "should have a buffer when writeIdx > 0";
assert writeIdx <= buffer.length : "writeIdx can not be grater than buffer.length";
final byte[] b;
if (writeIdx == buffer.length) {
b = buffer;
buffer = null;
} else {
// this extra copy is necessary when we read the last chunk and total number of bytes read before EOF
// is less than guesstimated buffer size
Expand All @@ -242,7 +266,7 @@ private void sendOnComplete(final Subscriber<? super byte[]> subscriber) {
}
}

private <T extends Throwable> void sendOnError(final Subscriber<? super byte[]> subscriber, final T t) {
private void sendOnError(final Subscriber<? super byte[]> subscriber, final Throwable t) {
if (trySetTerminalSent()) {
try {
subscriber.onError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,37 +368,77 @@ void readsAllBytesWhenAvailableNotImplemented() throws Throwable {
}

@ParameterizedTest(name = "{displayName} [{index}] readChunkSize={0}")
@ValueSource(ints = {7, 1024})
@ValueSource(ints = {4, 1024})
void doNotFailOnInputStreamWithBrokenAvailableCall(int readChunkSize) throws Throwable {
initChunkedStream(bigBuff, of(5, 0, 0, 10, 5, 5, 1, 0),
of(5, 7, 7, 10, 5, 5, 1, 0));
// We use double "0, 0" because FromInputStreamPublisher does two calls to available() now. For this test, both
// calls to "broken" available() should consistently return `0`.
initChunkedStream(bigBuff, of(3, 0, 0, 4, 0, 0, 5, 0, 0, 2, 0, 0, 0, 0, 4, 0),
of(3, 7, 4, 4, 5, 2, 2, 2, 1, 2, 1, 1, 1, 1, 1, 4, 0));
pub = new FromInputStreamPublisher(inputStream, readChunkSize);

if (readChunkSize > bigBuff.length) {
byte[][] items = {
new byte[]{0, 1, 2, 3, 4},
new byte[]{0, 1, 2},
// avail == 0 -> override to readChunkSize
new byte[]{5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27,
28, 29, 30, 31, 32, 33, 34, 35, 36},
new byte[]{3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26,
27, 28, 29, 30, 31, 32, 33, 34, 35, 36},
};
verifySuccess(items);
} else {
byte[][] items = {
new byte[]{0, 1, 2, 3, 4},
// avail == 0 -> override to readChunkSize
new byte[]{5, 6, 7, 8, 9, 10, 11},
// avail == 0 -> override to readChunkSize
new byte[]{12, 13, 14, 15, 16, 17, 18},
// readChunkSize < available
new byte[]{19, 20, 21, 22, 23, 24, 25},
new byte[]{26, 27, 28, 29, 30},
new byte[]{31, 32, 33, 34, 35},
new byte[]{36},
// available < readChunkSize
new byte[]{0, 1, 2},
// available == 0 -> override to readChunkSize, actual chunk > readChunkSize
new byte[]{3, 4, 5, 6},
// available == readChunkSize
new byte[]{7, 8, 9, 10},
// available == 0 -> override to readChunkSize, actual chunk == readChunkSize
new byte[]{11, 12, 13, 14},
// available > readChunkSize -> limit by readChunkSize
new byte[]{15, 16, 17, 18},
// available == 0 -> override to readChunkSize, actual chunk < readChunkSize -> read twice
new byte[]{19, 20, 21, 22},
// available < readChunkSize
new byte[]{23, 24},
// available == 0 -> override to readChunkSize, actual chunk < readChunkSize -> read 3 times
new byte[]{25, 26, 27, 28},
// available == 0 -> override to readChunkSize, actual chunk < readChunkSize -> read 4 times
new byte[]{29, 30, 31, 32},
// available == readChunkSize
new byte[]{33, 34, 35, 36},
};
verifySuccess(items);
}
}

@Test
void singleReadTriggersMoreAvailability() throws Throwable {
// We simulate a case when a single stream.read() triggers a read of a larger chunk and then the next call to
// available() returns "chunk - 1". To accommodate mock behavior, if the 3rd in a row call to available()
// returns a non zero value, we should return a chunk value of "chunks[idx - 1] - number of read bytes".
initChunkedStream(bigBuff, of(0, 1, 0, 7, 0, 8, 1, 0, 17, 10, 2, 0),
of(2, 8, 9, 1, 18, 10, 2, 0));
pub = new FromInputStreamPublisher(inputStream, 8);

byte[][] items = {
// available < readChunkSize
new byte[]{0, 1},
// available == readChunkSize
new byte[]{2, 3, 4, 5, 6, 7, 8, 9},
// available > readChunkSize -> limit by readChunkSize
new byte[]{10, 11, 12, 13, 14, 15, 16, 17},
// available == 1 - unread remaining from the previous chunk of 9
new byte[]{18},
// available > 2x readChunkSize -> limit by readChunkSize
new byte[]{19, 20, 21, 22, 23, 24, 25, 26},
// available == 10 > readChunkSize - unread remaining from the previous chunk of 18
new byte[]{27, 28, 29, 30, 31, 32, 33, 34},
// available == 2 -> unread remaining from the previous chunk of 18
new byte[]{35, 36},
};
verifySuccess(items);
}

@ParameterizedTest(name = "{displayName} [{index}] chunkSize={0}")
@ValueSource(ints = {3, 5, 7})
void readChunkSizeRespectedWhenAvailableNotImplemented(int chunkSize) throws Throwable {
Expand Down Expand Up @@ -478,6 +518,7 @@ private IntStream ofAll(int i) {

private void initEmptyStream() throws IOException {
when(inputStream.available()).thenReturn(0);
when(inputStream.read()).thenReturn(-1);
when(inputStream.read(any(), anyInt(), anyInt())).thenReturn(-1);
}

Expand All @@ -491,13 +532,23 @@ private void initChunkedStream(final byte[] data, final IntStream avails, final
AtomicInteger readIdx = new AtomicInteger();
OfInt availSizes = avails.iterator();
OfInt chunkSizes = chunks.iterator();
AtomicBoolean readOneByte = new AtomicBoolean();
try {
when(inputStream.available()).then(inv -> availSizes.nextInt());
when(inputStream.read()).then(inv -> {
if (data.length == readIdx.get()) {
return -1;
}
readOneByte.set(true);
return (int) data[readIdx.getAndIncrement()];
});
when(inputStream.read(any(), anyInt(), anyInt())).then(inv -> {
byte[] b = inv.getArgument(0);
int pos = inv.getArgument(1);
int len = inv.getArgument(2);
int read = min(min(len, data.length - readIdx.get()), chunkSizes.nextInt());
// subtract 1 byte from the next chunk if a single byte was already read
final int chunkSize = chunkSizes.nextInt() - (readOneByte.getAndSet(false) ? 1 : 0);
int read = min(min(len, data.length - readIdx.get()), chunkSize);
if (read == 0) {
return data.length == readIdx.get() ? -1 : 0;
}
Expand Down

0 comments on commit 82e256e

Please sign in to comment.