Skip to content

Commit

Permalink
Scala 3 support
Browse files Browse the repository at this point in the history
  • Loading branch information
barryoneill committed May 15, 2024
1 parent f2f9800 commit f9d38f7
Show file tree
Hide file tree
Showing 17 changed files with 209 additions and 123 deletions.
Binary file added .DS_Store
Binary file not shown.
58 changes: 34 additions & 24 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,37 +1,47 @@
version = 3.4.0

runner.dialect = scala213
version=3.8.1

maxColumn = 140
style = default
align.preset = more
maxColumn = 140

rewrite.sortModifiers.order = [
"override"
"private"
"protected"
"implicit"
"final"
"sealed"
"abstract"
"lazy"
]
runner.dialect = scala213source3
fileOverride {
"glob:**/scala-3/**" { runner.dialect = scala3 }
}

danglingParentheses.preset = true
docstrings.wrap = "no"

rewrite.rules = [
SortModifiers
AvoidInfix
RedundantBraces
RedundantParens
AsciiSortImports
PreferCurlyFors
SortModifiers
Imports
]

spaces.inByNameTypes = false

rewrite.redundantBraces.stringInterpolation = true
rewrite.redundantBraces.includeUnitMethods = true
rewrite.neverInfix.excludeFilters = [until
to
by
eq
ne
"should.*"
"contain.*"
"must.*"
in
be
taggedAs
thrownBy
synchronized
have
when
size
theSameElementsAs
at
can
should]

newlines.afterCurlyLambda = preserve
newlines.neverInResultType = true
newlines.beforeMultilineDef = keep
newlines.sometimesBeforeColonInMethodReturnType = false
newlines.penalizeSingleSelectMultiArgList = false
rewrite.imports.sort = original
newlines.implicitParamListModifierPrefer = before
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.laserdisc.mysql.binlog.models

