From c1097e0e317dd519ba81e46b4eaf240bcd5ab38e Mon Sep 17 00:00:00 2001 From: Andrea Passaglia Date: Sun, 3 Jan 2021 16:56:28 +0000 Subject: [PATCH] final final draft reviewed --- README.md | 7 ++++--- core/src/main/scala/tamer/config/package.scala | 6 +----- core/src/main/scala/tamer/kafka/Kafka.scala | 6 +++--- doobie/src/main/scala/tamer/db/ConfigDb.scala | 16 +++++++--------- doobie/src/main/scala/tamer/db/Setup.scala | 8 ++++---- local/runDb.sh | 2 +- 6 files changed, 20 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 372ec30a..b630dd47 100644 --- a/README.md +++ b/README.md @@ -16,20 +16,21 @@ Add Tamer as a dependency to your project: libraryDependencies += "io.laserdisc" %% "tamer" % version ``` -See [here](example/src/main/scala/tamer/example/Main.scala) for a sample application that makes use of Tamer. +See [here](example/src/main/scala/tamer/example/DatabaseSimple.scala) for a sample application that makes use of Tamer. ## End to end testing ### Database module -Basic manual testing is available for the code in the example module `tamer.example.Main`. +Basic manual testing is available for the code in the example module `tamer.example.DatabaseSimple` +(and/or `tamer.example.DatabaseGeneralized`). This code covers getting data from a synthetic Postgres database. Make sure you have docker installed before proceeding. From the `local` folder launch `docker-compose up` (you can enter `docker-compose down` if you want to start from scratch). After that you should be able to access the kafka -gui from http://localhost:8000. +gui from [http://localhost:8000](http://localhost:8000). Start the `runDb.sh` program which contains some example environment variables. If tamer works you should see messages appearing in the kafka gui. diff --git a/core/src/main/scala/tamer/config/package.scala b/core/src/main/scala/tamer/config/package.scala index 0e8abd55..b4b479c6 100644 --- a/core/src/main/scala/tamer/config/package.scala +++ b/core/src/main/scala/tamer/config/package.scala @@ -4,15 +4,11 @@ import eu.timepit.refined.api.Refined import eu.timepit.refined.boolean.{And, Or} import eu.timepit.refined.collection.{Forall, NonEmpty} import eu.timepit.refined.string.{IPv4, Uri, Url} -import zio.{Has, URIO, ZIO} +import zio.Has package object config { type HostList = List[String] Refined (NonEmpty And Forall[IPv4 Or Uri]) - type Password = String - type UriString = String Refined Uri type UrlString = String Refined Url type KafkaConfig = Has[Config.Kafka] - - val kafkaConfig: URIO[KafkaConfig, Config.Kafka] = ZIO.access(_.get) } diff --git a/core/src/main/scala/tamer/kafka/Kafka.scala b/core/src/main/scala/tamer/kafka/Kafka.scala index d2e2bbd5..a3d7ee5c 100644 --- a/core/src/main/scala/tamer/kafka/Kafka.scala +++ b/core/src/main/scala/tamer/kafka/Kafka.scala @@ -14,14 +14,14 @@ import zio.blocking.Blocking import zio.clock.Clock import zio.duration._ import zio.kafka.consumer.Consumer.{AutoOffsetStrategy, OffsetRetrieval} -import zio.kafka.consumer.{CommittableRecord, Consumer, ConsumerSettings, Subscription} +import zio.kafka.consumer.{CommittableRecord, Consumer, ConsumerSettings, Offset, Subscription} import zio.kafka.producer.{Producer, ProducerSettings} import zio.stream.ZStream final case class StateKey(stateKey: String, groupId: String) object Kafka { - sealed trait Service { + trait Service { def runLoop[K, V, State, R](setup: Setup[K, V, State])( f: (State, Queue[(K, V)]) => ZIO[R, TamerError, State] ): ZIO[R with Blocking with Clock, TamerError, Unit] @@ -78,7 +78,7 @@ object Kafka { q: Queue[(K, V)], sp: Producer.Service[Registry with Topic, StateKey, State], layer: ULayer[Registry with Topic] - ) = + ): ZStream[R with Blocking with Clock, Throwable, Offset] = ZStream.fromEffect(logTask).tap(printSetup).flatMap { log => ZStream .fromEffect(subscribe(sc)) diff --git a/doobie/src/main/scala/tamer/db/ConfigDb.scala b/doobie/src/main/scala/tamer/db/ConfigDb.scala index ca7fd451..7056669e 100644 --- a/doobie/src/main/scala/tamer/db/ConfigDb.scala +++ b/doobie/src/main/scala/tamer/db/ConfigDb.scala @@ -1,16 +1,19 @@ -package tamer.db +package tamer +package db import ciris.{ConfigException, env} import ciris.refined.refTypeConfigDecoder import cats.implicits._ +import eu.timepit.refined.api.Refined +import eu.timepit.refined.string.Uri import eu.timepit.refined.types.numeric.PosInt import eu.timepit.refined.types.string.NonEmptyString -import tamer.TamerError -import tamer.config.{Password, UriString} import zio.interop.catz.{taskConcurrentInstance, zioContextShift} import zio.{Has, Layer, Task, URIO, ZIO, ZLayer} object ConfigDb { + type Password = String + type UriString = String Refined Uri type DbConfig = Has[Db] type QueryConfig = Has[Query] @@ -31,12 +34,7 @@ object ConfigDb { private[this] val configValue = (dbConfigValue, queryConfigValue).parMapN(DatabaseConfig.apply) - trait Service { - val dbConfig: URIO[DbConfig, Db] - val queryConfig: URIO[QueryConfig, Query] - } - - val live: Layer[TamerError, Has[Db] with Has[Query]] = ZLayer.fromEffectMany { + val live: Layer[TamerError, DbConfig with QueryConfig] = ZLayer.fromEffectMany { configValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map { case DatabaseConfig(db, query) => Has(db) ++ Has(query) } diff --git a/doobie/src/main/scala/tamer/db/Setup.scala b/doobie/src/main/scala/tamer/db/Setup.scala index 84582068..f7941441 100644 --- a/doobie/src/main/scala/tamer/db/Setup.scala +++ b/doobie/src/main/scala/tamer/db/Setup.scala @@ -1,8 +1,8 @@ -package tamer.db +package tamer +package db import com.sksamuel.avro4s.{Decoder, Encoder, SchemaFor} import doobie.Query0 -import tamer.Serde import zio.UIO import java.time.{Duration, Instant} @@ -27,7 +27,7 @@ trait HashableState { final case class ResultMetadata(queryExecutionTime: Long) final case class QueryResult[V](metadata: ResultMetadata, results: List[V]) -case class Setup[ +final case class Setup[ K <: Product: Encoder: Decoder: SchemaFor, V <: Product: Encoder: Decoder: SchemaFor, S <: Product with HashableState: Encoder: Decoder: SchemaFor @@ -36,7 +36,7 @@ case class Setup[ override val defaultState: S, keyExtract: V => K, stateFoldM: S => QueryResult[V] => UIO[S] -) extends tamer.Setup[K, V, S]( +) extends _root_.tamer.Setup[K, V, S]( Serde[K](isKey = true).serializer, Serde[V]().serializer, Serde[S]().serde, diff --git a/local/runDb.sh b/local/runDb.sh index a6d908cd..7cfde510 100644 --- a/local/runDb.sh +++ b/local/runDb.sh @@ -17,4 +17,4 @@ export KAFKA_STATE_CLIENT_ID=state-client SCRIPT_PATH=$(cd "$(dirname "${BASH_SOURCE[0]}")" || exit; pwd -P) cd "$SCRIPT_PATH"/.. || exit -sbt "example/runMain tamer.example.Main" -jvm-debug 5005 \ No newline at end of file +sbt "example/runMain tamer.example.DatabaseSimple" -jvm-debug 5005 \ No newline at end of file