diff --git a/examples/config.nru b/examples/config.nru index 721a642..ba8f6c8 100755 --- a/examples/config.nru +++ b/examples/config.nru @@ -80,26 +80,26 @@ module App def self.on_open(e) - e.subscribe :broadcast + e.subscribe(:broadcast) # { |msg| puts "(#{Process.pid}:#{Thread.current}): #{msg.channel} : #{msg.message}"; e.write msg.message } end def self.on_message(e, m) - Iodine.publish :broadcast, m + Iodine.publish(:broadcast, m) end end class LogPubSub < Iodine::PubSub::Engine def subscribe ch - puts "(#{Process.pid}) Someone subscribed to #{ch}" + puts "(#{Process.pid}) #{ch} has listeners (created)" end def psubscribe ch - puts "(#{Process.pid}) Someone subscribed to #{ch} (pattern)" + puts "(#{Process.pid}) (pattern) #{ch} has listeners (created)" end def unsubscribe ch - puts "(#{Process.pid}) Someone unsubscribed to #{ch}" + puts "(#{Process.pid}) #{ch} has no more listeners (destroyed)" end def punsubscribe ch - puts "(#{Process.pid}) Someone unsubscribed to #{ch} (pattern)" + puts "(#{Process.pid}) (pattern) #{ch} has no more listeners (destroyed)" end def publish m puts "(#{Process.pid}) Someone published to #{m.channel}" diff --git a/ext/iodine/fio-stl.h b/ext/iodine/fio-stl.h index d92d93a..97eb3b3 100644 --- a/ext/iodine/fio-stl.h +++ b/ext/iodine/fio-stl.h @@ -38011,6 +38011,8 @@ typedef struct { void (*on_unsubscribe)(void *udata); /** The opaque udata value is ignored and made available to the callbacks. */ void *udata; + /** The queue to which the callbacks should be routed. May be NULL. */ + fio_queue_s *queue; /** Replay cached messages (if any) since supplied time in milliseconds. */ uint64_t replay_since; /** @@ -38469,6 +38471,7 @@ typedef struct fio_subscription_s { uint64_t replay_since; fio_io_s *io; fio_channel_s *channel; + fio_queue_s *queue; void (*on_message)(fio_msg_s *msg); void (*on_unsubscribe)(void *udata); void *udata; @@ -38478,7 +38481,8 @@ typedef struct fio_subscription_s { * Reference counting: `fio_subscription_dup(sb)` / `fio_subscription_free(sb)` */ FIO_SFUNC void fio___pubsub_subscription_on_destroy(fio_subscription_s *sub); -#define FIO_REF_NAME fio_subscription +#define FIO_REF_NAME fio___subscription +#define FIO_REF_TYPE fio_subscription_s #define FIO_REF_DESTROY(obj) fio___pubsub_subscription_on_destroy(&(obj)) #define FIO_REF_CONSTRUCTOR_ONLY 1 #define FIO___RECURSIVE_INCLUDE 1 @@ -39045,7 +39049,7 @@ FIO_IFUNC void fio___pubsub_unsubscribe_task(void *sub_, void *ignr_) { sub->channel = NULL; no_channel: - fio_subscription_free(sub); + fio___subscription_free(sub); return; (void)ignr_; } @@ -39071,10 +39075,11 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) { } uptr; if (args.channel.len > 0xFFFFUL) goto sub_error; - s = fio_subscription_new(); + s = fio___subscription_new(); if (!s) goto sub_error; - + if (!args.queue || !args.on_message) + args.queue = fio_io_queue(); *s = (fio_subscription_s){ .replay_since = args.replay_since, .io = args.io, @@ -39084,6 +39089,7 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) { : fio___subscription_mock_cb)), .on_unsubscribe = args.on_unsubscribe, .udata = args.udata, + .queue = args.queue, }; args.is_pattern = !!args.is_pattern; /* make sure this is either 1 or zero */ uptr.ls = &s->node; @@ -39141,7 +39147,7 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) { fio_bstr_free(uptr.str->buf); s->node = FIO_LIST_INIT(s->node); s->history = FIO_LIST_INIT(s->history); - fio_subscription_free(s); + fio___subscription_free(s); FIO_LOG_WARNING( "(%d) master-only subscription attempt on a non-master process: %.*s", fio_io_pid(), @@ -39211,6 +39217,11 @@ int fio_unsubscribe FIO_NOOP(fio_subscribe_args_s args) { Pub/Sub Message Distribution (local process) ***************************************************************************** */ +static void fio___subscription_after_on_message_task(void *s, void *m) { + fio___subscription_free((fio_subscription_s *)s); + fio___pubsub_message_free((fio___pubsub_message_s *)m); +} + /* performs the subscription callback */ FIO_IFUNC void fio___subscription_on_message_task(void *s_, void *m_) { fio_subscription_s *s = (fio_subscription_s *)s_; @@ -39230,11 +39241,13 @@ FIO_IFUNC void fio___subscription_on_message_task(void *s_, void *m_) { s->udata = container.msg.udata; if (container.flag) goto reschedule; - fio_subscription_free(s); - fio___pubsub_message_free(m); + fio_queue_push(fio_io_queue(), + fio___subscription_after_on_message_task, + s, + m); return; reschedule: - fio_queue_push(fio_io_queue(), fio___subscription_on_message_task, s_, m_); + fio_queue_push(s->queue, fio___subscription_on_message_task, s_, m_); } /* returns the internal message object. */ @@ -39259,18 +39272,18 @@ FIO_SFUNC void fio___pubsub_channel_deliver_task(void *ch_, void *m_) { FIO_LIST_EACH(fio_subscription_s, node, head, s) { if (m->data.io != s->io && m->data.published >= s->replay_since) fio_queue_push( - fio_io_queue(), + s->queue, (void (*)(void *, void *))fio___subscription_on_message_task, - fio_subscription_dup(s), + fio___subscription_dup(s), fio___pubsub_message_dup(m)); } } else { FIO_LIST_EACH(fio_subscription_s, node, head, s) { if (m->data.io != s->io) fio_queue_push( - fio_io_queue(), + s->queue, (void (*)(void *, void *))fio___subscription_on_message_task, - fio_subscription_dup(s), + fio___subscription_dup(s), fio___pubsub_message_dup(m)); } } @@ -39279,17 +39292,17 @@ FIO_SFUNC void fio___pubsub_channel_deliver_task(void *ch_, void *m_) { FIO_LIST_EACH(fio_subscription_s, node, head, s) { if (m->data.published >= s->replay_since) fio_queue_push( - fio_io_queue(), + s->queue, (void (*)(void *, void *))fio___subscription_on_message_task, - fio_subscription_dup(s), + fio___subscription_dup(s), fio___pubsub_message_dup(m)); } } else { FIO_LIST_EACH(fio_subscription_s, node, head, s) { fio_queue_push( - fio_io_queue(), + s->queue, (void (*)(void *, void *))fio___subscription_on_message_task, - fio_subscription_dup(s), + fio___subscription_dup(s), fio___pubsub_message_dup(m)); } } @@ -44963,7 +44976,7 @@ typedef struct fio_http_settings_s { */ size_t max_body_size; /** - * The maximum websocket message size/buffer (in bytes) for Websocket + * The maximum WebSocket message size/buffer (in bytes) for Websocket * connections. Defaults to FIO_HTTP_DEFAULT_WS_MAX_MSG_SIZE bytes. */ size_t ws_max_msg_size; diff --git a/ext/iodine/iodine_connection.h b/ext/iodine/iodine_connection.h index cf67ebd..6e8c0f9 100644 --- a/ext/iodine/iodine_connection.h +++ b/ext/iodine/iodine_connection.h @@ -1711,7 +1711,7 @@ typedef struct { uint8_t is_text; } iodine_io_http_on_message_internal_s; -/** Called when a websocket / SSE message arrives. */ +/** Called when a WebSocket / SSE message arrives. */ static void *iodine_io_http_on_message_internal(void *info) { iodine_io_http_on_message_internal_s *i = (iodine_io_http_on_message_internal_s *)info; @@ -1731,7 +1731,7 @@ static void *iodine_io_http_on_message_internal(void *info) { return NULL; } -/** Called when a websocket / SSE message arrives. */ +/** Called when a WebSocket / SSE message arrives. */ static void iodine_io_http_on_message(fio_http_s *h, fio_buf_info_s msg, uint8_t is_text) { @@ -1770,13 +1770,13 @@ Subscription Helpers static void *iodine_connection_on_pubsub_in_gvl(void *m_) { fio_msg_s *m = (fio_msg_s *)m_; VALUE msg = iodine_pubsub_msg_create(m); + /* TODO! move callback to async queue. */ iodine_ruby_call_inside((VALUE)m->udata, IODINE_CALL_ID, 1, &msg); STORE.release(msg); return m_; } static void iodine_connection_on_pubsub(fio_msg_s *m) { - /* TODO? move callback to outside queue? */ rb_thread_call_with_gvl(iodine_connection_on_pubsub_in_gvl, m); } @@ -1805,6 +1805,7 @@ FIO_IFUNC VALUE iodine_connection_subscribe_internal(fio_io_s *io, .filter = (int16_t)filter, .channel = channel, .udata = (void *)proc, + .queue = fio_io_async_queue(&IODINE_THREAD_POOL), .on_message = (!proc || (proc == Qnil) ? NULL : iodine_connection_on_pubsub),