From b1d26c3766eedc2aa3a834c83663b6a7c17504ad Mon Sep 17 00:00:00 2001 From: Tyler Gregg Date: Thu, 15 Feb 2024 16:38:12 -0800 Subject: [PATCH] Adds support for reading binary Ion 1.1 delimited containers. --- .../com/amazon/ion/impl/IonCursorBinary.java | 298 +++++++++++- ...IonReaderContinuableApplicationBinary.java | 13 +- .../java/com/amazon/ion/impl/IonTypeID.java | 5 + .../amazon/ion/impl/IonCursorBinaryTest.java | 177 +++++++- ...onReaderContinuableTopLevelBinaryTest.java | 427 +++++++++++++++++- 5 files changed, 891 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/amazon/ion/impl/IonCursorBinary.java b/src/main/java/com/amazon/ion/impl/IonCursorBinary.java index 696a411093..d672187cf2 100644 --- a/src/main/java/com/amazon/ion/impl/IonCursorBinary.java +++ b/src/main/java/com/amazon/ion/impl/IonCursorBinary.java @@ -11,7 +11,6 @@ import com.amazon.ion.IvmNotificationConsumer; import com.amazon.ion.SystemSymbols; import com.amazon.ion.impl.bin.FlexInt; -import com.amazon.ion.impl.bin.Ion_1_1_Constants; import com.amazon.ion.impl.bin.OpCodes; import java.io.ByteArrayInputStream; @@ -20,6 +19,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; +import static com.amazon.ion.impl.IonTypeID.DELIMITED_END_ID; import static com.amazon.ion.impl.IonTypeID.ONE_ANNOTATION_FLEX_SYM_LOWER_NIBBLE_1_1; import static com.amazon.ion.impl.IonTypeID.ONE_ANNOTATION_SID_LOWER_NIBBLE_1_1; import static com.amazon.ion.impl.IonTypeID.TWO_ANNOTATION_FLEX_SYMS_LOWER_NIBBLE_1_1; @@ -112,11 +112,38 @@ private static class RefillableState { */ final int maximumBufferSize; + /** + * The number of bytes shifted left in the buffer during the current operation to make room for more bytes. This + * is needed when rewinding to a previous location, as any saved indices at that location will need to be + * shifted by this amount. + */ + long pendingShift = 0; + /** * The source of data, for refillable streams. */ final InputStream inputStream; + /** + * Index of the first "pinned" byte in the buffer. Pinned bytes must be preserved in the buffer until un-pinned. + */ + long pinOffset = -1; + + /** + * True if the first byte of a special FlexSym in field name position was skipped due to the delimited struct + * being oversize. This is necessary because the only way to end a delimited struct is with a two-byte + * sequence. If the second byte in the sequence is not yet available, this flag reminds the cursor that the + * previous byte, which could not be buffered, began the special two-byte sequence. + * TODO: handling this case introduces some complexity; alternative solutions should be considered. + */ + boolean isSpecialFlexSymPartiallyRead = false; + + /** + * The target depth to which the reader should seek. This is used when a container is determined to be oversize + * while buffering one of its children. + */ + int targetSeekDepth = -1; + /** * Handler invoked when a single value would exceed `maximumBufferSize`. */ @@ -520,6 +547,10 @@ private boolean ensureCapacity(long minimumNumberOfBytesRequired) { } int maximumFreeSpace = refillableState.maximumBufferSize; int startOffset = (int) offset; + if (refillableState.pinOffset > -1) { + maximumFreeSpace -= (int) (offset - refillableState.pinOffset); + startOffset = (int) refillableState.pinOffset; + } if (minimumNumberOfBytesRequired > maximumFreeSpace) { refillableState.isSkippingCurrentValue = true; return false; @@ -581,6 +612,9 @@ private void moveBytesToStartOfBuffer(byte[] destinationBuffer, int fromIndex) { shiftIndicesLeft(fromIndex); } offset = 0; + if (refillableState.pinOffset > 0) { + refillableState.pinOffset = 0; + } limit = size; } @@ -678,6 +712,7 @@ private void shiftIndicesLeft(int shiftAmount) { } } shiftContainerEnds(shiftAmount); + refillableState.pendingShift = shiftAmount; refillableState.totalDiscardedBytes += shiftAmount; } @@ -1278,9 +1313,11 @@ private long uncheckedReadFlexSym_1_1(Marker markerToSet) { // Inline symbol with zero length. markerToSet.startIndex = peekIndex; markerToSet.endIndex = peekIndex; + return -1; } else if (nextByte != OpCodes.DELIMITED_END_MARKER) { throw new IonException("FlexSym 0 may only precede symbol zero, empty string, or delimited end."); } + markerToSet.typeId = IonTypeID.DELIMITED_END_ID; return -1; } else if (result < 0) { markerToSet.startIndex = peekIndex; @@ -1349,9 +1386,11 @@ private long slowReadFlexSym_1_1(Marker markerToSet) { // Inline symbol with zero length. markerToSet.startIndex = peekIndex; markerToSet.endIndex = peekIndex; + return -1; } else if (nextByte != OpCodes.DELIMITED_END_MARKER) { throw new IonException("FlexSyms may only wrap symbol zero, empty string, or delimited end."); } + markerToSet.typeId = DELIMITED_END_ID; return -1; } else if (result < 0) { markerToSet.startIndex = peekIndex; @@ -1396,36 +1435,248 @@ private boolean slowReadFieldName_1_1() { } } + /** + * Determines whether the current delimited container has reached its end. + * @return true if the container is at its end; otherwise, false. + */ private boolean uncheckedIsDelimitedEnd_1_1() { - throw new UnsupportedOperationException(); + if (parent.typeId.type == IonType.STRUCT) { + uncheckedReadFieldName_1_1(); + if (fieldSid < 0 && fieldTextMarker.typeId != null && fieldTextMarker.typeId.lowerNibble == OpCodes.DELIMITED_END_MARKER) { + parent.endIndex = peekIndex; + event = Event.END_CONTAINER; + return true; + } + } else if (buffer[(int) peekIndex] == OpCodes.DELIMITED_END_MARKER) { + peekIndex++; + parent.endIndex = peekIndex; + event = Event.END_CONTAINER; + return true; + } + return false; } + /** + * Determines whether the cursor is at the end of a delimited struct. + * @param currentByte the byte on which the cursor is currently positioned. + * @return true if the struct is at its end or if not enough data is available; otherwise, false. + */ + private boolean slowIsDelimitedStructEnd_1_1(int currentByte) { + if (refillableState.isSpecialFlexSymPartiallyRead) { + // The delimited struct is oversized, and the first byte in a special FlexSym in field position (0x01) + // was already skipped. If the next byte is DELIMITED_END_MARKER, then the struct is at its end. + if (currentByte == (OpCodes.DELIMITED_END_MARKER & SINGLE_BYTE_MASK)) { + event = Event.END_CONTAINER; + fieldSid = -1; + return true; + } + refillableState.isSpecialFlexSymPartiallyRead = false; + } else if (currentByte == FlexInt.ZERO) { + // This is a special FlexSym in field position. Determine whether the next byte is DELIMITED_END_MARKER. + currentByte = slowReadByte(); + if (currentByte < 0) { + // If the struct is being skipped due to being oversized, then the first byte in the special FlexSym + // was skipped and is not present in the buffer. This needs to be recorded so that the struct can + // still terminate if additional bytes become available. + refillableState.isSpecialFlexSymPartiallyRead = refillableState.isSkippingCurrentValue; + return true; + } + if (currentByte == (OpCodes.DELIMITED_END_MARKER & SINGLE_BYTE_MASK)) { + event = Event.END_CONTAINER; + valueTid = null; + fieldSid = -1; + return true; + } + // Note: slowReadByte() increments the peekIndex, but if the delimiter end is not found, the byte + // needs to remain available. + peekIndex--; + } + return false; + } + + /** + * Determines whether the current delimited container has reached its end, ensuring enough bytes are available + * in the stream. + * @return true if the container is at its end or if not enough data is available; otherwise, false. + */ private boolean slowIsDelimitedEnd_1_1() { - throw new UnsupportedOperationException(); + int b = slowReadByte(); + if (b < 0) { + return true; + } + if (parent.typeId.type == IonType.STRUCT && slowIsDelimitedStructEnd_1_1(b)) { + parent.endIndex = peekIndex; + return true; + } else if (b == (OpCodes.DELIMITED_END_MARKER & SINGLE_BYTE_MASK)) { + parent.endIndex = peekIndex; + event = Event.END_CONTAINER; + valueTid = null; + fieldSid = -1; + return true; + } + // Note: slowReadByte() increments the peekIndex, but if the delimiter end is not found, the byte + // needs to remain available. + peekIndex--; + return false; } + /** + * Skips past the remaining elements of the current delimited container. + * @return true if the end of the stream was reached before skipping past all remaining elements; otherwise, false. + */ boolean skipRemainingDelimitedContainerElements_1_1() { - throw new UnsupportedOperationException(); + while (event != Event.END_CONTAINER) { + nextValue(); + if (event == Event.NEEDS_DATA) { + return true; + } + } + return false; } + /** + * Skips past the remaining elements of the current delimited container, ensuring enough bytes are available in + * the stream. + * @return true if the end of the stream was reached before skipping past all remaining elements; otherwise, false. + */ + private boolean slowSkipRemainingDelimitedContainerElements_1_1() { + while (event != Event.END_CONTAINER) { + slowNextToken(); + if (event == Event.START_CONTAINER && valueMarker.endIndex == DELIMITED_MARKER) { + seekPastDelimitedContainer_1_1(); + } + if (event == Event.NEEDS_DATA) { + return true; + } + } + return false; + } + + /** + * Seek past a delimited container that was never stepped into. + */ private void seekPastDelimitedContainer_1_1() { - throw new UnsupportedOperationException(); + stepIntoContainer(); + stepOutOfContainer(); } + /** + * Locates the end of the delimited container on which the reader is currently positioned. + * @return true if the end of the container was found; otherwise, false. + */ private boolean slowFindDelimitedEnd_1_1() { - throw new UnsupportedOperationException(); + // Save the cursor's current state so that it can return to this position after finding the delimited end. + long savedPeekIndex = peekIndex; + long savedStartIndex = valueMarker.startIndex; + long savedEndIndex = valueMarker.endIndex; + int savedFieldSid = fieldSid; + IonTypeID savedFieldTid = fieldTextMarker.typeId; + long savedFieldTextStartIndex = fieldTextMarker.startIndex; + long savedFieldTextEndIndex = fieldTextMarker.endIndex; + IonTypeID savedValueTid = valueMarker.typeId; + IonTypeID savedAnnotationTid = annotationSequenceMarker.typeId; + long savedAnnotationStartIndex = annotationSequenceMarker.startIndex; + long savedAnnotationsEndIndex = annotationSequenceMarker.endIndex; + CheckpointLocation savedCheckpointLocation = checkpointLocation; + long savedCheckpoint = checkpoint; + int savedContainerIndex = containerIndex; + Marker savedParent = parent; + // ------------ + + // TODO performance: the following line causes the end indexes of any child delimited containers that are not + // contained within a length-prefixed container to be calculated. Currently these are thrown away, but storing + // them in case those containers are later accessed could make them faster to skip. This would require some + // additional complexity. + seekPastDelimitedContainer_1_1(); + + boolean isReady = event != Event.NEEDS_DATA; + if (refillableState.isSkippingCurrentValue) { + // This delimited container is oversized. The cursor must seek past it. + refillableState.state = State.SEEK_DELIMITED; + refillableState.targetSeekDepth = savedContainerIndex; + refillableState.pendingShift = 0; + return isReady; + } + + // Restore the state of the cursor at the start of the delimited container. + long pendingShift = refillableState.pendingShift; + valueMarker.startIndex = savedStartIndex - pendingShift; + valueMarker.endIndex = (savedEndIndex == DELIMITED_MARKER) ? DELIMITED_MARKER : (savedEndIndex - pendingShift); + fieldSid = savedFieldSid; + valueMarker.typeId = savedValueTid; + valueTid = savedValueTid; + annotationSequenceMarker.typeId = savedAnnotationTid; + annotationSequenceMarker.startIndex = savedAnnotationStartIndex - pendingShift; + annotationSequenceMarker.endIndex = savedAnnotationsEndIndex - pendingShift; + fieldTextMarker.typeId = savedFieldTid; + fieldTextMarker.startIndex = savedFieldTextStartIndex - pendingShift; + fieldTextMarker.endIndex = savedFieldTextEndIndex - pendingShift; + checkpointLocation = savedCheckpointLocation; + checkpoint = savedCheckpoint - pendingShift; + containerIndex = savedContainerIndex; + + savedPeekIndex -= pendingShift; + parent = savedParent; + if (parent == null) { + // At depth zero, there can not be any more upward recursive calls to which the shift needs to be + // conveyed. + refillableState.pendingShift = 0; + } + if (isReady) { + // Record the endIndex so that it does not need to be calculated repetitively. + valueMarker.endIndex = peekIndex; + event = Event.START_CONTAINER; + refillableState.state = State.READY; + } else { + // The fill is not complete, but there is currently no more data. The cursor will have to resume the fill + // before processing the next request. + refillableState.state = State.FILL_DELIMITED; + } + + peekIndex = savedPeekIndex; + return isReady; } + /** + * Seeks to the end of the delimited container at `refillableState.targetSeekDepth`. + * @return true if the end of the container was reached; otherwise, false. + */ private boolean slowSeekToDelimitedEnd_1_1() { - throw new UnsupportedOperationException(); + refillableState.state = State.READY; + refillableState.isSkippingCurrentValue = true; + while (containerIndex > refillableState.targetSeekDepth) { + stepOutOfContainer(); + if (event == Event.NEEDS_DATA) { + refillableState.state = State.SEEK_DELIMITED; + refillableState.isSkippingCurrentValue = false; + return false; + } + } + // The end of the container has been reached. Report the number of bytes skipped and exit seek mode. + if (dataHandler != null) { + reportSkippedData(); + } + refillableState.totalDiscardedBytes += refillableState.individualBytesSkippedWithoutBuffering; + refillableState.individualBytesSkippedWithoutBuffering = 0; + refillableState.isSkippingCurrentValue = false; + event = Event.NEEDS_INSTRUCTION; + return true; } + /** + * Fills all bytes in the delimited container on which the cursor is currently positioned. + * @return true if not enough data was available in the stream; otherwise, false. + */ private boolean slowFillDelimitedContainer_1_1() { - throw new UnsupportedOperationException(); - } - - private boolean slowSkipRemainingDelimitedContainerElements_1_1() { - throw new UnsupportedOperationException(); + // Pin the current buffer offset so that all bytes encountered while finding the end of the delimited container + // are buffered. + refillableState.pinOffset = offset; + slowFindDelimitedEnd_1_1(); + if (event == Event.NEEDS_DATA) { + return true; + } + refillableState.pinOffset = -1; + return false; } /* ---- End: version-dependent parsing methods ---- */ @@ -1529,6 +1780,10 @@ private void reset() { valueMarker.endIndex = -1; fieldSid = -1; hasAnnotations = false; + if (refillableState != null) { + refillableState.isSpecialFlexSymPartiallyRead = false; + refillableState.pendingShift = 0; + } } /** @@ -1979,6 +2234,15 @@ private void reportConsumedData() { lastReportedByteTotal = totalNumberOfBytesRead; } + /** + * Reports the total number of bytes skipped without buffering since the last report. + */ + private void reportSkippedData() { + long totalNumberOfBytesRead = getTotalOffset() + refillableState.individualBytesSkippedWithoutBuffering; + dataHandler.onData((int) (totalNumberOfBytesRead - lastReportedByteTotal)); + lastReportedByteTotal = totalNumberOfBytesRead; + } + /** * Advances to the next token, seeking past the previous value if necessary. After return `event` will convey * the result (e.g. START_SCALAR, END_CONTAINER) @@ -2140,7 +2404,15 @@ private Event slowOverflowableNextToken() { */ private void seekPastOversizedValue() { refillableState.oversizedValueHandler.onOversizedValue(); - if (refillableState.state != State.TERMINATED) { + if (refillableState.state == State.SEEK_DELIMITED) { + // Discard all buffered bytes. + slowSeek(availableAt(offset)); + refillableState.pinOffset = -1; + refillableState.totalDiscardedBytes += refillableState.individualBytesSkippedWithoutBuffering; + refillableState.state = State.SEEK_DELIMITED; + peekIndex = offset; + shiftContainerEnds(refillableState.individualBytesSkippedWithoutBuffering); + } else if (refillableState.state != State.TERMINATED) { slowSeek(valueMarker.endIndex - offset - refillableState.individualBytesSkippedWithoutBuffering); refillableState.totalDiscardedBytes += refillableState.individualBytesSkippedWithoutBuffering; peekIndex = offset; diff --git a/src/main/java/com/amazon/ion/impl/IonReaderContinuableApplicationBinary.java b/src/main/java/com/amazon/ion/impl/IonReaderContinuableApplicationBinary.java index 7ca95d000c..52ebebc71b 100644 --- a/src/main/java/com/amazon/ion/impl/IonReaderContinuableApplicationBinary.java +++ b/src/main/java/com/amazon/ion/impl/IonReaderContinuableApplicationBinary.java @@ -945,11 +945,14 @@ private enum State { * false. */ boolean startsWithIonSymbolTable() { - long savedPeekIndex = peekIndex; - peekIndex = annotationSequenceMarker.startIndex; - int sid = minorVersion == 0 ? readVarUInt_1_0() : (int) readFlexUInt_1_1(); - peekIndex = savedPeekIndex; - return ION_SYMBOL_TABLE_SID == sid; + if (minorVersion == 0 || annotationTokenMarkers.isEmpty()) { + long savedPeekIndex = peekIndex; + peekIndex = annotationSequenceMarker.startIndex; + int sid = readVarUInt_1_0(); + peekIndex = savedPeekIndex; + return ION_SYMBOL_TABLE_SID == sid; + } + return ION_SYMBOL_TABLE_SID == annotationTokenMarkers.get(0).endIndex; } /** diff --git a/src/main/java/com/amazon/ion/impl/IonTypeID.java b/src/main/java/com/amazon/ion/impl/IonTypeID.java index 002ed31a2a..fa34dcd238 100644 --- a/src/main/java/com/amazon/ion/impl/IonTypeID.java +++ b/src/main/java/com/amazon/ion/impl/IonTypeID.java @@ -85,6 +85,7 @@ final class IonTypeID { static final IonTypeID[] TYPE_IDS_1_1; static final IonTypeID[] NULL_TYPE_IDS_1_1; static final IonTypeID STRUCT_WITH_FLEX_SYMS_ID; + static final IonTypeID DELIMITED_END_ID; static { TYPE_IDS_NO_IVM = new IonTypeID[NUMBER_OF_BYTES]; TYPE_IDS_1_0 = new IonTypeID[NUMBER_OF_BYTES]; @@ -116,6 +117,10 @@ final class IonTypeID { // This is used as a dummy ID when a struct switches to using FlexSym field names in the middle. The key // here is that the type is STRUCT and the isInlineable flag is true. STRUCT_WITH_FLEX_SYMS_ID = TYPE_IDS_1_1[VARIABLE_LENGTH_STRUCT_WITH_FLEX_SYMS & 0xFF]; + + // This is used as a dummy ID when a delimited container reaches its end. The key here is that the type ID's + // lower nibble is OpCodes.DELIMITED_END_MARKER. + DELIMITED_END_ID = TYPE_IDS_1_1[DELIMITED_END_MARKER & 0xFF]; } final IonType type; diff --git a/src/test/java/com/amazon/ion/impl/IonCursorBinaryTest.java b/src/test/java/com/amazon/ion/impl/IonCursorBinaryTest.java index 581289a78e..d311be6231 100644 --- a/src/test/java/com/amazon/ion/impl/IonCursorBinaryTest.java +++ b/src/test/java/com/amazon/ion/impl/IonCursorBinaryTest.java @@ -6,14 +6,18 @@ import com.amazon.ion.IonBufferConfiguration; import com.amazon.ion.IonCursor; import com.amazon.ion.IonException; +import com.amazon.ion.IonType; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.io.ByteArrayInputStream; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.function.Supplier; import static com.amazon.ion.BitUtils.bytes; +import static com.amazon.ion.IonCursor.Event.NEEDS_INSTRUCTION; import static com.amazon.ion.IonCursor.Event.VALUE_READY; import static com.amazon.ion.IonCursor.Event.START_CONTAINER; import static com.amazon.ion.IonCursor.Event.START_SCALAR; @@ -35,23 +39,27 @@ public class IonCursorBinaryTest { - private static IonCursorBinary initializeCursor(boolean constructFromBytes, int... data) { + private static IonCursorBinary initializeCursor(IonBufferConfiguration configuration, boolean constructFromBytes, int... data) { IonCursorBinary cursor; if (constructFromBytes) { - cursor = new IonCursorBinary(STANDARD_BUFFER_CONFIGURATION, bytes(data), 0, data.length); + cursor = new IonCursorBinary(configuration, bytes(data), 0, data.length); } else { cursor = new IonCursorBinary( - STANDARD_BUFFER_CONFIGURATION, + configuration, new ByteArrayInputStream(bytes(data)), null, 0, 0 ); } - cursor.registerOversizedValueHandler(STANDARD_BUFFER_CONFIGURATION.getOversizedValueHandler()); + cursor.registerOversizedValueHandler(configuration.getOversizedValueHandler()); return cursor; } + private static IonCursorBinary initializeCursor(boolean constructFromBytes, int... data) { + return initializeCursor(STANDARD_BUFFER_CONFIGURATION, constructFromBytes, data); + } + /** * Provides Expectations that verify that advancing the cursor to the next value results in the given event, and * filling that value results in a Marker with the given start and end indices. @@ -69,6 +77,33 @@ private static ExpectationProvider fill(IonCursor.Event expecte )); } + /** + * Provides Expectations that verify that advancing the cursor to the next value results in the given event, and + * attempting to fill that value results in NEEDS_INSTRUCTION, indicating that the value could not be filled due + * to being oversize. + */ + private static ExpectationProvider fillIsOversize(IonCursor.Event expectedEvent, Supplier oversizeCounter) { + return consumer -> consumer.accept(new Expectation<>( + String.format("fillOversized(%s)", expectedEvent), + cursor -> { + assertEquals(expectedEvent, cursor.nextValue()); + assertEquals(NEEDS_INSTRUCTION, cursor.fillValue()); + assertEquals(1, oversizeCounter.get()); + } + )); + } + + /** + * Provides an Expectation that verifies that the value on which the cursor is currently positioned has the given + * type. + */ + static ExpectationProvider type(IonType expectedType) { + return consumer -> consumer.accept(new Expectation<>( + String.format("type(%s)", expectedType), + cursor -> assertEquals(expectedType, cursor.getValueMarker().typeId.type)) + ); + } + /** * Provides Expectations that verify that advancing the cursor positions it on a scalar, and filling that scalar * results in a Marker with the given start and end indices. @@ -256,6 +291,36 @@ public void fillContainerAtDepth0(boolean constructFromBytes) { ); } + @ParameterizedTest(name = "constructFromBytes={0}") + @ValueSource(booleans = {true, false}) + public void fillDelimitedContainerAtDepth0(boolean constructFromBytes) { + IonCursorBinary cursor = initializeCursor( + constructFromBytes, + 0xE0, 0x01, 0x01, 0xEA, + 0xF3, // Delimited struct + 0x07, // Field SID 3 + 0xF1, // Delimited list, contents start at index 7 + 0x5A, // Float length 0 + 0xF0, // End delimited list + 0x09, // Field SID 4 + 0x51, 0x01, // Int length 1, starting at byte index 11 + 0x01, 0xF0 // End delimited struct + ); + assertSequence( + cursor, + // When reading from a fixed-size input source, the cursor does not need peek ahead to find the end of + // the delimited container during fill, so it remains -1 in that case. Otherwise, fill looks ahead to + // find the end index and stores in the index so that it does not need to be repetitively calculated. + fillContainer(5, constructFromBytes ? -1 : 14, + container( + scalar() + ), + fillScalar(11, 12) + ), + endStream() + ); + } + @ParameterizedTest(name = "constructFromBytes={0}") @ValueSource(booleans = {true, false}) public void fillContainerAtDepth1(boolean constructFromBytes) { @@ -280,6 +345,78 @@ public void fillContainerAtDepth1(boolean constructFromBytes) { ); } + @ParameterizedTest(name = "constructFromBytes={0}") + @ValueSource(booleans = {true, false}) + public void fillDelimitedContainerAtDepth1(boolean constructFromBytes) { + IonCursorBinary cursor = initializeCursor( + constructFromBytes, + 0xE0, 0x01, 0x01, 0xEA, + 0xF3, // Delimited struct + 0x07, // Field SID 3 + 0xF1, // Delimited list, contents start at index 7 + 0x5A, // Float length 0 + 0xF0, // End delimited list + 0x09, // Field SID 4 + 0x51, 0x01, // Int length 1, starting at byte index 11 + 0x01, 0xF0 // End delimited struct + ); + assertSequence( + cursor, + container( + // When reading from a fixed-size input source, the cursor does not need peek ahead to find the end of + // the delimited container during fill, so it remains -1 in that case. Otherwise, fill looks ahead to + // find the end index and stores in the index so that it does not need to be repetitively calculated. + fillContainer(7, constructFromBytes ? -1 : 9, + scalar(), + endContainer() + ) + ) + ); + } + + @Test + public void skipOversizeDelimitedContainerAtDepth1() { + AtomicInteger oversizeValueCounter = new AtomicInteger(0); + AtomicInteger oversizeSymbolTableCounter = new AtomicInteger(0); + AtomicInteger byteCounter = new AtomicInteger(0); + int[] data = new int[] { + 0xE0, 0x01, 0x01, 0xEA, + 0xF3, // Delimited struct + 0x07, // Field SID 3 + 0xF1, // Delimited list, contents start at index 7 + 0x5A, 0x5A, 0x5A, 0x5A, 0x5A, 0x5A, // Five floats 0e0 + 0xF0, // End delimited list + 0x09, // Field SID 4 + 0x51, 0x01, // Int length 1, starting at byte index 16 + 0x01, 0xF0 // End delimited struct + }; + IonCursorBinary cursor = initializeCursor( + IonBufferConfiguration.Builder.standard() + .withInitialBufferSize(5) + .withMaximumBufferSize(5) + .onData(byteCounter::addAndGet) + .onOversizedValue(oversizeValueCounter::incrementAndGet) + .onOversizedSymbolTable(oversizeSymbolTableCounter::incrementAndGet) + .build(), + false, + data + ); + assertSequence( + cursor, + container( + // The oversize delimited list is skipped. + fillIsOversize(START_CONTAINER, oversizeValueCounter::get), + scalar(), type(IonType.INT), + endContainer() + ), + endStream() + ); + cursor.close(); + assertEquals(1, oversizeValueCounter.get()); + assertEquals(0, oversizeSymbolTableCounter.get()); + assertEquals(data.length, byteCounter.get()); + } + @ParameterizedTest(name = "constructFromBytes={0}") @ValueSource(booleans = {true, false}) public void fillContainerThenSkip(boolean constructFromBytes) { @@ -306,6 +443,38 @@ public void fillContainerThenSkip(boolean constructFromBytes) { ); } + @ParameterizedTest(name = "constructFromBytes={0}") + @ValueSource(booleans = {true, false}) + public void fillDelimitedContainerThenSkip(boolean constructFromBytes) { + IonCursorBinary cursor = initializeCursor( + constructFromBytes, + 0xE0, 0x01, 0x01, 0xEA, + 0xF3, // Delimited struct + 0x07, // Field SID 3 + 0xF1, // Delimited list, contents start at index 7 + 0x5A, // Float length 0 + 0xF0, // End delimited list + 0x09, // Field SID 4 + 0x51, 0x01, // Int length 1, starting at byte index 11 + 0x01, 0xF0, // End delimited struct + 0xF3, // Delimited struct + 0x09, // Field SID 4 + 0x50, // Int length 0, at byte index 17 + 0x01, 0xF0 // End delimited struct + ); + assertSequence( + cursor, + // When reading from a fixed-size input source, the cursor does not need peek ahead to find the end of + // the delimited container during fill, so it remains -1 in that case. Otherwise, fill looks ahead to + // find the end index and stores in the index so that it does not need to be repetitively calculated. + fill(START_CONTAINER, 5, constructFromBytes ? -1 : 14), + container( + fillScalar(17, 17) + ), + endStream() + ); + } + @Test public void expectMalformedListHeaderToFailCleanly() { // The following test is expected to fail because the VarUInt length would extend beyond the end of the buffer. diff --git a/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java b/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java index 6797e11c9d..0278fdcd26 100644 --- a/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java +++ b/src/test/java/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java @@ -4093,6 +4093,16 @@ public void corruptEachByteThrowsIonException(boolean constructFromBytes) { corruptEachByteThrowsIonException(constructFromBytes, false, false); } + /** + * Returns the given data prepended with an IVM for the requested 1.x minor version. + * @param minorVersion the IVM version to prepend. + * @param data the data. + * @return the data with an IVM prepended. + */ + private static byte[] withIvm(int minorVersion, byte[] data) throws Exception { + return new TestUtils.BinaryIonAppender(minorVersion).append(data).toByteArray(); + } + /** * Creates an IonReader over the given data, which will be prepended with a binary Ion 1.1 IVM. * @param data the data to read. @@ -4100,8 +4110,7 @@ public void corruptEachByteThrowsIonException(boolean constructFromBytes) { * @return a new reader. */ private IonReader readerForIon11(byte[] data, boolean constructFromBytes) throws Exception { - byte[] inputBytes = new TestUtils.BinaryIonAppender(1).append(data).toByteArray(); - reader = readerFor(readerBuilder, constructFromBytes, inputBytes); + reader = readerFor(readerBuilder, constructFromBytes, withIvm(1, data)); byteCounter.set(0); return reader; } @@ -4118,6 +4127,8 @@ private void assertNullCorrectlyParsed(boolean constructFromBytes, IonType expec closeAndCount(); } + // TODO byte-by-byte incremental mode testing for all Ion 1.1 tests + @ParameterizedTest @CsvSource({ " NULL, EA", @@ -4965,6 +4976,13 @@ private void assertSimpleStructCorrectlyParsed(boolean constructFromBytes, Strin "5E | true \n" + "0D | SID 6 \n " + "5F | false", + // Delimited + "F3 | Delimited struct \n" + + "F9 6E 61 6D 65 | name \n" + + "5E | true \n" + + "0D | SID 6 \n" + + "5F | false \n" + + "01 F0 | End delimited struct", // FlexSym field names using SID type ID "FC | Variable Length SID struct \n" + "21 | Length = 16 \n" + @@ -4994,7 +5012,6 @@ private void assertSimpleStructCorrectlyParsed(boolean constructFromBytes, Strin "5E | true \n" + "F3 69 6D 70 6F 72 74 73 | imports \n " + "5F | false", - // TODO delimited }) public void readStruct_1_1(String inputBytes) throws Exception { assertSimpleStructCorrectlyParsed(true, inputBytes); @@ -5079,8 +5096,18 @@ private void assertStructWithSymbolZeroFieldNamesCorrectlyParsed(boolean constru "09 | FlexSym SID 4 (name) \n" + "5E | true \n" + "01 90 | FlexSym SID 0 \n" + - "5E | true" - // TODO delimited + "5E | true", + // SID 0 in delimited struct + "F3 | Delimited struct \n" + + "01 90 | FlexSym SID 0 \n" + + "5E | true \n" + + "01 90 | FlexSym SID 0 \n" + + "5E | true \n" + + "09 | FlexSym SID 4 (name) \n" + + "5E | true \n" + + "01 90 | FlexSym SID 0 \n" + + "5E | true \n" + + "01 F0 | End delimited struct" }) public void readStructWithSymbolZeroFieldNames_1_1(String inputBytes) throws Exception { assertStructWithSymbolZeroFieldNamesCorrectlyParsed(true, inputBytes); @@ -5146,7 +5173,11 @@ public void assertStructWithEmptyInlineFieldNamesCorrectlyParsed(boolean constru "07 | Length = 3 \n" + "01 80 | FlexSym empty text \n" + "5F | false", - // TODO delimited + // Empty field name in delimited struct + "F3 | Delimited struct \n" + + "01 80 | FlexSym empty text \n" + + "5F | false \n" + + "01 F0 | End delimited struct" }) public void readStructWithEmptyInlineFieldName_1_1(String inputBytes) throws Exception { assertStructWithEmptyInlineFieldNamesCorrectlyParsed(true, inputBytes); @@ -5191,5 +5222,387 @@ public void readMultipleNestedListsAndSexps_1_1(boolean constructFromBytes) thro closeAndCount(); } - // TODO add tests for incrementally reading Ion 1.1 containers, including oversize values. + // TODO oversized Ion 1.1 annotation wrappers + + @ParameterizedTest(name = "constructFromBytes={0}") + @ValueSource(booleans = {true, false}) + public void delimitedListNestedWithinDelimitedStruct(boolean constructFromBytes) throws Exception { + byte[] input = hexStringToByteArray(cleanCommentedHexBytes( + "F3 | Delimited struct\n" + + "09 | Field SID 4 (name)\n" + + "F1 | Delimited list\n" + + "F0 | Delimited end marker\n" + + "01 | Special FlexSym 0 in field name position\n" + + "F0 | Delimited end marker\n" + )); + reader = readerForIon11(input, constructFromBytes); + assertSequence( + container(IonType.STRUCT, + next("name", IonType.LIST), STEP_IN, + next(null), + STEP_OUT + ), + next(null) + ); + closeAndCount(); + } + + + @ParameterizedTest(name = "constructFromBytes={0}") + @ValueSource(booleans = {true, false}) + public void topLevelStepOverDelimitedListNestedWithinDelimitedStruct(boolean constructFromBytes) throws Exception { + byte[] input = hexStringToByteArray(cleanCommentedHexBytes( + "F3 | Delimited struct\n" + + "09 | Field SID 4 (name)\n" + + "F1 | Delimited list\n" + + "F0 | Delimited end marker\n" + + "01 | Special FlexSym 0 in field name position\n" + + "F0 | Delimited end marker\n" + + "50 | Int 0\n" + )); + reader = readerForIon11(input, constructFromBytes); + assertSequence( + next(IonType.STRUCT), + next(IonType.INT), intValue(0), + next(null) + ); + closeAndCount(); + } + + @ParameterizedTest(name = "constructFromBytes={0}") + @ValueSource(booleans = {true, false}) + public void topLevelStepOverDelimitedListNestedWithinDelimitedStructNonIncremental(boolean constructFromBytes) throws Exception { + byte[] input = withIvm(1, hexStringToByteArray(cleanCommentedHexBytes( + "F3 | Delimited struct\n" + + "09 | Field SID 4 (name)\n" + + "F1 | Delimited list\n" + + "F0 | Delimited end marker\n" + + "01 | Special FlexSym 0 in field name position\n" + + "F0 | Delimited end marker\n" + + "50 | Int 0\n" + ))); + reader = readerFor(readerBuilder.withIncrementalReadingEnabled(false), constructFromBytes, input); + assertSequence( + next(IonType.STRUCT), + next(IonType.INT), intValue(0), + next(null) + ); + closeAndCount(); + } + + @ParameterizedTest(name = "constructFromBytes={0}") + @ValueSource(booleans = {true, false}) + public void prefixedStructNestedWithinDelimitedSexp(boolean constructFromBytes) throws Exception { + byte[] input = hexStringToByteArray(cleanCommentedHexBytes( + "F2 | Delimited s-expression\n" + + "C2 | Prefixed struct, length 2\n" + + "09 | Field SID 4 (name)\n" + + "50 | Int 0\n" + + "F0 | Delimited end marker\n" + )); + reader = readerForIon11(input, constructFromBytes); + assertSequence( + container(IonType.SEXP, + container(IonType.STRUCT, + next("name", IonType.INT), intValue(0), + next(null) + ), + next(null) + ), + next(null) + ); + closeAndCount(); + } + + @ParameterizedTest(name = "constructFromBytes={0}") + @ValueSource(booleans = {true, false}) + public void stepOverPrefixedStructNestedWithinDelimitedSexp(boolean constructFromBytes) throws Exception { + byte[] input = hexStringToByteArray(cleanCommentedHexBytes( + "F2 | Delimited s-expression\n" + + "C2 | Prefixed struct, length 2\n" + + "09 | Field SID 4 (name)\n" + + "50 | Int 0\n" + + "F0 | Delimited end marker\n" + )); + reader = readerForIon11(input, constructFromBytes); + assertSequence( + container(IonType.SEXP, + next(IonType.STRUCT), + // The nested struct is skipped. + next(null) + ), + next(null) + ); + closeAndCount(); + } + + @ParameterizedTest(name = "constructFromBytes={0}") + @ValueSource(booleans = {true, false}) + public void delimitedSexpNestedWithinPrefixedList(boolean constructFromBytes) throws Exception { + byte[] input = hexStringToByteArray(cleanCommentedHexBytes( + "A4 | Prefixed list, length 4\n" + + "F2 | Delimited s-expression\n" + + "50 | Int 0\n" + + "F0 | Delimited end marker\n" + + "50 | Int 0\n" + )); + reader = readerForIon11(input, constructFromBytes); + assertSequence( + container(IonType.LIST, + container(IonType.SEXP, + next(IonType.INT), intValue(0), + next(null) + ), + next(IonType.INT), intValue(0), + next(null) + ), + next(null) + ); + closeAndCount(); + } + + + @ParameterizedTest(name = "constructFromBytes={0}") + @ValueSource(booleans = {true, false}) + public void stepOverDelimitedSexpNestedWithinPrefixedList(boolean constructFromBytes) throws Exception { + byte[] input = hexStringToByteArray(cleanCommentedHexBytes( + "A4 | Prefixed list, length 4\n" + + "F2 | Delimited s-expression\n" + + "50 | Int 0\n" + + "F0 | Delimited end marker\n" + + "50 | Int 0\n" + )); + reader = readerForIon11(input, constructFromBytes); + assertSequence( + container(IonType.LIST, + next(IonType.SEXP), + // The nested s-expression is skipped. + next(IonType.INT), intValue(0), + next(null) + ), + next(null) + ); + closeAndCount(); + } + + @Test + public void oversizeDelimitedContainer() throws Exception { + // The outer struct is determined to be oversize after the nested delimited list is processed. + byte[] input = withIvm(1, hexStringToByteArray(cleanCommentedHexBytes( + "F3 | Delimited struct\n" + + "09 | Field SID 4 (name)\n" + + "F1 | Delimited list\n" + + "50 | Int 0\n" + + "F0 | Delimited end marker\n" + + "01 | Special FlexSym 0 in field name position\n" + + "F0 | Delimited end marker\n" + + "50 | Int 0\n" + ))); + reader = boundedReaderFor(false, input, 5, 5, byteAndOversizedValueCountingHandler); + assertSequence( + // The oversize delimited struct is skipped. + next(IonType.INT), intValue(0), + next(null) + ); + expectOversized(1); + closeAndCount(); + } + + private byte[] delimitedListNestedWithinDelimitedStructFollowedByFloatZero() throws Exception { + byte[] input = withIvm(1, hexStringToByteArray(cleanCommentedHexBytes( + "F3 | Delimited struct\n" + + "09 | Field SID 4 (name)\n" + + "F1 | Delimited list\n" + + "50 | Int 0\n" + + "50 | Int 0\n" + + "50 | Int 0\n" + + "F0 | Delimited end marker\n" + + "01 | Special FlexSym 0 in field name position\n" + + "F0 | Delimited end marker\n" + + "5A | Float 0e0\n" + ))); + totalBytesInStream = input.length; + return input; + } + + @Test + public void oversizeNestedDelimitedContainer() throws Exception { + // This test differs from the previous one in that the outer struct is determined to be oversize in the + // middle of the nested delimited list. + byte[] input = delimitedListNestedWithinDelimitedStructFollowedByFloatZero(); + reader = boundedReaderFor(false, input, 5, 5, byteAndOversizedValueCountingHandler); + assertSequence( + // The oversize delimited struct is skipped. + next(IonType.FLOAT), doubleValue(0e0), + next(null) + ); + expectOversized(1); + closeAndCount(); + } + + @Test + public void oversizeNestedDelimitedContainerIncremental() throws Exception { + byte[] input = delimitedListNestedWithinDelimitedStructFollowedByFloatZero(); + ResizingPipedInputStream pipe = new ResizingPipedInputStream((int) totalBytesInStream); + reader = boundedReaderFor(pipe, 5, 5, byteAndOversizedValueCountingHandler); + feedBytesOneByOne(input, pipe, reader); + assertSequence( + // The oversize delimited struct is skipped. + next(IonType.FLOAT), doubleValue(0e0), + next(null) + ); + expectOversized(1); + closeAndCount(); + } + + @Test + public void skipDelimitedContainerIncremental() throws Exception { + byte[] input = delimitedListNestedWithinDelimitedStructFollowedByFloatZero(); + ResizingPipedInputStream pipe = new ResizingPipedInputStream((int) totalBytesInStream); + reader = readerFor(pipe); + for (int i = 0; i < input.length - 1; i++) { + nextExpect(null); + pipe.receive(input[i]); + } + nextExpect(IonType.STRUCT); + pipe.receive(input[input.length - 1]); + assertSequence( + // The delimited struct is skipped. + next(IonType.FLOAT), doubleValue(0e0), + next(null) + ); + closeAndCount(); + } + + @Test + public void skipNestedDelimitedContainerIncremental() throws Exception { + byte[] input = delimitedListNestedWithinDelimitedStructFollowedByFloatZero(); + ResizingPipedInputStream pipe = new ResizingPipedInputStream((int) totalBytesInStream); + reader = readerFor(pipe); + for (int i = 0; i < input.length - 1; i++) { + nextExpect(null); + pipe.receive(input[i]); + } + assertSequence( + container(IonType.STRUCT, + next("name", IonType.LIST), STEP_IN, + next(IonType.INT), intValue(0), + STEP_OUT // Skips the last two ints + ), + next(null) + ); + pipe.receive(input[input.length - 1]); + assertSequence( + next(IonType.FLOAT), doubleValue(0e0), + next(null) + ); + closeAndCount(); + } + + @Test + public void nestedDelimitedContainerIncremental() throws Exception { + byte[] input = delimitedListNestedWithinDelimitedStructFollowedByFloatZero(); + ResizingPipedInputStream pipe = new ResizingPipedInputStream((int) totalBytesInStream); + reader = readerFor(pipe); + for (int i = 0; i < input.length - 1; i++) { + nextExpect(null); + pipe.receive(input[i]); + } + assertSequence( + container(IonType.STRUCT, + next("name", IonType.LIST), STEP_IN, + next(IonType.INT), intValue(0), + next(IonType.INT), intValue(0), + next(IonType.INT), intValue(0), + next(null), + STEP_OUT + ), + next(null) + ); + pipe.receive(input[input.length - 1]); + assertSequence( + next(IonType.FLOAT), doubleValue(0e0), + next(null) + ); + closeAndCount(); + } + + @Test + public void nestedDelimitedContainerInlineFieldNamesIncremental() throws Exception { + byte[] input = withIvm(1, hexStringToByteArray(cleanCommentedHexBytes( + "F3 | Delimited struct\n" + + "F9 | Inline field name, length 4\n" + + "6E 61 6D 65 | name\n" + + "F1 | Delimited list\n" + + "50 | Int 0\n" + + "50 | Int 0\n" + + "50 | Int 0\n" + + "F0 | Delimited end marker\n" + + "01 | Special FlexSym 0 in field name position\n" + + "F0 | Delimited end marker\n" + + "5A | Float 0e0\n" + ))); + totalBytesInStream = input.length; + ResizingPipedInputStream pipe = new ResizingPipedInputStream((int) totalBytesInStream); + reader = readerFor(pipe); + for (int i = 0; i < input.length - 1; i++) { + nextExpect(null); + pipe.receive(input[i]); + } + assertSequence( + container(IonType.STRUCT, + next("name", IonType.LIST), STEP_IN, + next(IonType.INT), intValue(0), + next(IonType.INT), intValue(0), + next(IonType.INT), intValue(0), + next(null), + STEP_OUT + ), + next(null) + ); + pipe.receive(input[input.length - 1]); + assertSequence( + next(IonType.FLOAT), doubleValue(0e0), + next(null) + ); + closeAndCount(); + } + + private byte[] delimitedSymbolTable() throws Exception { + byte[] input = withIvm(1, hexStringToByteArray(cleanCommentedHexBytes( + "E4 07 | Annotation symbol ID 3 ($ion_symbol_table)\n" + + "F3 | Delimited struct\n" + + "0F | FlexSym SID 7 (symbols)\n" + + "F1 | Delimited list\n" + + "86 66 6F 6F 62 61 72 | string foobar\n" + + "F0 | End delimited list\n" + + "01 F0 | End delimited struct\n" + + "E1 0A | Symbol ID 10" + ))); + totalBytesInStream = input.length; + return input; + } + + @ParameterizedTest(name = "constructFromBytes={0}") + @ValueSource(booleans = {true, false}) + public void delimitedSymbolTable(boolean constructFromBytes) throws Exception { + reader = readerFor(readerBuilder, constructFromBytes, delimitedSymbolTable()); + assertSequence( + // Note: this will fail if the Ion 1.1 system symbol table changes because SID 10 will point to something + // else. If that happens, change the input data to point to the first Ion 1.1 local symbol ID. + next(IonType.SYMBOL), symbolValue("foobar"), + next(null) + ); + closeAndCount(); + } + + @Test + public void oversizeDelimitedSymbolTableFailsCleanly() throws Exception { + reader = boundedReaderFor(false, delimitedSymbolTable(), 5, 5, byteAndOversizedSymbolTableCountingHandler); + assertNull(reader.next()); + expectOversized(1); + reader.close(); + } + + // TODO Ion 1.1 symbol tables with all kinds of annotation encodings (opcodes E4 - E9, inline and SID) }