Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Complete futures on shutdown #746

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
from google.cloud.pubsub_v1.subscriber._protocol import requests
from google.cloud.pubsub_v1.subscriber.exceptions import (
AcknowledgeError,
AcknowledgeStatus,
)

Expand Down Expand Up @@ -86,6 +87,7 @@ def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"):
self._queue = queue
self._thread: Optional[threading.Thread] = None
self._operational_lock = threading.Lock()
self._shutdown_mode = False

def start(self) -> None:
"""Start a thread to dispatch requests queued up by callbacks.
Expand All @@ -112,6 +114,7 @@ def start(self) -> None:
def stop(self) -> None:
with self._operational_lock:
if self._thread is not None:
self.enter_shutdown_mode()
# Signal the worker to stop by queueing a "poison pill"
self._queue.put(helper_threads.STOP)
self._thread.join()
Expand All @@ -125,6 +128,25 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None:
items:
Queued requests to dispatch.
"""
exactly_once_delivery_enabled = self._manager._exactly_once_delivery_enabled()
if self._shutdown_mode:
for item in items:
if (
isinstance(item, requests.ModAckRequest)
or isinstance(item, requests.AckRequest)
or isinstance(item, requests.NackRequest)
) and item.future is not None:
if exactly_once_delivery_enabled:
item.future.set_exception(
AcknowledgeError(
AcknowledgeStatus.OTHER,
"Stream is being shutdown, request was not sent.",
)
)
else:
item.future.set_result(AcknowledgeStatus.SUCCESS)
return

lease_requests: List[requests.LeaseRequest] = []
modack_requests: List[requests.ModAckRequest] = []
ack_requests: List[requests.AckRequest] = []
Expand All @@ -136,7 +158,6 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None:
ack_ids = set()
nack_ids = set()
drop_ids = set()
exactly_once_delivery_enabled = self._manager._exactly_once_delivery_enabled()

for item in items:
if isinstance(item, requests.LeaseRequest):
Expand Down Expand Up @@ -412,3 +433,6 @@ def nack(self, items: Sequence[requests.NackRequest]) -> None:
for item in items
]
)

