diff --git a/README.md b/README.md index 803bad9d..77eb0ca3 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,22 @@ libraryDependencies += "io.laserdisc" %% "tamer" % version See [here](example/src/main/scala/tamer/example/Main.scala) for a sample application that makes use of Tamer. +## End to end testing + +### Database module + +Basic manual testing is available for the code in the example module `tamer.example.Main`. +Thi code covers getting data from a synthetic Postgres database. + +Make sure you have docker installed before proceeding. + +From the `local` folder launch `docker-compose up` (you can enter `docker-compose down` +if you want to start from scratch). After that you should be able to access the kafka +gui from http://localhost:8000. + +Start the `runDb.sh` program which contains some example environment variables. +If tamer works you should see messages appearing in the kafka gui. + ## License Tamer is licensed under the **[MIT License](LICENSE)** (the "License"); you may not use this software except in diff --git a/build.sbt b/build.sbt index 18238051..6997928a 100644 --- a/build.sbt +++ b/build.sbt @@ -19,7 +19,6 @@ lazy val V = new { val scalatest = "3.2.3" val silencer = "1.7.1" val zio = "1.0.3" - val `zio-s3` = "latest.integration" val `zio-interop` = "2.2.0.1" val `zio-kafka` = "0.13.0" } @@ -79,7 +78,6 @@ lazy val D = new { val zio = Seq( "dev.zio" %% "zio-interop-cats" % V.`zio-interop`, "dev.zio" %% "zio-kafka" % V.`zio-kafka`, - "dev.zio" %% "zio-s3" % V.`zio-s3`, "dev.zio" %% "zio-streams" % V.zio, "dev.zio" %% "zio-test" % V.zio ) diff --git a/core/src/main/scala/tamer/Setup.scala b/core/src/main/scala/tamer/Setup.scala index 7594cc94..0578dc93 100644 --- a/core/src/main/scala/tamer/Setup.scala +++ b/core/src/main/scala/tamer/Setup.scala @@ -9,4 +9,6 @@ abstract class Setup[-K, -V, S]( val stateSerde: ZSerde[Registry with Topic, S], val defaultState: S, val stateKey: Int -) +) { + def show: String = "not available, please implement the show method to display setup" +} diff --git a/core/src/main/scala/tamer/kafka/Kafka.scala b/core/src/main/scala/tamer/kafka/Kafka.scala index e50b9358..3aacd4de 100644 --- a/core/src/main/scala/tamer/kafka/Kafka.scala +++ b/core/src/main/scala/tamer/kafka/Kafka.scala @@ -52,9 +52,12 @@ object Kafka { val stateKeySerde = Serde[StateKey](isKey = true) val stateConsumer = Consumer.make(cSettings) val stateProducer = Producer.make(pSettings, stateKeySerde.serializer, setup.stateSerde) + val stateKey = StateKey(setup.stateKey.toHexString, cfg.state.groupId) val producer = Producer.make(pSettings, setup.keySerializer, setup.valueSerializer) val queue = Managed.make(Queue.bounded[(K, V)](cfg.bufferSize))(_.shutdown) + def printSetup(logWriter: LogWriter[Task]) = logWriter.info(s"initializing kafka loop with setup: \n${setup.show}") + def mkRegistry(src: SchemaRegistryClient, topic: String) = (ZLayer.succeed(src) >>> Registry.live) ++ (ZLayer.succeed(topic) >>> Topic.live) def mkRecordChunk(kvs: List[(K, V)]) = Chunk.fromIterable(kvs.map { case (k, v) => new ProducerRecord(cfg.sink.topic, k, v) }) @@ -67,7 +70,6 @@ object Kafka { log.info(s"pushed ${kvs.size} messages to ${cfg.sink.topic}") } } - def mkRecord(k: StateKey, v: State) = new ProducerRecord(cfg.state.topic, k, v) def waitAssignment(sc: Consumer.Service) = sc.assignment.withFilter(_.nonEmpty).retry(tenTimes) def subscribe(sc: Consumer.Service) = sc.subscribe(stateTopicSub) *> waitAssignment(sc).flatMap(sc.endOffsets(_)).map(_.values.exists(_ > 0L)) @@ -77,41 +79,40 @@ object Kafka { sp: Producer.Service[Registry with Topic, StateKey, State], layer: ULayer[Registry with Topic] ) = - ZStream.fromEffect(logTask <*> UIO(StateKey(setup.stateKey.toHexString, cfg.state.groupId))).flatMap { // TODO: no need for UIO, it's pure - case (log, stateKey) => - ZStream - .fromEffect(subscribe(sc)) - .flatMap { - case true => ZStream.fromEffect(log.info(s"consumer group ${cfg.state.groupId} resuming consumption from ${cfg.state.topic}")) - case false => - ZStream.fromEffect { - log.info(s"consumer group ${cfg.state.groupId} never consumed from ${cfg.state.topic}, setting offset to earliest") *> - sp.produceAsync(mkRecord(stateKey, setup.defaultState)) + ZStream.fromEffect(logTask).tap(printSetup).flatMap { log => + ZStream + .fromEffect(subscribe(sc)) + .flatMap { + case true => ZStream.fromEffect(log.info(s"consumer group ${cfg.state.groupId} resuming consumption from ${cfg.state.topic}")) + case false => + ZStream.fromEffect { + log.info(s"consumer group ${cfg.state.groupId} never consumed from ${cfg.state.topic}, setting offset to earliest") *> + sp.produceAsync(mkRecord(stateKey, setup.defaultState)) + .provideSomeLayer[Blocking](layer) + .flatten + .flatMap(rm => log.info(s"pushed initial state ${setup.defaultState} to $rm")) + } + } + .drain ++ + sc.plainStream(stateKeySerde.deserializer, setup.stateSerde) + .provideSomeLayer[Blocking with Clock](layer) + .mapM { + case CommittableRecord(record, offset) if record.key == stateKey => + log.debug( + s"consumer group ${cfg.state.groupId} consumed state ${record.value} from ${offset.topicPartition}@${offset.offset}" + ) *> + f(record.value, q).flatMap { newState => + sp.produceAsync(mkRecord(stateKey, newState)) .provideSomeLayer[Blocking](layer) .flatten - .flatMap(rm => log.info(s"pushed initial state ${setup.defaultState} to $rm")) - } + .flatMap(rmd => log.debug(s"pushed state $newState to $rmd")) + .as(offset) + } + case CommittableRecord(_, offset) => + log.debug( + s"consumer group ${cfg.state.groupId} ignored state (wrong key) from ${offset.topicPartition}@${offset.offset}" + ) *> UIO(offset) } - .drain ++ - sc.plainStream(stateKeySerde.deserializer, setup.stateSerde) - .provideSomeLayer[Blocking with Clock](layer) - .mapM { - case CommittableRecord(record, offset) if record.key == stateKey => - log.debug( - s"consumer group ${cfg.state.groupId} consumed state ${record.value} from ${offset.topicPartition}@${offset.offset}" - ) *> - f(record.value, q).flatMap { newState => - sp.produceAsync(mkRecord(stateKey, newState)) - .provideSomeLayer[Blocking](layer) - .flatten - .flatMap(rmd => log.debug(s"pushed state $newState to $rmd")) - .as(offset) - } - case CommittableRecord(_, offset) => - log.debug( - s"consumer group ${cfg.state.groupId} ignored state (wrong key) from ${offset.topicPartition}@${offset.offset}" - ) *> UIO(offset) - } } ZStream diff --git a/doobie/src/main/scala/tamer/db/ConfigDb.scala b/doobie/src/main/scala/tamer/db/ConfigDb.scala index 3f53d0b8..ca7fd451 100644 --- a/doobie/src/main/scala/tamer/db/ConfigDb.scala +++ b/doobie/src/main/scala/tamer/db/ConfigDb.scala @@ -29,7 +29,7 @@ object ConfigDb { ).parMapN(Db) private[this] val queryConfigValue = env("QUERY_FETCH_CHUNK_SIZE").as[PosInt].map(Query) - private[this] val deleteMeConfigValue = (dbConfigValue, queryConfigValue).parMapN(DatabaseConfig.apply) + private[this] val configValue = (dbConfigValue, queryConfigValue).parMapN(DatabaseConfig.apply) trait Service { val dbConfig: URIO[DbConfig, Db] @@ -37,7 +37,7 @@ object ConfigDb { } val live: Layer[TamerError, Has[Db] with Has[Query]] = ZLayer.fromEffectMany { - deleteMeConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map { + configValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map { case DatabaseConfig(db, query) => Has(db) ++ Has(query) } } diff --git a/doobie/src/main/scala/tamer/db/Setup.scala b/doobie/src/main/scala/tamer/db/Setup.scala index 739f8f2f..6da80543 100644 --- a/doobie/src/main/scala/tamer/db/Setup.scala +++ b/doobie/src/main/scala/tamer/db/Setup.scala @@ -14,10 +14,6 @@ trait QueryBuilder[V, -S] { } trait HashableState { - // TODO: Evaluate if this is less invasive as a typeclass, the main cons - // TODO: is loss of expressivity, and since state is probably manually - // TODO: provided by the user (as opposed to automatically generated - // TODO: code) it should be easy to implement this. /** It is required for this hash to be consistent even across executions * for the same semantic state. This is in contrast with the built-in @@ -44,4 +40,12 @@ case class Setup[ Serde[S]().serde, defaultState, queryBuilder.queryId + defaultState.stateHash - ) + ) { + override def show: String = s""" + |query: ${queryBuilder.query(defaultState).sql} + |query id: ${queryBuilder.queryId} + |default state: $defaultState + |default state id: ${defaultState.stateHash} + |default state key: $stateKey + |""".stripMargin.stripLeading() +} diff --git a/example/src/main/scala/tamer/example/Main.scala b/example/src/main/scala/tamer/example/Main.scala index a10179b5..db7e0d6e 100644 --- a/example/src/main/scala/tamer/example/Main.scala +++ b/example/src/main/scala/tamer/example/Main.scala @@ -9,7 +9,6 @@ import zio.blocking.Blocking import zio._ import doobie.implicits.legacy.instant._ import doobie.syntax.string._ -import log.effect.zio.ZioLogWriter.log4sFromName import tamer.db.ConfigDb.{DbConfig, QueryConfig} import java.time.temporal.ChronoUnit._ @@ -21,8 +20,6 @@ final case class Value(id: String, name: String, description: Option[String], mo object Main extends zio.App { val program: ZIO[ZEnv, TamerError, Unit] = (for { - log <- log4sFromName.provide("tamer.example") - _ <- log.info("Starting tamer...") boot <- UIO(Instant.now()) _ <- tamer.db.fetchWithTimeSegment(ts => sql"""SELECT id, name, description, modified_at FROM users WHERE modified_at > ${ts.from} AND modified_at <= ${ts.to}""".query[Value] @@ -51,8 +48,6 @@ object MainGeneralized extends zio.App { val queryConfigLayer: Layer[TamerError, DbConfig with QueryConfig] = ConfigDb.live val myLayer: Layer[TamerError, DbTransactor with Kafka with QueryConfig] = transactorLayer ++ kafkaLayer ++ queryConfigLayer val program: ZIO[Kafka with TamerDBConfig with ZEnv, TamerError, Unit] = (for { - log <- log4sFromName.provide("tamer.example") - _ <- log.info("Starting tamer...") boot <- UIO(Instant.now()) earliest = boot.minus(60, DAYS) setup = tamer.db.mkSetup((s: MyState) => @@ -67,7 +62,6 @@ object MainGeneralized extends zio.App { mostRecent.plus(5, MINUTES).orNow.map(MyState(mostRecent, _)) } ) - _ <- log.info(s"Tamer initialized with setup $setup") _ <- tamer.db.fetch(setup) } yield ()).mapError(e => TamerError("Could not run tamer example", e)) diff --git a/local/docker-compose.yml b/local/docker-compose.yml index 362b2117..0ed5a88a 100644 --- a/local/docker-compose.yml +++ b/local/docker-compose.yml @@ -121,37 +121,5 @@ services: - SCHEMAREGISTRY_URL=http://schema-registry:8081 - PROXY=true -# # TODO replace kafka-rest+kafka-ui and schema-registry-ui -# kowl: -# container_name: local-kowl -# image: quay.io/cloudhut/kowl:v1.2.1 -# depends_on: -# - kafka -# - schema-registry -# # waits for connections to be available -# restart: on-failure -# ports: -# - 8002:8080 -# hostname: kowl -# networks: -# - local_kafka_network -# entrypoint: -# - /bin/ash -# - -c -# - | -# mkdir -p /etc/kowl -# cat > /etc/kowl/config.yaml <