Skip to content

Commit

Permalink
pub/sub callbacks / procs now run in Ruby thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
boazsegev committed Dec 15, 2024
1 parent e102951 commit c42c613
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 26 deletions.
12 changes: 6 additions & 6 deletions examples/config.nru
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,26 @@ module App


def self.on_open(e)
e.subscribe :broadcast
e.subscribe(:broadcast) # { |msg| puts "(#{Process.pid}:#{Thread.current}): #{msg.channel} : #{msg.message}"; e.write msg.message }
end
def self.on_message(e, m)
Iodine.publish :broadcast, m
Iodine.publish(:broadcast, m)
end
end


class LogPubSub < Iodine::PubSub::Engine
def subscribe ch
puts "(#{Process.pid}) Someone subscribed to #{ch}"
puts "(#{Process.pid}) #{ch} has listeners (created)"
end
def psubscribe ch
puts "(#{Process.pid}) Someone subscribed to #{ch} (pattern)"
puts "(#{Process.pid}) (pattern) #{ch} has listeners (created)"
end
def unsubscribe ch
puts "(#{Process.pid}) Someone unsubscribed to #{ch}"
puts "(#{Process.pid}) #{ch} has no more listeners (destroyed)"
end
def punsubscribe ch
puts "(#{Process.pid}) Someone unsubscribed to #{ch} (pattern)"
puts "(#{Process.pid}) (pattern) #{ch} has no more listeners (destroyed)"
end
def publish m
puts "(#{Process.pid}) Someone published to #{m.channel}"
Expand Down
47 changes: 30 additions & 17 deletions ext/iodine/fio-stl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38011,6 +38011,8 @@ typedef struct {
void (*on_unsubscribe)(void *udata);
/** The opaque udata value is ignored and made available to the callbacks. */
void *udata;
/** The queue to which the callbacks should be routed. May be NULL. */
fio_queue_s *queue;
/** Replay cached messages (if any) since supplied time in milliseconds. */
uint64_t replay_since;
/**
Expand Down Expand Up @@ -38469,6 +38471,7 @@ typedef struct fio_subscription_s {
uint64_t replay_since;
fio_io_s *io;
fio_channel_s *channel;
fio_queue_s *queue;
void (*on_message)(fio_msg_s *msg);
void (*on_unsubscribe)(void *udata);
void *udata;
Expand All @@ -38478,7 +38481,8 @@ typedef struct fio_subscription_s {
* Reference counting: `fio_subscription_dup(sb)` / `fio_subscription_free(sb)`
*/
FIO_SFUNC void fio___pubsub_subscription_on_destroy(fio_subscription_s *sub);
#define FIO_REF_NAME fio_subscription
#define FIO_REF_NAME fio___subscription
#define FIO_REF_TYPE fio_subscription_s
#define FIO_REF_DESTROY(obj) fio___pubsub_subscription_on_destroy(&(obj))
#define FIO_REF_CONSTRUCTOR_ONLY 1
#define FIO___RECURSIVE_INCLUDE 1
Expand Down Expand Up @@ -39045,7 +39049,7 @@ FIO_IFUNC void fio___pubsub_unsubscribe_task(void *sub_, void *ignr_) {
sub->channel = NULL;

no_channel:
fio_subscription_free(sub);
fio___subscription_free(sub);
return;
(void)ignr_;
}
Expand All @@ -39071,10 +39075,11 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) {
} uptr;
if (args.channel.len > 0xFFFFUL)
goto sub_error;
s = fio_subscription_new();
s = fio___subscription_new();
if (!s)
goto sub_error;

