Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MessagePack item serializer and validator #628

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8395a35
Add msgpack item serializer and validator
jarmuszz Aug 11, 2024
f051c31
Merge branch 'gnieh:main' into main
jarmuszz Aug 11, 2024
52b48d7
Fix one additional argument being passed
jarmuszz Aug 13, 2024
7b4246e
Add tests to cover all fmts for msgpack serializer
jarmuszz Aug 13, 2024
9bc1452
Remove debug code in serializer test
jarmuszz Aug 13, 2024
eed74a4
Add validation test cases
jarmuszz Aug 15, 2024
54d7d55
Make msgpack item serializer omit leading zeros
jarmuszz Aug 17, 2024
1ed1f73
Make Extension tests use `ByteVector.fill`
jarmuszz Aug 18, 2024
ac14528
Refine msgpack fixpoint test
jarmuszz Aug 18, 2024
671e693
Remove redundant `padLeft`s when size is known
jarmuszz Aug 18, 2024
b732583
Reformat ValidationSpec.scala
jarmuszz Aug 18, 2024
3f30e33
Remove scaladoc from an embedded function
jarmuszz Aug 18, 2024
aa8658a
Add benchmars for msgpack item serializer
jarmuszz Aug 24, 2024
cd9782e
Merge msgpack serializers
jarmuszz Sep 5, 2024
cdc4894
Make `SerializerSpec` no longer extend `Checkers`
jarmuszz Sep 7, 2024
309569e
Make `msgpack.low` API similar to `cbor.low` API
jarmuszz Sep 7, 2024
3d717a3
Update msgpack serializer spec documentation
jarmuszz Sep 7, 2024
deede3f
Change `msgpack.low.toBinary` scaladoc
jarmuszz Sep 10, 2024
698f727
Fix msgpack doc generation
jarmuszz Sep 10, 2024
248fbc6
Add doc for `msgpack.low` public methods
jarmuszz Sep 10, 2024
4760221
Run prePR
jarmuszz Sep 10, 2024
041e135
Extract literals into constants
jarmuszz Sep 14, 2024
2be8831
Fix msgpack serialization test of negative fixint
jarmuszz Sep 14, 2024
fd845e8
Make msgpack Array and Map use Long for sizes
jarmuszz Sep 21, 2024
482bf9e
Make msgpack exceptions public
jarmuszz Sep 22, 2024
8d67768
Move Pull.pure(None) into a constant
jarmuszz Sep 22, 2024
05c4c1c
Use bit shifts instead of `Math.pow(2, n)`
jarmuszz Sep 22, 2024
fce1083
Use `.redeem` in msgpack validation spec
jarmuszz Sep 23, 2024
989ec8a
Use binary data in msgpack serializer benchmark
jarmuszz Oct 27, 2024
59d5e3f
Use binary data in msgpack parser benchmark
jarmuszz Oct 30, 2024
9e787e5
Merge branch 'gnieh:main' into main
jarmuszz Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
1 change: 0 additions & 1 deletion benchmarks/src/main/resources/twitter_msgpack.txt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.openjdk.jmh.annotations._

import cats.effect.SyncIO

import scodec.bits._
import fs2._

@OutputTimeUnit(TimeUnit.MICROSECONDS)
Expand All @@ -32,23 +31,19 @@ import fs2._
@Warmup(iterations = 3, time = 2)
@Measurement(iterations = 10, time = 2)
class MsgPackItemParserBenchmarks {

// The file contains hex representation of the values so we have to convert it
val msgpackBytes: ByteVector = ByteVector
.fromHex(
fs2.io
.readClassLoaderResource[SyncIO]("twitter_msgpack.txt", 4096)
.through(fs2.text.utf8.decode)
.compile
.string
.unsafeRunSync()
)
.get
val msgpackBytes: Stream[SyncIO, Byte] =
fs2.io
.readClassLoaderResource[SyncIO]("twitter_msgpack.mp", 4096)
.chunks
.compile
.toList
.unsafeRunSync()
.map(Stream.chunk)
.fold(Stream.empty)(_ ++ _)

@Benchmark
def parseMsgpackItems() =
Stream
.chunk(Chunk.byteVector(msgpackBytes))
msgpackBytes
.through(fs2.data.msgpack.low.items[SyncIO])
.compile
.drain
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2024 fs2-data Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2
package data.benchmarks

import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._

import cats.effect.SyncIO

@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Array(Mode.AverageTime))
@State(org.openjdk.jmh.annotations.Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3, time = 2)
@Measurement(iterations = 10, time = 2)
class MsgPackItemSerializerBenchmarks {
val msgpackItems: Stream[SyncIO, fs2.data.msgpack.low.MsgpackItem] =
fs2.io
.readClassLoaderResource[SyncIO]("twitter_msgpack.mp", 4096)
.through(fs2.data.msgpack.low.items[SyncIO])
.chunks
.compile
.toList
.unsafeRunSync()
.map(Stream.chunk)
.fold(Stream.empty)(_ ++ _)

@Benchmark
def serialize() =
msgpackItems
.through(fs2.data.msgpack.low.toNonValidatedBinary[SyncIO])
.compile
.drain
.unsafeRunSync()

@Benchmark
def withValidation() =
msgpackItems
.through(fs2.data.msgpack.low.toBinary[SyncIO])
.compile
.drain
.unsafeRunSync()
}
32 changes: 32 additions & 0 deletions msgpack/src/main/scala/fs2/data/msgpack/exceptions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 fs2-data Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2
package data
package msgpack

