Skip to content

Commit

Permalink
(3/x) Disable retries (for now) if this is an ExecuteStatement call
Browse files Browse the repository at this point in the history
I don't like using state for this. But thrift doesn't make it easy to pass
arbitrary flags to its execution methods. AFAIK thread pools are thread
local so this shouldn't cause race conditions.

This could affect our plans for async execution in in the future, though

Signed-off-by: Jesse Whitehouse <[email protected]>
  • Loading branch information
Jesse Whitehouse committed Jul 26, 2023
1 parent 6278365 commit 6a5635b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
11 changes: 11 additions & 0 deletions src/databricks/sql/auth/thrift_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(
self.realhost = self.realport = self.proxy_auth = None

self.max_connections = max_connections
self._allow_retries = True
self.retry_policy = retry_policy

self.__wbuf = BytesIO()
Expand All @@ -117,6 +118,10 @@ def setCustomHeaders(self, headers: Dict[str, str]):
self._headers = headers
super().setCustomHeaders(headers)

def setAllowRetries(self, value: bool):
logger.info(f"urllib3 is allowed to retry: {value}")
self._allow_retries = value

def open(self):

# self.__pool replaces the self.__http used by the original THttpClient
Expand Down Expand Up @@ -184,6 +189,11 @@ def flush(self):
custom_headers = {key: val for key, val in self.__custom_headers.items()}
headers.update(**custom_headers)

if not self._allow_retries:
toggle_retries = {"retries": False}
else:
toggle_retries = {}

# HTTP request
self.__resp = self.__pool.request(
"POST",
Expand All @@ -192,6 +202,7 @@ def flush(self):
headers=headers,
preload_content=False,
timeout=self.__timeout,
**toggle_retries
)

# Get reply to flush the request
Expand Down
19 changes: 13 additions & 6 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,17 +342,24 @@ def attempt_request(attempt):

error, error_message, retry_delay = None, None, None
try:

# The MagicMocks in our unit tests have a `name` property instead of `__name__`.
logger.debug(
"Sending request: {}(<REDACTED>)".format(
getattr(
method, "__name__", getattr(method, "name", "UnknownMethod")
)
)
this_method_name = getattr(
method, "__name__", getattr(method, "name", "UnknownMethod")
)

logger.debug("Sending request: {}(<REDACTED>)".format(this_method_name))
unsafe_logger.debug("Sending request: {}".format(request))

# TODO: only allow retries when it's a 429 or a 503
if this_method_name == "ExecuteStatement":
self._transport.setAllowRetries(False)

response = method(request)

# Always default to retry after this request is complete
self._transport.setAllowRetries(True)

# Calling `close()` here releases the active HTTP connection back to the pool
self._transport.close()

Expand Down

0 comments on commit 6a5635b

Please sign in to comment.