Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
MeshanKhosla committed Mar 20, 2024
1 parent 7329776 commit f4b6fd6
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 12 deletions.
13 changes: 8 additions & 5 deletions tests/asyncio/test_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,27 @@ async def test_dlq(client):
)
assert len(msg_deleted) == 0


@pytest.mark.asyncio
async def test_dlq_delete_many(client):
print("Publishing 5 messages to a failed endpoint")
msg_ids = []
for _ in range(5):
pub_res = await client.publish_json({"url": "http://httpstat.us/404", "retries": 0})
pub_res = await client.publish_json(
{"url": "http://httpstat.us/404", "retries": 0}
)
msg_ids.append(pub_res["messageId"])
assert len(msg_ids) == 5

print("Waiting 5 seconds for events to be delivered")
await asyncio.sleep(5)

print("Checking if messages are in DLQ")
dlq = client.dlq()
all_messages = (await dlq.list_messages())["messages"]
msg_sent = list(filter(lambda msg: msg["messageId"] in msg_ids, all_messages))
assert len(msg_sent) == 5

print("Deleting messages from DLQ")
dlq_ids = [msg["dlqId"] for msg in msg_sent]
await dlq.deleteMany({"dlq_ids": dlq_ids})
Expand All @@ -75,4 +78,4 @@ async def test_dlq_delete_many(client):
filter(lambda msg: msg["messageId"] in msg_ids, all_messages_after_delete)
)
assert len(msg_deleted) == 0
assert len(all_messages_after_delete) == len(all_messages) - 5
assert len(all_messages_after_delete) == len(all_messages) - 5
9 changes: 5 additions & 4 deletions tests/test_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,24 @@ def test_dlq(client):
)
assert len(msg_deleted) == 0


def test_dlq_delete_many(client):
print("Publishing 5 messages to a failed endpoint")
msg_ids = []
for _ in range(5):
pub_res = client.publish_json({"url": "http://httpstat.us/404", "retries": 0})
msg_ids.append(pub_res["messageId"])
assert len(msg_ids) == 5

print("Waiting 5 seconds for events to be delivered")
time.sleep(5)

print("Checking if messages are in DLQ")
dlq = client.dlq()
all_messages = dlq.list_messages()["messages"]
msg_sent = list(filter(lambda msg: msg["messageId"] in msg_ids, all_messages))
assert len(msg_sent) == 5

print("Deleting messages from DLQ")
dlq_ids = [msg["dlqId"] for msg in msg_sent]
dlq.deleteMany({"dlq_ids": dlq_ids})
Expand All @@ -71,4 +72,4 @@ def test_dlq_delete_many(client):
filter(lambda msg: msg["messageId"] in msg_ids, all_messages_after_delete)
)
assert len(msg_deleted) == 0
assert len(all_messages_after_delete) == len(all_messages) - 5
assert len(all_messages_after_delete) == len(all_messages) - 5
10 changes: 8 additions & 2 deletions upstash_qstash/asyncio/dlq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from typing import Optional
from upstash_qstash.upstash_http import HttpClient
from upstash_qstash.dlq import ListMessagesOpts, ListMessageResponse, DlqMessage, BulkDeleteRequest, BulkDeleteResponse
from upstash_qstash.dlq import (
ListMessagesOpts,
ListMessageResponse,
DlqMessage,
BulkDeleteRequest,
BulkDeleteResponse,
)
from upstash_qstash.qstash_types import UpstashRequest
import json

Expand Down Expand Up @@ -67,7 +73,7 @@ async def deleteMany(self, req: BulkDeleteRequest) -> BulkDeleteResponse:
{
"path": ["v2", "dlq"],
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"dlqIds": req.get('dlq_ids')}),
"body": json.dumps({"dlqIds": req.get("dlq_ids")}),
"method": "DELETE",
}
)
2 changes: 1 addition & 1 deletion upstash_qstash/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def deleteMany(self, req: BulkDeleteRequest) -> BulkDeleteResponse:
{
"path": ["v2", "dlq"],
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"dlqIds": req.get('dlq_ids')}),
"body": json.dumps({"dlqIds": req.get("dlq_ids")}),
"method": "DELETE",
}
)

0 comments on commit f4b6fd6

Please sign in to comment.