Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement bulk DLQ delete #11

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions tests/asyncio/test_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,33 @@ async def test_dlq(client):
filter(lambda msg: msg["messageId"] == msg_id, all_messages_after_delete)
)
assert len(msg_deleted) == 0

@pytest.mark.asyncio
async def test_dlq_delete_many(client):
print("Publishing 5 messages to a failed endpoint")
msg_ids = []
for _ in range(5):
pub_res = await client.publish_json({"url": "http://httpstat.us/404", "retries": 0})
msg_ids.append(pub_res["messageId"])
assert len(msg_ids) == 5

print("Waiting 5 seconds for events to be delivered")
await asyncio.sleep(5)

print("Checking if messages are in DLQ")
dlq = client.dlq()
all_messages = (await dlq.list_messages())["messages"]
msg_sent = list(filter(lambda msg: msg["messageId"] in msg_ids, all_messages))
assert len(msg_sent) == 5

print("Deleting messages from DLQ")
dlq_ids = [msg["dlqId"] for msg in msg_sent]
await dlq.deleteMany({"dlq_ids": dlq_ids})

print("Checking if messages are deleted from DLQ")
all_messages_after_delete = (await dlq.list_messages())["messages"]
msg_deleted = list(
filter(lambda msg: msg["messageId"] in msg_ids, all_messages_after_delete)
)
assert len(msg_deleted) == 0
assert len(all_messages_after_delete) == len(all_messages) - 5
29 changes: 29 additions & 0 deletions tests/test_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,32 @@ def test_dlq(client):
filter(lambda msg: msg["messageId"] == msg_id, all_messages_after_delete)
)
assert len(msg_deleted) == 0

def test_dlq_delete_many(client):
print("Publishing 5 messages to a failed endpoint")
msg_ids = []
for _ in range(5):
pub_res = client.publish_json({"url": "http://httpstat.us/404", "retries": 0})
msg_ids.append(pub_res["messageId"])
assert len(msg_ids) == 5

print("Waiting 5 seconds for events to be delivered")
time.sleep(5)

print("Checking if messages are in DLQ")
dlq = client.dlq()
all_messages = dlq.list_messages()["messages"]
msg_sent = list(filter(lambda msg: msg["messageId"] in msg_ids, all_messages))
assert len(msg_sent) == 5

print("Deleting messages from DLQ")
dlq_ids = [msg["dlqId"] for msg in msg_sent]
dlq.deleteMany({"dlq_ids": dlq_ids})

print("Checking if messages are deleted from DLQ")
all_messages_after_delete = dlq.list_messages()["messages"]
msg_deleted = list(
filter(lambda msg: msg["messageId"] in msg_ids, all_messages_after_delete)
)
assert len(msg_deleted) == 0
assert len(all_messages_after_delete) == len(all_messages) - 5
16 changes: 15 additions & 1 deletion upstash_qstash/asyncio/dlq.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Optional
from upstash_qstash.upstash_http import HttpClient
from upstash_qstash.dlq import ListMessagesOpts, ListMessageResponse, DlqMessage
from upstash_qstash.dlq import ListMessagesOpts, ListMessageResponse, DlqMessage, BulkDeleteRequest, BulkDeleteResponse
from upstash_qstash.qstash_types import UpstashRequest
import json


class DLQ:
Expand Down Expand Up @@ -57,3 +58,16 @@ async def delete(self, dlq_message_id: str):
"parse_response_as_json": False,
}
)

async def deleteMany(self, req: BulkDeleteRequest) -> BulkDeleteResponse:
"""
Asynchronously remove many message from the DLQ
"""
return await self.http.request_async(
{
"path": ["v2", "dlq"],
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"dlqIds": req.get('dlq_ids')}),
"method": "DELETE",
}
)
29 changes: 29 additions & 0 deletions upstash_qstash/dlq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional, TypedDict, List, Dict
from upstash_qstash.qstash_types import Method, UpstashRequest
from upstash_qstash.upstash_http import HttpClient
import json

DlqMessage = TypedDict(
"DlqMessage",
Expand Down Expand Up @@ -41,6 +42,21 @@
)


BulkDeleteRequest = TypedDict(
"BulkDeleteRequest",
{
"dlq_ids": List[str],
},
)

BulkDeleteResponse = TypedDict(
"BulkDeleteResponse",
{
"deleted": int,
},
)


class DLQ:
def __init__(self, http: HttpClient):
self.http = http
Expand Down Expand Up @@ -94,3 +110,16 @@ def delete(self, dlq_message_id: str):
"parse_response_as_json": False,
}
)

def deleteMany(self, req: BulkDeleteRequest) -> BulkDeleteResponse:
"""
Remove many message from the DLQ
"""
return self.http.request(
{
"path": ["v2", "dlq"],
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"dlqIds": req.get('dlq_ids')}),
"method": "DELETE",
}
)
Loading