Skip to content

Commit

Permalink
Advanced pub sub (#854)
Browse files Browse the repository at this point in the history
* wip

* updated zenoh; added z_advanced_subscriber_detect_publishers

* updated zenoh

* deprecate ze_querying_subscriber and ze_publication_cache

* clippy

* remove traces of pub_cache and querying_subscriber from cmake

* docs update

* switch zenoh branch to main

* fix advanced pub

* add background advanced subscriber

* fix function names

* rename _settings_t -> _options_t

* fix cmd help message in examples

* fix docs

* update zenoh branch

* add test for advanced pubsub

* restore querying_subscriber-pub_cache test

* fix docs

* add delay to test
  • Loading branch information
DenisBiryukov91 authored Dec 12, 2024
1 parent 1339219 commit ceec66b
Show file tree
Hide file tree
Showing 33 changed files with 2,651 additions and 350 deletions.
55 changes: 51 additions & 4 deletions build-resources/opaque-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use zenoh::{
config::Config,
handlers::{FifoChannelHandler, RingChannelHandler},
key_expr::KeyExpr,
liveliness::LivelinessToken,
pubsub::{Publisher, Subscriber},
query::{Query, Queryable, Reply, ReplyError},
sample::Sample,
Expand All @@ -23,10 +24,8 @@ use zenoh::{
};
#[cfg(feature = "unstable")]
use zenoh::{
matching::MatchingListener, query::Querier, sample::SourceInfo,
session::EntityGlobalId,
matching::MatchingListener, query::Querier, sample::SourceInfo, session::EntityGlobalId,
};
use zenoh::liveliness::LivelinessToken;
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
use zenoh::{
shm::zshm, shm::zshmmut, shm::AllocLayout, shm::ChunkAllocResult, shm::ChunkDescriptor,
Expand Down Expand Up @@ -159,6 +158,51 @@ get_opaque_type_data!(
ze_loaned_querying_subscriber_t
);

#[cfg(feature = "unstable")]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief An owned Zenoh advanced subscriber.
///
/// In addition to receiving the data it is subscribed to,
/// it is also able to receive notifications regarding missed samples and/or automatically recover them.
get_opaque_type_data!(
Option<zenoh_ext::AdvancedSubscriber<()>>,
ze_owned_advanced_subscriber_t
);
#[cfg(feature = "unstable")]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief A loaned Zenoh advanced subscriber.
get_opaque_type_data!(
zenoh_ext::AdvancedSubscriber<()>,
ze_loaned_advanced_subscriber_t
);
#[cfg(feature = "unstable")]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief An owned Zenoh sample miss listener. Missed samples can only be detected from advanced publishers, enabling sample miss detection.
///
/// A listener that sends notification when the advanced subscriber misses a sample .
/// Dropping the corresponding subscriber, also drops the listener.
get_opaque_type_data!(
Option<zenoh_ext::SampleMissListener<()>>,
ze_owned_sample_miss_listener_t
);

#[cfg(feature = "unstable")]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief An owned Zenoh advanced publisher.
///
/// In addition to publishing the data,
/// it also maintains the storage, allowing matching subscribers to retrive missed samples.
get_opaque_type_data!(
Option<zenoh_ext::AdvancedPublisher<'static>>,
ze_owned_advanced_publisher_t
);
#[cfg(feature = "unstable")]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief A loaned Zenoh advanced publisher.
get_opaque_type_data!(
zenoh_ext::AdvancedPublisher<'static>,
ze_loaned_advanced_publisher_t
);
/// A Zenoh-allocated <a href="https://zenoh.io/docs/manual/abstractions/#key-expression"> key expression </a>.
///
/// Key expressions can identify a single key or a set of keys.
Expand Down Expand Up @@ -197,7 +241,10 @@ get_opaque_type_data!(Session, z_loaned_session_t);

#[cfg(feature = "unstable")]
/// An owned Close handle
get_opaque_type_data!(Option<tokio::task::JoinHandle<zenoh::Result<()>>>, zc_owned_concurrent_close_handle_t);
get_opaque_type_data!(
Option<tokio::task::JoinHandle<zenoh::Result<()>>>,
zc_owned_concurrent_close_handle_t
);

/// An owned Zenoh configuration.
get_opaque_type_data!(Option<Config>, z_owned_config_t);
Expand Down
Loading

0 comments on commit ceec66b

Please sign in to comment.