Skip to content

Commit

Permalink
Queue (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
MeshanKhosla authored Apr 25, 2024
1 parent 1915632 commit 95ce5dd
Show file tree
Hide file tree
Showing 6 changed files with 467 additions and 0 deletions.
95 changes: 95 additions & 0 deletions tests/asyncio/test_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import pytest
from upstash_qstash.asyncio import Client
from qstash_tokens import QSTASH_TOKEN


@pytest.fixture
def client():
return Client(QSTASH_TOKEN)


@pytest.mark.asyncio
async def test_queue_upsert(client):
queue_name = "test_queue"
queue = client.queue({"queue_name": queue_name})

print("Creating queue with parallelism 1")
await queue.upsert({"parallelism": 1})

print("Verifying queue details")
queue_details = await queue.get()
assert queue_details["name"] == queue_name
assert queue_details["parallelism"] == 1

print("Updating queue parallelism to 2")
await queue.upsert({"parallelism": 2})

print("Verifying queue details")
queue_details = await queue.get()
assert queue_details["parallelism"] == 2

print("Making sure queue list returns the queue")
all_queues = await queue.list()
assert queue_name in map(lambda q: q["name"], all_queues)

print("Deleting queue")
await queue.delete()

print("Making sure queue list does not return the queue")
all_queues = await queue.list()
assert queue_name not in map(lambda q: q["name"], all_queues)


@pytest.mark.asyncio
async def test_no_queue_name(client):
queue = client.queue()

print("Trying to upsert without a queue name")
with pytest.raises(ValueError):
await queue.upsert({"parallelism": 1})

print("Trying to get without a queue name")
with pytest.raises(ValueError):
await queue.get()

print("Trying to delete without a queue name")
with pytest.raises(ValueError):
await queue.delete()

print("Trying to enqueue without a queue name")
with pytest.raises(ValueError):
await queue.enqueue({"url": "https://example.com"})

with pytest.raises(ValueError):
await queue.enqueue_json({"url": "https://example.com"})

print("Should be able to list queues without a queue name")
all_queues = await queue.list()
print(all_queues)
assert isinstance(all_queues, list)


@pytest.mark.asyncio
async def test_enqueue(client):
queue_name = "test_queue"
queue = client.queue({"queue_name": queue_name})

print("Creating queue with parallelism 1")
await queue.upsert({"parallelism": 1})

print("Enqueueing message to the queue")
res = await queue.enqueue_json(
{
"body": {"ex_key": "ex_value"},
"url": "https://meshan456.requestcatcher.com/test",
"headers": {
"test-header": "test-value",
},
}
)

print("Verifying enqueue response")
assert res["messageId"] is not None

print("Deleting queue")
await queue.delete()
92 changes: 92 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import pytest
from upstash_qstash import Client
from qstash_tokens import QSTASH_TOKEN


@pytest.fixture
def client():
return Client(QSTASH_TOKEN)


def test_queue_upsert(client):
queue_name = "test_queue"
queue = client.queue({"queue_name": queue_name})

print("Creating queue with parallelism 1")
queue.upsert({"parallelism": 1})

print("Verifying queue details")
queue_details = queue.get()
assert queue_details["name"] == queue_name
assert queue_details["parallelism"] == 1

print("Updating queue parallelism to 2")
queue.upsert({"parallelism": 2})

print("Verifying queue details")
queue_details = queue.get()
assert queue_details["parallelism"] == 2

print("Making sure queue list returns the queue")
all_queues = queue.list()
assert queue_name in map(lambda q: q["name"], all_queues)

print("Deleting queue")
queue.delete()

print("Making sure queue list does not return the queue")
all_queues = queue.list()
assert queue_name not in map(lambda q: q["name"], all_queues)


def test_no_queue_name(client):
queue = client.queue()

print("Trying to upsert without a queue name")
with pytest.raises(ValueError):
queue.upsert({"parallelism": 1})

print("Trying to get without a queue name")
with pytest.raises(ValueError):
queue.get()

print("Trying to delete without a queue name")
with pytest.raises(ValueError):
queue.delete()

print("Trying to enqueue without a queue name")
with pytest.raises(ValueError):
queue.enqueue({"url": "https://example.com"})

with pytest.raises(ValueError):
queue.enqueue_json({"url": "https://example.com"})

print("Should be able to list queues without a queue name")
all_queues = queue.list()
print(all_queues)
assert isinstance(all_queues, list)


def test_enqueue(client):
queue_name = "test_queue"
queue = client.queue({"queue_name": queue_name})

print("Creating queue with parallelism 1")
queue.upsert({"parallelism": 1})

print("Enqueueing message to the queue")
res = queue.enqueue_json(
{
"body": {"ex_key": "ex_value"},
"url": "https://meshan456.requestcatcher.com/test",
"headers": {
"test-header": "test-value",
},
}
)

print("Verifying enqueue response")
assert res["messageId"] is not None

print("Deleting queue")
queue.delete()
10 changes: 10 additions & 0 deletions upstash_qstash/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from upstash_qstash.asyncio.messages import Messages
from upstash_qstash.asyncio.topics import Topics
from upstash_qstash.asyncio.dlq import DLQ
from upstash_qstash.asyncio.queue import Queue, QueueOpts
from upstash_qstash.asyncio.schedules import Schedules
from upstash_qstash.asyncio.events import Events, EventsRequest, GetEventsResponse
from upstash_qstash.asyncio.keys import Keys
Expand Down Expand Up @@ -92,6 +93,15 @@ def dlq(self):
"""
return DLQ(self.http)

def queue(self, queue_opts: Optional[QueueOpts] = None):
"""
Access the queue API.
Create, read, update, or delete queues. Also allows for asynchronous message enqueueing.
You must provide a queue name to queue, unless you are only using the list method.
"""
return Queue(self.http, queue_opts)

def schedules(self):
"""
Access the schedules API.
Expand Down
119 changes: 119 additions & 0 deletions upstash_qstash/asyncio/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import json
from typing import List, Union, Optional
from upstash_qstash.queue import (
UpsertQueueRequest,
QueueResponse,
EnqueueRequest,
QueueOpts,
)
from upstash_qstash.upstash_http import HttpClient
from upstash_qstash.publish import (
Publish,
PublishToUrlResponse,
PublishToTopicResponse,
)


class Queue:
def __init__(self, http: HttpClient, queue_opts: Optional[QueueOpts] = None):
self.http = http
self.queue_name = queue_opts.get("queue_name") if queue_opts else None

async def upsert(self, req: UpsertQueueRequest):
"""
Asynchronously creates or updates a queue with the given name and parallelism
"""
if not self.queue_name:
raise ValueError("Please provide a queue name to the Queue constructor")

body = {"queueName": self.queue_name, "parallelism": req["parallelism"]}

await self.http.request_async(
{
"method": "POST",
"path": ["v2", "queues"],
"headers": {
"Content-Type": "application/json",
},
"body": json.dumps(body),
"parse_response_as_json": False,
}
)

async def get(self) -> QueueResponse:
"""
Asynchronously get the queue details
"""
if not self.queue_name:
raise ValueError("Please provide a queue name to the Queue constructor")

return await self.http.request_async(
{
"method": "GET",
"path": ["v2", "queues", self.queue_name],
}
)

async def list(self) -> List[QueueResponse]:
"""
Asynchronously list all queues
"""

return await self.http.request_async(
{
"method": "GET",
"path": ["v2", "queues"],
}
)

async def delete(self):
"""
Asynchronously delete the queue
"""
if not self.queue_name:
raise ValueError("Please provide a queue name to the Queue constructor")

return await self.http.request_async(
{
"method": "DELETE",
"path": ["v2", "queues", self.queue_name],
"parse_response_as_json": False,
}
)

async def enqueue(
self, req: EnqueueRequest
) -> Union[PublishToUrlResponse, PublishToTopicResponse]:
"""
Asynchronously enqueue a message to the queue
"""
if not self.queue_name:
raise ValueError("Please provide a queue name to the Queue constructor")

Publish._validate_request(req)
headers = Publish._prepare_headers(req)

return await self.http.request_async(
{
"path": [
"v2",
"enqueue",
self.queue_name,
req.get("url") or req["topic"],
],
"body": req.get("body"),
"headers": headers,
"method": "POST",
}
)

async def enqueue_json(self, req: EnqueueRequest):
"""
Asynchronously enqueue a message to the queue with the body as JSON
"""
if "body" in req:
req["body"] = json.dumps(req["body"])

req.setdefault("headers", {}).update({"Content-Type": "application/json"})

return await self.enqueue(req)
10 changes: 10 additions & 0 deletions upstash_qstash/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from upstash_qstash.messages import Messages
from upstash_qstash.topics import Topics
from upstash_qstash.dlq import DLQ
from upstash_qstash.queue import Queue, QueueOpts
from upstash_qstash.events import Events, EventsRequest, GetEventsResponse
from upstash_qstash.schedules import Schedules
from upstash_qstash.keys import Keys
Expand Down Expand Up @@ -94,6 +95,15 @@ def dlq(self):
"""
return DLQ(self.http)

def queue(self, queue_opts: Optional[QueueOpts] = None):
"""
Access the queue API.
Create, read, update, or delete queues. Also allows for message enqueueing.
You must provide a queue name to queue, unless you are only using the list method.
"""
return Queue(self.http, queue_opts)

def schedules(self):
"""
Access the schedules API.
Expand Down
Loading

0 comments on commit 95ce5dd

Please sign in to comment.