Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made websocketsd more efficient #57

Merged
merged 2 commits into from
Apr 28, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 80 additions & 13 deletions chain/websocketd.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,96 @@

logger.info("websocketd.py started")

zmq_socks = set()
zmq_socks_tag = {}
tag_zmq_sock = {}
tag_subscribers = {}

@websockets.route('/<tag>')
def site_socket(ws, tag):
logger.info('ws client connected for tag "%s"' % tag)
zmq_sock = zmq_ctx.socket(zmq.SUB)
zmq_sock.connect(ZMQ_PASSTHROUGH_URL_PUB)
# note that flask gives us tag as a unicode string
zmq_sock.setsockopt_string(zmq.SUBSCRIBE, tag)
if tag in tag_zmq_sock:
tag_subscribers[tag].add(ws)
else:
zmq_sock = zmq_ctx.socket(zmq.SUB)
zmq_sock.connect(ZMQ_PASSTHROUGH_URL_PUB)
# note that flask gives us tag as a unicode string
zmq_sock.setsockopt_string(zmq.SUBSCRIBE, tag)
zmq_socks.add(zmq_sock)
zmq_socks_tag[zmq_sock] = tag
tag_zmq_sock[tag] = zmq_sock
tag_subscribers[tag] = {ws}
while True:
in_data = zmq_sock.recv()
# in case we later subscribe to multiple topic, get the topic from the
# incoming message
msg_tag, _, msg = in_data.partition(' ')
logger.debug('Received on tag "%s": %s' % (msg_tag, msg))
try:
ws.send(msg)
# Read from ws socket continuously.
# XXX: This builds up a lot of threads. Is there ANY way we could do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually spawn system threads, or do you mean greenlets? As I understand it flask-sockets is based on gevent-websocket, which should spawn a greenlet per connection (which will be "green-blocked" on ws.receive()) Under the hood I'd expect it do be doing something like select

# this for many connected ws clients in a non-blocking fashion (like select).
# NOTE that ws dies if site_socket function exits...
received = ws.receive()
logger.info("Received from ws client: %s" % received)
except Exception as e:
logger.info('Caught Error sending to client: %s' % e)
tag_subscribers[tag].remove(ws)
if len(tag_subscribers) == 0:
close_socket(tag_zmq_sock[tag])
break
logger.info('Disconnecting ZMQ Socket for tag "%s"' % tag)
zmq_sock.disconnect(ZMQ_PASSTHROUGH_URL_PUB)
# return an empty response so Flask doesn't complain
return ''

def close_socket(zmq_sock):
tag = zmq_socks_tag[zmq_sock]
logger.info('Disconnecting ZMQ Socket for tag "%s"' % tag)
try:
zmq_sock.disconnect(ZMQ_PASSTHROUGH_URL_PUB)
except Exception as e:
logger.info(str(e))
zmq_socks.remove(zmq_sock)
del zmq_socks_tag[zmq_sock]
del tag_zmq_sock[tag]
subscribers = tag_subscribers[tag]
del tag_subscribers[tag]
for ws in tag_subscribers:
try:
ws.close()
except Exception:
logger.info('Could not close ws socket cleanly.')

def select_zmq_socks():
logger.info("Starting select loop over ZMQ sockets")
while True:
gevent.sleep(seconds=0.0625, ref=True)
while len(zmq_socks) == 0:
gevent.sleep(seconds=0.0625, ref=True)
# It's important to have a timeout for the select loop, because
# if more sockets are added, they won't be slected for until
# the next iteration of the while loop
socks_list = list(zmq_socks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another way to do this without the quick-spin loop would be to add an internal ZMQ socket called select_wake or something and add it to the list of sockets we're selecting. Then whenever you wanted to kick the select process you could just write a byte to that socket and it would wake up and pick up any new sockets.

Not a blocking issue though.

rlist, wlist, xlist = zmq.select(socks_list, [], socks_list, timeout=0.0625)
for zmq_sock in xlist:
tag = zmq_socks_tag[zmq_sock]
logger.info('Error on ZMQ socket on tag "%s".' % tag)
close_socket(zmq_sock)
for zmq_sock in rlist:
tag = zmq_socks_tag[zmq_sock]
logger.info('Reading from socket on tag "%s".' % tag)
msg_tag, _, msg = zmq_sock.recv().partition(" ")
logger.info('Received on tag "%s": %s' % (msg_tag, msg))
to_remove = set()
for ws in tag_subscribers[tag]:
try:
ws.send(msg)
except Exception as e:
logger.info('Caught Error sending to client: %s' % e)
try:
ws.close()
except Exception:
logger.info('Could not close ws socket cleanly.')
to_remove.add(ws)
tag_subscribers[tag].difference_update(to_remove)
if len(tag_subscribers[tag]) == 0:
close_socket(zmq_sock)

logger.info("Starting gevent processess...")
# Start the passthrough process:
gevent.spawn(passthrough, zmq_context=zmq_ctx)
# Start the zmq socket selection loop:
gevent.spawn(select_zmq_socks)