Skip to content

Commit

Permalink
Merge pull request #388 from laserdisc-io/serverId
Browse files Browse the repository at this point in the history
Upgrades, add ServerId
  • Loading branch information
barryoneill authored Sep 14, 2023
2 parents 2b7ede9 + 8bb6622 commit 34f8d02
Show file tree
Hide file tree
Showing 13 changed files with 49 additions and 84 deletions.
25 changes: 0 additions & 25 deletions binlog-stream-models/test_resources/logback-test.xml

This file was deleted.

12 changes: 12 additions & 0 deletions binlog-stream/app/io/laserdisc/mysql/binlog/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.CompatibilityMode
import com.github.shyiko.mysql.binlog.network.SSLMode
import io.laserdisc.mysql.binlog.checkpoint.BinlogOffset
import org.slf4j.LoggerFactory

package object config {

val logger = LoggerFactory.getLogger("BinLogConfigOps")

implicit class BinLogConfigOps(val v: BinLogConfig) extends AnyVal {

def mkBinaryLogClient(offset: Option[BinlogOffset] = None): BinaryLogClient = {
Expand All @@ -25,6 +28,15 @@ package object config {

blc.setSSLMode(if (v.useSSL) SSLMode.VERIFY_IDENTITY else SSLMode.DISABLED)

// ServerID should be set, (see mysql-binlog-connector-java / BinaryLogClient.setServerId for documentation)
v.serverId match {
case Some(sid) => blc.setServerId(sid)
case None =>
logger.warn(
s"ServerID is not provided, so ${blc.getServerId} will be the default. This will cause issues if running multiple binlog services with this value!"
)
}

offset match {
case Some(o) =>
blc.setBinlogFilename(o.fileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object MysqlBinlogStream {
client: BinaryLogClient
): Stream[F, Event] =
for {
d <- Stream.resource(Dispatcher[F])
d <- Stream.resource(Dispatcher.parallel[F])
q <- Stream.eval(Queue.bounded[F, Option[Event]](10000))
proc = new MysSqlBinlogEventProcessor[F](client, q, d)
/* some difficulties here during the cats3 migration. Basically, we would have used:
Expand Down
26 changes: 0 additions & 26 deletions binlog-stream/test_resources/logback-test.xml

This file was deleted.

2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import sbt.Keys.scalaSource
organization := "io.laserdisc"
name := "mysql-binlog-stream"

ThisBuild / scalaVersion := "2.13.10"
ThisBuild / scalaVersion := "2.13.12"

lazy val commonSettings = Seq(
organization := "io.laserdisc",
Expand Down
8 changes: 5 additions & 3 deletions mysql-binlog-stream-examples/app/main/BinLogListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ object BinLogListener extends IOApp {
env("DB_PASSWORD"),
env("DB_URL").option,
env("DB_SCHEMA"),
env("USE_SSL").as[Boolean]
).parMapN { case (host, port, user, password, url, schema, useSSL) =>
env("USE_SSL").as[Boolean],
env("SERVER_ID").as[Int]
).parMapN { case (host, port, user, password, url, schema, useSSL, sid) =>
BinLogConfig(
host,
port,
Expand All @@ -34,7 +35,8 @@ object BinLogListener extends IOApp {
schema,
poolSize = 1,
useSSL = useSSL,
urlOverride = url
urlOverride = url,
serverId = Some(sid)
)
}.load[IO]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ case class BinLogConfig(
useSSL: Boolean = true,
driverClass: String = "com.mysql.cj.jdbc.Driver",
urlOverride: Option[String] = None,
poolSize: Int
poolSize: Int,
serverId: Option[Int] = None
) {
def connectionURL: String =
urlOverride.getOrElse(s"jdbc:mysql://$host:$port/$schema${if (useSSL) "?useSSL=true" else ""}")

override def toString: String =
s"BinLogConfig(host=$host,port=$port,user=$user,password=**redacted**,schema=$schema,useSSL=$useSSL,driverClass=$driverClass,urlOverride=$urlOverride,poolSize=$poolSize)"
s"BinLogConfig(host=$host,port=$port,user=$user,password=**redacted**,schema=$schema,useSSL=$useSSL,driverClass=$driverClass,urlOverride=$urlOverride,poolSize=$poolSize,serverId=$serverId)"
}
3 changes: 2 additions & 1 deletion mysql-binlog-stream-shared/test/db/MySqlContainer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ trait MySqlContainer {
mySqlContainer.getPassword,
mySqlContainer.getDatabaseName,
useSSL = false,
poolSize = 3
poolSize = 3,
serverId = Some(1234)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class BinLogConfigSpec extends AnyWordSpec with Matchers {
useSSL = true,
driverClass = "com.made.up.TestDriver",
urlOverride = None,
poolSize = 3
poolSize = 3,
serverId = Some(222)
)

"build correct connection URL" in {
Expand Down
2 changes: 1 addition & 1 deletion mysql-binlog-stream-shared/test_resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<appender-ref ref="STDOUT" />
</appender>

<root level="INFO">
<root level="WARN">
<appender-ref ref="ASYNCSTDOUT" />
</root>

Expand Down
41 changes: 20 additions & 21 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,35 @@ import sbt.Keys.{libraryDependencies, _}
import sbt._

object Dependencies {
val cirisVersion = "2.4.0"
val cirisVersion = "3.2.0"
val doobieVersion = "1.0.0-RC2"
val circeVersion = "0.14.1"
val catsEffectVersion = "3.3.14"
val circeVersion = "0.14.6"
val catsEffectVersion = "3.5.1"

val TestLib = Seq(
libraryDependencies ++= Seq(
"org.scalamock" %% "scalamock" % "5.2.0" % Test,
"org.scalatest" %% "scalatest" % "3.2.14" % Test,
"com.dimafeng" %% "testcontainers-scala" % "0.40.11" % Test,
"org.testcontainers" % "mysql" % "1.17.5" % Test,
"org.testcontainers" % "testcontainers" % "1.17.5" % Test
"org.scalamock" %% "scalamock" % "5.2.0" % Test,
"org.scalatest" %% "scalatest" % "3.2.17" % Test,
"com.dimafeng" %% "testcontainers-scala" % "0.41.0" % Test,
"org.testcontainers" % "mysql" % "1.19.0" % Test,
"org.testcontainers" % "testcontainers" % "1.19.0" % Test
)
)

val Config = Seq(
libraryDependencies ++= Seq(
"is.cir" %% "ciris-enumeratum" % cirisVersion,
"is.cir" %% "ciris-refined" % cirisVersion,
"eu.timepit" %% "refined" % "0.10.1"
"eu.timepit" %% "refined" % "0.11.0"
)
)

val Logging = Seq(
libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % "1.4.4",
"ch.qos.logback" % "logback-core" % "1.4.4",
"org.slf4j" % "jcl-over-slf4j" % "2.0.3",
"org.slf4j" % "jul-to-slf4j" % "2.0.3",
"ch.qos.logback" % "logback-classic" % "1.4.11",
"ch.qos.logback" % "logback-core" % "1.4.11",
"org.slf4j" % "jcl-over-slf4j" % "2.0.9",
"org.slf4j" % "jul-to-slf4j" % "2.0.9",
"org.typelevel" %% "log4cats-slf4j" % "2.5.0"
)
)
Expand All @@ -41,17 +41,16 @@ object Dependencies {
"org.tpolecat" %% "doobie-hikari" % doobieVersion, // HikariCP transactor.
"org.tpolecat" %% "doobie-refined" % doobieVersion, // Postgres driver 42.1.4 + type mappings.
"org.tpolecat" %% "doobie-scalatest" % doobieVersion % Test, // Support for doobie scalatest
"mysql" % "mysql-connector-java" % "8.0.30",
"com.zendesk" % "mysql-binlog-connector-java" % "0.27.3"
"mysql" % "mysql-connector-java" % "8.0.33",
"com.zendesk" % "mysql-binlog-connector-java" % "0.28.1"
)
)

val Circe = Seq(
libraryDependencies ++= Seq(
"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-generic" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
"io.circe" %% "circe-optics" % circeVersion % Test
"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
"io.circe" %% "circe-optics" % "0.14.1" % Test
)
)

Expand All @@ -64,8 +63,8 @@ object Dependencies {
val XML = Seq(
libraryDependencies ++= Seq(
"javax.xml.bind" % "jaxb-api" % "2.3.1",
"com.sun.xml.bind" % "jaxb-impl" % "4.0.1",
"com.sun.xml.bind" % "jaxb-core" % "4.0.1",
"com.sun.xml.bind" % "jaxb-impl" % "4.0.3",
"com.sun.xml.bind" % "jaxb-core" % "4.0.3",
"javax.activation" % "activation" % "1.1.1"
)
)
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.7.2
sbt.version=1.9.4
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.2")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.8.1")
Expand Down

0 comments on commit 34f8d02

Please sign in to comment.