if (!args.queue || !args.on_message)
args.queue = fio_io_queue();
*s = (fio_subscription_s){
.replay_since = args.replay_since,
.io = args.io,
Expand All @@ -39084,6 +39089,7 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) {
: fio___subscription_mock_cb)),
.on_unsubscribe = args.on_unsubscribe,
.udata = args.udata,
.queue = args.queue,
};
args.is_pattern = !!args.is_pattern; /* make sure this is either 1 or zero */
uptr.ls = &s->node;
Expand Down Expand Up @@ -39141,7 +39147,7 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) {
fio_bstr_free(uptr.str->buf);
s->node = FIO_LIST_INIT(s->node);
s->history = FIO_LIST_INIT(s->history);
fio_subscription_free(s);
fio___subscription_free(s);
FIO_LOG_WARNING(
"(%d) master-only subscription attempt on a non-master process: %.*s",
fio_io_pid(),
Expand Down Expand Up @@ -39211,6 +39217,11 @@ int fio_unsubscribe FIO_NOOP(fio_subscribe_args_s args) {
Pub/Sub Message Distribution (local process)
***************************************************************************** */

static void fio___subscription_after_on_message_task(void *s, void *m) {
fio___subscription_free((fio_subscription_s *)s);
fio___pubsub_message_free((fio___pubsub_message_s *)m);
}

/* performs the subscription callback */
FIO_IFUNC void fio___subscription_on_message_task(void *s_, void *m_) {
fio_subscription_s *s = (fio_subscription_s *)s_;
Expand All @@ -39230,11 +39241,13 @@ FIO_IFUNC void fio___subscription_on_message_task(void *s_, void *m_) {
s->udata = container.msg.udata;
if (container.flag)
goto reschedule;
fio_subscription_free(s);
fio___pubsub_message_free(m);
fio_queue_push(fio_io_queue(),
fio___subscription_after_on_message_task,
s,
m);
return;
reschedule:
fio_queue_push(fio_io_queue(), fio___subscription_on_message_task, s_, m_);
fio_queue_push(s->queue, fio___subscription_on_message_task, s_, m_);
}

/* returns the internal message object. */
Expand All @@ -39259,18 +39272,18 @@ FIO_SFUNC void fio___pubsub_channel_deliver_task(void *ch_, void *m_) {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
if (m->data.io != s->io && m->data.published >= s->replay_since)
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
} else {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
if (m->data.io != s->io)
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
}
Expand All @@ -39279,17 +39292,17 @@ FIO_SFUNC void fio___pubsub_channel_deliver_task(void *ch_, void *m_) {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
if (m->data.published >= s->replay_since)
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
} else {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
}
Expand Down Expand Up @@ -44963,7 +44976,7 @@ typedef struct fio_http_settings_s {
*/
size_t max_body_size;
/**
* The maximum websocket message size/buffer (in bytes) for Websocket
* The maximum WebSocket message size/buffer (in bytes) for Websocket
* connections. Defaults to FIO_HTTP_DEFAULT_WS_MAX_MSG_SIZE bytes.
*/
size_t ws_max_msg_size;
Expand Down
7 changes: 4 additions & 3 deletions ext/iodine/iodine_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -1711,7 +1711,7 @@ typedef struct {
uint8_t is_text;
} iodine_io_http_on_message_internal_s;

/** Called when a websocket / SSE message arrives. */
/** Called when a WebSocket / SSE message arrives. */
static void *iodine_io_http_on_message_internal(void *info) {
iodine_io_http_on_message_internal_s *i =
(iodine_io_http_on_message_internal_s *)info;
Expand All @@ -1731,7 +1731,7 @@ static void *iodine_io_http_on_message_internal(void *info) {
return NULL;
}

/** Called when a websocket / SSE message arrives. */
/** Called when a WebSocket / SSE message arrives. */
static void iodine_io_http_on_message(fio_http_s *h,
fio_buf_info_s msg,
uint8_t is_text) {
Expand Down Expand Up @@ -1770,13 +1770,13 @@ Subscription Helpers
static void *iodine_connection_on_pubsub_in_gvl(void *m_) {
fio_msg_s *m = (fio_msg_s *)m_;
VALUE msg = iodine_pubsub_msg_create(m);
/* TODO! move callback to async queue. */
iodine_ruby_call_inside((VALUE)m->udata, IODINE_CALL_ID, 1, &msg);
STORE.release(msg);
return m_;
}

static void iodine_connection_on_pubsub(fio_msg_s *m) {
/* TODO? move callback to outside queue? */
rb_thread_call_with_gvl(iodine_connection_on_pubsub_in_gvl, m);
}

Expand Down Expand Up @@ -1805,6 +1805,7 @@ FIO_IFUNC VALUE iodine_connection_subscribe_internal(fio_io_s *io,
.filter = (int16_t)filter,
.channel = channel,
.udata = (void *)proc,
.queue = fio_io_async_queue(&IODINE_THREAD_POOL),
.on_message =
(!proc || (proc == Qnil) ? NULL
: iodine_connection_on_pubsub),
Expand Down

0 comments on commit c42c613

Please sign in to comment.