diff --git a/zio-json/js/src/main/scala/java/io/BufferedWriter.scala b/zio-json/js/src/main/scala/java/io/BufferedWriter.scala new file mode 100644 index 000000000..0c1da6d59 --- /dev/null +++ b/zio-json/js/src/main/scala/java/io/BufferedWriter.scala @@ -0,0 +1,50 @@ +package java.io + +class BufferedWriter(out: Writer, sz: Int) extends Writer { + + if (sz <= 0) throw new IllegalArgumentException("Buffer size <= 0") + + def this(out: Writer) = this(out, 4096) + + private val buffer: Array[Char] = new Array[Char](sz) + private var pos: Int = 0 + private var closed: Boolean = false + + def close(): Unit = if (!closed) { + flush() + out.close() + closed = true + } + + def flush(): Unit = { + ensureOpen() + out.write(buffer, 0, pos) + out.flush() + pos = 0 + } + + def newLine(): Unit = + write(System.lineSeparator(), 0, System.lineSeparator().length) + + override def write(c: Int): Unit = + write(Array(c.toChar), 0, 1) + + override def write(s: String, off: Int, len: Int): Unit = + write(s.toCharArray, off, len) + + def write(cbuf: Array[Char], off: Int, len: Int): Unit = { + ensureOpen() + val available = sz - pos + if (available >= len) { + System.arraycopy(cbuf, off, buffer, pos, len) + pos += len + if (pos == sz) flush() + } else { + write(cbuf, off, available) + write(cbuf, off + available, len - available) + } + } + + private def ensureOpen(): Unit = + if (closed) throw new IOException("Stream closed") +} diff --git a/zio-json/js/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala b/zio-json/js/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala deleted file mode 100644 index d0fc4eb8e..000000000 --- a/zio-json/js/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala +++ /dev/null @@ -1,3 +0,0 @@ -package zio.json - -trait JsonDecoderPlatformSpecific[A] { self: JsonDecoder[A] => } diff --git a/zio-json/js/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala b/zio-json/js/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala deleted file mode 100644 index 57ca8ecb7..000000000 --- a/zio-json/js/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala +++ /dev/null @@ -1,3 +0,0 @@ -package zio.json - -trait JsonEncoderPlatformSpecific[A] { self: JsonEncoder[A] => } diff --git a/zio-json/jvm/src/test/scala-2/zio/json/DecoderPlatformSpecificSpec.scala b/zio-json/jvm/src/test/scala-2/zio/json/DecoderPlatformSpecificSpec.scala index eab496ca8..d42ec8646 100644 --- a/zio-json/jvm/src/test/scala-2/zio/json/DecoderPlatformSpecificSpec.scala +++ b/zio-json/jvm/src/test/scala-2/zio/json/DecoderPlatformSpecificSpec.scala @@ -115,122 +115,6 @@ object DecoderPlatformSpecificSpec extends ZIOSpecDefault { testAst("ugh10k") ), suite("ZIO Streams integration")( - test("decodes a stream of chars") { - for { - int <- JsonDecoder[Int].decodeJsonStream(ZStream('1', '2', '3')) - } yield { - assert(int)(equalTo(123)) - } - }, - test("decodes an encoded stream of bytes") { - for { - int <- JsonDecoder[Int].decodeJsonStreamInput(ZStream.fromIterable("123".getBytes(StandardCharsets.UTF_8))) - } yield assert(int)(equalTo(123)) - }, - suite("decodeJsonPipeline")( - suite("Newline delimited")( - test("decodes single elements") { - ZStream - .fromIterable("1001".toSeq) - .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) - .runCollect - .map { xs => - assert(xs)(equalTo(Chunk(1001))) - } - }, - test("decodes multiple elements") { - ZStream - .fromIterable("1001\n1002".toSeq) - .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) - .runCollect - .map { xs => - assert(xs)(equalTo(Chunk(1001, 1002))) - } - }, - test("decodes multiple elements when fed in smaller chunks") { - ZStream - .fromIterable("1001\n1002".toSeq) - .rechunk(1) - .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) - .runCollect - .map { xs => - assert(xs)(equalTo(Chunk(1001, 1002))) - } - }, - test("accepts trailing NL") { - ZStream - .fromIterable("1001\n1002\n".toSeq) - .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) - .runCollect - .map { xs => - assert(xs)(equalTo(Chunk(1001, 1002))) - } - }, - test("errors") { - ZStream - .fromIterable("1\nfalse\n3".toSeq) - .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) - .runDrain - .exit - .map { exit => - assert(exit)(fails(anything)) - } - }, - test("is interruptible") { - (ZStream.fromIterable("1\n2\n3\n4") ++ ZStream.fromZIO(ZIO.interrupt)) - .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) - .runDrain - .exit - .map { exit => - assert(exit)(isInterrupted) - } - } @@ timeout(2.seconds) - ), - suite("Array delimited")( - test("decodes single elements") { - ZStream - .fromIterable("[1001]".toSeq) - .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Array)) - .runCollect - .map { xs => - assert(xs)(equalTo(Chunk(1001))) - } - }, - test("empty array") { - ZStream - .fromIterable("[]".toSeq) - .via(JsonDecoder[String].decodeJsonPipeline(JsonStreamDelimiter.Array)) - .runCollect - .map { xs => - assert(xs)(isEmpty) - } - }, - test("decodes multiple elements") { - ZStream - .fromIterable("[ 1001, 1002, 1003 ]".toSeq) - .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Array)) - .runCollect - .map { xs => - assert(xs)(equalTo(Chunk(1001, 1002, 1003))) - } - }, - test("handles whitespace leniently") { - val in = - """[ - 1001, 1002, - 1003 - ]""" - - ZStream - .fromIterable(in.toSeq) - .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Array)) - .runCollect - .map { xs => - assert(xs)(equalTo(Chunk(1001, 1002, 1003))) - } - } - ) - ), suite("helpers in zio.json")( test("readJsonLines reads from files") { import logEvent._ diff --git a/zio-json/jvm/src/test/scala-2/zio/json/EncoderPlatformSpecificSpec.scala b/zio-json/jvm/src/test/scala-2/zio/json/EncoderPlatformSpecificSpec.scala index 7479e378b..5d2ca227f 100644 --- a/zio-json/jvm/src/test/scala-2/zio/json/EncoderPlatformSpecificSpec.scala +++ b/zio-json/jvm/src/test/scala-2/zio/json/EncoderPlatformSpecificSpec.scala @@ -24,67 +24,6 @@ object EncoderPlatformSpecificSpec extends ZIOSpecDefault { testRoundTrip[List[Tweet]]("twitter_api_response"), testRoundTrip[GeoJSON]("che.geo") ), - suite("ZIO Streams integration")( - test("encodes into a ZStream of Char") { - val intEncoder = JsonEncoder[Int] - val value = 1234 - - for { - chars <- intEncoder.encodeJsonStream(value).runCollect - } yield { - assert(chars.mkString)(equalTo("1234")) - } - }, - test("encodes values that yield a result of length > DefaultChunkSize") { - val longString = List.fill(ZStream.DefaultChunkSize * 2)('x').mkString - - for { - chars <- JsonEncoder[String].encodeJsonStream(longString).runCollect - } yield { - assert(chars)(hasSize(equalTo(ZStream.DefaultChunkSize * 2 + 2))) && - assert(chars.mkString(""))(equalTo("\"" ++ longString ++ "\"")) - } - }, - test("encodeJsonLinesPipeline") { - val ints = ZStream(1, 2, 3, 4) - - for { - xs <- ints.via(JsonEncoder[Int].encodeJsonLinesPipeline).runCollect - } yield { - assert(xs.mkString)(equalTo("1\n2\n3\n4\n")) - } - }, - test("encodeJsonLinesPipeline handles elements which take up > DefaultChunkSize to encode") { - val longString = List.fill(5000)('x').mkString - - val ints = ZStream(longString, longString) - val encoder = JsonEncoder[String] - - for { - xs <- ints.via(encoder.encodeJsonLinesPipeline).runCollect - } yield { - // leading `"`, trailing `"` and `\n` = 3 - assert(xs.size)(equalTo((5000 + 3) * 2)) - } - }, - test("encodeJsonArrayPipeline XYZ") { - val ints = ZStream(1, 2, 3).map(n => Json.Obj(Chunk("id" -> Json.Num(BigDecimal(n).bigDecimal)))) - - for { - xs <- ints.via(JsonEncoder[Json].encodeJsonArrayPipeline).runCollect - } yield { - assert(xs.mkString)(equalTo("""[{"id":1},{"id":2},{"id":3}]""")) - } - }, - test("encodeJsonArrayPipeline, empty stream") { - val emptyArray = ZStream - .from(List()) - .via(JsonEncoder[String].encodeJsonArrayPipeline) - .run(ZSink.mkString) - - assertZIO(emptyArray)(equalTo("[]")) - } - ), suite("helpers in zio.json")( test("writeJsonLines writes JSON lines") { val path = Files.createTempFile("log", "json") diff --git a/zio-json/native/src/main/scala/zio/JsonPackagePlatformSpecific.scala b/zio-json/native/src/main/scala/zio/JsonPackagePlatformSpecific.scala index 846000c41..403b02e6d 100644 --- a/zio-json/native/src/main/scala/zio/JsonPackagePlatformSpecific.scala +++ b/zio-json/native/src/main/scala/zio/JsonPackagePlatformSpecific.scala @@ -1,3 +1,88 @@ package zio -trait JsonPackagePlatformSpecific {} +import zio.json.{ JsonDecoder, JsonEncoder, JsonStreamDelimiter, ast } +import zio.stream._ + +import java.io.{ File, IOException } +import java.net.URL +import java.nio.charset.StandardCharsets +import java.nio.file.{ Path, Paths } + +trait JsonPackagePlatformSpecific { + def readJsonAs(file: File): ZStream[Any, Throwable, ast.Json] = + readJsonLinesAs[ast.Json](file) + + def readJsonAs(path: Path): ZStream[Any, Throwable, ast.Json] = + readJsonLinesAs[ast.Json](path) + + def readJsonAs(path: String): ZStream[Any, Throwable, ast.Json] = + readJsonLinesAs[ast.Json](path) + + def readJsonAs(url: URL): ZStream[Any, Throwable, ast.Json] = + readJsonLinesAs[ast.Json](url) + + def readJsonLinesAs[A: JsonDecoder](file: File): ZStream[Any, Throwable, A] = + readJsonLinesAs(file.toPath) + + def readJsonLinesAs[A: JsonDecoder](path: Path): ZStream[Any, Throwable, A] = + ZStream + .fromPath(path) + .via( + ZPipeline.utf8Decode >>> + stringToChars >>> + JsonDecoder[A].decodeJsonPipeline(JsonStreamDelimiter.Newline) + ) + + def readJsonLinesAs[A: JsonDecoder](path: String): ZStream[Any, Throwable, A] = + readJsonLinesAs(Paths.get(path)) + + def readJsonLinesAs[A: JsonDecoder](url: URL): ZStream[Any, Throwable, A] = { + val scoped = ZIO + .fromAutoCloseable(ZIO.attempt(url.openStream())) + .refineToOrDie[IOException] + + ZStream + .fromInputStreamScoped(scoped) + .via( + ZPipeline.utf8Decode >>> + stringToChars >>> + JsonDecoder[A].decodeJsonPipeline(JsonStreamDelimiter.Newline) + ) + } + + def writeJsonLines[R](file: File, stream: ZStream[R, Throwable, ast.Json]): RIO[R, Unit] = + writeJsonLinesAs(file, stream) + + def writeJsonLines[R](path: Path, stream: ZStream[R, Throwable, ast.Json]): RIO[R, Unit] = + writeJsonLinesAs(path, stream) + + def writeJsonLines[R](path: String, stream: ZStream[R, Throwable, ast.Json]): RIO[R, Unit] = + writeJsonLinesAs(path, stream) + + def writeJsonLinesAs[R, A: JsonEncoder](file: File, stream: ZStream[R, Throwable, A]): RIO[R, Unit] = + writeJsonLinesAs(file.toPath, stream) + + def writeJsonLinesAs[R, A: JsonEncoder](path: Path, stream: ZStream[R, Throwable, A]): RIO[R, Unit] = + stream + .via( + JsonEncoder[A].encodeJsonLinesPipeline >>> + charsToUtf8 + ) + .run(ZSink.fromPath(path)) + .unit + + def writeJsonLinesAs[R, A: JsonEncoder](path: String, stream: ZStream[R, Throwable, A]): RIO[R, Unit] = + writeJsonLinesAs(Paths.get(path), stream) + + private def stringToChars: ZPipeline[Any, Nothing, String, Char] = + ZPipeline.mapChunks[String, Char](_.flatMap(_.toCharArray)) + + private def charsToUtf8: ZPipeline[Any, Nothing, Char, Byte] = + ZPipeline.mapChunksZIO[Any, Nothing, Char, Byte] { chunk => + ZIO.succeed { + Chunk.fromArray { + new String(chunk.toArray).getBytes(StandardCharsets.UTF_8) + } + } + } +} diff --git a/zio-json/native/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala b/zio-json/native/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala deleted file mode 100644 index d0fc4eb8e..000000000 --- a/zio-json/native/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala +++ /dev/null @@ -1,3 +0,0 @@ -package zio.json - -trait JsonDecoderPlatformSpecific[A] { self: JsonDecoder[A] => } diff --git a/zio-json/native/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala b/zio-json/native/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala deleted file mode 100644 index 57ca8ecb7..000000000 --- a/zio-json/native/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala +++ /dev/null @@ -1,3 +0,0 @@ -package zio.json - -trait JsonEncoderPlatformSpecific[A] { self: JsonEncoder[A] => } diff --git a/zio-json/shared/src/main/scala/zio/json/JsonDecoder.scala b/zio-json/shared/src/main/scala/zio/json/JsonDecoder.scala index ddf45c725..17d370abd 100644 --- a/zio-json/shared/src/main/scala/zio/json/JsonDecoder.scala +++ b/zio-json/shared/src/main/scala/zio/json/JsonDecoder.scala @@ -31,7 +31,7 @@ import scala.util.control.NoStackTrace * A `JsonDecoder[A]` instance has the ability to decode JSON to values of type `A`, potentially * failing with an error if the JSON content does not encode a value of the given type. */ -trait JsonDecoder[A] extends JsonDecoderPlatformSpecific[A] { +trait JsonDecoder[A] extends JsonStreamDecoder[A] { self => /** diff --git a/zio-json/shared/src/main/scala/zio/json/JsonEncoder.scala b/zio-json/shared/src/main/scala/zio/json/JsonEncoder.scala index 6b1dc40e6..eddc38b8a 100644 --- a/zio-json/shared/src/main/scala/zio/json/JsonEncoder.scala +++ b/zio-json/shared/src/main/scala/zio/json/JsonEncoder.scala @@ -25,7 +25,7 @@ import scala.annotation._ import scala.collection.{ immutable, mutable } import scala.reflect.ClassTag -trait JsonEncoder[A] extends JsonEncoderPlatformSpecific[A] { +trait JsonEncoder[A] extends JsonStreamEncoder[A] { self => /** diff --git a/zio-json/jvm/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala b/zio-json/shared/src/main/scala/zio/json/JsonStreamDecoder.scala similarity index 99% rename from zio-json/jvm/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala rename to zio-json/shared/src/main/scala/zio/json/JsonStreamDecoder.scala index 11da0dbfd..3f6ff4471 100644 --- a/zio-json/jvm/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala +++ b/zio-json/shared/src/main/scala/zio/json/JsonStreamDecoder.scala @@ -8,7 +8,7 @@ import zio.stream.{ Take, ZPipeline, ZStream } import java.nio.charset.{ Charset, StandardCharsets } import scala.annotation.tailrec -trait JsonDecoderPlatformSpecific[A] { self: JsonDecoder[A] => +trait JsonStreamDecoder[A] { self: JsonDecoder[A] => private def readAll(reader: java.io.Reader): ZIO[Any, Throwable, A] = ZIO.attemptBlocking { diff --git a/zio-json/jvm/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala b/zio-json/shared/src/main/scala/zio/json/JsonStreamEncoder.scala similarity index 97% rename from zio-json/jvm/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala rename to zio-json/shared/src/main/scala/zio/json/JsonStreamEncoder.scala index 3502c7270..fe35fbac2 100644 --- a/zio-json/jvm/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala +++ b/zio-json/shared/src/main/scala/zio/json/JsonStreamEncoder.scala @@ -4,7 +4,7 @@ import zio.json.internal.WriteWriter import zio.stream._ import zio.{ Chunk, Ref, Unsafe, ZIO } -trait JsonEncoderPlatformSpecific[A] { self: JsonEncoder[A] => +trait JsonStreamEncoder[A] { self: JsonEncoder[A] => /** * Encodes the specified value into a character stream. diff --git a/zio-json/shared/src/test/scala/zio/json/StreamDecoderSpec.scala b/zio-json/shared/src/test/scala/zio/json/StreamDecoderSpec.scala new file mode 100644 index 000000000..53fb033ac --- /dev/null +++ b/zio-json/shared/src/test/scala/zio/json/StreamDecoderSpec.scala @@ -0,0 +1,136 @@ +package testzio.json + +import zio._ +import zio.json._ +import zio.json.ast._ +import zio.stream.ZStream +import zio.test.Assertion._ +import zio.test.TestAspect._ +import zio.test._ + +import java.nio.charset.StandardCharsets + +object StreamDecoderSpec extends ZIOSpecDefault { + + val spec = + suite("Decoder")( + suite("ZIO Streams integration")( + test("decodes a stream of chars") { + for { + int <- JsonDecoder[Int].decodeJsonStream(ZStream('1', '2', '3')) + } yield { + assert(int)(equalTo(123)) + } + }, + test("decodes an encoded stream of bytes") { + for { + int <- JsonDecoder[Int].decodeJsonStreamInput(ZStream.fromIterable("123".getBytes(StandardCharsets.UTF_8))) + } yield assert(int)(equalTo(123)) + }, + suite("decodeJsonPipeline")( + suite("Newline delimited")( + test("decodes single elements") { + ZStream + .fromIterable("1001".toSeq) + .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) + .runCollect + .map { xs => + assert(xs)(equalTo(Chunk(1001))) + } + }, + test("decodes multiple elements") { + ZStream + .fromIterable("1001\n1002".toSeq) + .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) + .runCollect + .map { xs => + assert(xs)(equalTo(Chunk(1001, 1002))) + } + }, + test("decodes multiple elements when fed in smaller chunks") { + ZStream + .fromIterable("1001\n1002".toSeq) + .rechunk(1) + .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) + .runCollect + .map { xs => + assert(xs)(equalTo(Chunk(1001, 1002))) + } + }, + test("accepts trailing NL") { + ZStream + .fromIterable("1001\n1002\n".toSeq) + .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) + .runCollect + .map { xs => + assert(xs)(equalTo(Chunk(1001, 1002))) + } + }, + test("errors") { + ZStream + .fromIterable("1\nfalse\n3".toSeq) + .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) + .runDrain + .exit + .map { exit => + assert(exit)(fails(anything)) + } + }, + test("is interruptible") { + (ZStream.fromIterable("1\n2\n3\n4") ++ ZStream.fromZIO(ZIO.interrupt)) + .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline)) + .runDrain + .exit + .map { exit => + assert(exit)(isInterrupted) + } + } @@ timeout(2.seconds) + ), + suite("Array delimited")( + test("decodes single elements") { + ZStream + .fromIterable("[1001]".toSeq) + .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Array)) + .runCollect + .map { xs => + assert(xs)(equalTo(Chunk(1001))) + } + }, + test("empty array") { + ZStream + .fromIterable("[]".toSeq) + .via(JsonDecoder[String].decodeJsonPipeline(JsonStreamDelimiter.Array)) + .runCollect + .map { xs => + assert(xs)(isEmpty) + } + }, + test("decodes multiple elements") { + ZStream + .fromIterable("[ 1001, 1002, 1003 ]".toSeq) + .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Array)) + .runCollect + .map { xs => + assert(xs)(equalTo(Chunk(1001, 1002, 1003))) + } + }, + test("handles whitespace leniently") { + val in = + """[ + 1001, 1002, + 1003 + ]""" + + ZStream + .fromIterable(in.toSeq) + .via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Array)) + .runCollect + .map { xs => + assert(xs)(equalTo(Chunk(1001, 1002, 1003))) + } + } + ) + ) + ) + ) +} diff --git a/zio-json/shared/src/test/scala/zio/json/StreamEncoderSpec.scala b/zio-json/shared/src/test/scala/zio/json/StreamEncoderSpec.scala new file mode 100644 index 000000000..3574c5964 --- /dev/null +++ b/zio-json/shared/src/test/scala/zio/json/StreamEncoderSpec.scala @@ -0,0 +1,76 @@ +package zio.json + +import zio.Chunk +import zio.json.ast.Json +import zio.stream.{ ZSink, ZStream } +import zio.test.Assertion._ +import zio.test.{ ZIOSpecDefault, assert, _ } + +object StreamEncoderSpec extends ZIOSpecDefault { + + val spec = + suite("Encoder")( + suite("ZIO Streams integration")( + test("encodes into a ZStream of Char") { + val intEncoder = JsonEncoder[Int] + val value = 1234 + + for { + chars <- intEncoder.encodeJsonStream(value).runCollect + } yield { + assert(chars.mkString)(equalTo("1234")) + } + }, + test("encodes values that yield a result of length > DefaultChunkSize") { + val longString = List.fill(ZStream.DefaultChunkSize * 2)('x').mkString + + for { + chars <- JsonEncoder[String].encodeJsonStream(longString).runCollect + } yield { + assert(chars)(hasSize(equalTo(ZStream.DefaultChunkSize * 2 + 2))) && + assert(chars.mkString(""))(equalTo("\"" ++ longString ++ "\"")) + } + }, + test("encodeJsonLinesPipeline") { + val ints = ZStream(1, 2, 3, 4) + + for { + xs <- ints.via(JsonEncoder[Int].encodeJsonLinesPipeline).runCollect + } yield { + assert(xs.mkString)(equalTo("1\n2\n3\n4\n")) + } + }, + test("encodeJsonLinesPipeline handles elements which take up > DefaultChunkSize to encode") { + val longString = List.fill(5000)('x').mkString + + val ints = ZStream(longString, longString) + val encoder = JsonEncoder[String] + + for { + xs <- ints.via(encoder.encodeJsonLinesPipeline).runCollect + } yield { + // leading `"`, trailing `"` and `\n` = 3 + assert(xs.size)(equalTo((5000 + 3) * 2)) + } + }, + test("encodeJsonArrayPipeline XYZ") { + val ints = ZStream(1, 2, 3).map(n => Json.Obj(Chunk("id" -> Json.Num(BigDecimal(n).bigDecimal)))) + + for { + xs <- ints.via(JsonEncoder[Json].encodeJsonArrayPipeline).runCollect + } yield { + assert(xs.mkString)(equalTo("""[{"id":1},{"id":2},{"id":3}]""")) + } + }, + test("encodeJsonArrayPipeline, empty stream") { + val emptyArray = ZStream + .from(List()) + .via(JsonEncoder[String].encodeJsonArrayPipeline) + .run(ZSink.mkString) + + assertZIO(emptyArray)(equalTo("[]")) + } + ) + ) + +}