import cats.effect.IO
import com.dimafeng.testcontainers.ForAllTestContainer
import com.dimafeng.testcontainers.{Container, ForAllTestContainer}
import db.MySqlContainerTest
import doobie.util.transactor.Transactor
import doobie.util.transactor.Transactor.Aux
Expand All @@ -16,13 +16,13 @@ class SchemaMetadataTest extends AnyWordSpec with ForAllTestContainer with MySql
"restore schema state from DB" in {

val props = new java.util.Properties()
props.put("user", mySqlContainer.getUsername)
props.put("password", mySqlContainer.getPassword)
props.put("user", container.username)
props.put("password", container.username)

implicit val testTransactor: Aux[IO, Unit] =
Transactor.fromDriverManager[IO](
mySqlContainer.getDriverClassName,
s"${mySqlContainer.getJdbcUrl}?useSSL=false",
mySQLContainer.driverClassName,
s"${mySQLContainer.jdbcUrl}?useSSL=false",
props,
None
)
Expand Down
Binary file added binlog-stream/src/.DS_Store
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@ object TransactionState {

def mapRawToMeta(table: TableMetadata): ((ColumnMetadata, Option[Serializable])) => (String, Json) = {
case (column, Some(value)) =>
/** At some point, Scala will drop ClassTag, so we should be prepared to either drop cross compilation for 2.13,
* else add an alternative implementation that uses TypeTest. At the current time, I don't want to introduce
* any potential bugs into _this_ at the same time that I'm introducing Scala 3..
*/
@annotation.nowarn(
"msg=Use of `scala.reflect.ClassTag` for type testing may be unsound. Consider using `scala.reflect.TypeTest` instead."
)
def unsafeCast[T](implicit m: ClassTag[T]): T =
try
value match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import cats.effect.unsafe.implicits.global
import com.dimafeng.testcontainers.ForAllTestContainer
import com.github.shyiko.mysql.binlog.BinaryLogClient
import db.MySqlContainerTest
import doobie.implicits._
import doobie.implicits.*
import io.circe.optics.JsonPath.root
import io.laserdisc.mysql.binlog.database
import io.laserdisc.mysql.binlog.config.BinLogConfigOps
import io.laserdisc.mysql.binlog.event.EventMessage
import io.laserdisc.mysql.binlog.models.SchemaMetadata
import org.scalatest.matchers.should.Matchers
Expand All @@ -28,8 +29,8 @@ class MultiSchemaTest extends AnyWordSpec with ForAllTestContainer with MySqlCon

def runScenarioForSchema(schema: String): List[EventMessage] = {

val client = containerBinlogConfig.mkBinaryLogClient()
val xaResource = database.transactor[IO](containerBinlogConfig)
val client = binlogConfig.mkBinaryLogClient()
val xaResource = database.transactor[IO](binlogConfig)

client.registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener {
override def onConnect(client: BinaryLogClient): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient
import com.github.shyiko.mysql.binlog.event.{EventHeaderV4, EventType}
import db.MySqlContainerTest
import doobie.hikari.HikariTransactor
import doobie.implicits._
import doobie.implicits.*
import io.laserdisc.mysql.binlog.config.BinLogConfigOps
import io.laserdisc.mysql.binlog.database
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
Expand All @@ -18,26 +19,19 @@ import scala.language.reflectiveCalls

class MysqlBinlogStreamTest extends AnyWordSpec with ForAllTestContainer with MySqlContainerTest with Matchers {

def fixture =
new {

val testTransactor: Resource[IO, HikariTransactor[IO]] =
database.transactor[IO](containerBinlogConfig)

val client: BinaryLogClient = containerBinlogConfig.mkBinaryLogClient()

}

implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]

"Binlog stream" should {

"emit events from mysql" in {

val (client, xaResource) = (fixture.client, fixture.testTransactor)
val testTransactor: Resource[IO, HikariTransactor[IO]] = database.transactor[IO](binlogConfig)

val client: BinaryLogClient = binlogConfig.mkBinaryLogClient()

client.registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener {
override def onConnect(client: BinaryLogClient): Unit =
xaResource
testTransactor
.use(xa =>
Sku
.insertMany(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,36 @@ package io.laserdisc.mysql.binlog.stream

import cats.effect.unsafe.implicits.global
import cats.effect.{IO, Resource}
import cats.implicits._
import cats.implicits.*
import com.dimafeng.testcontainers.ForAllTestContainer
import com.github.shyiko.mysql.binlog.BinaryLogClient
import db.MySqlContainerTest
import doobie.hikari.HikariTransactor
import doobie.implicits._
import doobie.implicits.*
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import io.laserdisc.mysql.binlog.database
import io.laserdisc.mysql.binlog.models.SchemaMetadata
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import io.laserdisc.mysql.binlog.config.BinLogConfigOps

import scala.language.reflectiveCalls
import scala.util.control

class PipesTest extends AnyWordSpec with Matchers with ForAllTestContainer with MySqlContainerTest {

def fixture =
new {

val cfg = containerBinlogConfig

val testTransactor: Resource[IO, HikariTransactor[IO]] = database.transactor[IO](cfg)

val client = cfg.mkBinaryLogClient()
}

"Binlog Events Stream" should {
"handle truncate table as separate transaction" in {
val (client, xaResource) = (fixture.client, fixture.testTransactor)

val testTransactor: Resource[IO, HikariTransactor[IO]] = database.transactor[IO](binlogConfig)

val client: BinaryLogClient = binlogConfig.mkBinaryLogClient()

client.registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener {
override def onConnect(client: BinaryLogClient): Unit =
control.Exception.allCatch.opt {
xaResource
testTransactor
.use { xa =>
for {
_ <- (Sku.truncate.run *> Sku.insert(5, "sku5").run *>
Expand All @@ -57,11 +51,11 @@ class PipesTest extends AnyWordSpec with Matchers with ForAllTestContainer with
}
})

val events = xaResource
val events = testTransactor
.use { implicit xa =>
implicit val logger: Logger[IO] = Slf4jLogger.getLoggerFromName[IO]("testing")
for {
implicit0(logger: Logger[IO]) <- Slf4jLogger.fromName[IO]("testing")
schemaMetadata <- SchemaMetadata.buildSchemaMetadata("test")
schemaMetadata <- SchemaMetadata.buildSchemaMetadata("test")
transactionState <- TransactionState
.createTransactionState[IO](schemaMetadata, client)
actions <- MysqlBinlogStream
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.laserdisc.mysql.binlog.stream

import doobie.ConnectionIO
import doobie.implicits._
import doobie.implicits.*
import doobie.util.update.Update

case class Sku(id: Int, sku: String)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.laserdisc.mysql.binlog.stream

import _root_.io.circe.optics.JsonPath._
import _root_.io.circe.optics.JsonPath.*
import _root_.io.circe.Json
import _root_.io.laserdisc.mysql.binlog._
import com.github.shyiko.mysql.binlog.event._
import _root_.io.laserdisc.mysql.binlog.*
import com.github.shyiko.mysql.binlog.event.*
import org.scalatest.OptionValues
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
Expand Down
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@

organization := "io.laserdisc"
name := "mysql-binlog-stream"

lazy val scala213 = "2.13.14"
lazy val scala3 = "3.4.1"
lazy val scala3 = "3.3.3"
lazy val supportedScalaVersions = List(scala213, scala3)

ThisBuild / crossScalaVersions := supportedScalaVersions
Expand All @@ -16,9 +15,10 @@ lazy val commonSettings = Seq(
Developer("barryoneill", "Barry O'Neill", "", url("https://github.com/barryoneill"))
),
licenses ++= Seq(("MIT", url("http://opensource.org/licenses/MIT"))),
homepage := Some(url("https://github.com/laserdisc-io/fs2-aws")),
Test / parallelExecution := false,
Test / fork := true,
homepage := Some(url("https://github.com/laserdisc-io/fs2-aws")),
Test / parallelExecution := false,
Test / fork := true,
Test / scalacOptions ++= BuildOptions.scalacTestOptions,
scalacOptions ++= BuildOptions.scalacOptions(scalaVersion.value),
libraryDependencies ++= BuildOptions.compilerPlugins(scalaVersion.value)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package main

import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import ciris._
import ciris.refined._
import eu.timepit.refined.auto._
import cats.implicits.*
import ciris.*
import ciris.refined.*
import eu.timepit.refined.auto.*
import eu.timepit.refined.types.string.TrimmedString
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import io.laserdisc.mysql.binlog.config.BinLogConfig
import io.laserdisc.mysql.binlog.models.SchemaMetadata
import io.laserdisc.mysql.binlog.stream.{MysqlBinlogStream, TransactionState, streamEvents}
import io.laserdisc.mysql.binlog.{client, database}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

object BinLogListener extends IOApp {

Expand Down Expand Up @@ -40,23 +40,24 @@ object BinLogListener extends IOApp {
)
}.load[IO]

conf.flatMap { config =>
database.transactor[IO](config).use { implicit xa =>
for {
implicit0(logger: Logger[IO]) <- Slf4jLogger.fromName[IO]("application")
// Here we do not provide binlog offset, client will be initialized with default file and offset
binlogClient <- client.createBinLogClient[IO](config)
schemaMetadata <- SchemaMetadata.buildSchemaMetadata(config.schema)
transactionState <- TransactionState
.createTransactionState[IO](schemaMetadata, binlogClient)
_ <- MysqlBinlogStream
.rawEvents[IO](binlogClient)
.through(streamEvents[IO](transactionState, config.schema))
.evalTap(msg => logger.info(s"received $msg"))
// Here you should do the checkpoint
.compile
.drain
} yield (ExitCode.Success)
Slf4jLogger.fromName[IO]("application").flatMap { implicit logger =>
conf.flatMap { config =>
database.transactor[IO](config).use { implicit xa =>
for {
// Here we do not provide binlog offset, client will be initialized with default file and offset
binlogClient <- client.createBinLogClient[IO](config)
schemaMetadata <- SchemaMetadata.buildSchemaMetadata(config.schema)
transactionState <- TransactionState
.createTransactionState[IO](schemaMetadata, binlogClient)
_ <- MysqlBinlogStream
.rawEvents[IO](binlogClient)
.through(streamEvents[IO](transactionState, config.schema))
.evalTap(msg => summon[Logger[IO]].info(s"received $msg"))
// Here you should do the checkpoint
.compile
.drain
} yield ExitCode.Success
}
}
}
}
Expand Down
Loading

0 comments on commit f9d38f7

Please sign in to comment.