From db9317b6c6936cefd079d4b0f891f52eea35b202 Mon Sep 17 00:00:00 2001 From: Meshan Khosla Date: Sat, 16 Mar 2024 11:07:55 -0700 Subject: [PATCH] Implement bulk DLQ delete --- tests/asyncio/test_dlq.py | 30 ++++++++++++++++++++++++++++++ tests/test_dlq.py | 29 +++++++++++++++++++++++++++++ upstash_qstash/asyncio/dlq.py | 16 +++++++++++++++- upstash_qstash/dlq.py | 29 +++++++++++++++++++++++++++++ 4 files changed, 103 insertions(+), 1 deletion(-) diff --git a/tests/asyncio/test_dlq.py b/tests/asyncio/test_dlq.py index 9ff545c..ace84ce 100644 --- a/tests/asyncio/test_dlq.py +++ b/tests/asyncio/test_dlq.py @@ -46,3 +46,33 @@ async def test_dlq(client): filter(lambda msg: msg["messageId"] == msg_id, all_messages_after_delete) ) assert len(msg_deleted) == 0 + +@pytest.mark.asyncio +async def test_dlq_delete_many(client): + print("Publishing 5 messages to a failed endpoint") + msg_ids = [] + for _ in range(5): + pub_res = await client.publish_json({"url": "http://httpstat.us/404", "retries": 0}) + msg_ids.append(pub_res["messageId"]) + assert len(msg_ids) == 5 + + print("Waiting 5 seconds for events to be delivered") + await asyncio.sleep(5) + + print("Checking if messages are in DLQ") + dlq = client.dlq() + all_messages = (await dlq.list_messages())["messages"] + msg_sent = list(filter(lambda msg: msg["messageId"] in msg_ids, all_messages)) + assert len(msg_sent) == 5 + + print("Deleting messages from DLQ") + dlq_ids = [msg["dlqId"] for msg in msg_sent] + await dlq.deleteMany({"dlq_ids": dlq_ids}) + + print("Checking if messages are deleted from DLQ") + all_messages_after_delete = (await dlq.list_messages())["messages"] + msg_deleted = list( + filter(lambda msg: msg["messageId"] in msg_ids, all_messages_after_delete) + ) + assert len(msg_deleted) == 0 + assert len(all_messages_after_delete) == len(all_messages) - 5 \ No newline at end of file diff --git a/tests/test_dlq.py b/tests/test_dlq.py index 10c6cca..8cad008 100644 --- a/tests/test_dlq.py +++ b/tests/test_dlq.py @@ -43,3 +43,32 @@ def test_dlq(client): filter(lambda msg: msg["messageId"] == msg_id, all_messages_after_delete) ) assert len(msg_deleted) == 0 + +def test_dlq_delete_many(client): + print("Publishing 5 messages to a failed endpoint") + msg_ids = [] + for _ in range(5): + pub_res = client.publish_json({"url": "http://httpstat.us/404", "retries": 0}) + msg_ids.append(pub_res["messageId"]) + assert len(msg_ids) == 5 + + print("Waiting 5 seconds for events to be delivered") + time.sleep(5) + + print("Checking if messages are in DLQ") + dlq = client.dlq() + all_messages = dlq.list_messages()["messages"] + msg_sent = list(filter(lambda msg: msg["messageId"] in msg_ids, all_messages)) + assert len(msg_sent) == 5 + + print("Deleting messages from DLQ") + dlq_ids = [msg["dlqId"] for msg in msg_sent] + dlq.deleteMany({"dlq_ids": dlq_ids}) + + print("Checking if messages are deleted from DLQ") + all_messages_after_delete = dlq.list_messages()["messages"] + msg_deleted = list( + filter(lambda msg: msg["messageId"] in msg_ids, all_messages_after_delete) + ) + assert len(msg_deleted) == 0 + assert len(all_messages_after_delete) == len(all_messages) - 5 \ No newline at end of file diff --git a/upstash_qstash/asyncio/dlq.py b/upstash_qstash/asyncio/dlq.py index bfba3cf..cab15b1 100644 --- a/upstash_qstash/asyncio/dlq.py +++ b/upstash_qstash/asyncio/dlq.py @@ -1,7 +1,8 @@ from typing import Optional from upstash_qstash.upstash_http import HttpClient -from upstash_qstash.dlq import ListMessagesOpts, ListMessageResponse, DlqMessage +from upstash_qstash.dlq import ListMessagesOpts, ListMessageResponse, DlqMessage, BulkDeleteRequest, BulkDeleteResponse from upstash_qstash.qstash_types import UpstashRequest +import json class DLQ: @@ -57,3 +58,16 @@ async def delete(self, dlq_message_id: str): "parse_response_as_json": False, } ) + + async def deleteMany(self, req: BulkDeleteRequest) -> BulkDeleteResponse: + """ + Asynchronously remove many message from the DLQ + """ + return await self.http.request_async( + { + "path": ["v2", "dlq"], + "headers": {"Content-Type": "application/json"}, + "body": json.dumps({"dlqIds": req.get('dlq_ids')}), + "method": "DELETE", + } + ) diff --git a/upstash_qstash/dlq.py b/upstash_qstash/dlq.py index fd3182b..8d6a4ee 100644 --- a/upstash_qstash/dlq.py +++ b/upstash_qstash/dlq.py @@ -1,6 +1,7 @@ from typing import Optional, TypedDict, List, Dict from upstash_qstash.qstash_types import Method, UpstashRequest from upstash_qstash.upstash_http import HttpClient +import json DlqMessage = TypedDict( "DlqMessage", @@ -41,6 +42,21 @@ ) +BulkDeleteRequest = TypedDict( + "BulkDeleteRequest", + { + "dlq_ids": List[str], + }, +) + +BulkDeleteResponse = TypedDict( + "BulkDeleteResponse", + { + "deleted": int, + }, +) + + class DLQ: def __init__(self, http: HttpClient): self.http = http @@ -94,3 +110,16 @@ def delete(self, dlq_message_id: str): "parse_response_as_json": False, } ) + + def deleteMany(self, req: BulkDeleteRequest) -> BulkDeleteResponse: + """ + Remove many message from the DLQ + """ + return self.http.request( + { + "path": ["v2", "dlq"], + "headers": {"Content-Type": "application/json"}, + "body": json.dumps({"dlqIds": req.get('dlq_ids')}), + "method": "DELETE", + } + )