Skip to content

Commit

Permalink
final final draft reviewed
Browse files Browse the repository at this point in the history
  • Loading branch information
gurghet committed Jan 3, 2021
1 parent 35b2fbe commit ba7ec60
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 25 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ Add Tamer as a dependency to your project:
libraryDependencies += "io.laserdisc" %% "tamer" % version
```

See [here](example/src/main/scala/tamer/example/Main.scala) for a sample application that makes use of Tamer.
See [here](example/src/main/scala/tamer/example/DatabaseSimple.scala) for a sample application that makes use of Tamer.

## End to end testing

### Database module

Basic manual testing is available for the code in the example module `tamer.example.Main`.
Basic manual testing is available for the code in the example module `tamer.example.DatabaseSimple`
(and/or `tamer.example.DatabaseGeneralized`).
This code covers getting data from a synthetic Postgres database.

Make sure you have docker installed before proceeding.

From the `local` folder launch `docker-compose up` (you can enter `docker-compose down`
if you want to start from scratch). After that you should be able to access the kafka
gui from http://localhost:8000.
gui from [http://localhost:8000](http://localhost:8000).

Start the `runDb.sh` program which contains some example environment variables.
If tamer works you should see messages appearing in the kafka gui.
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/scala/tamer/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@ import eu.timepit.refined.api.Refined
import eu.timepit.refined.boolean.{And, Or}
import eu.timepit.refined.collection.{Forall, NonEmpty}
import eu.timepit.refined.string.{IPv4, Uri, Url}
import zio.{Has, URIO, ZIO}
import zio.Has

package object config {
type HostList = List[String] Refined (NonEmpty And Forall[IPv4 Or Uri])
type Password = String
type UriString = String Refined Uri
type UrlString = String Refined Url

type KafkaConfig = Has[Config.Kafka]

val kafkaConfig: URIO[KafkaConfig, Config.Kafka] = ZIO.access(_.get)
}
6 changes: 3 additions & 3 deletions core/src/main/scala/tamer/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import zio.blocking.Blocking
import zio.clock.Clock
import zio.duration._
import zio.kafka.consumer.Consumer.{AutoOffsetStrategy, OffsetRetrieval}
import zio.kafka.consumer.{CommittableRecord, Consumer, ConsumerSettings, Subscription}
import zio.kafka.consumer.{CommittableRecord, Consumer, ConsumerSettings, Offset, Subscription}
import zio.kafka.producer.{Producer, ProducerSettings}
import zio.stream.ZStream

final case class StateKey(stateKey: String, groupId: String)

object Kafka {
sealed trait Service {
trait Service {
def runLoop[K, V, State, R](setup: Setup[K, V, State])(
f: (State, Queue[(K, V)]) => ZIO[R, TamerError, State]
): ZIO[R with Blocking with Clock, TamerError, Unit]
Expand Down Expand Up @@ -78,7 +78,7 @@ object Kafka {
q: Queue[(K, V)],
sp: Producer.Service[Registry with Topic, StateKey, State],
layer: ULayer[Registry with Topic]
) =
): ZStream[R with ZEnv, Throwable, Offset] =
ZStream.fromEffect(logTask).tap(printSetup).flatMap { log =>
ZStream
.fromEffect(subscribe(sc))
Expand Down
16 changes: 7 additions & 9 deletions doobie/src/main/scala/tamer/db/ConfigDb.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package tamer.db
package tamer
package db

import ciris.{ConfigException, env}
import ciris.refined.refTypeConfigDecoder
import cats.implicits._
import eu.timepit.refined.api.Refined
import eu.timepit.refined.string.Uri
import eu.timepit.refined.types.numeric.PosInt
import eu.timepit.refined.types.string.NonEmptyString
import tamer.TamerError
import tamer.config.{Password, UriString}
import zio.interop.catz.{taskConcurrentInstance, zioContextShift}
import zio.{Has, Layer, Task, URIO, ZIO, ZLayer}

object ConfigDb {
type Password = String
type UriString = String Refined Uri
type DbConfig = Has[Db]
type QueryConfig = Has[Query]

Expand All @@ -31,12 +34,7 @@ object ConfigDb {

private[this] val configValue = (dbConfigValue, queryConfigValue).parMapN(DatabaseConfig.apply)

trait Service {
val dbConfig: URIO[DbConfig, Db]
val queryConfig: URIO[QueryConfig, Query]
}

val live: Layer[TamerError, Has[Db] with Has[Query]] = ZLayer.fromEffectMany {
val live: Layer[TamerError, DbConfig with QueryConfig] = ZLayer.fromEffectMany {
configValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map {
case DatabaseConfig(db, query) => Has(db) ++ Has(query)
}
Expand Down
8 changes: 4 additions & 4 deletions doobie/src/main/scala/tamer/db/Setup.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package tamer.db
package tamer
package db

import com.sksamuel.avro4s.{Decoder, Encoder, SchemaFor}
import doobie.Query0
import tamer.Serde
import zio.UIO

import java.time.{Duration, Instant}
Expand All @@ -27,7 +27,7 @@ trait HashableState {
final case class ResultMetadata(queryExecutionTime: Long)
final case class QueryResult[V](metadata: ResultMetadata, results: List[V])

case class Setup[
final case class Setup[
K <: Product: Encoder: Decoder: SchemaFor,
V <: Product: Encoder: Decoder: SchemaFor,
S <: Product with HashableState: Encoder: Decoder: SchemaFor
Expand All @@ -36,7 +36,7 @@ case class Setup[
override val defaultState: S,
keyExtract: V => K,
stateFoldM: S => QueryResult[V] => UIO[S]
) extends tamer.Setup[K, V, S](
) extends _root_.tamer.Setup[K, V, S](
Serde[K](isKey = true).serializer,
Serde[V]().serializer,
Serde[S]().serde,
Expand Down
2 changes: 1 addition & 1 deletion local/runDb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ export KAFKA_STATE_CLIENT_ID=state-client
SCRIPT_PATH=$(cd "$(dirname "${BASH_SOURCE[0]}")" || exit; pwd -P)
cd "$SCRIPT_PATH"/.. || exit

sbt "example/runMain tamer.example.Main" -jvm-debug 5005
sbt "example/runMain tamer.example.DatabaseSimple" -jvm-debug 5005

0 comments on commit ba7ec60

Please sign in to comment.