diff --git a/src/main/java/com/amazon/ion/impl/IonCursorBinary.java b/src/main/java/com/amazon/ion/impl/IonCursorBinary.java index c32e23bf6a..4e94b2f132 100644 --- a/src/main/java/com/amazon/ion/impl/IonCursorBinary.java +++ b/src/main/java/com/amazon/ion/impl/IonCursorBinary.java @@ -712,6 +712,7 @@ private boolean slowSeek(long numberOfBytes) { throwAsIonException(e); } refillableState.totalDiscardedBytes += skipped; + shiftContainerEnds(skipped); shortfall = unbufferedBytesToSkip - skipped; unbufferedBytesToSkip = shortfall; } while (shortfall > 0 && skipped > 0); @@ -1154,12 +1155,6 @@ private boolean slowSeekPastNopPad(long valueLength, boolean isAnnotated) { event = Event.NEEDS_DATA; return true; } - if (offset < (peekIndex + valueLength)) { - // Some of the bytes of the NOP pad had to be skipped without buffering to keep the buffer within its - // maximum size. Therefore, any parent container end indices need to be shifted by the number of bytes - // that were never buffered. - shiftContainerEnds(peekIndex + valueLength - offset); - } peekIndex = offset; setCheckpointBeforeUnannotatedTypeId(); if (parent != null) { @@ -1323,9 +1318,6 @@ private boolean slowReadValueHeader(IonTypeID valueTid, boolean isAnnotated, Mar validateAnnotationWrapperEndIndex(endIndex); } setMarker(endIndex, markerToSet); - if (event == Event.START_CONTAINER && endIndex > DELIMITED_MARKER && endIndex <= limit) { - refillableState.fillDepth = containerIndex + 1; - } return false; } @@ -1403,7 +1395,11 @@ private Event slowStepIntoContainer() { public Event stepIntoContainer() { if (isSlowMode) { if (containerIndex != refillableState.fillDepth - 1) { - return slowStepIntoContainer(); + if (valueMarker.endIndex > DELIMITED_MARKER && valueMarker.endIndex <= limit) { + refillableState.fillDepth = containerIndex + 1; + } else { + return slowStepIntoContainer(); + } } isSlowMode = false; } @@ -1664,9 +1660,6 @@ private boolean slowSkipRemainingValueBytes() { } peekIndex = offset; valuePreHeaderIndex = peekIndex; - if (peekIndex < valueMarker.endIndex) { - shiftContainerEnds(valueMarker.endIndex - peekIndex); - } if (refillableState.fillDepth > containerIndex) { // This value was filled, but was skipped. Reset the fillDepth so that the reader does not think the // next value was filled immediately upon encountering it. @@ -1702,10 +1695,10 @@ private void seekPastOversizedValue() { refillableState.totalDiscardedBytes += refillableState.individualBytesSkippedWithoutBuffering; peekIndex = offset; // peekIndex now points at the first byte after the value. If any bytes were skipped directly from - // the input stream, peekIndex will be less than the value's pre-calculated endIndex. This requires - // the end indices for all parent containers to be shifted left by the number of bytes that were - // skipped without buffering. - shiftContainerEnds(valueMarker.endIndex - peekIndex); + // the input stream before the 'slowSeek', peekIndex will be less than the value's pre-calculated endIndex. + // This requires the end indices for all parent containers to be shifted left by the number of bytes that + // were skipped without buffering. + shiftContainerEnds(refillableState.individualBytesSkippedWithoutBuffering); setCheckpointBeforeUnannotatedTypeId(); } refillableState.isSkippingCurrentValue = false; diff --git a/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java b/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java index 228d26ca84..cb4d44ae34 100644 --- a/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java +++ b/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java @@ -3455,6 +3455,207 @@ public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithout assertEquals(0, oversizedCounter.get()); } + @ParameterizedTest(name = "incrementalReadingEnabled={0},throwOnEof={1}") + @CsvSource({ + "true, true", + "true, false", + "false, true", + "false, false" + }) + public void shouldNotFailWhenAnUnbufferedNestedContainerIsSkipped(boolean incrementalReadingEnabled, boolean throwOnEof) throws Exception { + readerBuilder = readerBuilder.withIncrementalReadingEnabled(incrementalReadingEnabled); + // The reader's buffer size is unbounded, but the nested container value is larger than the initial buffer size. + // Skipping the nested container therefore causes some of its bytes to be skipped without ever being buffered, + // by calling InputStream.skip(). + reader = boundedReaderFor( + new ThrottlingInputStream( + bytes( + 0xE0, 0x01, 0x00, 0xEA, // IVM + 0xB9, // List, length 9 + 0xB7, // List, length 7 + 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, // 7 int 0 values + 0x20, // int 0 value + 0x20 // Int 0 value + ), + throwOnEof + ), + 5, + Integer.MAX_VALUE, + byteAndOversizedValueCountingHandler + ); + assertSequence( + container(IonType.LIST, + next(IonType.LIST), + next(IonType.INT) + ), + next(IonType.INT), intValue(0), + next(null) + ); + reader.close(); + } + + @ParameterizedTest(name = "incrementalReadingEnabled={0},throwOnEof={1}") + @CsvSource({ + "true, true", + "true, false", + "false, true", + "false, false" + }) + public void shouldNotFailWhenAnUnbufferedNestedContainerIsSteppedOutEarly(boolean incrementalReadingEnabled, boolean throwOnEof) throws Exception { + readerBuilder = readerBuilder.withIncrementalReadingEnabled(incrementalReadingEnabled); + // The reader's buffer size is unbounded, but the nested container value is larger than the initial buffer size. + // Stepping out of the nested container early therefore causes some of its bytes to be skipped without ever + // being buffered, by calling InputStream.skip(). + reader = boundedReaderFor( + new ThrottlingInputStream( + bytes( + 0xE0, 0x01, 0x00, 0xEA, // IVM + 0xB9, // List, length 9 + 0xB7, // List, length 7 + 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, // 7 int 0 values + 0x20, // int 0 value + 0x20 // Int 0 value + ), + throwOnEof + ), + 5, + Integer.MAX_VALUE, + byteAndOversizedValueCountingHandler + ); + assertSequence( + container(IonType.LIST, + next(IonType.LIST), STEP_IN, STEP_OUT + ), + next(IonType.INT), intValue(0), + next(null) + ); + reader.close(); + } + + @Test + public void nestedNullContainerIsParsedSuccessfully() throws Exception { + readerBuilder = readerBuilder.withIncrementalReadingEnabled(false); + ResizingPipedInputStream pipe = new ResizingPipedInputStream(128); + reader = readerFor(pipe); + pipe.receive(bytes( + 0xE0, 0x01, 0x00, 0xEA, // IVM + 0xB5, // List, length 5 + 0xB3, // List, length 3 + 0xBF, 0xCF // Null list, null s-exp + )); + assertSequence( + next(IonType.LIST), STEP_IN, + next(IonType.LIST), STEP_IN, + next(IonType.LIST), + next(IonType.SEXP) + ); + pipe.receive(bytes( + 0xDF, // Null struct + 0x20 // Int 0 + )); + assertSequence( + next(IonType.STRUCT), + STEP_OUT, + next(IonType.INT), intValue(0), + STEP_OUT, + next(null) + ); + reader.close(); + } + + @Test + public void nestedEmptyContainerIsParsedSuccessfully() throws Exception { + readerBuilder = readerBuilder.withIncrementalReadingEnabled(false); + ResizingPipedInputStream pipe = new ResizingPipedInputStream(128); + reader = readerFor(pipe); + pipe.receive(bytes( + 0xE0, 0x01, 0x00, 0xEA, // IVM + 0xB5, // List, length 5 + 0xB3, // List, length 3 + 0xB0, 0xC0 // Empty list, empty s-exp + )); + assertSequence( + next(IonType.LIST), STEP_IN, + next(IonType.LIST), STEP_IN, + next(IonType.LIST), + next(IonType.SEXP) + ); + pipe.receive(bytes( + 0xD0, // empty struct + 0x20 // Int 0 + )); + assertSequence( + next(IonType.STRUCT), + STEP_OUT, + next(IonType.INT), intValue(0), + STEP_OUT, + next(null) + ); + reader.close(); + } + + @Test + public void nestedContainerIsSkippedSuccessfully() throws Exception { + readerBuilder = readerBuilder.withIncrementalReadingEnabled(false); + ResizingPipedInputStream pipe = new ResizingPipedInputStream(128); + reader = readerFor(pipe); + pipe.receive(bytes( + 0xE0, 0x01, 0x00, 0xEA, // IVM + 0xB5, // List, length 5 + 0xB3, // List, length 3 + 0xB2, // List, length 2 + 0xC0 // Empty s-exp + )); + assertSequence( + next(IonType.LIST), STEP_IN, + next(IonType.LIST), STEP_IN, + next(IonType.LIST) + ); + pipe.receive(bytes( + 0xD0, // Empty struct + 0x20 // Int 0 + )); + assertSequence( + next(null), // Skip the list + STEP_OUT, + next(IonType.INT), intValue(0), + STEP_OUT, + next(null) + ); + reader.close(); + } + + @Test + public void nestedContainerIsSteppedOutEarlySuccessfully() throws Exception { + readerBuilder = readerBuilder.withIncrementalReadingEnabled(false); + ResizingPipedInputStream pipe = new ResizingPipedInputStream(128); + reader = readerFor(pipe); + pipe.receive(bytes( + 0xE0, 0x01, 0x00, 0xEA, // IVM + 0xB5, // List, length 5 + 0xB3, // List, length 3 + 0xB2, // List, length 2 + 0xC0 // Empty s-exp + )); + assertSequence( + next(IonType.LIST), STEP_IN, + next(IonType.LIST), STEP_IN, + next(IonType.LIST), STEP_IN + ); + pipe.receive(bytes( + 0xD0, // Empty struct + 0x20 // Int 0 + )); + assertSequence( + STEP_OUT, // Step out early + STEP_OUT, // Step out early + next(IonType.INT), intValue(0), + STEP_OUT, + next(null) + ); + reader.close(); + } + @Test public void shouldNotFailWhenGZIPBoundaryIsEncounteredInStringValue() throws Exception { ResizingPipedInputStream pipe = new ResizingPipedInputStream(128);