Skip to content

Commit

Permalink
Added type to service discovered message
Browse files Browse the repository at this point in the history
  • Loading branch information
ClemensElflein committed Sep 10, 2024
1 parent dd87dc5 commit b643e04
Showing 1 changed file with 21 additions and 39 deletions.
60 changes: 21 additions & 39 deletions libxbot-service-interface/src/ServiceDiscoveryImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ std::vector<ServiceDiscoveryCallbacks *> registered_callbacks_{};

ServiceDiscoveryImpl *instance_ = nullptr;

bool ServiceDiscoveryImpl::GetEndpoint(const std::string &uid, uint32_t &ip,
uint16_t &port) {
bool ServiceDiscoveryImpl::GetEndpoint(const std::string &uid, uint32_t &ip, uint16_t &port) {
std::unique_lock lk(sd_mutex_);
if (discovered_services_.contains(uid)) {
auto &service_info = discovered_services_.at(uid);
Expand All @@ -57,28 +56,24 @@ bool ServiceDiscoveryImpl::Start() {
return true;
}

void ServiceDiscoveryImpl::RegisterCallbacks(
ServiceDiscoveryCallbacks *callbacks) {
void ServiceDiscoveryImpl::RegisterCallbacks(ServiceDiscoveryCallbacks *callbacks) {
if (callbacks == nullptr) {
return;
}
std::unique_lock lk(sd_mutex_);
const auto &it = std::find(registered_callbacks_.begin(),
registered_callbacks_.end(), callbacks);
const auto &it = std::find(registered_callbacks_.begin(), registered_callbacks_.end(), callbacks);
if (it == registered_callbacks_.end()) {
registered_callbacks_.push_back(callbacks);
}
}
void ServiceDiscoveryImpl::UnregisterCallbacks(
ServiceDiscoveryCallbacks *callbacks) {
void ServiceDiscoveryImpl::UnregisterCallbacks(ServiceDiscoveryCallbacks *callbacks) {
if (callbacks == nullptr) {
return;
}
std::unique_lock lk(sd_mutex_);

while (true) {
const auto &it = std::find(registered_callbacks_.begin(),
registered_callbacks_.end(), callbacks);
const auto &it = std::find(registered_callbacks_.begin(), registered_callbacks_.end(), callbacks);
if (it != registered_callbacks_.end()) {
registered_callbacks_.erase(it);
} else {
Expand All @@ -87,20 +82,17 @@ void ServiceDiscoveryImpl::UnregisterCallbacks(
}
}

std::unique_ptr<ServiceInfo> ServiceDiscoveryImpl::GetServiceInfo(
const std::string &uid) {
std::unique_ptr<ServiceInfo> ServiceDiscoveryImpl::GetServiceInfo(const std::string &uid) {
std::unique_lock lk(sd_mutex_);
if (!discovered_services_.contains(uid)) {
return nullptr;
}

return std::make_unique<ServiceInfo>(discovered_services_.at(uid));
}
std::unique_ptr<std::map<std::string, ServiceInfo>>
ServiceDiscoveryImpl::GetAllServices() {
std::unique_ptr<std::map<std::string, ServiceInfo>> ServiceDiscoveryImpl::GetAllServices() {
std::unique_lock lk(sd_mutex_);
return std::make_unique<std::map<std::string, ServiceInfo>>(
discovered_services_);
return std::make_unique<std::map<std::string, ServiceInfo>>(discovered_services_);
}
bool ServiceDiscoveryImpl::DropService(const std::string &uid) {
std::unique_lock lk(sd_mutex_);
Expand Down Expand Up @@ -143,22 +135,18 @@ void Run() {
if (sd_socket_.ReceivePacket(sender_ip, sender_port, packet)) {
// Check, if packet has at least enough space for our header
if (packet.size() >= sizeof(datatypes::XbotHeader)) {
const auto header =
reinterpret_cast<datatypes::XbotHeader *>(packet.data());
const auto header = reinterpret_cast<datatypes::XbotHeader *>(packet.data());

if (header->message_type !=
datatypes::MessageType::SERVICE_ADVERTISEMENT) {
spdlog::warn(
"Service Discovery socket got non-service discovery message");
if (header->message_type != datatypes::MessageType::SERVICE_ADVERTISEMENT) {
spdlog::warn("Service Discovery socket got non-service discovery message");
continue;
}

// Validate reported length
if (packet.size() ==
header->payload_size + sizeof(datatypes::XbotHeader)) {
if (packet.size() == header->payload_size + sizeof(datatypes::XbotHeader)) {
try {
const auto json = nlohmann::json::from_cbor(
packet.begin() + sizeof(datatypes::XbotHeader), packet.end());
const auto json =
nlohmann::json::from_cbor(packet.begin() + sizeof(datatypes::XbotHeader), packet.end());

// Build the ServiceInfo object from the received data.
ServiceInfo info = json;
Expand All @@ -179,12 +167,9 @@ void Run() {
if (discovered_services_.contains(info.unique_id_)) {
// Check, if service endpoint was updated
// (every thing else is constant) and update
if (auto &old_service_info =
discovered_services_.at(info.unique_id_);
old_service_info.ip != info.ip ||
old_service_info.port != info.port) {
spdlog::info("Endpoint updated (ID: {}, new endpoint: {})",
info.unique_id_,
if (auto &old_service_info = discovered_services_.at(info.unique_id_);
old_service_info.ip != info.ip || old_service_info.port != info.port) {
spdlog::info("Endpoint updated (ID: {}, new endpoint: {})", info.unique_id_,
EndpointIntToString(info.ip, info.port));
// Backup the old infos, so that we can pass them to the
// callback
Expand All @@ -197,14 +182,12 @@ void Run() {

// Notify callbacks
for (const auto &callback : registered_callbacks_) {
callback->OnEndpointChanged(info.unique_id_, old_ip,
old_port, info.ip, info.port);
callback->OnEndpointChanged(info.unique_id_, old_ip, old_port, info.ip, info.port);
}
}
} else {
spdlog::info("Found new service (ID: {}, endpoint: {})",
info.unique_id_,
EndpointIntToString(info.ip, info.port));
spdlog::info("Found new service (Type: {}, ID: {}, endpoint: {})", info.description.type,
info.unique_id_, EndpointIntToString(info.ip, info.port));
discovered_services_.emplace(info.unique_id_, info);
// Notify callbacks
for (const auto &callback : registered_callbacks_) {
Expand All @@ -213,8 +196,7 @@ void Run() {
}
}
} catch (std::exception &e) {
spdlog::error("Got exception during service discovery: {}",
e.what());
spdlog::error("Got exception during service discovery: {}", e.what());
}
}
}
Expand Down

0 comments on commit b643e04

Please sign in to comment.