Skip to content

Commit

Permalink
feat: Allow overriding block_on_no_clients queue param
Browse files Browse the repository at this point in the history
In the current implementation, when a SUB provider is restarted, the
library just hangs without detecting connection loss.
This happens because on disconnect a client is kicked out of the queue,
as since for all the socket queues block_on_no_clients is set to true,
it hangs indefinitely.

I'm unsure what would be the effect of setting block_on_no_clients to
false globally (though it seems reasonable), so in this PR an ability
to override this value is introduced via SocketOptions is added.
  • Loading branch information
bemyak authored and rgbkrk committed Dec 13, 2024
1 parent 07c65dc commit 2a9874d
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/dealer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Drop for DealerSocket {
#[async_trait]
impl Socket for DealerSocket {
fn with_options(options: SocketOptions) -> Self {
let fair_queue = FairQueue::new(true);
let fair_queue = FairQueue::new(options.block_on_no_clients);
Self {
backend: Arc::new(GenericSocketBackend::with_options(
Some(fair_queue.inner()),
Expand Down
16 changes: 15 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,30 @@ pub enum SocketEvent {
Disconnected(PeerIdentity),
}

#[derive(Default)]
#[derive(Debug, Clone)]
pub struct SocketOptions {
pub(crate) peer_id: Option<PeerIdentity>,
pub(crate) block_on_no_clients: bool,
}

impl Default for SocketOptions {
fn default() -> Self {
Self {
peer_id: Default::default(),
block_on_no_clients: true,
}
}
}

impl SocketOptions {
pub fn peer_identity(&mut self, peer_id: PeerIdentity) -> &mut Self {
self.peer_id = Some(peer_id);
self
}
pub fn block_on_no_clients(&mut self, block_on_no_clients: bool) -> &mut Self {
self.block_on_no_clients = block_on_no_clients;
self
}
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct PullSocket {
#[async_trait]
impl Socket for PullSocket {
fn with_options(options: SocketOptions) -> Self {
let fair_queue = FairQueue::new(true);
let fair_queue = FairQueue::new(options.block_on_no_clients);
Self {
backend: Arc::new(GenericSocketBackend::with_options(
Some(fair_queue.inner()),
Expand Down
2 changes: 1 addition & 1 deletion src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Drop for RepSocket {
#[async_trait]
impl Socket for RepSocket {
fn with_options(options: SocketOptions) -> Self {
let fair_queue = FairQueue::new(true);
let fair_queue = FairQueue::new(options.block_on_no_clients);
Self {
backend: Arc::new(RepSocketBackend {
peers: DashMap::new(),
Expand Down
2 changes: 1 addition & 1 deletion src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Drop for RouterSocket {
#[async_trait]
impl Socket for RouterSocket {
fn with_options(options: SocketOptions) -> Self {
let fair_queue = FairQueue::new(true);
let fair_queue = FairQueue::new(options.block_on_no_clients);
Self {
backend: Arc::new(GenericSocketBackend::with_options(
Some(fair_queue.inner()),
Expand Down
2 changes: 1 addition & 1 deletion src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl SubSocket {
#[async_trait]
impl Socket for SubSocket {
fn with_options(options: SocketOptions) -> Self {
let fair_queue = FairQueue::new(true);
let fair_queue = FairQueue::new(options.block_on_no_clients);
Self {
backend: Arc::new(SubSocketBackend::with_options(
Some(fair_queue.inner()),
Expand Down

0 comments on commit 2a9874d

Please sign in to comment.