Skip to content

Commit

Permalink
Merge pull request #1 from gyst/master
Browse files Browse the repository at this point in the history
Threaded socketio client
  • Loading branch information
kingel committed Jul 16, 2012
2 parents fc47bb5 + 6c8546e commit 6e62606
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 0 deletions.
133 changes: 133 additions & 0 deletions amitu/socketio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,136 @@ def ontimeout(self):
else:
self.sock.close()
exit(1)

import time
import socket
import threading
from Queue import Queue
from amitu.websocket_client import FRAME_START, FRAME_END
from amitu.websocket_client import WebSocketError
import logging
logger = logging.getLogger(__name__)


class ThreadedSocketIOClient(SocketIOClient):
"""The upstream amitu socket client can only send one message,
and then shuts down the connection.
This threaded client can handle a sequential conversation consisting
of multiple messages.
Example usage:
rcvd = []
def myfunc(msg):
rcvd.append(msg)
# do something more useful
sockio = ThreadedSocketIOClient(server, port)
# first message
socketio('5:::{"foo":"bar"}', myfunc)
# second message
socketio('5:::{"bar":"baz"}', myfunc)
# wait for callbacks
while len(rcvd) < 2:
time.sleep(1)
# shutdown
sockio.close()
"""

def __init__(self, server, port, protocol="ws", *args, **kwargs):
self._q = Queue()
self.msg = None
self._callback = None
self._t = None
super(ThreadedSocketIOClient, self
).__init__(server, port, protocol, *args, **kwargs)

def __call__(self, msg, callback):
logger.debug("%s.__call__::%s, %s",
self.__class__.__name__, msg, callback)
self._q.put((msg, callback))
if self._t is None:
self.runloop()

def callback(self, msg):
logger.debug("%s.callback::calling %s with msg=%s",
self.__class__.__name__, self._callback, msg)
if self._callback is not None:
self._callback(msg)
# re-loop
self.runloop()
else:
raise AttributeError("No callback to handle message::%s" % msg)

def runloop(self):
logger.debug("%s.runloop",
self.__class__.__name__)
# blocks until next message or terminator
self.msg, self._callback = self._q.get()
logger.debug("%s.runloop::callback set to %s",
self.__class__.__name__, self._callback)
# initial loop
if self._t is None:
self._t = threading.Thread(target=self._run)
self._t.start()
# terminator
elif self.msg is None:
self._close()
else:
self.send_message(self.msg)

def _run(self):
self.on("connect", self.my_connect)
self.on("message", self.my_message)
self.on("disconnect", self.my_disconnect)
self.on("error", self.my_error)
self.on("timeout", self.my_timeout)
# fixes connection reset by peer errors
time.sleep(0.001)
self.run()

def my_error(self, error):
self.my_disconnect('dikke error %s ik kap ermee ait' % error)

def my_timeout(self):
self.my_disconnect('timeout yo, ik kap ermee')

def my_connect(self):
self.send_message(self.msg)

def send_message(self, msg):
logger.debug("%s.send_message::%s",
self.__class__.__name__, msg)
self.send(msg)

def my_message(self, msg):
logger.debug("%s.my_message::> %s",
self.__class__.__name__, msg)
message = msg.split(':')
if message[0] == "5":
my_msg = json.loads(':'.join(message[3:]))
self.callback(my_msg)

def my_disconnect(self, msg=None):
self.close()

def close(self):
logger.debug("%s.close",
self.__class__.__name__)
self._q.put((None, None))

def _close(self):
self.sock.settimeout(1)
self.sock.shutdown(socket.SHUT_RDWR)
# no sys.exit!

def on_server(data):
pass

def onclose(self):
logger.debug("%s.onclose" %
(self.__class__.__name__))
super(ThreadedSocketIOClient, self).onclose()
5 changes: 5 additions & 0 deletions amitu/websocket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ def _receive_handshake(self):
def _consume_frames(self, buf):
while FRAME_END in buf:
frame, buf = buf.split(FRAME_END, 1)

# don't choke on empty frames
if frame == '' and buf == FRAME_START:
continue

if frame[0] != FRAME_START:
raise WebSocketError("Invalid frame %s)" % (buf))
self.onmessage(frame[1:])
Expand Down

0 comments on commit 6e62606

Please sign in to comment.