Skip to content

Commit

Permalink
Front end rehaul (#212)
Browse files Browse the repository at this point in the history
* compiles

* config extracted barely compiles

* working (apparently)

* feature parity

* address review

* just shove the layers under the carpet

* reorder main

* Datables don't need the user to define ordering anymore

* dateable => timestamped and some scaladoc

* Update core/src/main/scala/tamer/Setup.scala

Co-authored-by: Julien Jean Paul Sirocchi <[email protected]>

* address pr review

* cleaned, update docs

* Update README.md

Co-authored-by: Julien Jean Paul Sirocchi <[email protected]>

* final draft after review

* final final draft reviewed

Co-authored-by: Julien Jean Paul Sirocchi <[email protected]>
  • Loading branch information
gurghet and sirocchj authored Jan 3, 2021
1 parent 2ab1962 commit 7e037d9
Show file tree
Hide file tree
Showing 19 changed files with 1,573 additions and 286 deletions.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,24 @@ Add Tamer as a dependency to your project:
libraryDependencies += "io.laserdisc" %% "tamer" % version
```

See [here](example/src/main/scala/tamer/example/Main.scala) for a sample application that makes use of Tamer.
See [here](example/src/main/scala/tamer/example/DatabaseSimple.scala) for a sample application that makes use of Tamer.

## End to end testing

### Database module

Basic manual testing is available for the code in the example module `tamer.example.DatabaseSimple`
(and/or `tamer.example.DatabaseGeneralized`).
This code covers getting data from a synthetic Postgres database.

Make sure you have docker installed before proceeding.

From the `local` folder launch `docker-compose up` (you can enter `docker-compose down`
if you want to start from scratch). After that you should be able to access the kafka
gui from [http://localhost:8000](http://localhost:8000).

Start the `runDb.sh` program which contains some example environment variables.
If tamer works you should see messages appearing in the kafka gui.

## License

Expand Down
22 changes: 16 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ lazy val D = new {
val zio = Seq(
"dev.zio" %% "zio-interop-cats" % V.`zio-interop`,
"dev.zio" %% "zio-kafka" % V.`zio-kafka`,
"dev.zio" %% "zio-streams" % V.zio
"dev.zio" %% "zio-streams" % V.zio,
"dev.zio" %% "zio-test" % V.zio
)
}

Expand Down Expand Up @@ -135,15 +136,15 @@ lazy val commonSettings = Seq(
licenses += "MIT" -> url("http://opensource.org/licenses/MIT"),
developers += Developer("sirocchj", "Julien Sirocchi", "[email protected]", url("https://github.com/sirocchj")),
scalacOptions ++= versionDependent(scalaVersion.value),
resolvers += "confluent" at "https://packages.confluent.io/maven/"
resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven/")
)

lazy val tamer = project
.in(file("core"))
.settings(commonSettings)
.settings(
name := "tamer",
libraryDependencies ++= (D.cats ++ D.config ++ D.doobie ++ D.kafka ++ D.logs ++ D.refined ++ D.serialization ++ D.silencer ++ D.tests ++ D.zio)
name := "tamer-core",
libraryDependencies ++= (D.cats ++ D.config ++ D.kafka ++ D.logs ++ D.refined ++ D.serialization ++ D.silencer ++ D.tests ++ D.zio)
.map(_.withSources)
.map(_.withJavadoc),
libraryDependencies ++= D.avro,
Expand All @@ -152,10 +153,19 @@ lazy val tamer = project
Test / console / scalacOptions := (Compile / console / scalacOptions).value
)

lazy val doobie = project
.in(file("doobie"))
.dependsOn(tamer)
.settings(commonSettings)
.settings(
name := "tamer-doobie",
libraryDependencies ++= D.doobie
)

lazy val example = project
.in(file("example"))
.enablePlugins(JavaAppPackaging)
.dependsOn(tamer)
.dependsOn(tamer, doobie)
.settings(commonSettings)
.settings(
libraryDependencies ++= D.postgres,
Expand All @@ -164,7 +174,7 @@ lazy val example = project

lazy val root = project
.in(file("."))
.aggregate(tamer, example)
.aggregate(tamer, example, doobie)
.settings(commonSettings)
.settings(
publish / skip := true,
Expand Down
50 changes: 8 additions & 42 deletions core/src/main/scala/tamer/Setup.scala
Original file line number Diff line number Diff line change
@@ -1,48 +1,14 @@
package tamer

import com.sksamuel.avro4s._
import doobie.util.query.Query0
import tamer.registry.{Registry, Topic}
import zio.UIO
import zio.kafka.serde.Serializer

final case class ResultMetadata(queryExecutionTime: Long)
final case class QueryResult[V](metadata: ResultMetadata, results: List[V])

final case class Setup[K, V, State](
keySerializer: Serializer[Registry with Topic, K],
valueSerializer: Serializer[Registry with Topic, V],
stateSerde: ZSerde[Registry with Topic, State],
valueToKey: V => K,
defaultState: State,
buildQuery: State => Query0[V],
stateFoldM: State => QueryResult[V] => UIO[State]
)

object Setup {
final def avro[K <: Product: Decoder: Encoder: SchemaFor, V <: Product: Decoder: Encoder: SchemaFor, State <: Product: Decoder: Encoder: SchemaFor](
defaultState: State
)(
buildQuery: State => Query0[V]
)(
valueToKey: V => K,
stateFoldM: State => QueryResult[V] => UIO[State]
): Setup[K, V, State] =
Setup(Serde[K](isKey = true).serializer, Serde[V]().serializer, Serde[State]().serde, valueToKey, defaultState, buildQuery, stateFoldM)

final def avroSimple[K <: Product: Decoder: Encoder: SchemaFor, V <: Product: Decoder: Encoder: SchemaFor](
defaultState: V
)(
buildQuery: V => Query0[V],
valueToKey: V => K
): Setup[K, V, V] =
Setup(
Serde[K](isKey = true).serializer,
Serde[V]().serializer,
Serde[V]().serde,
valueToKey,
defaultState,
buildQuery,
_ => r => UIO(r.results.last)
)
abstract class Setup[-K, -V, S](
val keySerializer: Serializer[Registry with Topic, K],
val valueSerializer: Serializer[Registry with Topic, V],
val stateSerde: ZSerde[Registry with Topic, S],
val defaultState: S,
val stateKey: Int
) {
def show: String = "not available, please implement the show method to display setup"
}
28 changes: 0 additions & 28 deletions core/src/main/scala/tamer/TamerApp.scala

This file was deleted.

25 changes: 2 additions & 23 deletions core/src/main/scala/tamer/config/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import zio.interop.catz._
import scala.concurrent.duration.FiniteDuration

object Config {
final case class Db(driver: NonEmptyString, uri: UriString, username: NonEmptyString, password: Password)
final case class Query(fetchChunkSize: PosInt)
final case class KafkaSink(topic: NonEmptyString)
final case class KafkaState(topic: NonEmptyString, groupId: NonEmptyString, clientId: NonEmptyString)
final case class Kafka(
Expand All @@ -25,20 +23,10 @@ object Config {
sink: KafkaSink,
state: KafkaState
)
final case class Tamer(db: Db, query: Query, kafka: Kafka)

private[this] implicit final val hostListConfigDecoder: ConfigDecoder[String, HostList] =
ConfigDecoder.identity[String].map(_.split(",").toList.map(_.trim)).mapEither(ConfigDecoder[List[String], HostList].decode)

private[this] val dbConfigValue = (
env("DATABASE_DRIVER").as[NonEmptyString],
env("DATABASE_URL").as[UriString],
env("DATABASE_USERNAME").as[NonEmptyString],
env("DATABASE_PASSWORD").as[Password].redacted
).parMapN(Db)

private[this] val queryConfigValue = env("QUERY_FETCH_CHUNK_SIZE").as[PosInt].map(Query)

private[this] val kafkaSinkConfigValue = env("KAFKA_SINK_TOPIC").as[NonEmptyString].map(KafkaSink)
private[this] val kafkaStateConfigValue = (
env("KAFKA_STATE_TOPIC").as[NonEmptyString],
Expand All @@ -53,17 +41,8 @@ object Config {
kafkaSinkConfigValue,
kafkaStateConfigValue
).parMapN(Kafka)
private[this] val tamerConfigValue: ConfigValue[Tamer] = (dbConfigValue, queryConfigValue, kafkaConfigValue).parMapN(Tamer.apply)

trait Service {
val dbConfig: URIO[DbConfig, Db]
val queryConfig: URIO[QueryConfig, Query]
val kafkaConfig: URIO[KafkaConfig, Kafka]
}

val live: Layer[TamerError, TamerConfig] = ZLayer.fromEffectMany {
tamerConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map {
case Tamer(db, query, kafka) => Has(db) ++ Has(query) ++ Has(kafka)
}
val live: Layer[TamerError, KafkaConfig] = ZLayer.fromEffect {
kafkaConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce))
}
}
11 changes: 1 addition & 10 deletions core/src/main/scala/tamer/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,11 @@ import eu.timepit.refined.api.Refined
import eu.timepit.refined.boolean.{And, Or}
import eu.timepit.refined.collection.{Forall, NonEmpty}
import eu.timepit.refined.string.{IPv4, Uri, Url}
import zio.{Has, URIO, ZIO}
import zio.Has

package object config {
type HostList = List[String] Refined (NonEmpty And Forall[IPv4 Or Uri])
type Password = String
type UriString = String Refined Uri
type UrlString = String Refined Url

type DbConfig = Has[Config.Db]
type QueryConfig = Has[Config.Query]
type KafkaConfig = Has[Config.Kafka]
type TamerConfig = DbConfig with QueryConfig with KafkaConfig

val dbConfig: URIO[DbConfig, Config.Db] = ZIO.access(_.get)
val queryConfig: URIO[QueryConfig, Config.Query] = ZIO.access(_.get)
val kafkaConfig: URIO[KafkaConfig, Config.Kafka] = ZIO.access(_.get)
}
80 changes: 0 additions & 80 deletions core/src/main/scala/tamer/db/Db.scala

This file was deleted.

12 changes: 0 additions & 12 deletions core/src/main/scala/tamer/db/package.scala

This file was deleted.

Loading

0 comments on commit 7e037d9

Please sign in to comment.