Skip to content

Commit

Permalink
client side event deserialize=True option
Browse files Browse the repository at this point in the history
  • Loading branch information
Vignesh.Vaidyanathan committed Oct 5, 2024
1 parent a5f4bcb commit 9667baf
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions hololinked/client/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ async def async_write_multiple_properties(self, **properties) -> None:


def subscribe_event(self, name : str, callbacks : typing.Union[typing.List[typing.Callable], typing.Callable],
thread_callbacks : bool = False) -> None:
thread_callbacks : bool = False, deserialize = False) -> None:
"""
Subscribe to event specified by name. Events are listened in separate threads and supplied callbacks are
are also called in those threads.
Expand All @@ -489,7 +489,7 @@ def subscribe_event(self, name : str, callbacks : typing.Union[typing.List[typin
if event._subscribed:
event.add_callbacks(callbacks)
else:
event.subscribe(callbacks, thread_callbacks)
event.subscribe(callbacks, thread_callbacks, deserialize)


def unsubscribe_event(self, name : str):
Expand Down Expand Up @@ -756,7 +756,7 @@ def oneway_set(self, value : typing.Any) -> None:
class _Event:

__slots__ = ['_zmq_client', '_name', '_obj_name', '_unique_identifier', '_socket_address', '_callbacks', '_serialization_specific',
'_serializer', '_subscribed', '_thread', '_thread_callbacks', '_event_consumer', '_logger']
'_serializer', '_subscribed', '_thread', '_thread_callbacks', '_event_consumer', '_logger', '_deserialize']
# event subscription
# Dont add class doc otherwise __doc__ in slots will conflict with class variable

Expand All @@ -772,6 +772,7 @@ def __init__(self, client : SyncZMQClient, name : str, obj_name : str, unique_id
self._serializer = serializer
self._logger = logger
self._subscribed = False
self._deserialize = False

def add_callbacks(self, callbacks : typing.Union[typing.List[typing.Callable], typing.Callable]) -> None:
if not self._callbacks:
Expand All @@ -782,22 +783,23 @@ def add_callbacks(self, callbacks : typing.Union[typing.List[typing.Callable], t
self._callbacks.append(callbacks)

def subscribe(self, callbacks : typing.Union[typing.List[typing.Callable], typing.Callable],
thread_callbacks : bool = False):
thread_callbacks : bool = False, deserialize : bool = False):
self._event_consumer = EventConsumer(
'zmq-' + self._unique_identifier if self._serialization_specific else self._unique_identifier,
self._socket_address, f"{self._name}|RPCEvent|{uuid.uuid4()}", b'PROXY',
zmq_serializer=self._serializer, logger=self._logger
)
self.add_callbacks(callbacks)
self._subscribed = True
self._deserialize = deserialize
self._thread_callbacks = thread_callbacks
self._thread = threading.Thread(target=self.listen)
self._thread.start()

def listen(self):
while self._subscribed:
try:
data = self._event_consumer.receive()
data = self._event_consumer.receive(deserialize=self._deserialize)
if data == 'INTERRUPT':
break
for cb in self._callbacks:
Expand Down

0 comments on commit 9667baf

Please sign in to comment.