diff --git a/.env.example b/.env.example index 4f452c8..c1c4ce7 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,4 @@ QSTASH_TOKEN="YOUR_TOKEN" QSTASH_CURRENT_SIGNING_KEY="" -QSTASH_NEXT_SIGNING_KEY="" \ No newline at end of file +QSTASH_NEXT_SIGNING_KEY="" +OPENAI_API_KEY = "" diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index da63b65..4afc629 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -47,4 +47,5 @@ jobs: export QSTASH_TOKEN="${{ secrets.QSTASH_TOKEN }}" export QSTASH_CURRENT_SIGNING_KEY="${{ secrets.QSTASH_CURRENT_SIGNING_KEY }}" export QSTASH_NEXT_SIGNING_KEY="${{ secrets.QSTASH_NEXT_SIGNING_KEY }}" - poetry run pytest \ No newline at end of file + export OPENAI_API_KEY="${{ secrets.OPENAI_API_KEY }}" + poetry run pytest diff --git a/README.md b/README.md index 274b6ea..9003bf2 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ > [!NOTE] > **This project is in GA Stage.** > -> The Upstash Professional Support fully covers this project. It receives regular updates, and bug fixes. +> The Upstash Professional Support fully covers this project. It receives regular updates, and bug fixes. > The Upstash team is committed to maintaining and improving its functionality. **QStash** is an HTTP based messaging and scheduling solution for serverless and edge runtimes. @@ -73,11 +73,13 @@ receiver.verify( ```python from upstash_qstash import QStash +from upstash_qstash.chat import upstash qstash = QStash("") res = qstash.chat.create( model="meta-llama/Meta-Llama-3-8B-Instruct", + provider=upstash(), messages=[ { "role": "user", @@ -89,6 +91,78 @@ res = qstash.chat.create( print(res.choices[0].message.content) ``` +#### Create Chat Completions Using Custom Providers + +```python +from upstash_qstash import QStash +from upstash_qstash.chat import openai + +qstash = QStash("") + +res = qstash.chat.create( + model="gpt-3.5-turbo", + provider=openai(""), + messages=[ + { + "role": "user", + "content": "What is the capital of Turkey?", + } + ], +) + +print(res.choices[0].message.content) +``` + +#### Publish a JSON message to LLM + +```python +from upstash_qstash import QStash +from upstash_qstash.chat import upstash + +qstash = QStash("") + +res = qstash.message.publish_json( + api={"name": "llm", "provider": upstash()}, + body={ + "model": "meta-llama/Meta-Llama-3-8B-Instruct", + "messages": [ + { + "role": "user", + "content": "What is the capital of Turkey?", + } + ], + }, + callback="https://example-cb.com", +) + +print(res.message_id) +``` + +#### Publish a JSON message to LLM Using Custom Providers + +```python +from upstash_qstash import QStash +from upstash_qstash.chat import openai + +qstash = QStash("") + +res = qstash.message.publish_json( + api={"name": "llm", "provider": openai("")}, + body={ + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "user", + "content": "What is the capital of Turkey?", + } + ], + }, + callback="https://example-cb.com", +) + +print(res.message_id) +``` + #### Additional configuration ```python diff --git a/examples/llm.py b/examples/llm.py index 9f94ea8..4615635 100644 --- a/examples/llm.py +++ b/examples/llm.py @@ -6,6 +6,7 @@ """ from upstash_qstash import QStash +from upstash_qstash.chat import upstash def main(): @@ -14,7 +15,7 @@ def main(): ) qstash.message.publish_json( - api="llm", + api={"name": "llm", "provider": upstash()}, body={ "model": "meta-llama/Meta-Llama-3-8B-Instruct", "messages": [ diff --git a/tests/__init__.py b/tests/__init__.py index 819c7a2..2382952 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -20,6 +20,11 @@ dotenv.dotenv_values().get("QSTASH_NEXT_SIGNING_KEY"), ) +OPENAI_API_KEY = os.environ.get( + "OPENAI_API_KEY", + dotenv.dotenv_values().get("OPENAI_API_KEY"), +) + def assert_eventually( assertion: Callable[[], None], diff --git a/tests/asyncio/test_chat.py b/tests/asyncio/test_chat.py index cf3d0ae..56e51c1 100644 --- a/tests/asyncio/test_chat.py +++ b/tests/asyncio/test_chat.py @@ -1,14 +1,16 @@ import pytest + +from tests import OPENAI_API_KEY from upstash_qstash import AsyncQStash from upstash_qstash.asyncio.chat import AsyncChatCompletionChunkStream -from upstash_qstash.chat import ChatCompletion +from upstash_qstash.chat import ChatCompletion, upstash, openai @pytest.mark.asyncio async def test_chat_async(async_qstash: AsyncQStash) -> None: res = await async_qstash.chat.create( model="meta-llama/Meta-Llama-3-8B-Instruct", - messages=[{"role": "user", "content": "hello"}], + messages=[{"role": "user", "content": "just say hello"}], ) assert isinstance(res, ChatCompletion) @@ -21,7 +23,7 @@ async def test_chat_async(async_qstash: AsyncQStash) -> None: async def test_chat_streaming_async(async_qstash: AsyncQStash) -> None: res = await async_qstash.chat.create( model="meta-llama/Meta-Llama-3-8B-Instruct", - messages=[{"role": "user", "content": "hello"}], + messages=[{"role": "user", "content": "just say hello"}], stream=True, ) @@ -41,7 +43,7 @@ async def test_chat_streaming_async(async_qstash: AsyncQStash) -> None: async def test_prompt_async(async_qstash: AsyncQStash) -> None: res = await async_qstash.chat.prompt( model="meta-llama/Meta-Llama-3-8B-Instruct", - user="hello", + user="just say hello", ) assert isinstance(res, ChatCompletion) @@ -54,7 +56,44 @@ async def test_prompt_async(async_qstash: AsyncQStash) -> None: async def test_prompt_streaming_async(async_qstash: AsyncQStash) -> None: res = await async_qstash.chat.prompt( model="meta-llama/Meta-Llama-3-8B-Instruct", - user="hello", + user="just say hello", + stream=True, + ) + + assert isinstance(res, AsyncChatCompletionChunkStream) + + i = 0 + async for r in res: + if i == 0: + assert r.choices[0].delta.role is not None + else: + assert r.choices[0].delta.content is not None + + i += 1 + + +@pytest.mark.asyncio +async def test_chat_explicit_upstash_provider_async(async_qstash: AsyncQStash) -> None: + res = await async_qstash.chat.create( + model="meta-llama/Meta-Llama-3-8B-Instruct", + messages=[{"role": "user", "content": "just say hello"}], + provider=upstash(), + ) + + assert isinstance(res, ChatCompletion) + + assert len(res.choices[0].message.content) > 0 + assert res.choices[0].message.role == "assistant" + + +@pytest.mark.asyncio +async def test_chat_explicit_upstash_provider_streaming_async( + async_qstash: AsyncQStash, +) -> None: + res = await async_qstash.chat.create( + model="meta-llama/Meta-Llama-3-8B-Instruct", + messages=[{"role": "user", "content": "just say hello"}], + provider=upstash(), stream=True, ) @@ -68,3 +107,120 @@ async def test_prompt_streaming_async(async_qstash: AsyncQStash) -> None: assert r.choices[0].delta.content is not None i += 1 + + +@pytest.mark.asyncio +async def test_prompt_explicit_upstash_provider_async( + async_qstash: AsyncQStash, +) -> None: + res = await async_qstash.chat.prompt( + model="meta-llama/Meta-Llama-3-8B-Instruct", + user="just say hello", + provider=upstash(), + ) + + assert isinstance(res, ChatCompletion) + + assert len(res.choices[0].message.content) > 0 + assert res.choices[0].message.role == "assistant" + + +@pytest.mark.asyncio +async def test_prompt_explicit_upstash_provider_streaming_async( + async_qstash: AsyncQStash, +) -> None: + res = await async_qstash.chat.prompt( + model="meta-llama/Meta-Llama-3-8B-Instruct", + user="just say hello", + provider=upstash(), + stream=True, + ) + + assert isinstance(res, AsyncChatCompletionChunkStream) + + i = 0 + async for r in res: + if i == 0: + assert r.choices[0].delta.role is not None + else: + assert r.choices[0].delta.content is not None + + i += 1 + + +@pytest.mark.asyncio +async def test_chat_custom_provider_async(async_qstash: AsyncQStash) -> None: + res = await async_qstash.chat.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "just say hello"}], + provider=openai(token=OPENAI_API_KEY), # type:ignore[arg-type] + ) + + assert isinstance(res, ChatCompletion) + + assert len(res.choices[0].message.content) > 0 + assert res.choices[0].message.role == "assistant" + + +@pytest.mark.asyncio +async def test_chat_custom_provider_streaming_async(async_qstash: AsyncQStash) -> None: + res = await async_qstash.chat.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "just say hello"}], + provider=openai(token=OPENAI_API_KEY), # type:ignore[arg-type] + stream=True, + ) + + assert isinstance(res, AsyncChatCompletionChunkStream) + + i = 0 + async for r in res: + if i == 0: + assert r.choices[0].delta.role is not None + else: + assert ( + r.choices[0].delta.content is not None + or r.choices[0].finish_reason is not None + ) + + i += 1 + + +@pytest.mark.asyncio +async def test_prompt_custom_provider_async(async_qstash: AsyncQStash) -> None: + res = await async_qstash.chat.prompt( + model="gpt-3.5-turbo", + user="just say hello", + provider=openai(token=OPENAI_API_KEY), # type:ignore[arg-type] + ) + + assert isinstance(res, ChatCompletion) + + assert len(res.choices[0].message.content) > 0 + assert res.choices[0].message.role == "assistant" + + +@pytest.mark.asyncio +async def test_prompt_custom_provider_streaming_async( + async_qstash: AsyncQStash, +) -> None: + res = await async_qstash.chat.prompt( + model="gpt-3.5-turbo", + user="just say hello", + provider=openai(token=OPENAI_API_KEY), # type:ignore[arg-type] + stream=True, + ) + + assert isinstance(res, AsyncChatCompletionChunkStream) + + i = 0 + async for r in res: + if i == 0: + assert r.choices[0].delta.role is not None + else: + assert ( + r.choices[0].delta.content is not None + or r.choices[0].finish_reason is not None + ) + + i += 1 diff --git a/tests/asyncio/test_message.py b/tests/asyncio/test_message.py index f0bdc07..28a7d18 100644 --- a/tests/asyncio/test_message.py +++ b/tests/asyncio/test_message.py @@ -2,8 +2,9 @@ import pytest -from tests import assert_eventually_async +from tests import assert_eventually_async, OPENAI_API_KEY from upstash_qstash import AsyncQStash +from upstash_qstash.chat import upstash, openai from upstash_qstash.errors import QStashError from upstash_qstash.event import EventState from upstash_qstash.message import ( @@ -81,13 +82,13 @@ async def test_disallow_multiple_destinations_async(async_qstash: AsyncQStash) - with pytest.raises(QStashError): await async_qstash.message.publish_json( url="https://httpstat.us/200", - api="llm", + api={"name": "llm", "provider": upstash()}, ) with pytest.raises(QStashError): await async_qstash.message.publish_json( url_group="test-url-group", - api="llm", + api={"name": "llm", "provider": upstash()}, ) @@ -145,13 +146,13 @@ async def test_batch_json_async(async_qstash: AsyncQStash) -> None: @pytest.mark.asyncio async def test_publish_to_api_llm_async(async_qstash: AsyncQStash) -> None: res = await async_qstash.message.publish_json( - api="llm", + api={"name": "llm", "provider": upstash()}, body={ "model": "meta-llama/Meta-Llama-3-8B-Instruct", "messages": [ { "role": "user", - "content": "hello", + "content": "just say hello", } ], }, @@ -169,27 +170,47 @@ async def test_batch_api_llm_async(async_qstash: AsyncQStash) -> None: res = await async_qstash.message.batch_json( [ { - "api": "llm", + "api": {"name": "llm", "provider": upstash()}, "body": { "model": "meta-llama/Meta-Llama-3-8B-Instruct", "messages": [ { "role": "user", - "content": "hello", + "content": "just say hello", } ], }, "callback": "https://httpstat.us/200", - } + }, + { + "api": { + "name": "llm", + "provider": openai(OPENAI_API_KEY), # type:ignore[arg-type] + }, + "body": { + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "user", + "content": "just say hello", + } + ], + }, + "callback": "https://httpstat.us/200", + }, ] ) - assert len(res) == 1 + assert len(res) == 2 assert isinstance(res[0], BatchResponse) assert len(res[0].message_id) > 0 + assert isinstance(res[1], BatchResponse) + assert len(res[1].message_id) > 0 + await assert_delivered_eventually_async(async_qstash, res[0].message_id) + await assert_delivered_eventually_async(async_qstash, res[1].message_id) @pytest.mark.asyncio @@ -251,11 +272,11 @@ async def test_enqueue_api_llm_async( "messages": [ { "role": "user", - "content": "hello", + "content": "just say hello", } ], }, - api="llm", + api={"name": "llm", "provider": upstash()}, callback="https://httpstat.us/200", ) @@ -345,3 +366,61 @@ async def test_cancel_all_async(async_qstash: AsyncQStash) -> None: cancelled = await async_qstash.message.cancel_all() assert cancelled >= 2 + + +@pytest.mark.asyncio +async def test_publish_to_api_llm_custom_provider_async( + async_qstash: AsyncQStash, +) -> None: + res = await async_qstash.message.publish_json( + api={ + "name": "llm", + "provider": openai(OPENAI_API_KEY), # type:ignore[arg-type] + }, + body={ + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "user", + "content": "just say hello", + } + ], + }, + callback="https://httpstat.us/200", + ) + + assert isinstance(res, PublishResponse) + assert len(res.message_id) > 0 + + await assert_delivered_eventually_async(async_qstash, res.message_id) + + +@pytest.mark.asyncio +async def test_enqueue_api_llm_custom_provider_async( + async_qstash: AsyncQStash, + cleanup_queue: Callable[[AsyncQStash, str], None], +) -> None: + name = "test_queue" + cleanup_queue(async_qstash, name) + + res = await async_qstash.message.enqueue_json( + queue=name, + body={ + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "user", + "content": "just say hello", + } + ], + }, + api={ + "name": "llm", + "provider": openai(OPENAI_API_KEY), # type:ignore[arg-type] + }, + callback="https://httpstat.us/200", + ) + + assert isinstance(res, EnqueueResponse) + + assert len(res.message_id) > 0 diff --git a/tests/test_chat.py b/tests/test_chat.py index 5b85205..da0ca50 100644 --- a/tests/test_chat.py +++ b/tests/test_chat.py @@ -1,11 +1,17 @@ +from tests import OPENAI_API_KEY from upstash_qstash import QStash -from upstash_qstash.chat import ChatCompletion, ChatCompletionChunkStream +from upstash_qstash.chat import ( + ChatCompletion, + ChatCompletionChunkStream, + upstash, + openai, +) def test_chat(qstash: QStash) -> None: res = qstash.chat.create( model="meta-llama/Meta-Llama-3-8B-Instruct", - messages=[{"role": "user", "content": "hello"}], + messages=[{"role": "user", "content": "just say hello"}], ) assert isinstance(res, ChatCompletion) @@ -17,7 +23,7 @@ def test_chat(qstash: QStash) -> None: def test_chat_streaming(qstash: QStash) -> None: res = qstash.chat.create( model="meta-llama/Meta-Llama-3-8B-Instruct", - messages=[{"role": "user", "content": "hello"}], + messages=[{"role": "user", "content": "just say hello"}], stream=True, ) @@ -33,7 +39,7 @@ def test_chat_streaming(qstash: QStash) -> None: def test_prompt(qstash: QStash) -> None: res = qstash.chat.prompt( model="meta-llama/Meta-Llama-3-8B-Instruct", - user="hello", + user="just say hello", ) assert isinstance(res, ChatCompletion) @@ -45,7 +51,7 @@ def test_prompt(qstash: QStash) -> None: def test_prompt_streaming(qstash: QStash) -> None: res = qstash.chat.prompt( model="meta-llama/Meta-Llama-3-8B-Instruct", - user="hello", + user="just say hello", stream=True, ) @@ -56,3 +62,129 @@ def test_prompt_streaming(qstash: QStash) -> None: assert r.choices[0].delta.role is not None else: assert r.choices[0].delta.content is not None + + +def test_chat_explicit_upstash_provider(qstash: QStash) -> None: + res = qstash.chat.create( + model="meta-llama/Meta-Llama-3-8B-Instruct", + messages=[{"role": "user", "content": "just say hello"}], + provider=upstash(), + ) + + assert isinstance(res, ChatCompletion) + + assert len(res.choices[0].message.content) > 0 + assert res.choices[0].message.role == "assistant" + + +def test_chat_explicit_upstash_provider_streaming(qstash: QStash) -> None: + res = qstash.chat.create( + model="meta-llama/Meta-Llama-3-8B-Instruct", + messages=[{"role": "user", "content": "just say hello"}], + provider=upstash(), + stream=True, + ) + + assert isinstance(res, ChatCompletionChunkStream) + + for i, r in enumerate(res): + if i == 0: + assert r.choices[0].delta.role is not None + else: + assert r.choices[0].delta.content is not None + + +def test_prompt_explicit_upstash_provider(qstash: QStash) -> None: + res = qstash.chat.prompt( + model="meta-llama/Meta-Llama-3-8B-Instruct", + user="just say hello", + provider=upstash(), + ) + + assert isinstance(res, ChatCompletion) + + assert len(res.choices[0].message.content) > 0 + assert res.choices[0].message.role == "assistant" + + +def test_prompt_explicit_upstash_provider_streaming(qstash: QStash) -> None: + res = qstash.chat.prompt( + model="meta-llama/Meta-Llama-3-8B-Instruct", + user="just say hello", + provider=upstash(), + stream=True, + ) + + assert isinstance(res, ChatCompletionChunkStream) + + for i, r in enumerate(res): + if i == 0: + assert r.choices[0].delta.role is not None + else: + assert r.choices[0].delta.content is not None + + +def test_chat_custom_provider(qstash: QStash) -> None: + res = qstash.chat.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "just say hello"}], + provider=openai(token=OPENAI_API_KEY), # type:ignore[arg-type] + ) + + assert isinstance(res, ChatCompletion) + + assert len(res.choices[0].message.content) > 0 + assert res.choices[0].message.role == "assistant" + + +def test_chat_custom_provider_streaming(qstash: QStash) -> None: + res = qstash.chat.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "just say hello"}], + provider=openai(token=OPENAI_API_KEY), # type:ignore[arg-type] + stream=True, + ) + + assert isinstance(res, ChatCompletionChunkStream) + + for i, r in enumerate(res): + if i == 0: + assert r.choices[0].delta.role is not None + else: + assert ( + r.choices[0].delta.content is not None + or r.choices[0].finish_reason is not None + ) + + +def test_prompt_custom_provider(qstash: QStash) -> None: + res = qstash.chat.prompt( + model="gpt-3.5-turbo", + user="just say hello", + provider=openai(token=OPENAI_API_KEY), # type:ignore[arg-type] + ) + + assert isinstance(res, ChatCompletion) + + assert len(res.choices[0].message.content) > 0 + assert res.choices[0].message.role == "assistant" + + +def test_prompt_custom_provider_streaming(qstash: QStash) -> None: + res = qstash.chat.prompt( + model="gpt-3.5-turbo", + user="just say hello", + provider=openai(token=OPENAI_API_KEY), # type:ignore[arg-type] + stream=True, + ) + + assert isinstance(res, ChatCompletionChunkStream) + + for i, r in enumerate(res): + if i == 0: + assert r.choices[0].delta.role is not None + else: + assert ( + r.choices[0].delta.content is not None + or r.choices[0].finish_reason is not None + ) diff --git a/tests/test_message.py b/tests/test_message.py index a1682be..74960a7 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -2,8 +2,9 @@ import pytest -from tests import assert_eventually +from tests import assert_eventually, OPENAI_API_KEY from upstash_qstash import QStash +from upstash_qstash.chat import upstash, openai from upstash_qstash.errors import QStashError from upstash_qstash.event import EventState from upstash_qstash.message import ( @@ -74,13 +75,13 @@ def test_disallow_multiple_destinations(qstash: QStash) -> None: with pytest.raises(QStashError): qstash.message.publish_json( url="https://httpstat.us/200", - api="llm", + api={"name": "llm", "provider": upstash()}, ) with pytest.raises(QStashError): qstash.message.publish_json( url_group="test-url-group", - api="llm", + api={"name": "llm", "provider": upstash()}, ) @@ -135,13 +136,13 @@ def test_batch_json(qstash: QStash) -> None: def test_publish_to_api_llm(qstash: QStash) -> None: res = qstash.message.publish_json( - api="llm", + api={"name": "llm", "provider": upstash()}, body={ "model": "meta-llama/Meta-Llama-3-8B-Instruct", "messages": [ { "role": "user", - "content": "hello", + "content": "just say hello", } ], }, @@ -158,27 +159,47 @@ def test_batch_api_llm(qstash: QStash) -> None: res = qstash.message.batch_json( [ { - "api": "llm", + "api": {"name": "llm", "provider": upstash()}, "body": { "model": "meta-llama/Meta-Llama-3-8B-Instruct", "messages": [ { "role": "user", - "content": "hello", + "content": "just say hello", } ], }, "callback": "https://httpstat.us/200", - } + }, + { + "api": { + "name": "llm", + "provider": openai(OPENAI_API_KEY), # type:ignore[arg-type] + }, # type:ignore[arg-type] + "body": { + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "user", + "content": "just say hello", + } + ], + }, + "callback": "https://httpstat.us/200", + }, ] ) - assert len(res) == 1 + assert len(res) == 2 assert isinstance(res[0], BatchResponse) assert len(res[0].message_id) > 0 + assert isinstance(res[1], BatchResponse) + assert len(res[1].message_id) > 0 + assert_delivered_eventually(qstash, res[0].message_id) + assert_delivered_eventually(qstash, res[1].message_id) def test_enqueue( @@ -237,11 +258,11 @@ def test_enqueue_api_llm( "messages": [ { "role": "user", - "content": "hello", + "content": "just say hello", } ], }, - api="llm", + api={"name": "llm", "provider": upstash()}, callback="https://httpstat.us/200", ) @@ -325,3 +346,57 @@ def test_cancel_all(qstash: QStash) -> None: cancelled = qstash.message.cancel_all() assert cancelled >= 2 + + +def test_publish_to_api_llm_custom_provider(qstash: QStash) -> None: + res = qstash.message.publish_json( + api={ + "name": "llm", + "provider": openai(OPENAI_API_KEY), # type:ignore[arg-type] + }, + body={ + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "user", + "content": "just say hello", + } + ], + }, + callback="https://httpstat.us/200", + ) + + assert isinstance(res, PublishResponse) + assert len(res.message_id) > 0 + + assert_delivered_eventually(qstash, res.message_id) + + +def test_enqueue_api_llm_custom_provider( + qstash: QStash, + cleanup_queue: Callable[[QStash, str], None], +) -> None: + name = "test_queue" + cleanup_queue(qstash, name) + + res = qstash.message.enqueue_json( + queue=name, + body={ + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "user", + "content": "just say hello", + } + ], + }, + api={ + "name": "llm", + "provider": openai(OPENAI_API_KEY), # type:ignore[arg-type] + }, + callback="https://httpstat.us/200", + ) + + assert isinstance(res, EnqueueResponse) + + assert len(res.message_id) > 0 diff --git a/upstash_qstash/asyncio/chat.py b/upstash_qstash/asyncio/chat.py index 65dda72..ead750d 100644 --- a/upstash_qstash/asyncio/chat.py +++ b/upstash_qstash/asyncio/chat.py @@ -15,6 +15,8 @@ parse_chat_completion_chunk_response, parse_chat_completion_response, prepare_chat_request_body, + LlmProvider, + UPSTASH_LLM_PROVIDER, ) @@ -102,6 +104,7 @@ async def create( *, messages: List[ChatCompletionMessage], model: ChatModel, + provider: Optional[LlmProvider] = None, frequency_penalty: Optional[float] = None, logit_bias: Optional[Dict[str, int]] = None, logprobs: Optional[bool] = None, @@ -128,6 +131,8 @@ async def create( :param messages: One or more chat messages. :param model: Name of the model. + :param provider: LLM provider for the chat completion request. By default, + Upstash will be used. :param frequency_penalty: Number between `-2.0` and `2.0`. Positive values penalize new tokens based on their existing frequency in the text so far, decreasing the model's likelihood @@ -210,9 +215,18 @@ async def create( top_p=top_p, ) + base_url = None + token = None + path = "/llm/v1/chat/completions" + + if provider is not None and provider.name != UPSTASH_LLM_PROVIDER.name: + base_url = provider.base_url + token = f"Bearer {provider.token}" + path = "/v1/chat/completions" + if stream: stream_response = await self._http.stream( - path="/llm/v1/chat/completions", + path=path, method="POST", headers={ "Content-Type": "application/json", @@ -221,15 +235,19 @@ async def create( "Cache-Control": "no-cache", }, body=body, + base_url=base_url, + token=token, ) return AsyncChatCompletionChunkStream(stream_response) response = await self._http.request( - path="/llm/v1/chat/completions", + path=path, method="POST", headers={"Content-Type": "application/json"}, body=body, + base_url=base_url, + token=token, ) return parse_chat_completion_response(response) @@ -240,6 +258,7 @@ async def prompt( user: str, system: Optional[str] = None, model: ChatModel, + provider: Optional[LlmProvider] = None, frequency_penalty: Optional[float] = None, logit_bias: Optional[Dict[str, int]] = None, logprobs: Optional[bool] = None, @@ -271,6 +290,8 @@ async def prompt( :param user: User prompt. :param system: System prompt. :param model: Name of the model. + :param provider: LLM provider for the chat completion request. By default, + Upstash will be used. :param frequency_penalty: Number between `-2.0` and `2.0`. Positive values penalize new tokens based on their existing frequency in the text so far, decreasing the model's likelihood @@ -338,6 +359,7 @@ async def prompt( return await self.create( messages=convert_to_chat_messages(user, system), model=model, + provider=provider, frequency_penalty=frequency_penalty, logit_bias=logit_bias, logprobs=logprobs, diff --git a/upstash_qstash/asyncio/http.py b/upstash_qstash/asyncio/http.py index e2fde29..e4e6b03 100644 --- a/upstash_qstash/asyncio/http.py +++ b/upstash_qstash/asyncio/http.py @@ -42,9 +42,14 @@ async def request( body: Optional[Union[str, bytes]] = None, params: Optional[Dict[str, str]] = None, parse_response: bool = True, + base_url: Optional[str] = None, + token: Optional[str] = None, ) -> Any: - url = BASE_URL + path - headers = {"Authorization": self._token, **(headers or {})} + base_url = base_url or BASE_URL + token = token or self._token + + url = base_url + path + headers = {"Authorization": token, **(headers or {})} max_attempts = 1 + max(0, self._retry["retries"]) last_error = None @@ -83,9 +88,14 @@ async def stream( headers: Optional[Dict[str, str]] = None, body: Optional[Union[str, bytes]] = None, params: Optional[Dict[str, str]] = None, + base_url: Optional[str] = None, + token: Optional[str] = None, ) -> httpx.Response: - url = BASE_URL + path - headers = {"Authorization": self._token, **(headers or {})} + base_url = base_url or BASE_URL + token = token or self._token + + url = base_url + path + headers = {"Authorization": token, **(headers or {})} max_attempts = 1 + max(0, self._retry["retries"]) last_error = None diff --git a/upstash_qstash/asyncio/message.py b/upstash_qstash/asyncio/message.py index 03b9c07..29e8a19 100644 --- a/upstash_qstash/asyncio/message.py +++ b/upstash_qstash/asyncio/message.py @@ -85,7 +85,13 @@ async def publish( value permitted by the QStash plan. It is useful in scenarios, where a message should be delivered with a shorter timeout. """ - destination = get_destination(url=url, url_group=url_group, api=api) + headers = headers or {} + destination = get_destination( + url=url, + url_group=url_group, + api=api, + headers=headers, + ) req_headers = prepare_headers( content_type=content_type, @@ -243,7 +249,13 @@ async def enqueue( value permitted by the QStash plan. It is useful in scenarios, where a message should be delivered with a shorter timeout. """ - destination = get_destination(url=url, url_group=url_group, api=api) + headers = headers or {} + destination = get_destination( + url=url, + url_group=url_group, + api=api, + headers=headers, + ) req_headers = prepare_headers( content_type=content_type, diff --git a/upstash_qstash/chat.py b/upstash_qstash/chat.py index c79047b..b0a8d49 100644 --- a/upstash_qstash/chat.py +++ b/upstash_qstash/chat.py @@ -1,5 +1,6 @@ import dataclasses import json +import re from types import TracebackType from typing import ( Any, @@ -19,6 +20,51 @@ from upstash_qstash.http import HttpClient +@dataclasses.dataclass +class LlmProvider: + name: str + """Name of the LLM provider.""" + + base_url: str + """Base URL of the provider.""" + + token: str + """ + The token for the provider. + + The provided key will be passed to the + endpoint as a bearer token. + """ + + +def openai(token: str) -> LlmProvider: + return LlmProvider( + name="OpenAI", + base_url="https://api.openai.com", + token=token, + ) + + +UPSTASH_LLM_PROVIDER = LlmProvider( + name="Upstash", + base_url="", + token="", +) + + +def upstash() -> LlmProvider: + return UPSTASH_LLM_PROVIDER + + +def custom(base_url: str, token: str) -> LlmProvider: + base_url = re.sub("/(v1/)?chat/completions$", "", base_url) + return LlmProvider( + name="custom", + base_url=base_url, + token=token, + ) + + class ChatCompletionMessage(TypedDict): role: Literal["system", "assistant", "user"] """The role of the message author.""" @@ -27,9 +73,12 @@ class ChatCompletionMessage(TypedDict): """The content of the message.""" -ChatModel = Literal[ - "meta-llama/Meta-Llama-3-8B-Instruct", - "mistralai/Mistral-7B-Instruct-v0.2", +ChatModel = Union[ + Literal[ + "meta-llama/Meta-Llama-3-8B-Instruct", + "mistralai/Mistral-7B-Instruct-v0.2", + ], + str, ] @@ -527,6 +576,7 @@ def create( *, messages: List[ChatCompletionMessage], model: ChatModel, + provider: Optional[LlmProvider] = None, frequency_penalty: Optional[float] = None, logit_bias: Optional[Dict[str, int]] = None, logprobs: Optional[bool] = None, @@ -557,6 +607,8 @@ def create( Positive values penalize new tokens based on their existing frequency in the text so far, decreasing the model's likelihood to repeat the same line verbatim. + :param provider: LLM provider for the chat completion request. By default, + Upstash will be used. :param logit_bias: Modify the likelihood of specified tokens appearing in the completion. Accepts a dictionary that maps tokens (specified by their token ID in the tokenizer) to an associated bias value @@ -635,9 +687,18 @@ def create( top_p=top_p, ) + base_url = None + token = None + path = "/llm/v1/chat/completions" + + if provider is not None and provider.name != UPSTASH_LLM_PROVIDER.name: + base_url = provider.base_url + token = f"Bearer {provider.token}" + path = "/v1/chat/completions" + if stream: stream_response = self._http.stream( - path="/llm/v1/chat/completions", + path=path, method="POST", headers={ "Content-Type": "application/json", @@ -646,15 +707,19 @@ def create( "Cache-Control": "no-cache", }, body=body, + base_url=base_url, + token=token, ) return ChatCompletionChunkStream(stream_response) response = self._http.request( - path="/llm/v1/chat/completions", + path=path, method="POST", headers={"Content-Type": "application/json"}, body=body, + base_url=base_url, + token=token, ) return parse_chat_completion_response(response) @@ -665,6 +730,7 @@ def prompt( user: str, system: Optional[str] = None, model: ChatModel, + provider: Optional[LlmProvider] = None, frequency_penalty: Optional[float] = None, logit_bias: Optional[Dict[str, int]] = None, logprobs: Optional[bool] = None, @@ -696,6 +762,8 @@ def prompt( :param user: User prompt. :param system: System prompt. :param model: Name of the model. + :param provider: LLM provider for the chat completion request. By default, + Upstash will be used. :param frequency_penalty: Number between `-2.0` and `2.0`. Positive values penalize new tokens based on their existing frequency in the text so far, decreasing the model's likelihood @@ -763,6 +831,7 @@ def prompt( return self.create( messages=convert_to_chat_messages(user, system), model=model, + provider=provider, frequency_penalty=frequency_penalty, logit_bias=logit_bias, logprobs=logprobs, diff --git a/upstash_qstash/errors.py b/upstash_qstash/errors.py index 7e583bf..dae5bdd 100644 --- a/upstash_qstash/errors.py +++ b/upstash_qstash/errors.py @@ -13,7 +13,7 @@ class RateLimitExceededError(QStashError): def __init__( self, limit: Optional[str], remaining: Optional[str], reset: Optional[str] ): - super( + super().__init__( f"Exceeded rate limit: Limit: {limit}, remaining: {remaining}, reset: {reset}" ) self.limit = limit @@ -31,7 +31,7 @@ def __init__( reset_requests: Optional[str], reset_tokens: Optional[str], ): - super( + super().__init__( f"Exceeded chat rate limit: " f"Request limit: {limit_requests}, remaining: {remaining_requests}, reset: {reset_requests}; " f"token limit: {limit_tokens}, remaining: {remaining_tokens}, reset: {reset_tokens}" diff --git a/upstash_qstash/http.py b/upstash_qstash/http.py index 73088e1..f1aefe3 100644 --- a/upstash_qstash/http.py +++ b/upstash_qstash/http.py @@ -103,9 +103,14 @@ def request( body: Optional[Union[str, bytes]] = None, params: Optional[Dict[str, str]] = None, parse_response: bool = True, + base_url: Optional[str] = None, + token: Optional[str] = None, ) -> Any: - url = BASE_URL + path - headers = {"Authorization": self._token, **(headers or {})} + base_url = base_url or BASE_URL + token = token or self._token + + url = base_url + path + headers = {"Authorization": token, **(headers or {})} max_attempts = 1 + max(0, self._retry["retries"]) last_error = None @@ -144,9 +149,14 @@ def stream( headers: Optional[Dict[str, str]] = None, body: Optional[Union[str, bytes]] = None, params: Optional[Dict[str, str]] = None, + base_url: Optional[str] = None, + token: Optional[str] = None, ) -> httpx.Response: - url = BASE_URL + path - headers = {"Authorization": self._token, **(headers or {})} + base_url = base_url or BASE_URL + token = token or self._token + + url = base_url + path + headers = {"Authorization": token, **(headers or {})} max_attempts = 1 + max(0, self._retry["retries"]) last_error = None diff --git a/upstash_qstash/message.py b/upstash_qstash/message.py index c989d81..b24905a 100644 --- a/upstash_qstash/message.py +++ b/upstash_qstash/message.py @@ -10,10 +10,22 @@ TypedDict, ) +from upstash_qstash.chat import LlmProvider, UPSTASH_LLM_PROVIDER from upstash_qstash.errors import QStashError from upstash_qstash.http import HttpClient, HttpMethod -ApiT = Literal["llm"] + +class LlmApi(TypedDict): + name: Literal["llm"] + """The name of the API type.""" + + provider: LlmProvider + """ + The LLM provider for the API. + """ + + +ApiT = LlmApi # In the future, this can be union of different API types @dataclasses.dataclass @@ -234,6 +246,14 @@ class BatchJsonRequest(TypedDict, total=False): an integer, which will be interpreted as timeout in seconds. """ + provider: LlmProvider + """ + LLM provider to use. + + When specified, destination and headers will be + set according to the LLM provider. + """ + @dataclasses.dataclass class Message: @@ -303,6 +323,7 @@ def get_destination( url: Optional[str], url_group: Optional[str], api: Optional[ApiT], + headers: Dict[str, str], ) -> str: destination = None count = 0 @@ -315,7 +336,13 @@ def get_destination( count += 1 if api is not None: - destination = f"api/{api}" + provider = api["provider"] + if provider.name == UPSTASH_LLM_PROVIDER.name: + destination = "api/llm" + else: + destination = provider.base_url + "/v1/chat/completions" + headers["Authorization"] = f"Bearer {provider.token}" + count += 1 if count != 1: @@ -437,16 +464,18 @@ def prepare_batch_message_body(messages: List[BatchRequest]) -> str: batch_messages = [] for msg in messages: + user_headers = msg.get("headers") or {} destination = get_destination( url=msg.get("url"), url_group=msg.get("url_group"), api=msg.get("api"), + headers=user_headers, ) headers = prepare_headers( content_type=msg.get("content_type"), method=msg.get("method"), - headers=msg.get("headers"), + headers=user_headers, retries=msg.get("retries"), callback=msg.get("callback"), failure_callback=msg.get("failure_callback"), @@ -639,7 +668,13 @@ def publish( value permitted by the QStash plan. It is useful in scenarios, where a message should be delivered with a shorter timeout. """ - destination = get_destination(url=url, url_group=url_group, api=api) + headers = headers or {} + destination = get_destination( + url=url, + url_group=url_group, + api=api, + headers=headers, + ) req_headers = prepare_headers( content_type=content_type, @@ -797,7 +832,13 @@ def enqueue( value permitted by the QStash plan. It is useful in scenarios, where a message should be delivered with a shorter timeout. """ - destination = get_destination(url=url, url_group=url_group, api=api) + headers = headers or {} + destination = get_destination( + url=url, + url_group=url_group, + api=api, + headers=headers, + ) req_headers = prepare_headers( content_type=content_type,