def enter_shutdown_mode(self) -> None:
self._shutdown_mode = True
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ def send_unary_ack(
# Futures may be present even with exactly-once delivery
# disabled, in transition periods after the setting is changed on
# the subscription.
if req.future:
if req.future and req.future.running:
if exactly_once_delivery_enabled:
e = AcknowledgeError(
AcknowledgeStatus.OTHER, "RetryError while sending ack RPC."
Expand Down Expand Up @@ -679,7 +679,7 @@ def send_unary_ack(
# Futures may be present even with exactly-once delivery
# disabled, in transition periods after the setting is changed on
# the subscription.
if req.future:
if req.future and req.future.running:
req.future.set_result(AcknowledgeStatus.SUCCESS)
requests_completed.append(req)

Expand Down Expand Up @@ -726,7 +726,7 @@ def send_unary_modack(
# Futures may be present even with exactly-once delivery
# disabled, in transition periods after the setting is changed on
# the subscription.
if req.future:
if req.future and req.future.running:
if exactly_once_delivery_enabled:
e = AcknowledgeError(
AcknowledgeStatus.OTHER,
Expand Down Expand Up @@ -759,7 +759,7 @@ def send_unary_modack(
# Futures may be present even with exactly-once delivery
# disabled, in transition periods after the setting is changed on
# the subscription.
if req.future:
if req.future and req.future.running:
req.future.set_result(AcknowledgeStatus.SUCCESS)
requests_completed.append(req)

Expand Down
178 changes: 178 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import mock
import pytest
from google.cloud.pubsub_v1.subscriber.exceptions import (
AcknowledgeError,
AcknowledgeStatus,
)

Expand Down Expand Up @@ -195,6 +196,182 @@ def test_dispatch_duplicate_items_callback_active_manager_with_futures_no_eod(
assert items[1].future.result() == AcknowledgeStatus.SUCCESS


@pytest.mark.parametrize(
"items,method_name",
[
(
[
requests.AckRequest("0", 0, 0, "", None),
requests.AckRequest("0", 0, 1, "", None),
],
"ack",
),
(
[
requests.DropRequest("0", 0, ""),
requests.DropRequest("0", 1, ""),
],
"drop",
),
(
[
requests.LeaseRequest("0", 0, ""),
requests.LeaseRequest("0", 1, ""),
],
"lease",
),
(
[
requests.ModAckRequest("0", 0, None),
requests.ModAckRequest("0", 1, None),
],
"modify_ack_deadline",
),
(
[
requests.NackRequest("0", 0, "", None),
requests.NackRequest("0", 1, "", None),
],
"nack",
),
],
)
def test_dispatch_in_shutdown_mode_no_futures(items, method_name):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

dispatcher_._shutdown_mode = True

manager._exactly_once_delivery_enabled.return_value = False
with mock.patch.object(dispatcher_, method_name) as method:
dispatcher_.dispatch_callback(items)

method.assert_not_called()
manager._exactly_once_delivery_enabled.assert_called()


@pytest.mark.parametrize(
"items,method_name",
[
(
[
requests.AckRequest("0", 0, 0, "", None),
requests.AckRequest("0", 0, 1, "", futures.Future()),
],
"ack",
),
(
[
requests.DropRequest("0", 0, ""),
requests.DropRequest("0", 1, ""),
],
"drop",
),
(
[
requests.LeaseRequest("0", 0, ""),
requests.LeaseRequest("0", 1, ""),
],
"lease",
),
(
[
requests.ModAckRequest("0", 0, None),
requests.ModAckRequest("0", 1, futures.Future()),
],
"modify_ack_deadline",
),
(
[
requests.NackRequest("0", 0, "", None),
requests.NackRequest("0", 1, "", futures.Future()),
],
"nack",
),
],
)
def test_dispatch_in_shutdown_mode_no_eod(items, method_name):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

dispatcher_._shutdown_mode = True

manager._exactly_once_delivery_enabled.return_value = False
with mock.patch.object(dispatcher_, method_name) as method:
dispatcher_.dispatch_callback(items)

method.assert_not_called()
manager._exactly_once_delivery_enabled.assert_called()

if method_name != "drop" and method_name != "lease":
assert items[1].future.result() == AcknowledgeStatus.SUCCESS


@pytest.mark.parametrize(
"items,method_name",
[
(
[
requests.AckRequest("0", 0, 0, "", None),
requests.AckRequest("0", 0, 1, "", futures.Future()),
],
"ack",
),
(
[
requests.DropRequest("0", 0, ""),
requests.DropRequest("0", 1, ""),
],
"drop",
),
(
[
requests.LeaseRequest("0", 0, ""),
requests.LeaseRequest("0", 1, ""),
],
"lease",
),
(
[
requests.ModAckRequest("0", 0, None),
requests.ModAckRequest("0", 1, futures.Future()),
],
"modify_ack_deadline",
),
(
[
requests.NackRequest("0", 0, "", None),
requests.NackRequest("0", 1, "", futures.Future()),
],
"nack",
),
],
)
def test_dispatch_in_shutdown_mode_with_eod(items, method_name):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

dispatcher_._shutdown_mode = True

manager._exactly_once_delivery_enabled.return_value = True
with mock.patch.object(dispatcher_, method_name) as method:
dispatcher_.dispatch_callback(items)

method.assert_not_called()
manager._exactly_once_delivery_enabled.assert_called()

if method_name != "drop" and method_name != "lease":
with pytest.raises(AcknowledgeError) as e:
items[1].future.result()
assert e.value.error_code == AcknowledgeStatus.OTHER


@pytest.mark.parametrize(
"items,method_name",
[
Expand Down Expand Up @@ -400,6 +577,7 @@ def test_ack_no_time():
future=None,
)
]

manager.send_unary_ack.return_value = (items, [])
dispatcher_.ack(items)

Expand Down