Skip to content

Commit

Permalink
Implement bulk DLQ delete (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
MeshanKhosla authored Mar 19, 2024
1 parent 1cd0201 commit 7329776
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 1 deletion.
30 changes: 30 additions & 0 deletions tests/asyncio/test_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,33 @@ async def test_dlq(client):
filter(lambda msg: msg["messageId"] == msg_id, all_messages_after_delete)
)
assert len(msg_deleted) == 0

@pytest.mark.asyncio
async def test_dlq_delete_many(client):
print("Publishing 5 messages to a failed endpoint")
msg_ids = []
for _ in range(5):
pub_res = await client.publish_json({"url": "http://httpstat.us/404", "retries": 0})
msg_ids.append(pub_res["messageId"])
assert len(msg_ids) == 5

print("Waiting 5 seconds for events to be delivered")
await asyncio.sleep(5)

print("Checking if messages are in DLQ")
dlq = client.dlq()
all_messages = (await dlq.list_messages())["messages"]
msg_sent = list(filter(lambda msg: msg["messageId"] in msg_ids, all_messages))
assert len(msg_sent) == 5

print("Deleting messages from DLQ")
dlq_ids = [msg["dlqId"] for msg in msg_sent]
await dlq.deleteMany({"dlq_ids": dlq_ids})

print("Checking if messages are deleted from DLQ")
all_messages_after_delete = (await dlq.list_messages())["messages"]
msg_deleted = list(
filter(lambda msg: msg["messageId"] in msg_ids, all_messages_after_delete)
)
assert len(msg_deleted) == 0
assert len(all_messages_after_delete) == len(all_messages) - 5
29 changes: 29 additions & 0 deletions tests/test_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,32 @@ def test_dlq(client):
filter(lambda msg: msg["messageId"] == msg_id, all_messages_after_delete)
)
assert len(msg_deleted) == 0

def test_dlq_delete_many(client):
print("Publishing 5 messages to a failed endpoint")
msg_ids = []
for _ in range(5):
pub_res = client.publish_json({"url": "http://httpstat.us/404", "retries": 0})
msg_ids.append(pub_res["messageId"])
assert len(msg_ids) == 5

print("Waiting 5 seconds for events to be delivered")
time.sleep(5)

print("Checking if messages are in DLQ")
dlq = client.dlq()
all_messages = dlq.list_messages()["messages"]
msg_sent = list(filter(lambda msg: msg["messageId"] in msg_ids, all_messages))
assert len(msg_sent) == 5

print("Deleting messages from DLQ")
dlq_ids = [msg["dlqId"] for msg in msg_sent]
dlq.deleteMany({"dlq_ids": dlq_ids})

print("Checking if messages are deleted from DLQ")
all_messages_after_delete = dlq.list_messages()["messages"]
msg_deleted = list(
filter(lambda msg: msg["messageId"] in msg_ids, all_messages_after_delete)
)
assert len(msg_deleted) == 0
assert len(all_messages_after_delete) == len(all_messages) - 5
16 changes: 15 additions & 1 deletion upstash_qstash/asyncio/dlq.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Optional
from upstash_qstash.upstash_http import HttpClient
from upstash_qstash.dlq import ListMessagesOpts, ListMessageResponse, DlqMessage
from upstash_qstash.dlq import ListMessagesOpts, ListMessageResponse, DlqMessage, BulkDeleteRequest, BulkDeleteResponse
from upstash_qstash.qstash_types import UpstashRequest
import json


class DLQ:
Expand Down Expand Up @@ -57,3 +58,16 @@ async def delete(self, dlq_message_id: str):
"parse_response_as_json": False,
}
)

async def deleteMany(self, req: BulkDeleteRequest) -> BulkDeleteResponse:
"""
Asynchronously remove many message from the DLQ
"""
return await self.http.request_async(
{
"path": ["v2", "dlq"],
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"dlqIds": req.get('dlq_ids')}),
"method": "DELETE",
}
)
29 changes: 29 additions & 0 deletions upstash_qstash/dlq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional, TypedDict, List, Dict
from upstash_qstash.qstash_types import Method, UpstashRequest
from upstash_qstash.upstash_http import HttpClient
import json

DlqMessage = TypedDict(
"DlqMessage",
Expand Down Expand Up @@ -41,6 +42,21 @@
)


BulkDeleteRequest = TypedDict(
"BulkDeleteRequest",
{
"dlq_ids": List[str],
},
)

BulkDeleteResponse = TypedDict(
"BulkDeleteResponse",
{
"deleted": int,
},
)


class DLQ:
def __init__(self, http: HttpClient):
self.http = http
Expand Down Expand Up @@ -94,3 +110,16 @@ def delete(self, dlq_message_id: str):
"parse_response_as_json": False,
}
)

def deleteMany(self, req: BulkDeleteRequest) -> BulkDeleteResponse:
"""
Remove many message from the DLQ
"""
return self.http.request(
{
"path": ["v2", "dlq"],
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"dlqIds": req.get('dlq_ids')}),
"method": "DELETE",
}
)

0 comments on commit 7329776

Please sign in to comment.