Skip to content

Commit

Permalink
Handles the case where the binary cursor's InputStream provides fewer…
Browse files Browse the repository at this point in the history
… bytes than requested before reaching EOF.
  • Loading branch information
tgregg committed Oct 24, 2023
1 parent 5ddd95a commit cf6ef90
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 6 deletions.
19 changes: 13 additions & 6 deletions src/com/amazon/ion/impl/IonCursorBinary.java
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,14 @@ private boolean fillAt(long index, long numberOfBytes) {
refillableState.bytesRequested = numberOfBytes + (index - offset);
if (ensureCapacity(refillableState.bytesRequested)) {
// Fill all the free space, not just the shortfall; this reduces I/O.
refill(freeSpaceAt(limit));
shortfall = refillableState.bytesRequested - availableAt(offset);
// Sometimes an InputStream implementation will return fewer than the number of bytes requested even
// if the stream is not at EOF. If this happens and there is still a shortfall, keep requesting bytes
// until either the shortfall is filled or EOF is reached.
int numberOfBytesFilled = 0;
while (shortfall > 0 && numberOfBytesFilled >= 0) {
numberOfBytesFilled = refill(freeSpaceAt(limit));
shortfall = refillableState.bytesRequested - availableAt(offset);
}
} else {
// The request cannot be satisfied, but not because data was unavailable. Return normally; it is the
// caller's responsibility to recover.
Expand Down Expand Up @@ -649,8 +655,9 @@ private void shiftIndicesLeft(int shiftAmount) {
* Fills the buffer with up to the requested number of additional bytes. It is the caller's responsibility to
* ensure that there is space in the buffer.
* @param numberOfBytesToFill the number of additional bytes to attempt to add to the buffer.
* @return the number of bytes filled.
*/
private void refill(long numberOfBytesToFill) {
private int refill(long numberOfBytesToFill) {
int numberOfBytesFilled = -1;
try {
numberOfBytesFilled = refillableState.inputStream.read(buffer, (int) limit, (int) numberOfBytesToFill);
Expand All @@ -660,10 +667,10 @@ private void refill(long numberOfBytesToFill) {
} catch (IOException e) {
throwAsIonException(e);
}
if (numberOfBytesFilled < 0) {
return;
if (numberOfBytesFilled > 0) {
limit += numberOfBytesFilled;
}
limit += numberOfBytesFilled;
return numberOfBytesFilled;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3196,6 +3196,67 @@ public void skipWithoutEnoughDataNonIncrementalFails() throws Exception {
reader.close();
}

/**
* An InputStream that returns less than the number of bytes requested from bulk reads.
*/
private static class ThrottlingInputStream extends InputStream {

private final byte[] data;
private int offset = 0;

protected ThrottlingInputStream(byte[] data) {
this.data = data;
}

@Override
public int read() {
return data[offset++] & 0xFF;
}

@Override
public int read(byte[] b, int off, int len) {
int available = data.length - offset;
int numberOfBytesToReturn;
if (available > 1) {
// Return fewer bytes than requested and fewer than are available, avoiding EOF.
numberOfBytesToReturn = Math.min(available - 1, len - 1);
} else if (available <= 0) {
return -1; // EOF
} else {
// Only 1 byte is available, so return it as long as at least 1 byte was requested.
numberOfBytesToReturn = Math.min(len, available);
}
System.arraycopy(data, offset, b, off, numberOfBytesToReturn);
offset += numberOfBytesToReturn;
return numberOfBytesToReturn;
}
}

@ParameterizedTest(name = "incrementalReadingEnabled={0}")
@ValueSource(booleans = {true, false})
public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithoutReachingEof(boolean incrementalReadingEnabled) {
readerBuilder = readerBuilder.withIncrementalReadingEnabled(incrementalReadingEnabled)
.withBufferConfiguration(IonBufferConfiguration.Builder.standard().withInitialBufferSize(8).build());
reader = readerFor(new ThrottlingInputStream(bytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i')));
assertSequence(
next(IonType.STRING), stringValue("abcdefghi"),
next(null)
);
}

@Test
public void shouldNotFailWhenGZIPBoundaryIsEncounteredInStringValue() throws Exception {
ResizingPipedInputStream pipe = new ResizingPipedInputStream(128);
// The following lines create a GZIP payload boundary (trailer/header) in the middle of an Ion string value.
pipe.receive(gzippedBytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b'));
pipe.receive(gzippedBytes('c', 'd', 'e', 'f', 'g', 'h', 'i'));
reader = readerFor(new GZIPInputStream(pipe));
assertSequence(
next(IonType.STRING), stringValue("abcdefghi"),
next(null)
);
}

@Test
public void concatenatedAfterGZIPHeader() throws Exception {
// Tests that a stream that initially contains only a GZIP header can be read successfully if more data
Expand Down

0 comments on commit cf6ef90

Please sign in to comment.