Skip to content

Commit

Permalink
cleaned, update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
gurghet committed Jan 1, 2021
1 parent 831def7 commit 6e6026c
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 81 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@ libraryDependencies += "io.laserdisc" %% "tamer" % version

See [here](example/src/main/scala/tamer/example/Main.scala) for a sample application that makes use of Tamer.

## End to end testing

### Database module

Basic manual testing is available for the code in the example module `tamer.example.Main`.
Thi code covers getting data from a synthetic Postgres database.

Make sure you have docker installed before proceeding.

From the `local` folder launch `docker-compose up` (you can enter `docker-compose down`
if you want to start from scratch). After that you should be able to access the kafka
gui from http://localhost:8000.

Start the `runDb.sh` program which contains some example environment variables.
If tamer works you should see messages appearing in the kafka gui.

## License

Tamer is licensed under the **[MIT License](LICENSE)** (the "License"); you may not use this software except in
Expand Down
2 changes: 0 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ lazy val V = new {
val scalatest = "3.2.3"
val silencer = "1.7.1"
val zio = "1.0.3"
val `zio-s3` = "latest.integration"
val `zio-interop` = "2.2.0.1"
val `zio-kafka` = "0.13.0"
}
Expand Down Expand Up @@ -79,7 +78,6 @@ lazy val D = new {
val zio = Seq(
"dev.zio" %% "zio-interop-cats" % V.`zio-interop`,
"dev.zio" %% "zio-kafka" % V.`zio-kafka`,
"dev.zio" %% "zio-s3" % V.`zio-s3`,
"dev.zio" %% "zio-streams" % V.zio,
"dev.zio" %% "zio-test" % V.zio
)
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/tamer/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ abstract class Setup[-K, -V, S](
val stateSerde: ZSerde[Registry with Topic, S],
val defaultState: S,
val stateKey: Int
)
) {
def show: String = "not available, please implement the show method to display setup"
}
67 changes: 34 additions & 33 deletions core/src/main/scala/tamer/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ object Kafka {
val stateKeySerde = Serde[StateKey](isKey = true)
val stateConsumer = Consumer.make(cSettings)
val stateProducer = Producer.make(pSettings, stateKeySerde.serializer, setup.stateSerde)
val stateKey = StateKey(setup.stateKey.toHexString, cfg.state.groupId)
val producer = Producer.make(pSettings, setup.keySerializer, setup.valueSerializer)
val queue = Managed.make(Queue.bounded[(K, V)](cfg.bufferSize))(_.shutdown)

def printSetup(logWriter: LogWriter[Task]) = logWriter.info(s"initializing kafka loop with setup: \n${setup.show}")

def mkRegistry(src: SchemaRegistryClient, topic: String) = (ZLayer.succeed(src) >>> Registry.live) ++ (ZLayer.succeed(topic) >>> Topic.live)

def mkRecordChunk(kvs: List[(K, V)]) = Chunk.fromIterable(kvs.map { case (k, v) => new ProducerRecord(cfg.sink.topic, k, v) })
Expand All @@ -67,7 +70,6 @@ object Kafka {
log.info(s"pushed ${kvs.size} messages to ${cfg.sink.topic}")
}
}

