Skip to content
This repository has been archived by the owner on Jul 29, 2024. It is now read-only.

Commit

Permalink
Use fs2 flow interop (#254)
Browse files Browse the repository at this point in the history
* Use fs2 flow interop

* Bump JVM version

* Regenerate github workflow
  • Loading branch information
peterneyens authored Oct 31, 2023
1 parent 8bd547e commit 5af0152
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 40 deletions.
109 changes: 82 additions & 27 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ jobs:
matrix:
os: [ubuntu-latest]
scala: [3, 2.13]
java: [temurin@8]
java: [temurin@11, temurin@17]
exclude:
- scala: 2.13
java: temurin@17
runs-on: ${{ matrix.os }}
timeout-minutes: 60
steps:
Expand All @@ -38,35 +41,48 @@ jobs:
with:
fetch-depth: 0

- name: Setup Java (temurin@8)
id: setup-java-temurin-8
if: matrix.java == 'temurin@8'
- name: Setup Java (temurin@11)
id: setup-java-temurin-11
if: matrix.java == 'temurin@11'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
java-version: 11
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false'
if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false'
run: sbt +update

- name: Setup Java (temurin@17)
id: setup-java-temurin-17
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 17
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false'
run: sbt +update

- name: Check that workflows are up to date
run: sbt githubWorkflowCheck

- name: Check headers and formatting
if: matrix.java == 'temurin@8' && matrix.os == 'ubuntu-latest'
if: matrix.java == 'temurin@11' && matrix.os == 'ubuntu-latest'
run: sbt '++ ${{ matrix.scala }}' headerCheckAll scalafmtCheckAll 'project /' scalafmtSbtCheck

- name: Test
run: sbt '++ ${{ matrix.scala }}' test

- name: Check binary compatibility
if: matrix.java == 'temurin@8' && matrix.os == 'ubuntu-latest'
if: matrix.java == 'temurin@11' && matrix.os == 'ubuntu-latest'
run: sbt '++ ${{ matrix.scala }}' mimaReportBinaryIssues

- name: Generate API documentation
if: matrix.java == 'temurin@8' && matrix.os == 'ubuntu-latest'
if: matrix.java == 'temurin@11' && matrix.os == 'ubuntu-latest'
run: sbt '++ ${{ matrix.scala }}' doc

- name: Make target directories
Expand All @@ -91,25 +107,38 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
java: [temurin@8]
java: [temurin@11]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Setup Java (temurin@8)
id: setup-java-temurin-8
if: matrix.java == 'temurin@8'
- name: Setup Java (temurin@11)
id: setup-java-temurin-11
if: matrix.java == 'temurin@11'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 11
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false'
run: sbt +update

- name: Setup Java (temurin@17)
id: setup-java-temurin-17
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
java-version: 17
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false'
if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false'
run: sbt +update

- name: Download target directories (3)
Expand Down Expand Up @@ -163,25 +192,38 @@ jobs:
matrix:
os: [ubuntu-latest]
scala: [3.3.1]
java: [temurin@8]
java: [temurin@11]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Setup Java (temurin@8)
id: setup-java-temurin-8
if: matrix.java == 'temurin@8'
- name: Setup Java (temurin@11)
id: setup-java-temurin-11
if: matrix.java == 'temurin@11'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
java-version: 11
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false'
if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false'
run: sbt +update

- name: Setup Java (temurin@17)
id: setup-java-temurin-17
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 17
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false'
run: sbt +update

- name: Submit Dependencies
Expand All @@ -195,25 +237,38 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
java: [temurin@8]
java: [temurin@11]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Setup Java (temurin@8)
id: setup-java-temurin-8
if: matrix.java == 'temurin@8'
- name: Setup Java (temurin@11)
id: setup-java-temurin-11
if: matrix.java == 'temurin@11'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 11
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false'
run: sbt +update

- name: Setup Java (temurin@17)
id: setup-java-temurin-17
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
java-version: 17
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false'
if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false'
run: sbt +update

- name: Generate site
Expand Down
21 changes: 11 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ThisBuild / tlSonatypeUseLegacyHost := true //https://oss.sonatype.org/ currentl
val scala3 = "3.3.1"
ThisBuild / crossScalaVersions := Seq(scala3, "2.13.11")
ThisBuild / scalaVersion := scala3
ThisBuild / githubWorkflowJavaVersions := List(JavaSpec.temurin("11"), JavaSpec.temurin("17"))

lazy val `cosmos4s` = project
.in(file("."))
Expand All @@ -65,16 +66,16 @@ lazy val core = project
// General Settings
lazy val commonSettings = Seq(
libraryDependencies ++= Seq(
"com.azure" % "azure-cosmos" % azureCosmosV,
"org.typelevel" %% "cats-core" % catsV,
"org.typelevel" %% "cats-effect" % catsEffectV,
"co.fs2" %% "fs2-reactive-streams" % fs2V,
"io.circe" %% "circe-core" % circeV,
"io.circe" %% "circe-parser" % circeV,
"io.circe" %% "circe-jackson210" % circeJackson210V,
"io.netty" % "netty-codec-http2" % nettyV % Runtime, // GHSA-xpw8-rcwv-8f8p
"org.scalameta" %% "munit" % munitV % Test,
"org.typelevel" %% "munit-cats-effect-3" % munitCatsEffectV % Test
"com.azure" % "azure-cosmos" % azureCosmosV,
"org.typelevel" %% "cats-core" % catsV,
"org.typelevel" %% "cats-effect" % catsEffectV,
"co.fs2" %% "fs2-core" % fs2V,
"io.circe" %% "circe-core" % circeV,
"io.circe" %% "circe-parser" % circeV,
"io.circe" %% "circe-jackson210" % circeJackson210V,
"io.netty" % "netty-codec-http2" % nettyV % Runtime, // GHSA-xpw8-rcwv-8f8p
"org.scalameta" %% "munit" % munitV % Test,
"org.typelevel" %% "munit-cats-effect-3" % munitCatsEffectV % Test
) ++
// format: off
(if (scalaVersion.value.startsWith("2"))
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/com/banno/cosmos4s/ReactorCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import cats.effect._
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2.Stream
import fs2.interop.reactivestreams._
import fs2.interop.flow._
import reactor.core.publisher._
import reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher

object ReactorCore {
def monoToEffectOpt[F[_]: Async, A](m: F[Mono[A]]): F[Option[A]] =
Stream
.eval(m)
.flatMap(fromPublisher(_, 1))
.flatMap(m => fromPublisher(publisherToFlowPublisher(m), 1))
.compile
.last
.guarantee(Spawn[F].cede)
Expand All @@ -42,7 +43,7 @@ object ReactorCore {
def fluxToStream[F[_]: Async, A](m: F[Flux[A]]): fs2.Stream[F, A] =
Stream
.eval(m)
.flatMap(fromPublisher(_, 1))
.flatMap(f => fromPublisher(publisherToFlowPublisher(f), 1))
.chunks
.flatMap(chunk => Stream.eval(Spawn[F].cede).flatMap(_ => Stream.chunk(chunk).covary[F]))
}

0 comments on commit 5af0152

Please sign in to comment.