From 06120ad0bc3514be52778910294e6277dbf38d6c Mon Sep 17 00:00:00 2001 From: Tyler Gregg Date: Tue, 7 Jan 2025 16:07:26 -0800 Subject: [PATCH] Adds support for detecting format headers from InputStreams that require multiple read() calls to produce enough bytes to fill a header. --- .../ion/impl/_Private_IonReaderBuilder.java | 47 ++++++++++++++----- .../ion/util/ZstdStreamInterceptorTest.java | 34 ++++++++++++++ 2 files changed, 68 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java b/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java index 137d255f0..2a17b917d 100644 --- a/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java +++ b/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java @@ -12,7 +12,6 @@ import com.amazon.ion.util.IonStreamUtils; import java.io.ByteArrayInputStream; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.Reader; @@ -297,6 +296,38 @@ interface IonReaderFromInputStreamFactoryBinary { IonReader makeReader(_Private_IonReaderBuilder builder, InputStream source, byte[] alreadyRead, int alreadyReadOff, int alreadyReadLen); } + /** + * Reads from the given source into the given byte array, stopping once either + *
    + *
  1. `length` bytes have been read, or
  2. + *
  3. the end of the source stream has been reached, or
  4. + *
  5. the source stream throws an exception.
  6. + *
+ * @param source the source of the bytes to read. + * @param destination the destination for the bytes read. + * @param length the number of bytes to attempt to read. + * @return the number of bytes read into `destination`. + */ + private static int fillToLengthOrStreamEnd(InputStream source, byte[] destination, int length) { + int bytesRead = 0; + while (bytesRead < length) { + int bytesToRead = length - bytesRead; + int bytesReadThisIteration; + try { + bytesReadThisIteration = source.read(destination, bytesRead, bytesToRead); + } catch (IOException e) { + // Some InputStream implementations throw IOExceptions (e.g. EOFException) in certain cases to convey + // that the end of the stream has been reached. + break; + } + if (bytesReadThisIteration < 0) { // This indicates the end of the stream. + break; + } + bytesRead += bytesReadThisIteration; + } + return bytesRead; + } + static IonReader buildReader( _Private_IonReaderBuilder builder, InputStream source, @@ -318,12 +349,7 @@ static IonReader buildReader( // alternatives should be evaluated. byte[] possibleIVM = new byte[maxHeaderLength]; InputStream ionData = source; - int bytesRead; - try { - bytesRead = ionData.read(possibleIVM); - } catch (IOException e) { - throw new IonException(e); - } + int bytesRead = fillToLengthOrStreamEnd(ionData, possibleIVM, maxHeaderLength); // If the input stream is growing, it is possible that fewer than BINARY_VERSION_MARKER_SIZE bytes are // available yet. Simply check whether the stream *could* contain binary Ion based on the available bytes. // If it can't, fall back to text. @@ -342,12 +368,7 @@ static IonReader buildReader( ionData = streamInterceptor.newInputStream( new TwoElementSequenceInputStream(new ByteArrayInputStream(possibleIVM, 0, bytesRead), ionData) ); - try { - bytesRead = ionData.read(possibleIVM); - } catch (EOFException e) { - // Only a compression format header was available, so this may be a binary Ion stream. - bytesRead = 0; - } + bytesRead = fillToLengthOrStreamEnd(ionData, possibleIVM, _Private_IonConstants.BINARY_VERSION_MARKER_SIZE); } catch (IOException e) { throw new IonException(e); } diff --git a/src/test/java/com/amazon/ion/util/ZstdStreamInterceptorTest.java b/src/test/java/com/amazon/ion/util/ZstdStreamInterceptorTest.java index f459d83f5..65e023874 100644 --- a/src/test/java/com/amazon/ion/util/ZstdStreamInterceptorTest.java +++ b/src/test/java/com/amazon/ion/util/ZstdStreamInterceptorTest.java @@ -342,4 +342,38 @@ public void expectFailureWhenHeaderLengthIsInvalid(ZstdStream stream) { .addInputStreamInterceptor(new ZstdStreamInterceptor()); assertThrows(IonException.class, () -> stream.newReader(builder)); } + + private static class OneBytePerReadInputStream extends InputStream { + + private final InputStream delegate; + + OneBytePerReadInputStream(InputStream delegate) { + this.delegate = delegate; + } + + @Override + public int read() throws IOException { + return delegate.read(); + } + + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + int b = delegate.read(); + if (b < 0) { + return -1; + } + bytes[off] = (byte) b; + return 1; + } + } + + @Test + public void headerRequiresMultipleInputStreamReads() throws IOException { + IonReaderBuilder builder = IonReaderBuilder.standard() + .addInputStreamInterceptor(new ZstdStreamInterceptor()); + try (IonReader reader = builder.build(new OneBytePerReadInputStream(new ByteArrayInputStream(ZstdStream.BINARY_BYTES)))) { + assertEquals(IonType.INT, reader.next()); + assertEquals(123, reader.intValue()); + } + } }