Skip to content

Commit

Permalink
Adds support for detecting format headers from InputStreams that requ…
Browse files Browse the repository at this point in the history
…ire multiple read() calls to produce enough bytes to fill a header.
  • Loading branch information
tgregg committed Jan 8, 2025
1 parent cd166b6 commit 06120ad
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 13 deletions.
47 changes: 34 additions & 13 deletions src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.amazon.ion.util.IonStreamUtils;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
Expand Down Expand Up @@ -297,6 +296,38 @@ interface IonReaderFromInputStreamFactoryBinary {
IonReader makeReader(_Private_IonReaderBuilder builder, InputStream source, byte[] alreadyRead, int alreadyReadOff, int alreadyReadLen);
}

/**
* Reads from the given source into the given byte array, stopping once either
* <ol>
* <li>`length` bytes have been read, or</li>
* <li>the end of the source stream has been reached, or</li>
* <li>the source stream throws an exception.</li>
* </ol>
* @param source the source of the bytes to read.
* @param destination the destination for the bytes read.
* @param length the number of bytes to attempt to read.
* @return the number of bytes read into `destination`.
*/
private static int fillToLengthOrStreamEnd(InputStream source, byte[] destination, int length) {
int bytesRead = 0;
while (bytesRead < length) {
int bytesToRead = length - bytesRead;
int bytesReadThisIteration;
try {
bytesReadThisIteration = source.read(destination, bytesRead, bytesToRead);
} catch (IOException e) {
// Some InputStream implementations throw IOExceptions (e.g. EOFException) in certain cases to convey
// that the end of the stream has been reached.
break;
}
if (bytesReadThisIteration < 0) { // This indicates the end of the stream.
break;
}
bytesRead += bytesReadThisIteration;
}
return bytesRead;
}

static IonReader buildReader(
_Private_IonReaderBuilder builder,
InputStream source,
Expand All @@ -318,12 +349,7 @@ static IonReader buildReader(
// alternatives should be evaluated.
byte[] possibleIVM = new byte[maxHeaderLength];
InputStream ionData = source;
int bytesRead;
try {
bytesRead = ionData.read(possibleIVM);
} catch (IOException e) {
throw new IonException(e);
}
int bytesRead = fillToLengthOrStreamEnd(ionData, possibleIVM, maxHeaderLength);
// If the input stream is growing, it is possible that fewer than BINARY_VERSION_MARKER_SIZE bytes are
// available yet. Simply check whether the stream *could* contain binary Ion based on the available bytes.
// If it can't, fall back to text.
Expand All @@ -342,12 +368,7 @@ static IonReader buildReader(
ionData = streamInterceptor.newInputStream(
new TwoElementSequenceInputStream(new ByteArrayInputStream(possibleIVM, 0, bytesRead), ionData)
);
try {
bytesRead = ionData.read(possibleIVM);
} catch (EOFException e) {
// Only a compression format header was available, so this may be a binary Ion stream.
bytesRead = 0;
}
bytesRead = fillToLengthOrStreamEnd(ionData, possibleIVM, _Private_IonConstants.BINARY_VERSION_MARKER_SIZE);
} catch (IOException e) {
throw new IonException(e);
}
Expand Down
34 changes: 34 additions & 0 deletions src/test/java/com/amazon/ion/util/ZstdStreamInterceptorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,38 @@ public void expectFailureWhenHeaderLengthIsInvalid(ZstdStream stream) {
.addInputStreamInterceptor(new ZstdStreamInterceptor());
assertThrows(IonException.class, () -> stream.newReader(builder));
}

private static class OneBytePerReadInputStream extends InputStream {

private final InputStream delegate;

OneBytePerReadInputStream(InputStream delegate) {
this.delegate = delegate;
}

@Override
public int read() throws IOException {
return delegate.read();
}

@Override
public int read(byte[] bytes, int off, int len) throws IOException {
int b = delegate.read();
if (b < 0) {
return -1;
}
bytes[off] = (byte) b;
return 1;
}
}

@Test
public void headerRequiresMultipleInputStreamReads() throws IOException {
IonReaderBuilder builder = IonReaderBuilder.standard()
.addInputStreamInterceptor(new ZstdStreamInterceptor());
try (IonReader reader = builder.build(new OneBytePerReadInputStream(new ByteArrayInputStream(ZstdStream.BINARY_BYTES)))) {
assertEquals(IonType.INT, reader.next());
assertEquals(123, reader.intValue());
}
}
}

0 comments on commit 06120ad

Please sign in to comment.