Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Front end rehaul #212

Merged
merged 19 commits into from
Jan 3, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ lazy val V = new {
val scalatest = "3.2.3"
val silencer = "1.7.1"
val zio = "1.0.3"
val `zio-s3` = "latest.integration"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming soon? 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val `zio-s3` = "latest.integration"
val `zio-s3` = "0.2.5"

https://github.com/zio/zio-s3/releases/tag/v0.2.5

val `zio-interop` = "2.2.0.1"
val `zio-kafka` = "0.13.0"
}
Expand Down Expand Up @@ -78,7 +79,9 @@ lazy val D = new {
val zio = Seq(
"dev.zio" %% "zio-interop-cats" % V.`zio-interop`,
"dev.zio" %% "zio-kafka" % V.`zio-kafka`,
"dev.zio" %% "zio-streams" % V.zio
"dev.zio" %% "zio-s3" % V.`zio-s3`,
"dev.zio" %% "zio-streams" % V.zio,
"dev.zio" %% "zio-test" % V.zio
)
}

Expand Down Expand Up @@ -135,15 +138,15 @@ lazy val commonSettings = Seq(
licenses += "MIT" -> url("http://opensource.org/licenses/MIT"),
developers += Developer("sirocchj", "Julien Sirocchi", "[email protected]", url("https://github.com/sirocchj")),
scalacOptions ++= versionDependent(scalaVersion.value),
resolvers += "confluent" at "https://packages.confluent.io/maven/"
resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven/", "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which dependency needs snapshots ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove this, already removed the zio-s3 dep

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This we can probably revert too here

)

lazy val tamer = project
.in(file("core"))
.settings(commonSettings)
.settings(
name := "tamer",
libraryDependencies ++= (D.cats ++ D.config ++ D.doobie ++ D.kafka ++ D.logs ++ D.refined ++ D.serialization ++ D.silencer ++ D.tests ++ D.zio)
libraryDependencies ++= (D.cats ++ D.config ++ D.kafka ++ D.logs ++ D.refined ++ D.serialization ++ D.silencer ++ D.tests ++ D.zio)
.map(_.withSources)
.map(_.withJavadoc),
libraryDependencies ++= D.avro,
Expand All @@ -152,10 +155,19 @@ lazy val tamer = project
Test / console / scalacOptions := (Compile / console / scalacOptions).value
)

lazy val doobie = project
.in(file("doobie"))
.dependsOn(tamer)
.settings(commonSettings)
.settings(
name := "doobie",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we probably want to prefix all modules' artifacts with tamer- now that we no longer have only one. This means

Suggested change
name := "doobie",
name := "tamer-doobie",

And likely tamer-core the other one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As opposed to just just tamer, because the module in itself doesn't do much without a supporting module to fetch data. Done.

libraryDependencies ++= D.doobie
)