abstract class MsgpackException(msg: String, cause: Throwable = null) extends Exception(msg, cause)

case class MsgpackMalformedItemException(msg: String, position: Option[Long] = None, inner: Throwable = null)
extends MsgpackException(position.fold(msg)(pos => s"at position $pos"), inner)

case class MsgpackUnexpectedEndOfStreamException(position: Option[Long] = None, inner: Throwable = null)
extends MsgpackException(
position.fold("Unexpected end of stream")(pos => s"Unexpected end of stream starting at position $pos"),
inner)

case class MsgpackMalformedByteStreamException(msg: String, inner: Throwable = null)
extends MsgpackException(msg, inner)
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ private[internal] object FormatParsers {
def parseArray[F[_]](length: Int, ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireBytes(length, ctx).map { res =>
res.accumulate(v => MsgpackItem.Array(v.toInt(false, ByteOrdering.BigEndian)))
res.accumulate(v => MsgpackItem.Array(v.toLong(false)))
}
}

def parseMap[F[_]](length: Int, ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireBytes(length, ctx).map { res =>
res.accumulate(v => MsgpackItem.Map(v.toInt(false, ByteOrdering.BigEndian)))
res.accumulate(v => MsgpackItem.Map(v.toLong(false)))
}
}

Expand All @@ -63,7 +63,7 @@ private[internal] object FormatParsers {
res <- requireBytes(8, res.toContext)
seconds = res.result.toLong(false)
} yield res.toContext.prepend(MsgpackItem.Timestamp96(nanosec, seconds))
case _ => Pull.raiseError(new MsgpackParsingException(s"Invalid timestamp length: ${length}"))
case _ => Pull.raiseError(MsgpackMalformedByteStreamException(s"Invalid timestamp length: ${length}"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package internal
import scodec.bits.ByteVector

private[internal] object Helpers {
case class MsgpackParsingException(str: String) extends Exception

/** @param chunk Current chunk
* @param idx Index of the current [[Byte]] in `chunk`
Expand Down Expand Up @@ -67,7 +66,7 @@ private[internal] object Helpers {
// Inbounds chunk access is guaranteed by `ensureChunk`
Pull.pure(ctx.next.toResult(ctx.chunk(ctx.idx)))
} {
Pull.raiseError(new MsgpackParsingException("Unexpected end of input"))
Pull.raiseError(MsgpackUnexpectedEndOfStreamException())
}
}

Expand All @@ -93,7 +92,7 @@ private[internal] object Helpers {
go(count - available, ParserContext(chunk, slice.size, rest, acc), newBytes)
}
} {
Pull.raiseError(new MsgpackParsingException("Unexpected end of input"))
Pull.raiseError(MsgpackUnexpectedEndOfStreamException())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[low] object ItemParser {

((byte & 0xff): @switch) match {
case Headers.Nil => Pull.pure(ctx.prepend(MsgpackItem.Nil))
case Headers.NeverUsed => Pull.raiseError(new MsgpackParsingException("Reserved value 0xc1 used"))
case Headers.NeverUsed => Pull.raiseError(MsgpackMalformedByteStreamException("Reserved value 0xc1 used"))
case Headers.False => Pull.pure(ctx.prepend(MsgpackItem.False))
case Headers.True => Pull.pure(ctx.prepend(MsgpackItem.True))
case Headers.Bin8 => parseBin(1, ctx)
Expand Down Expand Up @@ -77,13 +77,13 @@ private[low] object ItemParser {
// fixmap
else if ((byte & 0xf0) == 0x80) {
val length = byte & 0x0f // 0x8f- 0x80
Pull.pure(ctx.prepend(MsgpackItem.Map(length)))
Pull.pure(ctx.prepend(MsgpackItem.Map(length.toLong)))
}

// fixarray
else if ((byte & 0xf0) == 0x90) {
val length = byte & 0x0f // 0x9f- 0x90
Pull.pure(ctx.prepend(MsgpackItem.Array(length)))
Pull.pure(ctx.prepend(MsgpackItem.Array(length.toLong)))
}

// fixstr
Expand All @@ -98,7 +98,7 @@ private[low] object ItemParser {
else if ((byte & 0xe0) == 0xe0) {
Pull.pure(ctx.prepend(MsgpackItem.SignedInt(ByteVector(byte))))
} else {
Pull.raiseError(new MsgpackParsingException(s"Invalid type ${byte}"))
Pull.raiseError(MsgpackMalformedByteStreamException(s"Invalid type ${byte}"))
}
}
}
Expand Down
Loading
Loading