Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mypy #4

Merged
merged 9 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pytest = "^7.4.3"
black = "^23.12.1"
python-dotenv = "^1.0.0"
pytest-asyncio = "^0.23.3"
mypy = "^1.8.0"

[build-system]
requires = ["poetry-core"]
Expand Down
2 changes: 1 addition & 1 deletion upstash_qstash/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(
Asynchronous QStash client.
To use the blocking version, use the upstash_qstash client instead.
"""
self.http = HttpClient(token, retry, base_url)
self.http = HttpClient(token, retry, base_url or DEFAULT_BASE_URL)

async def publish(self, req: PublishRequest):
"""
Expand Down
17 changes: 9 additions & 8 deletions upstash_qstash/asyncio/dlq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional
from upstash_qstash.upstash_http import HttpClient
from upstash_qstash.dlq import ListMessagesOpts, ListMessageResponse
from upstash_qstash.qstash_types import UpstashRequest


class DLQ:
Expand All @@ -13,14 +14,14 @@ async def list_messages(
"""
Asynchronously list messages in the dlq
"""
cursor = opts.get("cursor") if opts else None
return await self.http.request_async(
{
"path": ["v2", "dlq"],
"method": "GET",
"query": {"cursor": cursor},
}
)
req: UpstashRequest = {
"path": ["v2", "dlq"],
"method": "GET",
}
if opts is not None and opts.get("cursor") is not None:
req["query"] = {"cursor": opts["cursor"]}

return await self.http.request_async(req)

async def delete(self, dlq_message_id: str):
"""
Expand Down
4 changes: 2 additions & 2 deletions upstash_qstash/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ async def get(
"""
Asynchronously retrieve logs.
"""
query: Dict[str, int] = {}
query = {}
if req is not None and req.get("cursor") is not None and req["cursor"] > 0:
query["cursor"] = req["cursor"]
query["cursor"] = str(req["cursor"])

return await http.request_async(
{
Expand Down
2 changes: 1 addition & 1 deletion upstash_qstash/asyncio/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def publish_async(

return await http.request_async(
{
"path": ["v2", "publish", req.get("url") or req.get("topic")],
"path": ["v2", "publish", req.get("url") or req["topic"]],
"body": req.get("body"),
"headers": headers,
"method": "POST",
Expand Down
3 changes: 2 additions & 1 deletion upstash_qstash/asyncio/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ async def create(self, req: CreateScheduleRequest) -> CreateScheduleResponse:
:return: A dictionary containing the 'scheduleId' of the created schedule.
:raises UpstashError: If required headers are missing.
"""
SyncSchedules._validate_schedule_request(req)
headers = SyncSchedules._prepare_headers(req)

return await self.http.request_async(
{
"path": ["v2", "schedules", req.get("destination")],
"path": ["v2", "schedules", req["destination"]],
"body": req.get("body"),
"headers": headers,
"method": "POST",
Expand Down
3 changes: 1 addition & 2 deletions upstash_qstash/asyncio/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
Topic,
Topics as SyncTopics,
)
from upstash_qstash.error import QstashException
import json


Expand Down Expand Up @@ -37,7 +36,7 @@ async def remove_endpoints(self, req: RemoveEndpointsRequest):

:param req: An instance of RemoveEndpointsRequest containing the name and endpoints
"""
self._validate_topic_request(req)
SyncTopics._validate_topic_request(req)
await self.http.request_async(
{
"path": ["v2", "topics", req["name"], "endpoints"],
Expand Down
2 changes: 1 addition & 1 deletion upstash_qstash/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(
Synchronous QStash client.
To use the blocking version, use the upstash_qstash client instead.
"""
self.http = HttpClient(token, retry, base_url)
self.http = HttpClient(token, retry, base_url or DEFAULT_BASE_URL)

def publish(self, req: PublishRequest):
"""
Expand Down
20 changes: 10 additions & 10 deletions upstash_qstash/dlq.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Optional, TypedDict, List, Dict
from upstash_qstash.qstash_types import Method
from upstash_qstash.qstash_types import Method, UpstashRequest
from upstash_qstash.upstash_http import HttpClient

DlqMessage = TypedDict(
Expand All @@ -23,7 +23,7 @@
ListMessagesOpts = TypedDict(
"ListMessagesOpts",
{
"cursor": Optional[str],
"cursor": str,
},
)

Expand All @@ -46,14 +46,14 @@ def list_messages(
"""
List messages in the dlq
"""
cursor = opts.get("cursor") if opts else None
return self.http.request(
{
"path": ["v2", "dlq"],
"method": "GET",
"query": {"cursor": cursor},
}
)
req: UpstashRequest = {
"path": ["v2", "dlq"],
"method": "GET",
}
if opts is not None and opts.get("cursor") is not None:
req["query"] = {"cursor": opts["cursor"]}

return self.http.request(req)

def delete(self, dlq_message_id: str):
"""
Expand Down
6 changes: 3 additions & 3 deletions upstash_qstash/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class State(Enum):
EventsRequest = TypedDict(
"EventsRequest",
{
"cursor": str,
"cursor": int,
},
)

Expand All @@ -48,9 +48,9 @@ def get(http: HttpClient, req: Optional[EventsRequest] = None) -> GetEventsRespo
"""
Retrieve logs.
"""
query: Dict[str, int] = {}
query = {}
if req is not None and req.get("cursor") is not None and req["cursor"] > 0:
query["cursor"] = req["cursor"]
query["cursor"] = str(req["cursor"])

return http.request(
{
Expand Down
29 changes: 15 additions & 14 deletions upstash_qstash/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,22 @@
{
"url": str,
"body": Any,
"headers": Optional[Dict[Any, Any]],
"delay": Optional[int],
"not_before": Optional[int],
"deduplication_id": Optional[str],
"content_based_deduplication": Optional[bool],
"retries": Optional[int],
"callback": Optional[str],
"failure_callback": Optional[str],
"method": Optional[Method],
"topic": Optional[str],
"headers": Dict[Any, Any],
"delay": int,
"not_before": int,
"deduplication_id": str,
"content_based_deduplication": bool,
"retries": int,
"callback": str,
"failure_callback": str,
"method": Method,
"topic": str,
},
total=False,
)

PublishToUrlResponse = TypedDict(
"PublishResponse",
"PublishToUrlResponse",
{"messageId": str, "deduplicated": Optional[bool]},
)

Expand Down Expand Up @@ -54,8 +55,8 @@ def _prepare_headers(req: PublishRequest) -> UpstashHeaders:
"""
Prepare and return headers for the publish request.
"""
headers: UpstashHeaders = req.get("headers") or {}
prefix_headers(headers)
init_headers = req.get("headers") or {}
headers = prefix_headers(init_headers)

headers["Upstash-Method"] = req.get("method") or "POST"

Expand Down Expand Up @@ -94,7 +95,7 @@ def publish(

return http.request(
{
"path": ["v2", "publish", req.get("url") or req.get("topic")],
"path": ["v2", "publish", req.get("url") or req["topic"]],
"body": req.get("body"),
"headers": headers,
"method": "POST",
Expand Down
55 changes: 25 additions & 30 deletions upstash_qstash/qstash_types.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,43 @@
from typing import Optional, Dict, Any, List, TypedDict
from enum import Enum
from typing import Optional, Dict, Any, List, TypedDict, Callable, Literal, Union


class Method(Enum):
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
PATCH = "PATCH"
Method = Union[
Literal["GET"], Literal["POST"], Literal["PUT"], Literal["DELETE"], Literal["PATCH"]
]

HeaderKey = Union[
Literal["Upstash-Method"],
Literal["Upstash-Delay"],
Literal["Upstash-Not-Before"],
Literal["Upstash-Deduplication-Id"],
Literal["Upstash-Content-Based-Deduplication"],
Literal["Upstash-Retries"],
Literal["Upstash-Callback"],
Literal["Upstash-Failure-Callback"],
Literal["Upstash-Cron"],
str, # This allows for any other string as a key (Upstash-Forward-*)
]

UpstashHeaders = TypedDict(
"UpstashHeaders",
{
"Upstash-Method": Method,
"Upstash-Delay": Optional[str],
"Upstash-Not-Before": Optional[str],
"Upstash-Deduplication-Id": Optional[str],
"Upstash-Content-Based-Deduplication": Optional[str],
"Upstash-Retries": Optional[str],
"Upstash-Callback": Optional[str],
"Upstash-Failure-Callback": Optional[str],
"Upstash-Cron": Optional[str],
# Other headers are prefixed with Upstash-Forward-
},
)
UpstashHeaders = Dict[HeaderKey, Optional[Union[str, Method]]]

UpstashRequest = TypedDict(
"UpstashRequest",
{
"path": List[str],
"body": Optional[Any],
"headers": Optional[UpstashHeaders],
"keepalive": Optional[bool],
"method": Optional[Method],
"query": Optional[Dict[str, str]],
"parse_response_as_json": Optional[bool],
"body": Any,
"headers": UpstashHeaders,
"keepalive": bool,
"method": Method,
"query": Dict[str, str],
"parse_response_as_json": bool,
},
total=False,
)

RetryConfig = TypedDict(
"RetryConfig",
{
"attempts": int,
"backoff": callable,
"backoff": Callable,
},
)
17 changes: 13 additions & 4 deletions upstash_qstash/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from upstash_qstash.qstash_types import Method, UpstashHeaders
from upstash_qstash.upstash_http import HttpClient
from upstash_qstash.utils import prefix_headers
from upstash_qstash.error import QstashException

Schedule = TypedDict(
"Schedule",
Expand Down Expand Up @@ -50,8 +51,8 @@ def __init__(self, http: HttpClient):

@staticmethod
def _prepare_headers(req: CreateScheduleRequest) -> UpstashHeaders:
headers: UpstashHeaders = req.get("headers") or {}
prefix_headers(headers)
init_headers = req.get("headers") or {}
headers = prefix_headers(init_headers)

headers["Upstash-Method"] = req.get("method") or "POST"

Expand All @@ -76,22 +77,30 @@ def _prepare_headers(req: CreateScheduleRequest) -> UpstashHeaders:
headers["Upstash-Callback"] = req.get("callback")

if req.get("failure_callback") is not None:
headers["Upstash-Failure-Callback"] = req.get("failureCallback")
headers["Upstash-Failure-Callback"] = req.get("failure_callback")

return headers

@staticmethod
def _validate_schedule_request(req: CreateScheduleRequest) -> None:
if req.get("cron") is None:
raise QstashException("Cron is required")
if req.get("destination") is None:
raise QstashException("Destination is required")

def create(self, req: CreateScheduleRequest) -> CreateScheduleResponse:
"""
Create a new schedule with the specified parameters.
:param req: A dictionary with the details of the schedule to create.
:return: A dictionary containing the 'scheduleId' of the created schedule.
:raises UpstashError: If required headers are missing.
"""
Schedules._validate_schedule_request(req)
headers = Schedules._prepare_headers(req)

return self.http.request(
{
"path": ["v2", "schedules", req.get("destination")],
"path": ["v2", "schedules", req["destination"]],
"body": req.get("body"),
"headers": headers,
"method": "POST",
Expand Down
4 changes: 3 additions & 1 deletion upstash_qstash/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ def delete(self, name: str):
)

@staticmethod
def _validate_topic_request(req: dict):
def _validate_topic_request(
req: Union[AddEndpointsRequest, RemoveEndpointsRequest]
):
"""
Ensure that the request contains a valid topic name and valid endpoints
"""
Expand Down
10 changes: 6 additions & 4 deletions upstash_qstash/upstash_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import aiohttp
import asyncio
from typing import Union
from typing import Union, Optional
from urllib.parse import urlencode
from upstash_qstash.qstash_types import UpstashRequest, RetryConfig, UpstashHeaders
from upstash_qstash.error import QstashException, QstashRateLimitException
Expand All @@ -16,7 +16,9 @@


class HttpClient:
def __init__(self, token: str, retry: Union[RetryConfig, bool], base_url: str):
def __init__(
self, token: str, retry: Optional[Union[RetryConfig, bool]], base_url: str
):
"""
Initializes the HttpClient.

Expand Down Expand Up @@ -66,7 +68,7 @@ def request(self, req: UpstashRequest):
for i in range(self.retry["attempts"]):
try:
res = requests.request(
method=req.get("method"),
method=req["method"],
url=url,
headers=headers,
stream=req.get("keepalive", False),
Expand Down Expand Up @@ -113,7 +115,7 @@ async def request_async(self, req: UpstashRequest):
try:
async with aiohttp.ClientSession() as session:
async with session.request(
method=req.get("method"),
method=req["method"],
url=url,
headers=headers,
data=req.get("body"),
Expand Down
Loading
Loading