lazy val example = project
.in(file("example"))
.enablePlugins(JavaAppPackaging)
.dependsOn(tamer)
.dependsOn(tamer, doobie)
.settings(commonSettings)
.settings(
libraryDependencies ++= D.postgres,
Expand All @@ -164,7 +176,7 @@ lazy val example = project

lazy val root = project
.in(file("."))
.aggregate(tamer, example)
.aggregate(tamer, example, doobie)
.settings(commonSettings)
.settings(
publish / skip := true,
Expand Down
48 changes: 6 additions & 42 deletions core/src/main/scala/tamer/Setup.scala
Original file line number Diff line number Diff line change
@@ -1,48 +1,12 @@
package tamer

import com.sksamuel.avro4s._
import doobie.util.query.Query0
import tamer.registry.{Registry, Topic}
import zio.UIO
import zio.kafka.serde.Serializer

final case class ResultMetadata(queryExecutionTime: Long)
final case class QueryResult[V](metadata: ResultMetadata, results: List[V])

final case class Setup[K, V, State](
keySerializer: Serializer[Registry with Topic, K],
valueSerializer: Serializer[Registry with Topic, V],
stateSerde: ZSerde[Registry with Topic, State],
valueToKey: V => K,
defaultState: State,
buildQuery: State => Query0[V],
stateFoldM: State => QueryResult[V] => UIO[State]
abstract class Setup[-K, -V, S](
val keySerializer: Serializer[Registry with Topic, K],
val valueSerializer: Serializer[Registry with Topic, V],
val stateSerde: ZSerde[Registry with Topic, S],
val defaultState: S,
val stateKey: Int
)

object Setup {
final def avro[K <: Product: Decoder: Encoder: SchemaFor, V <: Product: Decoder: Encoder: SchemaFor, State <: Product: Decoder: Encoder: SchemaFor](
defaultState: State
)(
buildQuery: State => Query0[V]
)(
valueToKey: V => K,
stateFoldM: State => QueryResult[V] => UIO[State]
): Setup[K, V, State] =
Setup(Serde[K](isKey = true).serializer, Serde[V]().serializer, Serde[State]().serde, valueToKey, defaultState, buildQuery, stateFoldM)

final def avroSimple[K <: Product: Decoder: Encoder: SchemaFor, V <: Product: Decoder: Encoder: SchemaFor](
defaultState: V
)(
buildQuery: V => Query0[V],
valueToKey: V => K
): Setup[K, V, V] =
Setup(
Serde[K](isKey = true).serializer,
Serde[V]().serializer,
Serde[V]().serde,
valueToKey,
defaultState,
buildQuery,
_ => r => UIO(r.results.last)
)
}
28 changes: 0 additions & 28 deletions core/src/main/scala/tamer/TamerApp.scala

This file was deleted.

21 changes: 2 additions & 19 deletions core/src/main/scala/tamer/config/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import zio.interop.catz._
import scala.concurrent.duration.FiniteDuration

object Config {
final case class Db(driver: NonEmptyString, uri: UriString, username: NonEmptyString, password: Password)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like there's two refinement types left in config's package object that should be moved to the doobie module in the db package object: UriString and Password

final case class Query(fetchChunkSize: PosInt)
final case class KafkaSink(topic: NonEmptyString)
final case class KafkaState(topic: NonEmptyString, groupId: NonEmptyString, clientId: NonEmptyString)
final case class Kafka(
Expand All @@ -25,20 +23,10 @@ object Config {
sink: KafkaSink,
state: KafkaState
)
final case class Tamer(db: Db, query: Query, kafka: Kafka)

private[this] implicit final val hostListConfigDecoder: ConfigDecoder[String, HostList] =
ConfigDecoder.identity[String].map(_.split(",").toList.map(_.trim)).mapEither(ConfigDecoder[List[String], HostList].decode)

private[this] val dbConfigValue = (
env("DATABASE_DRIVER").as[NonEmptyString],
env("DATABASE_URL").as[UriString],
env("DATABASE_USERNAME").as[NonEmptyString],
env("DATABASE_PASSWORD").as[Password].redacted
).parMapN(Db)

private[this] val queryConfigValue = env("QUERY_FETCH_CHUNK_SIZE").as[PosInt].map(Query)

private[this] val kafkaSinkConfigValue = env("KAFKA_SINK_TOPIC").as[NonEmptyString].map(KafkaSink)
private[this] val kafkaStateConfigValue = (
env("KAFKA_STATE_TOPIC").as[NonEmptyString],
Expand All @@ -53,17 +41,12 @@ object Config {
kafkaSinkConfigValue,
kafkaStateConfigValue
).parMapN(Kafka)
private[this] val tamerConfigValue: ConfigValue[Tamer] = (dbConfigValue, queryConfigValue, kafkaConfigValue).parMapN(Tamer.apply)

trait Service {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who's using this ? is it needed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably hereditary of when this was a module, will remove

val dbConfig: URIO[DbConfig, Db]
val queryConfig: URIO[QueryConfig, Query]
val kafkaConfig: URIO[KafkaConfig, Kafka]
}

val live: Layer[TamerError, TamerConfig] = ZLayer.fromEffectMany {
tamerConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map {
case Tamer(db, query, kafka) => Has(db) ++ Has(query) ++ Has(kafka)
}
val live: Layer[TamerError, KafkaConfig] = ZLayer.fromEffect {
kafkaConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce))
}
}
5 changes: 0 additions & 5 deletions core/src/main/scala/tamer/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@ package object config {
type UriString = String Refined Uri
type UrlString = String Refined Url

type DbConfig = Has[Config.Db]
type QueryConfig = Has[Config.Query]
type KafkaConfig = Has[Config.Kafka]
type TamerConfig = DbConfig with QueryConfig with KafkaConfig

val dbConfig: URIO[DbConfig, Config.Db] = ZIO.access(_.get)
val queryConfig: URIO[QueryConfig, Config.Query] = ZIO.access(_.get)
val kafkaConfig: URIO[KafkaConfig, Config.Kafka] = ZIO.access(_.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used anywhere ?

Copy link
Member

@sirocchj sirocchj Jan 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with @barambani here too. I think I had added this at the time and there's places where it could be used but... it's not, so we might as well remove it

}
80 changes: 0 additions & 80 deletions core/src/main/scala/tamer/db/Db.scala

This file was deleted.

12 changes: 0 additions & 12 deletions core/src/main/scala/tamer/db/package.scala

This file was deleted.

12 changes: 3 additions & 9 deletions core/src/main/scala/tamer/kafka/Kafka.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package tamer
package kafka

import java.security.MessageDigest

import eu.timepit.refined.auto._
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import log.effect.LogWriter
Expand All @@ -20,7 +18,7 @@ import zio.kafka.consumer.{CommittableRecord, Consumer, ConsumerSettings, Subscr
import zio.kafka.producer.{Producer, ProducerSettings}
import zio.stream.ZStream

final case class StateKey(queryHash: String, groupId: String)
final case class StateKey(stateKey: String, groupId: String)

object Kafka {
trait Service {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
trait Service {
sealed trait Service {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, should work fine, but I don't think it's idiomatic

Expand All @@ -37,10 +35,6 @@ object Kafka {
val live: URLayer[KafkaConfig, Kafka] = ZLayer.fromService { cfg =>
new Service {
private[this] val logTask: Task[LogWriter[Task]] = log4sFromName.provide("tamer.kafka")
private[this] def stateKeyTask[A](a: A, s: String)(f: A => String): Task[StateKey] =
Task(MessageDigest.getInstance("SHA-1")).map { md =>
StateKey(md.digest(f(a).getBytes).take(7).map(b => f"$b%02x").mkString, s)
}

override final def runLoop[K, V, State, R](setup: Setup[K, V, State])(
f: (State, Queue[(K, V)]) => ZIO[R, TamerError, State]
Expand All @@ -67,7 +61,7 @@ object Kafka {
def sink(q: Queue[(K, V)], p: Producer.Service[Registry with Topic, K, V], layer: ULayer[Registry with Topic]) =
logTask.flatMap { log =>
q.takeAll.flatMap {
case Nil => log.trace("no data to push") *> ZIO.unit
case Nil => log.trace("no data to push").unit
case kvs =>
p.produceChunkAsync(mkRecordChunk(kvs)).provideSomeLayer[Blocking](layer).retry(tenTimes).flatten.unit <*
log.info(s"pushed ${kvs.size} messages to ${cfg.sink.topic}")
Expand All @@ -83,7 +77,7 @@ object Kafka {
sp: Producer.Service[Registry with Topic, StateKey, State],
layer: ULayer[Registry with Topic]
) =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with not having it everywhere, but here the type annotation might help reading it

Suggested change
) =
): ZStream[Blocking with R with Clock, Throwable, Offset] =

ZStream.fromEffect(logTask <*> stateKeyTask(setup.defaultState, cfg.state.groupId)(setup.buildQuery(_).sql)).flatMap {
ZStream.fromEffect(logTask <*> UIO(StateKey(setup.stateKey.toHexString, cfg.state.groupId))).flatMap { // TODO: no need for UIO, it's pure
case (log, stateKey) =>
ZStream
.fromEffect(subscribe(sc))
Expand Down
44 changes: 44 additions & 0 deletions doobie/src/main/scala/tamer/db/ConfigDb.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package tamer.db
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
package tamer.db
package tamer
package db

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen this done often, but, doesn't this actually go against the principle of least privilege?


import ciris.{ConfigException, env}
import ciris.refined.refTypeConfigDecoder
import cats.implicits._
import eu.timepit.refined.types.numeric.PosInt
import eu.timepit.refined.types.string.NonEmptyString
import tamer.TamerError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import tamer.TamerError

import tamer.config.{Password, UriString}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these would be part of the package object db

Suggested change
import tamer.config.{Password, UriString}

import zio.interop.catz.{taskConcurrentInstance, zioContextShift}
import zio.{Has, Layer, Task, URIO, ZIO, ZLayer}

object ConfigDb {
type DbConfig = Has[Db]
type QueryConfig = Has[Query]

val dbConfig: URIO[DbConfig, Db] = ZIO.service
val queryConfig: URIO[QueryConfig, Query] = ZIO.service

final case class Db(driver: NonEmptyString, uri: UriString, username: NonEmptyString, password: Password)
final case class Query(fetchChunkSize: PosInt)
final case class DatabaseConfig(db: Db, query: Query)

private[this] val dbConfigValue = (
env("DATABASE_DRIVER").as[NonEmptyString],
env("DATABASE_URL").as[UriString],
env("DATABASE_USERNAME").as[NonEmptyString],
env("DATABASE_PASSWORD").as[Password].redacted
).parMapN(Db)
private[this] val queryConfigValue = env("QUERY_FETCH_CHUNK_SIZE").as[PosInt].map(Query)

private[this] val deleteMeConfigValue = (dbConfigValue, queryConfigValue).parMapN(DatabaseConfig.apply)

trait Service {
val dbConfig: URIO[DbConfig, Db]
val queryConfig: URIO[QueryConfig, Query]
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can all go, right?

Suggested change
trait Service {
val dbConfig: URIO[DbConfig, Db]
val queryConfig: URIO[QueryConfig, Query]
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically yes, but maybe it's not the best form, anyways we can refactor later to decide who deserves to be a service and who doesn't


val live: Layer[TamerError, Has[Db] with Has[Query]] = ZLayer.fromEffectMany {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val live: Layer[TamerError, Has[Db] with Has[Query]] = ZLayer.fromEffectMany {
val live: Layer[TamerError, DbConfig with QueryConfig] = ZLayer.fromEffectMany {

deleteMeConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to be deleted? Maybe just folding the two things into one config will do too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, I eventually renamed it but the IDE refactoring algo messed up here. I'll rename it for now. The db module is on it's own now so it can be refactored freely in another PR.

case DatabaseConfig(db, query) => Has(db) ++ Has(query)
}
}
}
Loading