Skip to content

Commit

Permalink
Merge branch '2.0.0' into 'master'
Browse files Browse the repository at this point in the history
2.0.0 feat(add ws reconnection)

See merge request exchange/code/sdk/bitmart-python-sdk-api!15
  • Loading branch information
ricci committed Sep 13, 2024
2 parents 0939f66 + f726a00 commit 80ca0b0
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 10 deletions.
42 changes: 38 additions & 4 deletions bitmart/websocket/futures_socket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
on_pong=None,
logger=None,
timeout=None,
reconnection=True,
ping_interval=10,
api_key=None,
api_secret_key=None,
Expand All @@ -43,56 +44,89 @@ def __init__(
on_pong=on_pong,
logger=logger,
timeout=timeout,
on_reconnect=self.reconnect,
)
self.stream_url = stream_url
self.reconnection = reconnection
self.reconnectionUseLogin = False
self.reconnectionChannel = []
self.API_KEY = api_key
self.API_SECRET_KEY = api_secret_key
self.API_MEMO = api_memo

# start the thread
self.socket_manager.start()
self.logger.debug("BitMart Spot WebSocket Client started.")
self.logger.debug("BitMart Futures WebSocket Client started.")

# Start the ping timer
self.ping_interval = ping_interval
# self.start_send_ping()
self.start_send_ping()

def start_send_ping(self):
self.ping()
ping_timer = threading.Timer(self.ping_interval, self.start_send_ping)
ping_timer.start()

def send(self, message: dict):
self.socket_manager.send_message(json.dumps(message))
self.__send(json.dumps(message))

def subscribe(self, args):
if single_stream(args):
args = [args]
json_msg = json.dumps({"action": "subscribe", "args": args})
self.socket_manager.send_message(json_msg)
self.__send(json_msg)

def unsubscribe(self, args):
if single_stream(args):
args = [args]
json_msg = json.dumps({"action": "unsubscribe", "args": args})
self.__send(json_msg)

def __send(self, json_msg):
if self.reconnection:
if json_msg not in self.reconnectionChannel:
self.reconnectionChannel.append(json_msg)
self.socket_manager.send_message(json_msg)

def login(self, timeout=5):
if not self.API_KEY:
self.stop()
raise ValueError("Invalid API KEY")
if not self.API_SECRET_KEY:
self.stop()
raise ValueError("Invalid API SECRET KEY")
if not self.API_MEMO:
self.stop()
raise ValueError("Invalid API MEMO")
timestamp = cloud_utils.get_timestamp()
sign = cloud_utils.sign(cloud_utils.pre_substring(
timestamp, self.API_MEMO, 'bitmart.WebSocket'), self.API_SECRET_KEY)
self.socket_manager.send_message(json.dumps({"action": "access", "args": [self.API_KEY, timestamp, sign, "web"]}))
self.reconnectionUseLogin = True
# timeout
time.sleep(timeout)

def ping(self):
self.socket_manager.ping()

def stop(self):
self.reconnection = False
self.socket_manager.close()
self.socket_manager.join()

def reconnect(self):
if not self.reconnection:
return

self.logger.debug(
f"WebSocket Client Reconnection to: {self.stream_url}",
)

# time.sleep(2)
self.socket_manager.reconnect()
if self.reconnectionUseLogin:
self.login()

for msg in self.reconnectionChannel:
time.sleep(1)
self.socket_manager.send_message(msg)
33 changes: 29 additions & 4 deletions bitmart/websocket/socket_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import threading
import time

