-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Front end rehaul #212
Front end rehaul #212
Changes from 16 commits
5704ca2
b981d3a
8b5c151
75d47b6
d99eefc
bc40298
71bf64a
2095253
c6e4d6d
d3e9874
e8f6b91
035745e
4674617
831def7
6e6026c
532c69b
6578e7d
35b2fbe
c1097e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,6 +18,22 @@ libraryDependencies += "io.laserdisc" %% "tamer" % version | |||||
|
||||||
See [here](example/src/main/scala/tamer/example/Main.scala) for a sample application that makes use of Tamer. | ||||||
|
||||||
## End to end testing | ||||||
|
||||||
### Database module | ||||||
|
||||||
Basic manual testing is available for the code in the example module `tamer.example.Main`. | ||||||
This code covers getting data from a synthetic Postgres database. | ||||||
|
||||||
Make sure you have docker installed before proceeding. | ||||||
|
||||||
From the `local` folder launch `docker-compose up` (you can enter `docker-compose down` | ||||||
if you want to start from scratch). After that you should be able to access the kafka | ||||||
gui from http://localhost:8000. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
Start the `runDb.sh` program which contains some example environment variables. | ||||||
sirocchj marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
If tamer works you should see messages appearing in the kafka gui. | ||||||
|
||||||
## License | ||||||
|
||||||
Tamer is licensed under the **[MIT License](LICENSE)** (the "License"); you may not use this software except in | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -78,7 +78,8 @@ lazy val D = new { | |||||
val zio = Seq( | ||||||
"dev.zio" %% "zio-interop-cats" % V.`zio-interop`, | ||||||
"dev.zio" %% "zio-kafka" % V.`zio-kafka`, | ||||||
"dev.zio" %% "zio-streams" % V.zio | ||||||
"dev.zio" %% "zio-streams" % V.zio, | ||||||
"dev.zio" %% "zio-test" % V.zio | ||||||
) | ||||||
} | ||||||
|
||||||
|
@@ -135,15 +136,15 @@ lazy val commonSettings = Seq( | |||||
licenses += "MIT" -> url("http://opensource.org/licenses/MIT"), | ||||||
developers += Developer("sirocchj", "Julien Sirocchi", "[email protected]", url("https://github.com/sirocchj")), | ||||||
scalacOptions ++= versionDependent(scalaVersion.value), | ||||||
resolvers += "confluent" at "https://packages.confluent.io/maven/" | ||||||
resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven/", "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. which dependency needs snapshots ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will remove this, already removed the zio-s3 dep There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This we can probably revert too here |
||||||
) | ||||||
|
||||||
lazy val tamer = project | ||||||
.in(file("core")) | ||||||
.settings(commonSettings) | ||||||
.settings( | ||||||
name := "tamer", | ||||||
libraryDependencies ++= (D.cats ++ D.config ++ D.doobie ++ D.kafka ++ D.logs ++ D.refined ++ D.serialization ++ D.silencer ++ D.tests ++ D.zio) | ||||||
libraryDependencies ++= (D.cats ++ D.config ++ D.kafka ++ D.logs ++ D.refined ++ D.serialization ++ D.silencer ++ D.tests ++ D.zio) | ||||||
.map(_.withSources) | ||||||
.map(_.withJavadoc), | ||||||
libraryDependencies ++= D.avro, | ||||||
|
@@ -152,10 +153,19 @@ lazy val tamer = project | |||||
Test / console / scalacOptions := (Compile / console / scalacOptions).value | ||||||
) | ||||||
|
||||||
lazy val doobie = project | ||||||
.in(file("doobie")) | ||||||
.dependsOn(tamer) | ||||||
.settings(commonSettings) | ||||||
.settings( | ||||||
name := "doobie", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, we probably want to prefix all modules' artifacts with
Suggested change
And likely There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As opposed to just just |
||||||
libraryDependencies ++= D.doobie | ||||||
) | ||||||
|
||||||
lazy val example = project | ||||||
.in(file("example")) | ||||||
.enablePlugins(JavaAppPackaging) | ||||||
.dependsOn(tamer) | ||||||
.dependsOn(tamer, doobie) | ||||||
.settings(commonSettings) | ||||||
.settings( | ||||||
libraryDependencies ++= D.postgres, | ||||||
|
@@ -164,7 +174,7 @@ lazy val example = project | |||||
|
||||||
lazy val root = project | ||||||
.in(file(".")) | ||||||
.aggregate(tamer, example) | ||||||
.aggregate(tamer, example, doobie) | ||||||
.settings(commonSettings) | ||||||
.settings( | ||||||
publish / skip := true, | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,48 +1,14 @@ | ||||||
package tamer | ||||||
|
||||||
import com.sksamuel.avro4s._ | ||||||
import doobie.util.query.Query0 | ||||||
import tamer.registry.{Registry, Topic} | ||||||
import zio.UIO | ||||||
import zio.kafka.serde.Serializer | ||||||
|
||||||
final case class ResultMetadata(queryExecutionTime: Long) | ||||||
final case class QueryResult[V](metadata: ResultMetadata, results: List[V]) | ||||||
|
||||||
final case class Setup[K, V, State]( | ||||||
keySerializer: Serializer[Registry with Topic, K], | ||||||
valueSerializer: Serializer[Registry with Topic, V], | ||||||
stateSerde: ZSerde[Registry with Topic, State], | ||||||
valueToKey: V => K, | ||||||
defaultState: State, | ||||||
buildQuery: State => Query0[V], | ||||||
stateFoldM: State => QueryResult[V] => UIO[State] | ||||||
) | ||||||
|
||||||
object Setup { | ||||||
final def avro[K <: Product: Decoder: Encoder: SchemaFor, V <: Product: Decoder: Encoder: SchemaFor, State <: Product: Decoder: Encoder: SchemaFor]( | ||||||
defaultState: State | ||||||
)( | ||||||
buildQuery: State => Query0[V] | ||||||
)( | ||||||
valueToKey: V => K, | ||||||
stateFoldM: State => QueryResult[V] => UIO[State] | ||||||
): Setup[K, V, State] = | ||||||
Setup(Serde[K](isKey = true).serializer, Serde[V]().serializer, Serde[State]().serde, valueToKey, defaultState, buildQuery, stateFoldM) | ||||||
|
||||||
final def avroSimple[K <: Product: Decoder: Encoder: SchemaFor, V <: Product: Decoder: Encoder: SchemaFor]( | ||||||
defaultState: V | ||||||
)( | ||||||
buildQuery: V => Query0[V], | ||||||
valueToKey: V => K | ||||||
): Setup[K, V, V] = | ||||||
Setup( | ||||||
Serde[K](isKey = true).serializer, | ||||||
Serde[V]().serializer, | ||||||
Serde[V]().serde, | ||||||
valueToKey, | ||||||
defaultState, | ||||||
buildQuery, | ||||||
_ => r => UIO(r.results.last) | ||||||
) | ||||||
abstract class Setup[-K, -V, S]( | ||||||
val keySerializer: Serializer[Registry with Topic, K], | ||||||
val valueSerializer: Serializer[Registry with Topic, V], | ||||||
val stateSerde: ZSerde[Registry with Topic, S], | ||||||
val defaultState: S, | ||||||
val stateKey: Int | ||||||
) { | ||||||
def show: String = "not available, please implement the show method to display setup" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not ?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @barambani here, can we remove the default message and leave |
||||||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,8 +13,6 @@ import zio.interop.catz._ | |
import scala.concurrent.duration.FiniteDuration | ||
|
||
object Config { | ||
final case class Db(driver: NonEmptyString, uri: UriString, username: NonEmptyString, password: Password) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like there's two refinement types left in |
||
final case class Query(fetchChunkSize: PosInt) | ||
final case class KafkaSink(topic: NonEmptyString) | ||
final case class KafkaState(topic: NonEmptyString, groupId: NonEmptyString, clientId: NonEmptyString) | ||
final case class Kafka( | ||
|
@@ -25,20 +23,10 @@ object Config { | |
sink: KafkaSink, | ||
state: KafkaState | ||
) | ||
final case class Tamer(db: Db, query: Query, kafka: Kafka) | ||
|
||
private[this] implicit final val hostListConfigDecoder: ConfigDecoder[String, HostList] = | ||
ConfigDecoder.identity[String].map(_.split(",").toList.map(_.trim)).mapEither(ConfigDecoder[List[String], HostList].decode) | ||
|
||
private[this] val dbConfigValue = ( | ||
env("DATABASE_DRIVER").as[NonEmptyString], | ||
env("DATABASE_URL").as[UriString], | ||
env("DATABASE_USERNAME").as[NonEmptyString], | ||
env("DATABASE_PASSWORD").as[Password].redacted | ||
).parMapN(Db) | ||
|
||
private[this] val queryConfigValue = env("QUERY_FETCH_CHUNK_SIZE").as[PosInt].map(Query) | ||
|
||
private[this] val kafkaSinkConfigValue = env("KAFKA_SINK_TOPIC").as[NonEmptyString].map(KafkaSink) | ||
private[this] val kafkaStateConfigValue = ( | ||
env("KAFKA_STATE_TOPIC").as[NonEmptyString], | ||
|
@@ -53,17 +41,12 @@ object Config { | |
kafkaSinkConfigValue, | ||
kafkaStateConfigValue | ||
).parMapN(Kafka) | ||
private[this] val tamerConfigValue: ConfigValue[Tamer] = (dbConfigValue, queryConfigValue, kafkaConfigValue).parMapN(Tamer.apply) | ||
|
||
trait Service { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Who's using this ? is it needed ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably hereditary of when this was a module, will remove |
||
val dbConfig: URIO[DbConfig, Db] | ||
val queryConfig: URIO[QueryConfig, Query] | ||
val kafkaConfig: URIO[KafkaConfig, Kafka] | ||
} | ||
|
||
val live: Layer[TamerError, TamerConfig] = ZLayer.fromEffectMany { | ||
tamerConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map { | ||
case Tamer(db, query, kafka) => Has(db) ++ Has(query) ++ Has(kafka) | ||
} | ||
val live: Layer[TamerError, KafkaConfig] = ZLayer.fromEffect { | ||
kafkaConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,12 +12,7 @@ package object config { | |
type UriString = String Refined Uri | ||
type UrlString = String Refined Url | ||
|
||
type DbConfig = Has[Config.Db] | ||
type QueryConfig = Has[Config.Query] | ||
type KafkaConfig = Has[Config.Kafka] | ||
type TamerConfig = DbConfig with QueryConfig with KafkaConfig | ||
|
||
val dbConfig: URIO[DbConfig, Config.Db] = ZIO.access(_.get) | ||
val queryConfig: URIO[QueryConfig, Config.Query] = ZIO.access(_.get) | ||
val kafkaConfig: URIO[KafkaConfig, Config.Kafka] = ZIO.access(_.get) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this used anywhere ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agree with @barambani here too. I think I had added this at the time and there's places where it could be used but... it's not, so we might as well remove it |
||
} |
This file was deleted.
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.