Skip to content

Commit

Permalink
Handles the case where the binary cursor's InputStream provides fewer…
Browse files Browse the repository at this point in the history
… bytes than requested before reaching EOF.
  • Loading branch information
tgregg committed Oct 27, 2023
1 parent 5ddd95a commit 72c1962
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 17 deletions.
43 changes: 26 additions & 17 deletions src/com/amazon/ion/impl/IonCursorBinary.java
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,7 @@ private boolean fillAt(long index, long numberOfBytes) {
refillableState.bytesRequested = numberOfBytes + (index - offset);
if (ensureCapacity(refillableState.bytesRequested)) {
// Fill all the free space, not just the shortfall; this reduces I/O.
refill(freeSpaceAt(limit));
shortfall = refillableState.bytesRequested - availableAt(offset);
shortfall = refill(freeSpaceAt(limit), refillableState.bytesRequested);
} else {
// The request cannot be satisfied, but not because data was unavailable. Return normally; it is the
// caller's responsibility to recover.
Expand Down Expand Up @@ -646,24 +645,34 @@ private void shiftIndicesLeft(int shiftAmount) {
}

/**
* Fills the buffer with up to the requested number of additional bytes. It is the caller's responsibility to
* ensure that there is space in the buffer.
* Attempts to fill the buffer with up to the requested number of additional bytes. It is the caller's
* responsibility to ensure that there is space in the buffer.
* @param numberOfBytesToFill the number of additional bytes to attempt to add to the buffer.
* @param minimumNumberOfBytesRequired the minimum number of bytes requested to fill the current value.
* @return the shortfall between the number of bytes that were filled and the minimum number requested. If less than
* 1, then at least `minimumNumberOfBytesRequired` were filled.
*/
private void refill(long numberOfBytesToFill) {
private long refill(long numberOfBytesToFill, long minimumNumberOfBytesRequired) {
int numberOfBytesFilled = -1;
try {
numberOfBytesFilled = refillableState.inputStream.read(buffer, (int) limit, (int) numberOfBytesToFill);
} catch (EOFException e) {
// Certain InputStream implementations (e.g. GZIPInputStream) throw EOFException if more bytes are requested
// to read than are currently available (e.g. if a header or trailer is incomplete).
} catch (IOException e) {
throwAsIonException(e);
}
if (numberOfBytesFilled < 0) {
return;
}
limit += numberOfBytesFilled;
long shortfall;
// Sometimes an InputStream implementation will return fewer than the number of bytes requested even
// if the stream is not at EOF. If this happens and there is still a shortfall, keep requesting bytes
// until either the shortfall is filled or EOF is reached.
do {
try {
numberOfBytesFilled = refillableState.inputStream.read(buffer, (int) limit, (int) numberOfBytesToFill);
} catch (EOFException e) {
// Certain InputStream implementations (e.g. GZIPInputStream) throw EOFException if more bytes are requested
// to read than are currently available (e.g. if a header or trailer is incomplete).
} catch (IOException e) {
throwAsIonException(e);
}
if (numberOfBytesFilled > 0) {
limit += numberOfBytesFilled;
}
shortfall = minimumNumberOfBytesRequired - availableAt(offset);
} while (shortfall > 0 && numberOfBytesFilled >= 0);
return shortfall;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3196,6 +3196,94 @@ public void skipWithoutEnoughDataNonIncrementalFails() throws Exception {
reader.close();
}

/**
* An InputStream that returns less than the number of bytes requested from bulk reads.
*/
private static class ThrottlingInputStream extends InputStream {

private final byte[] data;
private int offset = 0;

protected ThrottlingInputStream(byte[] data) {
this.data = data;
}

@Override
public int read() {
return data[offset++] & 0xFF;
}

private int calculateNumberOfBytesToReturn(int numberOfBytesRequested) {
int available = data.length - offset;
int numberOfBytesToReturn;
if (available > 1 && numberOfBytesRequested > 1) {
// Return fewer bytes than requested and fewer than are available, avoiding EOF.
numberOfBytesToReturn = Math.min(available - 1, numberOfBytesRequested - 1);
} else if (available <= 0) {
return -1; // EOF
} else {
// Only 1 byte is available, so return it as long as at least 1 byte was requested.
numberOfBytesToReturn = Math.min(numberOfBytesRequested, available);
}
return numberOfBytesToReturn;
}

@Override
public int read(byte[] b, int off, int len) {
int numberOfBytesToReturn = calculateNumberOfBytesToReturn(len);
if (numberOfBytesToReturn < 0) {
return -1;
}
System.arraycopy(data, offset, b, off, numberOfBytesToReturn);
offset += numberOfBytesToReturn;
return numberOfBytesToReturn;
}

@Override
public long skip(long len) {
int numberOfBytesToSkip = calculateNumberOfBytesToReturn((int) len);
offset += numberOfBytesToSkip;
return numberOfBytesToSkip;
}
}

@ParameterizedTest(name = "incrementalReadingEnabled={0}")
@ValueSource(booleans = {true, false})
public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithoutReachingEof(boolean incrementalReadingEnabled) throws Exception {
readerBuilder = readerBuilder.withIncrementalReadingEnabled(incrementalReadingEnabled)
.withBufferConfiguration(IonBufferConfiguration.Builder.standard().withInitialBufferSize(8).build());
reader = readerFor(new ThrottlingInputStream(bytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i')));
assertSequence(
next(IonType.STRING), stringValue("abcdefghi"),
next(null)
);
reader.close();
}

@Test
public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithoutReachingEofAndTheReaderSkipsTheValue() throws Exception {
reader = boundedReaderFor(new ThrottlingInputStream(bytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 0x20)), 8, 8, byteAndOversizedValueCountingHandler);
assertSequence(
next(IonType.INT), intValue(0),
next(null)
);
reader.close();
assertEquals(1, oversizedCounter.get());
}

@Test
public void shouldNotFailWhenGZIPBoundaryIsEncounteredInStringValue() throws Exception {
ResizingPipedInputStream pipe = new ResizingPipedInputStream(128);
// The following lines create a GZIP payload boundary (trailer/header) in the middle of an Ion string value.
pipe.receive(gzippedBytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b'));
pipe.receive(gzippedBytes('c', 'd', 'e', 'f', 'g', 'h', 'i'));
reader = readerFor(new GZIPInputStream(pipe));
assertSequence(
next(IonType.STRING), stringValue("abcdefghi"),
next(null)
);
}

@Test
public void concatenatedAfterGZIPHeader() throws Exception {
// Tests that a stream that initially contains only a GZIP header can be read successfully if more data
Expand Down

0 comments on commit 72c1962

Please sign in to comment.