From 89f5108c6e2b343b6af161a8f59791009902ff64 Mon Sep 17 00:00:00 2001 From: Taewan Park Date: Sat, 26 Oct 2024 19:49:21 +0900 Subject: [PATCH 1/5] Update ktor library to 3.0 and update SSE method to native --- .../gptmobile/data/network/AnthropicAPIImpl.kt | 18 ++++++++++++++++++ .../gptmobile/data/network/NetworkClient.kt | 3 +++ gradle/libs.versions.toml | 2 +- 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt b/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt index a0e1488..ad7c7c4 100644 --- a/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt +++ b/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt @@ -6,6 +6,7 @@ import dev.chungjungsoo.gptmobile.data.dto.anthropic.response.ErrorDetail import dev.chungjungsoo.gptmobile.data.dto.anthropic.response.ErrorResponseChunk import dev.chungjungsoo.gptmobile.data.dto.anthropic.response.MessageResponseChunk import io.ktor.client.call.body +import io.ktor.client.plugins.sse.sse import io.ktor.client.request.HttpRequestBuilder import io.ktor.client.request.accept import io.ktor.client.request.headers @@ -25,6 +26,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.flow import kotlinx.coroutines.isActive +import kotlinx.coroutines.runBlocking import kotlinx.serialization.json.Json import kotlinx.serialization.json.encodeToJsonElement @@ -58,6 +60,22 @@ class AnthropicAPIImpl @Inject constructor( } } + runBlocking { + networkClient().sse( + host = apiUrl, + path = if (apiUrl.endsWith("/")) "v1/messages" else "/v1/messages" + ) { + incoming.collect { event -> + val line = event.data + val value = when { + line?.startsWith(STREAM_END_TOKEN) == true -> break + line?.startsWith(STREAM_PREFIX) == true -> Json.decodeFromString(line.removePrefix(STREAM_PREFIX)) + else -> continue + } + } + } + } + return flow { try { HttpStatement(builder = builder, client = networkClient()).execute { diff --git a/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/NetworkClient.kt b/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/NetworkClient.kt index f51fd5d..4819e9f 100644 --- a/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/NetworkClient.kt +++ b/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/NetworkClient.kt @@ -9,6 +9,7 @@ import io.ktor.client.plugins.logging.DEFAULT import io.ktor.client.plugins.logging.LogLevel import io.ktor.client.plugins.logging.Logger import io.ktor.client.plugins.logging.Logging +import io.ktor.client.plugins.sse.SSE import io.ktor.client.request.header import io.ktor.http.ContentType import io.ktor.http.HttpHeaders @@ -37,6 +38,8 @@ class NetworkClient @Inject constructor( ) } + install(SSE) + install(HttpTimeout) { requestTimeoutMillis = TIMEOUT.toLong() } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index dc9bfef..e373b51 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -13,7 +13,7 @@ datastore = "1.1.1" gemini = "0.9.0" hilt = "2.52" ksp = "2.0.20-1.0.25" # Also change with Kotlin version -ktor = "2.3.12" +ktor = "3.0.0" androidxHilt = "1.2.0" navigation = "2.8.3" markdown = "0.5.4" From 0dffff5e6422bf359eb02c8839603f0cd356ba81 Mon Sep 17 00:00:00 2001 From: Taewan Park Date: Sat, 23 Nov 2024 19:52:24 +0900 Subject: [PATCH 2/5] Replace SSE function to library native function --- .../data/network/AnthropicAPIImpl.kt | 53 ++++++------------- 1 file changed, 16 insertions(+), 37 deletions(-) diff --git a/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt b/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt index ad7c7c4..adb1235 100644 --- a/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt +++ b/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt @@ -7,16 +7,12 @@ import dev.chungjungsoo.gptmobile.data.dto.anthropic.response.ErrorResponseChunk import dev.chungjungsoo.gptmobile.data.dto.anthropic.response.MessageResponseChunk import io.ktor.client.call.body import io.ktor.client.plugins.sse.sse -import io.ktor.client.request.HttpRequestBuilder import io.ktor.client.request.accept import io.ktor.client.request.headers import io.ktor.client.request.setBody -import io.ktor.client.request.url import io.ktor.client.statement.HttpResponse -import io.ktor.client.statement.HttpStatement import io.ktor.http.ContentType import io.ktor.http.HttpMethod -import io.ktor.http.contentType import io.ktor.utils.io.ByteReadChannel import io.ktor.utils.io.cancel import io.ktor.utils.io.readUTF8Line @@ -26,7 +22,6 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.flow import kotlinx.coroutines.isActive -import kotlinx.coroutines.runBlocking import kotlinx.serialization.json.Json import kotlinx.serialization.json.encodeToJsonElement @@ -48,39 +43,23 @@ class AnthropicAPIImpl @Inject constructor( override fun streamChatMessage(messageRequest: MessageRequest): Flow { val body = Json.encodeToJsonElement(messageRequest) - val builder = HttpRequestBuilder().apply { - method = HttpMethod.Post - if (apiUrl.endsWith("/")) url("${apiUrl}v1/messages") else url("$apiUrl/v1/messages") - contentType(ContentType.Application.Json) - setBody(body) - accept(ContentType.Text.EventStream) - headers { - append(API_KEY_HEADER, token ?: "") - append(VERSION_HEADER, ANTHROPIC_VERSION) - } - } - - runBlocking { - networkClient().sse( - host = apiUrl, - path = if (apiUrl.endsWith("/")) "v1/messages" else "/v1/messages" - ) { - incoming.collect { event -> - val line = event.data - val value = when { - line?.startsWith(STREAM_END_TOKEN) == true -> break - line?.startsWith(STREAM_PREFIX) == true -> Json.decodeFromString(line.removePrefix(STREAM_PREFIX)) - else -> continue - } - } - } - } - - return flow { + return flow { try { - HttpStatement(builder = builder, client = networkClient()).execute { - streamEventsFrom(it) - } + networkClient() + .sse( + urlString = if (apiUrl.endsWith("/")) "${apiUrl}v1/messages" else "$apiUrl/v1/messages", + request = { + method = HttpMethod.Post + setBody(body) + accept(ContentType.Text.EventStream) + headers { + append(API_KEY_HEADER, token ?: "") + append(VERSION_HEADER, ANTHROPIC_VERSION) + } + } + ) { + incoming.collect { event -> event.data?.let { line -> emit(Json.decodeFromString(line)) } } + } } catch (e: Exception) { emit(ErrorResponseChunk(error = ErrorDetail(type = "network_error", message = e.message ?: ""))) } From dea38f21a6c6806666654cd023958d6345e55b92 Mon Sep 17 00:00:00 2001 From: Taewan Park Date: Sat, 23 Nov 2024 19:54:25 +0900 Subject: [PATCH 3/5] Remove unused function --- .../data/network/AnthropicAPIImpl.kt | 63 +++++-------------- 1 file changed, 16 insertions(+), 47 deletions(-) diff --git a/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt b/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt index adb1235..49fb56f 100644 --- a/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt +++ b/app/src/main/kotlin/dev/chungjungsoo/gptmobile/data/network/AnthropicAPIImpl.kt @@ -5,23 +5,15 @@ import dev.chungjungsoo.gptmobile.data.dto.anthropic.request.MessageRequest import dev.chungjungsoo.gptmobile.data.dto.anthropic.response.ErrorDetail import dev.chungjungsoo.gptmobile.data.dto.anthropic.response.ErrorResponseChunk import dev.chungjungsoo.gptmobile.data.dto.anthropic.response.MessageResponseChunk -import io.ktor.client.call.body import io.ktor.client.plugins.sse.sse import io.ktor.client.request.accept import io.ktor.client.request.headers import io.ktor.client.request.setBody -import io.ktor.client.statement.HttpResponse import io.ktor.http.ContentType import io.ktor.http.HttpMethod -import io.ktor.utils.io.ByteReadChannel -import io.ktor.utils.io.cancel -import io.ktor.utils.io.readUTF8Line import javax.inject.Inject -import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.isActive import kotlinx.serialization.json.Json import kotlinx.serialization.json.encodeToJsonElement @@ -40,52 +32,29 @@ class AnthropicAPIImpl @Inject constructor( this.apiUrl = url } - override fun streamChatMessage(messageRequest: MessageRequest): Flow { - val body = Json.encodeToJsonElement(messageRequest) - - return flow { - try { - networkClient() - .sse( - urlString = if (apiUrl.endsWith("/")) "${apiUrl}v1/messages" else "$apiUrl/v1/messages", - request = { - method = HttpMethod.Post - setBody(body) - accept(ContentType.Text.EventStream) - headers { - append(API_KEY_HEADER, token ?: "") - append(VERSION_HEADER, ANTHROPIC_VERSION) - } + override fun streamChatMessage(messageRequest: MessageRequest): Flow = flow { + try { + networkClient() + .sse( + urlString = if (apiUrl.endsWith("/")) "${apiUrl}v1/messages" else "$apiUrl/v1/messages", + request = { + method = HttpMethod.Post + setBody(Json.encodeToJsonElement(messageRequest)) + accept(ContentType.Text.EventStream) + headers { + append(API_KEY_HEADER, token ?: "") + append(VERSION_HEADER, ANTHROPIC_VERSION) } - ) { - incoming.collect { event -> event.data?.let { line -> emit(Json.decodeFromString(line)) } } } - } catch (e: Exception) { - emit(ErrorResponseChunk(error = ErrorDetail(type = "network_error", message = e.message ?: ""))) - } - } - } - - private suspend inline fun FlowCollector.streamEventsFrom(response: HttpResponse) { - val channel: ByteReadChannel = response.body() - try { - while (currentCoroutineContext().isActive && !channel.isClosedForRead) { - val line = channel.readUTF8Line() ?: continue - val value: T = when { - line.startsWith(STREAM_END_TOKEN) -> break - line.startsWith(STREAM_PREFIX) -> Json.decodeFromString(line.removePrefix(STREAM_PREFIX)) - else -> continue + ) { + incoming.collect { event -> event.data?.let { line -> emit(Json.decodeFromString(line)) } } } - emit(value) - } - } finally { - channel.cancel() + } catch (e: Exception) { + emit(ErrorResponseChunk(error = ErrorDetail(type = "network_error", message = e.message ?: ""))) } } companion object { - private const val STREAM_PREFIX = "data:" - private const val STREAM_END_TOKEN = "event: message_stop" private const val API_KEY_HEADER = "x-api-key" private const val VERSION_HEADER = "anthropic-version" private const val ANTHROPIC_VERSION = "2023-06-01" From 3be0660a940acd2c76b407af737ee6fb6066b169 Mon Sep 17 00:00:00 2001 From: Taewan Park Date: Sat, 23 Nov 2024 20:04:14 +0900 Subject: [PATCH 4/5] Update Ktor to latest version --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 02586bd..d2498bf 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -13,7 +13,7 @@ datastore = "1.1.1" gemini = "0.9.0" hilt = "2.52" ksp = "2.0.20-1.0.25" # Also change with Kotlin version -ktor = "3.0.0" +ktor = "3.0.1" androidxHilt = "1.2.0" navigation = "2.8.4" markdown = "0.5.4" From 70b806f639dbd1c0844c1636850e4497a799ba6e Mon Sep 17 00:00:00 2001 From: Taewan Park Date: Sat, 23 Nov 2024 20:11:41 +0900 Subject: [PATCH 5/5] Update OpenAI library version --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d2498bf..e344396 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -17,7 +17,7 @@ ktor = "3.0.1" androidxHilt = "1.2.0" navigation = "2.8.4" markdown = "0.5.4" -openai = "3.8.2" +openai = "4.0.0-beta01" serialization = "1.7.3" # Should not update due to kotlin version splashscreen = "1.0.1" room = "2.6.1"