def mkRecord(k: StateKey, v: State) = new ProducerRecord(cfg.state.topic, k, v)
def waitAssignment(sc: Consumer.Service) = sc.assignment.withFilter(_.nonEmpty).retry(tenTimes)
def subscribe(sc: Consumer.Service) = sc.subscribe(stateTopicSub) *> waitAssignment(sc).flatMap(sc.endOffsets(_)).map(_.values.exists(_ > 0L))
Expand All @@ -77,41 +79,40 @@ object Kafka {
sp: Producer.Service[Registry with Topic, StateKey, State],
layer: ULayer[Registry with Topic]
) =
ZStream.fromEffect(logTask <*> UIO(StateKey(setup.stateKey.toHexString, cfg.state.groupId))).flatMap { // TODO: no need for UIO, it's pure
case (log, stateKey) =>
ZStream
.fromEffect(subscribe(sc))
.flatMap {
case true => ZStream.fromEffect(log.info(s"consumer group ${cfg.state.groupId} resuming consumption from ${cfg.state.topic}"))
case false =>
ZStream.fromEffect {
log.info(s"consumer group ${cfg.state.groupId} never consumed from ${cfg.state.topic}, setting offset to earliest") *>
sp.produceAsync(mkRecord(stateKey, setup.defaultState))
ZStream.fromEffect(logTask).tap(printSetup).flatMap { log =>
ZStream
.fromEffect(subscribe(sc))
.flatMap {
case true => ZStream.fromEffect(log.info(s"consumer group ${cfg.state.groupId} resuming consumption from ${cfg.state.topic}"))
case false =>
ZStream.fromEffect {
log.info(s"consumer group ${cfg.state.groupId} never consumed from ${cfg.state.topic}, setting offset to earliest") *>
sp.produceAsync(mkRecord(stateKey, setup.defaultState))
.provideSomeLayer[Blocking](layer)
.flatten
.flatMap(rm => log.info(s"pushed initial state ${setup.defaultState} to $rm"))
}
}
.drain ++
sc.plainStream(stateKeySerde.deserializer, setup.stateSerde)
.provideSomeLayer[Blocking with Clock](layer)
.mapM {
case CommittableRecord(record, offset) if record.key == stateKey =>
log.debug(
s"consumer group ${cfg.state.groupId} consumed state ${record.value} from ${offset.topicPartition}@${offset.offset}"
) *>
f(record.value, q).flatMap { newState =>
sp.produceAsync(mkRecord(stateKey, newState))
.provideSomeLayer[Blocking](layer)
.flatten
.flatMap(rm => log.info(s"pushed initial state ${setup.defaultState} to $rm"))
}
.flatMap(rmd => log.debug(s"pushed state $newState to $rmd"))
.as(offset)
}
case CommittableRecord(_, offset) =>
log.debug(
s"consumer group ${cfg.state.groupId} ignored state (wrong key) from ${offset.topicPartition}@${offset.offset}"
) *> UIO(offset)
}
.drain ++
sc.plainStream(stateKeySerde.deserializer, setup.stateSerde)
.provideSomeLayer[Blocking with Clock](layer)
.mapM {
case CommittableRecord(record, offset) if record.key == stateKey =>
log.debug(
s"consumer group ${cfg.state.groupId} consumed state ${record.value} from ${offset.topicPartition}@${offset.offset}"
) *>
f(record.value, q).flatMap { newState =>
sp.produceAsync(mkRecord(stateKey, newState))
.provideSomeLayer[Blocking](layer)
.flatten
.flatMap(rmd => log.debug(s"pushed state $newState to $rmd"))
.as(offset)
}
case CommittableRecord(_, offset) =>
log.debug(
s"consumer group ${cfg.state.groupId} ignored state (wrong key) from ${offset.topicPartition}@${offset.offset}"
) *> UIO(offset)
}
}

ZStream
Expand Down
4 changes: 2 additions & 2 deletions doobie/src/main/scala/tamer/db/ConfigDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ object ConfigDb {
).parMapN(Db)
private[this] val queryConfigValue = env("QUERY_FETCH_CHUNK_SIZE").as[PosInt].map(Query)

private[this] val deleteMeConfigValue = (dbConfigValue, queryConfigValue).parMapN(DatabaseConfig.apply)
private[this] val configValue = (dbConfigValue, queryConfigValue).parMapN(DatabaseConfig.apply)

trait Service {
val dbConfig: URIO[DbConfig, Db]
val queryConfig: URIO[QueryConfig, Query]
}

val live: Layer[TamerError, Has[Db] with Has[Query]] = ZLayer.fromEffectMany {
deleteMeConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map {
configValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map {
case DatabaseConfig(db, query) => Has(db) ++ Has(query)
}
}
Expand Down
14 changes: 9 additions & 5 deletions doobie/src/main/scala/tamer/db/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ trait QueryBuilder[V, -S] {
}

trait HashableState {
// TODO: Evaluate if this is less invasive as a typeclass, the main cons
// TODO: is loss of expressivity, and since state is probably manually
// TODO: provided by the user (as opposed to automatically generated
// TODO: code) it should be easy to implement this.

/** It is required for this hash to be consistent even across executions
* for the same semantic state. This is in contrast with the built-in
Expand All @@ -44,4 +40,12 @@ case class Setup[
Serde[S]().serde,
defaultState,
queryBuilder.queryId + defaultState.stateHash
)
) {
override def show: String = s"""
|query: ${queryBuilder.query(defaultState).sql}
|query id: ${queryBuilder.queryId}
|default state: $defaultState
|default state id: ${defaultState.stateHash}
|default state key: $stateKey
|""".stripMargin.stripLeading()
}
6 changes: 0 additions & 6 deletions example/src/main/scala/tamer/example/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import zio.blocking.Blocking
import zio._
import doobie.implicits.legacy.instant._
import doobie.syntax.string._
import log.effect.zio.ZioLogWriter.log4sFromName
import tamer.db.ConfigDb.{DbConfig, QueryConfig}

import java.time.temporal.ChronoUnit._
Expand All @@ -21,8 +20,6 @@ final case class Value(id: String, name: String, description: Option[String], mo

object Main extends zio.App {
val program: ZIO[ZEnv, TamerError, Unit] = (for {
log <- log4sFromName.provide("tamer.example")
_ <- log.info("Starting tamer...")
boot <- UIO(Instant.now())
_ <- tamer.db.fetchWithTimeSegment(ts =>
sql"""SELECT id, name, description, modified_at FROM users WHERE modified_at > ${ts.from} AND modified_at <= ${ts.to}""".query[Value]
Expand Down Expand Up @@ -51,8 +48,6 @@ object MainGeneralized extends zio.App {
val queryConfigLayer: Layer[TamerError, DbConfig with QueryConfig] = ConfigDb.live
val myLayer: Layer[TamerError, DbTransactor with Kafka with QueryConfig] = transactorLayer ++ kafkaLayer ++ queryConfigLayer
val program: ZIO[Kafka with TamerDBConfig with ZEnv, TamerError, Unit] = (for {
log <- log4sFromName.provide("tamer.example")
_ <- log.info("Starting tamer...")
boot <- UIO(Instant.now())
earliest = boot.minus(60, DAYS)
setup = tamer.db.mkSetup((s: MyState) =>
Expand All @@ -67,7 +62,6 @@ object MainGeneralized extends zio.App {
mostRecent.plus(5, MINUTES).orNow.map(MyState(mostRecent, _))
}
)
_ <- log.info(s"Tamer initialized with setup $setup")
_ <- tamer.db.fetch(setup)
} yield ()).mapError(e => TamerError("Could not run tamer example", e))

Expand Down
32 changes: 0 additions & 32 deletions local/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,37 +121,5 @@ services:
- SCHEMAREGISTRY_URL=http://schema-registry:8081
- PROXY=true

# # TODO replace kafka-rest+kafka-ui and schema-registry-ui
# kowl:
# container_name: local-kowl
# image: quay.io/cloudhut/kowl:v1.2.1
# depends_on:
# - kafka
# - schema-registry
# # waits for connections to be available
# restart: on-failure
# ports:
# - 8002:8080
# hostname: kowl
# networks:
# - local_kafka_network
# entrypoint:
# - /bin/ash
# - -c
# - |
# mkdir -p /etc/kowl
# cat > /etc/kowl/config.yaml <<EOF
# # See: https://github.com/cloudhut/kowl/tree/master/docs/config for reference config files.
# kafka:
# brokers:
# - kafka:29092
# schemaRegistry:
# enabled: true
# urls: ["http://schema-registry:8081"]
# server:
# listenPort: 8080
# EOF
# ./kowl --config.filepath=/etc/kowl/config.yaml

networks:
local_kafka_network:
20 changes: 20 additions & 0 deletions local/runDb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

export DATABASE_DRIVER=org.postgresql.Driver;
export DATABASE_URL=jdbc:postgresql://localhost:5432/postgres;
export DATABASE_USERNAME=postgres;
export DATABASE_PASSWORD=example;
export QUERY_FETCH_CHUNK_SIZE=5;
export KAFKA_BROKERS=localhost:9092;
export KAFKA_SCHEMA_REGISTRY_URL=http://localhost:8081;
export KAFKA_CLOSE_TIMEOUT=10s;
export KAFKA_BUFFER_SIZE=50;
export KAFKA_SINK_TOPIC=sink;
export KAFKA_STATE_TOPIC=state;
export KAFKA_STATE_GROUP_ID=state-group;
export KAFKA_STATE_CLIENT_ID=state-client

SCRIPT_PATH=$(cd "$(dirname "${BASH_SOURCE[0]}")" || exit; pwd -P)
cd "$SCRIPT_PATH"/.. || exit

sbt "example/runMain tamer.example.Main" -jvm-debug 5005

0 comments on commit 6e6026c

Please sign in to comment.