from websocket import (
ABNF,
create_connection,
Expand All @@ -21,6 +23,7 @@ def __init__(
on_error=None,
on_ping=None,
on_pong=None,
on_reconnect=None,
logger=None,
timeout=None,
):
Expand All @@ -36,23 +39,43 @@ def __init__(
self.on_ping = on_ping
self.on_pong = on_pong
self.on_error = on_error
self.on_reconnect = on_reconnect
self.retryReconnectTimes = 5
self.timeout = timeout

self.create_ws_connection()

def create_ws_connection(self):
self.logger.debug(
f"Creating connection with WebSocket Server: {self.stream_url}",
f"WebSocket Client Connection to: {self.stream_url}",
)

self.ws = create_connection(
self.stream_url, timeout=self.timeout
)
self.logger.debug(
f"WebSocket connection has been established: {self.stream_url}",
f"WebSocket Client has been established: {self.stream_url}",
)
self._callback(self.on_open)

def reconnect(self):
self.retryReconnectTimes -= 1
if self.retryReconnectTimes < 0:
self.logger.error(f"Reconnection failed: Retry Max 5 times")
return

if self.ws:
self.ws.close()

time.sleep(5 - self.retryReconnectTimes) # Optional delay before attempting to reconnect

try:
self.create_ws_connection() # Try to create a new connection
self.retryReconnectTimes = 5
except WebSocketException as e:
self.logger.error(f"Reconnection failed: {e}")
self.reconnect() # Recursive reconnection attempt on failure

def run(self):
self.read_data()

Expand All @@ -76,7 +99,9 @@ def read_data(self):
self.logger.error("Websocket connection timeout")
else:
self.logger.error("Websocket exception: {}".format(e))
raise e
self._callback(self.on_close, "Close Reason: {}".format(e))
self.on_reconnect()
continue
except Exception as e:
self.logger.error("Exception in read_data: {}".format(e))
raise e
Expand All @@ -87,7 +112,7 @@ def read_data(self):
self.logger.warning(
"CLOSE frame received, closing websocket connection"
)
self._callback(self.on_close)
self._callback(self.on_close, "CLOSE frame received, closing websocket connection")
break

def _handle_data(self, op_code, frame):
Expand Down
37 changes: 35 additions & 2 deletions bitmart/websocket/spot_socket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
on_pong=None,
logger=None,
timeout=None,
reconnection=True,
ping_interval=10,
api_key=None,
api_secret_key=None,
Expand All @@ -43,7 +44,12 @@ def __init__(
on_pong=on_pong,
logger=logger,
timeout=timeout,
on_reconnect=self.reconnect,
)
self.stream_url = stream_url
self.reconnection = reconnection
self.reconnectionUseLogin = False
self.reconnectionChannel = []
self.API_KEY = api_key
self.API_SECRET_KEY = api_secret_key
self.API_MEMO = api_memo
Expand All @@ -62,37 +68,64 @@ def start_send_ping(self):
ping_timer.start()

def send(self, message: dict):
self.socket_manager.send_message(json.dumps(message))
self.__send(json.dumps(message))

def subscribe(self, args):
if single_stream(args):
args = [args]
json_msg = json.dumps({"op": "subscribe", "args": args})
self.socket_manager.send_message(json_msg)
self.__send(json_msg)

def unsubscribe(self, args):
if single_stream(args):
args = [args]
json_msg = json.dumps({"op": "unsubscribe", "args": args})
self.__send(json_msg)

def __send(self, json_msg):
if self.reconnection:
if json_msg not in self.reconnectionChannel:
self.reconnectionChannel.append(json_msg)
self.socket_manager.send_message(json_msg)

def login(self, timeout=5):
if not self.API_KEY:
self.stop()
raise ValueError("Invalid API KEY")
if not self.API_SECRET_KEY:
self.stop()
raise ValueError("Invalid API SECRET KEY")
if not self.API_MEMO:
self.stop()
raise ValueError("Invalid API MEMO")
timestamp = cloud_utils.get_timestamp()
sign = cloud_utils.sign(cloud_utils.pre_substring(
timestamp, self.API_MEMO, 'bitmart.WebSocket'), self.API_SECRET_KEY)
self.socket_manager.send_message(json.dumps({"op": "login", "args": [self.API_KEY, timestamp, sign]}))
self.reconnectionUseLogin = True
# timeout
time.sleep(timeout)

def ping(self):
self.socket_manager.ping()

def stop(self):
self.reconnection = False
self.socket_manager.close()
self.socket_manager.join()

def reconnect(self):
if not self.reconnection:
return

self.logger.debug(
f"WebSocket Client Reconnection to: {self.stream_url}",
)

self.socket_manager.reconnect()
if self.reconnectionUseLogin:
self.login()

for msg in self.reconnectionChannel:
time.sleep(1)
self.socket_manager.send_message(msg)
42 changes: 42 additions & 0 deletions examples/websocket/futures/websocket_stream/reconnection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
import time

from bitmart.lib.cloud_consts import SPOT_PUBLIC_WS_URL, SPOT_PRIVATE_WS_URL, FUTURES_PUBLIC_WS_URL, \
FUTURES_PRIVATE_WS_URL
from bitmart.lib.cloud_utils import config_logging
from bitmart.websocket.futures_socket_client import FuturesSocketClient
from bitmart.websocket.spot_socket_client import SpotSocketClient

config_logging(logging, logging.DEBUG)


def open_handler(_, ):
logging.info("open")


def message_handler(_, message):
logging.info(message)


def close_handler(_, message):
logging.info(message)


my_client = FuturesSocketClient(stream_url=FUTURES_PRIVATE_WS_URL,
on_open=open_handler,
on_message=message_handler,
on_close=close_handler,
reconnection=True,
api_memo='your_api_memo',
api_key='your_api_key',
api_secret_key='your_secret_key'
)
my_client.login()

my_client.subscribe(args="futures/asset:USDT")


# my_client.subscribe(args="futures/ticker")

# time.sleep(30)

36 changes: 36 additions & 0 deletions examples/websocket/spot/websocket_stream/reconnection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging
import time

from bitmart.lib.cloud_consts import SPOT_PUBLIC_WS_URL, SPOT_PRIVATE_WS_URL
from bitmart.lib.cloud_utils import config_logging
from bitmart.websocket.spot_socket_client import SpotSocketClient

config_logging(logging, logging.DEBUG)


def open_handler(_, ):
logging.info("open")


def message_handler(_, message):
logging.info(message)


def close_handler(_, message):
logging.info(message)


my_client = SpotSocketClient(stream_url=SPOT_PRIVATE_WS_URL,
on_open=open_handler,
on_message=message_handler,
on_close=close_handler,
reconnection=True)


my_client.subscribe(args="spot/user/balance:BALANCE_UPDATE")

# my_client.subscribe(args="spot/kline1m:BTC_USDT")

# time.sleep(30)

# my_client.subscribe(args="spot/ticker:BTC_USDT")

0 comments on commit 80ca0b0

Please sign in to comment.