From 6a5635b1c7419dc083efb62471cfb6748b8679df Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Wed, 26 Jul 2023 15:59:10 -0400 Subject: [PATCH] (3/x) Disable retries (for now) if this is an ExecuteStatement call I don't like using state for this. But thrift doesn't make it easy to pass arbitrary flags to its execution methods. AFAIK thread pools are thread local so this shouldn't cause race conditions. This could affect our plans for async execution in in the future, though Signed-off-by: Jesse Whitehouse --- src/databricks/sql/auth/thrift_http_client.py | 11 +++++++++++ src/databricks/sql/thrift_backend.py | 19 +++++++++++++------ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/databricks/sql/auth/thrift_http_client.py b/src/databricks/sql/auth/thrift_http_client.py index 24ec21a9..7a0c7225 100644 --- a/src/databricks/sql/auth/thrift_http_client.py +++ b/src/databricks/sql/auth/thrift_http_client.py @@ -104,6 +104,7 @@ def __init__( self.realhost = self.realport = self.proxy_auth = None self.max_connections = max_connections + self._allow_retries = True self.retry_policy = retry_policy self.__wbuf = BytesIO() @@ -117,6 +118,10 @@ def setCustomHeaders(self, headers: Dict[str, str]): self._headers = headers super().setCustomHeaders(headers) + def setAllowRetries(self, value: bool): + logger.info(f"urllib3 is allowed to retry: {value}") + self._allow_retries = value + def open(self): # self.__pool replaces the self.__http used by the original THttpClient @@ -184,6 +189,11 @@ def flush(self): custom_headers = {key: val for key, val in self.__custom_headers.items()} headers.update(**custom_headers) + if not self._allow_retries: + toggle_retries = {"retries": False} + else: + toggle_retries = {} + # HTTP request self.__resp = self.__pool.request( "POST", @@ -192,6 +202,7 @@ def flush(self): headers=headers, preload_content=False, timeout=self.__timeout, + **toggle_retries ) # Get reply to flush the request diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 2ab9ef66..3ad305e5 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -342,17 +342,24 @@ def attempt_request(attempt): error, error_message, retry_delay = None, None, None try: + # The MagicMocks in our unit tests have a `name` property instead of `__name__`. - logger.debug( - "Sending request: {}()".format( - getattr( - method, "__name__", getattr(method, "name", "UnknownMethod") - ) - ) + this_method_name = getattr( + method, "__name__", getattr(method, "name", "UnknownMethod") ) + + logger.debug("Sending request: {}()".format(this_method_name)) unsafe_logger.debug("Sending request: {}".format(request)) + + # TODO: only allow retries when it's a 429 or a 503 + if this_method_name == "ExecuteStatement": + self._transport.setAllowRetries(False) + response = method(request) + # Always default to retry after this request is complete + self._transport.setAllowRetries(True) + # Calling `close()` here releases the active HTTP connection back to the pool self._transport.close()