From 2a9874d91150fa1434c16b8315d63046a17c961a Mon Sep 17 00:00:00 2001 From: Sergei Gureev Date: Mon, 5 Feb 2024 16:15:54 +0200 Subject: [PATCH] feat: Allow overriding block_on_no_clients queue param In the current implementation, when a SUB provider is restarted, the library just hangs without detecting connection loss. This happens because on disconnect a client is kicked out of the queue, as since for all the socket queues block_on_no_clients is set to true, it hangs indefinitely. I'm unsure what would be the effect of setting block_on_no_clients to false globally (though it seems reasonable), so in this PR an ability to override this value is introduced via SocketOptions is added. --- src/dealer.rs | 2 +- src/lib.rs | 16 +++++++++++++++- src/pull.rs | 2 +- src/rep.rs | 2 +- src/router.rs | 2 +- src/sub.rs | 2 +- 6 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/dealer.rs b/src/dealer.rs index 04cfe74..e468127 100644 --- a/src/dealer.rs +++ b/src/dealer.rs @@ -31,7 +31,7 @@ impl Drop for DealerSocket { #[async_trait] impl Socket for DealerSocket { fn with_options(options: SocketOptions) -> Self { - let fair_queue = FairQueue::new(true); + let fair_queue = FairQueue::new(options.block_on_no_clients); Self { backend: Arc::new(GenericSocketBackend::with_options( Some(fair_queue.inner()), diff --git a/src/lib.rs b/src/lib.rs index e244d5f..c7f543a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -169,9 +169,19 @@ pub enum SocketEvent { Disconnected(PeerIdentity), } -#[derive(Default)] +#[derive(Debug, Clone)] pub struct SocketOptions { pub(crate) peer_id: Option, + pub(crate) block_on_no_clients: bool, +} + +impl Default for SocketOptions { + fn default() -> Self { + Self { + peer_id: Default::default(), + block_on_no_clients: true, + } + } } impl SocketOptions { @@ -179,6 +189,10 @@ impl SocketOptions { self.peer_id = Some(peer_id); self } + pub fn block_on_no_clients(&mut self, block_on_no_clients: bool) -> &mut Self { + self.block_on_no_clients = block_on_no_clients; + self + } } #[async_trait] diff --git a/src/pull.rs b/src/pull.rs index 0144869..ae2577d 100644 --- a/src/pull.rs +++ b/src/pull.rs @@ -25,7 +25,7 @@ pub struct PullSocket { #[async_trait] impl Socket for PullSocket { fn with_options(options: SocketOptions) -> Self { - let fair_queue = FairQueue::new(true); + let fair_queue = FairQueue::new(options.block_on_no_clients); Self { backend: Arc::new(GenericSocketBackend::with_options( Some(fair_queue.inner()), diff --git a/src/rep.rs b/src/rep.rs index 2243f69..9a927a6 100644 --- a/src/rep.rs +++ b/src/rep.rs @@ -43,7 +43,7 @@ impl Drop for RepSocket { #[async_trait] impl Socket for RepSocket { fn with_options(options: SocketOptions) -> Self { - let fair_queue = FairQueue::new(true); + let fair_queue = FairQueue::new(options.block_on_no_clients); Self { backend: Arc::new(RepSocketBackend { peers: DashMap::new(), diff --git a/src/router.rs b/src/router.rs index 4ffb4b0..159c226 100644 --- a/src/router.rs +++ b/src/router.rs @@ -32,7 +32,7 @@ impl Drop for RouterSocket { #[async_trait] impl Socket for RouterSocket { fn with_options(options: SocketOptions) -> Self { - let fair_queue = FairQueue::new(true); + let fair_queue = FairQueue::new(options.block_on_no_clients); Self { backend: Arc::new(GenericSocketBackend::with_options( Some(fair_queue.inner()), diff --git a/src/sub.rs b/src/sub.rs index 9e4818b..5813b39 100644 --- a/src/sub.rs +++ b/src/sub.rs @@ -156,7 +156,7 @@ impl SubSocket { #[async_trait] impl Socket for SubSocket { fn with_options(options: SocketOptions) -> Self { - let fair_queue = FairQueue::new(true); + let fair_queue = FairQueue::new(options.block_on_no_clients); Self { backend: Arc::new(SubSocketBackend::with_options( Some(fair_queue.inner()),