Skip to content

Commit

Permalink
Handles the case where a partially buffered nested value is skipped; …
Browse files Browse the repository at this point in the history
…ensures that fully-buffered containers do not cause the cursor to enter unchecked mode unless stepped into.
  • Loading branch information
tgregg committed Jan 9, 2024
1 parent 140761a commit 6754ba1
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 17 deletions.
27 changes: 10 additions & 17 deletions src/main/java/com/amazon/ion/impl/IonCursorBinary.java
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ private boolean slowSeek(long numberOfBytes) {
throwAsIonException(e);
}
refillableState.totalDiscardedBytes += skipped;
shiftContainerEnds(skipped);
shortfall = unbufferedBytesToSkip - skipped;
unbufferedBytesToSkip = shortfall;
} while (shortfall > 0 && skipped > 0);
Expand Down Expand Up @@ -1154,12 +1155,6 @@ private boolean slowSeekPastNopPad(long valueLength, boolean isAnnotated) {
event = Event.NEEDS_DATA;
return true;
}
if (offset < (peekIndex + valueLength)) {
// Some of the bytes of the NOP pad had to be skipped without buffering to keep the buffer within its
// maximum size. Therefore, any parent container end indices need to be shifted by the number of bytes
// that were never buffered.
shiftContainerEnds(peekIndex + valueLength - offset);
}
peekIndex = offset;
setCheckpointBeforeUnannotatedTypeId();
if (parent != null) {
Expand Down Expand Up @@ -1323,9 +1318,6 @@ private boolean slowReadValueHeader(IonTypeID valueTid, boolean isAnnotated, Mar
validateAnnotationWrapperEndIndex(endIndex);
}
setMarker(endIndex, markerToSet);
if (event == Event.START_CONTAINER && endIndex > DELIMITED_MARKER && endIndex <= limit) {
refillableState.fillDepth = containerIndex + 1;
}
return false;
}

