Skip to content
This repository has been archived by the owner on Jul 29, 2024. It is now read-only.

Use fs2 flow interop #254

Merged
merged 3 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 82 additions & 27 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ jobs:
matrix:
os: [ubuntu-latest]
scala: [3, 2.13]
java: [temurin@8]
java: [temurin@11, temurin@17]
exclude:
- scala: 2.13
java: temurin@17
runs-on: ${{ matrix.os }}
timeout-minutes: 60
steps:
Expand All @@ -38,35 +41,48 @@ jobs:
with:
fetch-depth: 0

- name: Setup Java (temurin@8)
id: setup-java-temurin-8
if: matrix.java == 'temurin@8'
- name: Setup Java (temurin@11)
id: setup-java-temurin-11
if: matrix.java == 'temurin@11'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
java-version: 11
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false'
if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false'
run: sbt +update

- name: Setup Java (temurin@17)
id: setup-java-temurin-17
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 17
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false'
run: sbt +update

- name: Check that workflows are up to date
run: sbt githubWorkflowCheck

- name: Check headers and formatting
if: matrix.java == 'temurin@8' && matrix.os == 'ubuntu-latest'
if: matrix.java == 'temurin@11' && matrix.os == 'ubuntu-latest'
run: sbt '++ ${{ matrix.scala }}' headerCheckAll scalafmtCheckAll 'project /' scalafmtSbtCheck

- name: Test
run: sbt '++ ${{ matrix.scala }}' test

- name: Check binary compatibility
if: matrix.java == 'temurin@8' && matrix.os == 'ubuntu-latest'
if: matrix.java == 'temurin@11' && matrix.os == 'ubuntu-latest'
run: sbt '++ ${{ matrix.scala }}' mimaReportBinaryIssues

- name: Generate API documentation
if: matrix.java == 'temurin@8' && matrix.os == 'ubuntu-latest'
if: matrix.java == 'temurin@11' && matrix.os == 'ubuntu-latest'
run: sbt '++ ${{ matrix.scala }}' doc

- name: Make target directories
Expand All @@ -91,25 +107,38 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
java: [temurin@8]
java: [temurin@11]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Setup Java (temurin@8)
id: setup-java-temurin-8
if: matrix.java == 'temurin@8'
- name: Setup Java (temurin@11)
id: setup-java-temurin-11
if: matrix.java == 'temurin@11'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 11
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false'
run: sbt +update

- name: Setup Java (temurin@17)
id: setup-java-temurin-17
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
java-version: 17
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false'
if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false'
run: sbt +update

- name: Download target directories (3)
Expand Down Expand Up @@ -163,25 +192,38 @@ jobs:
matrix:
os: [ubuntu-latest]
scala: [3.3.1]
java: [temurin@8]
java: [temurin@11]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Setup Java (temurin@8)
id: setup-java-temurin-8
if: matrix.java == 'temurin@8'
- name: Setup Java (temurin@11)
id: setup-java-temurin-11
if: matrix.java == 'temurin@11'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
java-version: 11
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false'
if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false'
run: sbt +update

- name: Setup Java (temurin@17)
id: setup-java-temurin-17
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 17
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false'
run: sbt +update

- name: Submit Dependencies
Expand All @@ -195,25 +237,38 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
java: [temurin@8]
java: [temurin@11]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Setup Java (temurin@8)
id: setup-java-temurin-8
if: matrix.java == 'temurin@8'
- name: Setup Java (temurin@11)
id: setup-java-temurin-11
if: matrix.java == 'temurin@11'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 11
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false'
run: sbt +update

- name: Setup Java (temurin@17)
id: setup-java-temurin-17
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
java-version: 17
cache: sbt

- name: sbt update
if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false'
if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false'
run: sbt +update

- name: Generate site
Expand Down
21 changes: 11 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ThisBuild / tlSonatypeUseLegacyHost := true //https://oss.sonatype.org/ currentl
val scala3 = "3.3.1"
ThisBuild / crossScalaVersions := Seq(scala3, "2.13.11")
ThisBuild / scalaVersion := scala3
ThisBuild / githubWorkflowJavaVersions := List(JavaSpec.temurin("11"), JavaSpec.temurin("17"))

lazy val `cosmos4s` = project
.in(file("."))
Expand All @@ -65,16 +66,16 @@ lazy val core = project
// General Settings
lazy val commonSettings = Seq(
libraryDependencies ++= Seq(
"com.azure" % "azure-cosmos" % azureCosmosV,
"org.typelevel" %% "cats-core" % catsV,
"org.typelevel" %% "cats-effect" % catsEffectV,
"co.fs2" %% "fs2-reactive-streams" % fs2V,
"io.circe" %% "circe-core" % circeV,
"io.circe" %% "circe-parser" % circeV,
"io.circe" %% "circe-jackson210" % circeJackson210V,
"io.netty" % "netty-codec-http2" % nettyV % Runtime, // GHSA-xpw8-rcwv-8f8p
"org.scalameta" %% "munit" % munitV % Test,
"org.typelevel" %% "munit-cats-effect-3" % munitCatsEffectV % Test
"com.azure" % "azure-cosmos" % azureCosmosV,
"org.typelevel" %% "cats-core" % catsV,
"org.typelevel" %% "cats-effect" % catsEffectV,
"co.fs2" %% "fs2-core" % fs2V,
"io.circe" %% "circe-core" % circeV,
"io.circe" %% "circe-parser" % circeV,
"io.circe" %% "circe-jackson210" % circeJackson210V,
"io.netty" % "netty-codec-http2" % nettyV % Runtime, // GHSA-xpw8-rcwv-8f8p
"org.scalameta" %% "munit" % munitV % Test,
"org.typelevel" %% "munit-cats-effect-3" % munitCatsEffectV % Test
) ++
// format: off
(if (scalaVersion.value.startsWith("2"))
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/com/banno/cosmos4s/ReactorCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import cats.effect._
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2.Stream
import fs2.interop.reactivestreams._
import fs2.interop.flow._
import reactor.core.publisher._
import reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher

object ReactorCore {
def monoToEffectOpt[F[_]: Async, A](m: F[Mono[A]]): F[Option[A]] =
Stream
.eval(m)
.flatMap(fromPublisher(_, 1))
.flatMap(m => fromPublisher(publisherToFlowPublisher(m), 1))
.compile
.last
.guarantee(Spawn[F].cede)
Expand All @@ -42,7 +43,7 @@ object ReactorCore {
def fluxToStream[F[_]: Async, A](m: F[Flux[A]]): fs2.Stream[F, A] =
Stream
.eval(m)
.flatMap(fromPublisher(_, 1))
.flatMap(f => fromPublisher(publisherToFlowPublisher(f), 1))
.chunks
.flatMap(chunk => Stream.eval(Spawn[F].cede).flatMap(_ => Stream.chunk(chunk).covary[F]))
}