Expand Down Expand Up @@ -1403,7 +1395,11 @@ private Event slowStepIntoContainer() {
public Event stepIntoContainer() {
if (isSlowMode) {
if (containerIndex != refillableState.fillDepth - 1) {
return slowStepIntoContainer();
if (valueMarker.endIndex > DELIMITED_MARKER && valueMarker.endIndex <= limit) {
refillableState.fillDepth = containerIndex + 1;
} else {
return slowStepIntoContainer();
}
}
isSlowMode = false;
}
Expand Down Expand Up @@ -1664,9 +1660,6 @@ private boolean slowSkipRemainingValueBytes() {
}
peekIndex = offset;
valuePreHeaderIndex = peekIndex;
if (peekIndex < valueMarker.endIndex) {
shiftContainerEnds(valueMarker.endIndex - peekIndex);
}
if (refillableState.fillDepth > containerIndex) {
// This value was filled, but was skipped. Reset the fillDepth so that the reader does not think the
// next value was filled immediately upon encountering it.
Expand Down Expand Up @@ -1702,10 +1695,10 @@ private void seekPastOversizedValue() {
refillableState.totalDiscardedBytes += refillableState.individualBytesSkippedWithoutBuffering;
peekIndex = offset;
// peekIndex now points at the first byte after the value. If any bytes were skipped directly from
// the input stream, peekIndex will be less than the value's pre-calculated endIndex. This requires
// the end indices for all parent containers to be shifted left by the number of bytes that were
// skipped without buffering.
shiftContainerEnds(valueMarker.endIndex - peekIndex);
// the input stream before the 'slowSeek', peekIndex will be less than the value's pre-calculated endIndex.
// This requires the end indices for all parent containers to be shifted left by the number of bytes that
// were skipped without buffering.
shiftContainerEnds(refillableState.individualBytesSkippedWithoutBuffering);
setCheckpointBeforeUnannotatedTypeId();
}
refillableState.isSkippingCurrentValue = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3455,6 +3455,207 @@ public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithout
assertEquals(0, oversizedCounter.get());
}

@ParameterizedTest(name = "incrementalReadingEnabled={0},throwOnEof={1}")
@CsvSource({
"true, true",
"true, false",
"false, true",
"false, false"
})
public void shouldNotFailWhenAnUnbufferedNestedContainerIsSkipped(boolean incrementalReadingEnabled, boolean throwOnEof) throws Exception {
readerBuilder = readerBuilder.withIncrementalReadingEnabled(incrementalReadingEnabled);
// The reader's buffer size is unbounded, but the nested container value is larger than the initial buffer size.
// Skipping the nested container therefore causes some of its bytes to be skipped without ever being buffered,
// by calling InputStream.skip().
reader = boundedReaderFor(
new ThrottlingInputStream(
bytes(
0xE0, 0x01, 0x00, 0xEA, // IVM
0xB9, // List, length 9
0xB7, // List, length 7
0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, // 7 int 0 values
0x20, // int 0 value
0x20 // Int 0 value
),
throwOnEof
),
5,
Integer.MAX_VALUE,
byteAndOversizedValueCountingHandler
);
assertSequence(
container(IonType.LIST,
next(IonType.LIST),
next(IonType.INT)
),
next(IonType.INT), intValue(0),
next(null)
);
reader.close();
}

@ParameterizedTest(name = "incrementalReadingEnabled={0},throwOnEof={1}")
@CsvSource({
"true, true",
"true, false",
"false, true",
"false, false"
})
public void shouldNotFailWhenAnUnbufferedNestedContainerIsSteppedOutEarly(boolean incrementalReadingEnabled, boolean throwOnEof) throws Exception {
readerBuilder = readerBuilder.withIncrementalReadingEnabled(incrementalReadingEnabled);
// The reader's buffer size is unbounded, but the nested container value is larger than the initial buffer size.
// Stepping out of the nested container early therefore causes some of its bytes to be skipped without ever
// being buffered, by calling InputStream.skip().
reader = boundedReaderFor(
new ThrottlingInputStream(
bytes(
0xE0, 0x01, 0x00, 0xEA, // IVM
0xB9, // List, length 9
0xB7, // List, length 7
0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, // 7 int 0 values
0x20, // int 0 value
0x20 // Int 0 value
),
throwOnEof
),
5,
Integer.MAX_VALUE,
byteAndOversizedValueCountingHandler
);
assertSequence(
container(IonType.LIST,
next(IonType.LIST), STEP_IN, STEP_OUT
),
next(IonType.INT), intValue(0),
next(null)
);
reader.close();
}

@Test
public void nestedNullContainerIsParsedSuccessfully() throws Exception {
readerBuilder = readerBuilder.withIncrementalReadingEnabled(false);
ResizingPipedInputStream pipe = new ResizingPipedInputStream(128);
reader = readerFor(pipe);
pipe.receive(bytes(
0xE0, 0x01, 0x00, 0xEA, // IVM
0xB5, // List, length 5
0xB3, // List, length 3
0xBF, 0xCF // Null list, null s-exp
));
assertSequence(
next(IonType.LIST), STEP_IN,
next(IonType.LIST), STEP_IN,
next(IonType.LIST),
next(IonType.SEXP)
);
pipe.receive(bytes(
0xDF, // Null struct
0x20 // Int 0
));
assertSequence(
next(IonType.STRUCT),
STEP_OUT,
next(IonType.INT), intValue(0),
STEP_OUT,
next(null)
);
reader.close();
}

@Test
public void nestedEmptyContainerIsParsedSuccessfully() throws Exception {
readerBuilder = readerBuilder.withIncrementalReadingEnabled(false);
ResizingPipedInputStream pipe = new ResizingPipedInputStream(128);
reader = readerFor(pipe);
pipe.receive(bytes(
0xE0, 0x01, 0x00, 0xEA, // IVM
0xB5, // List, length 5
0xB3, // List, length 3
0xB0, 0xC0 // Empty list, empty s-exp
));
assertSequence(
next(IonType.LIST), STEP_IN,
next(IonType.LIST), STEP_IN,
next(IonType.LIST),
next(IonType.SEXP)
);
pipe.receive(bytes(
0xD0, // empty struct
0x20 // Int 0
));
assertSequence(
next(IonType.STRUCT),
STEP_OUT,
next(IonType.INT), intValue(0),
STEP_OUT,
next(null)
);
reader.close();
}

@Test
public void nestedContainerIsSkippedSuccessfully() throws Exception {
readerBuilder = readerBuilder.withIncrementalReadingEnabled(false);
ResizingPipedInputStream pipe = new ResizingPipedInputStream(128);
reader = readerFor(pipe);
pipe.receive(bytes(
0xE0, 0x01, 0x00, 0xEA, // IVM
0xB5, // List, length 5
0xB3, // List, length 3
0xB2, // List, length 2
0xC0 // Empty s-exp
));
assertSequence(
next(IonType.LIST), STEP_IN,
next(IonType.LIST), STEP_IN,
next(IonType.LIST)
);
pipe.receive(bytes(
0xD0, // Empty struct
0x20 // Int 0
));
assertSequence(
next(null), // Skip the list
STEP_OUT,
next(IonType.INT), intValue(0),
STEP_OUT,
next(null)
);
reader.close();
}

@Test
public void nestedContainerIsSteppedOutEarlySuccessfully() throws Exception {
readerBuilder = readerBuilder.withIncrementalReadingEnabled(false);
ResizingPipedInputStream pipe = new ResizingPipedInputStream(128);
reader = readerFor(pipe);
pipe.receive(bytes(
0xE0, 0x01, 0x00, 0xEA, // IVM
0xB5, // List, length 5
0xB3, // List, length 3
0xB2, // List, length 2
0xC0 // Empty s-exp
));
assertSequence(
next(IonType.LIST), STEP_IN,
next(IonType.LIST), STEP_IN,
next(IonType.LIST), STEP_IN
);
pipe.receive(bytes(
0xD0, // Empty struct
0x20 // Int 0
));
assertSequence(
STEP_OUT, // Step out early
STEP_OUT, // Step out early
next(IonType.INT), intValue(0),
STEP_OUT,
next(null)
);
reader.close();
}

@Test
public void shouldNotFailWhenGZIPBoundaryIsEncounteredInStringValue() throws Exception {
ResizingPipedInputStream pipe = new ResizingPipedInputStream(128);
Expand Down

0 comments on commit 6754ba1

Please sign in to comment.