Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Jul 18, 2024
1 parent 5ddba43 commit c7adecf
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 53 deletions.
12 changes: 1 addition & 11 deletions include/cactus_rt/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class App {
//
// TODO: investigate into a weak pointer.
std::list<std::shared_ptr<tracing::ThreadTracer>> thread_tracers_;
std::unique_ptr<tracing::TraceAggregator> trace_aggregator_ = nullptr;
std::shared_ptr<tracing::TraceAggregator> trace_aggregator_ = nullptr;
std::mutex aggregator_mutex_;

void SetDefaultLogFormat(quill::Config& cfg) {
Expand Down Expand Up @@ -148,16 +148,6 @@ class App {
void StartQuill();

private:
/**
* @brief Register a thread tracer. Should only be called from Thread::RunThread.
*/
void RegisterThreadTracer(std::shared_ptr<tracing::ThreadTracer> thread_tracer) noexcept;

/**
* @brief Remove a thread tracer. Should only be called from Thread::~Thread().
*/
void DeregisterThreadTracer(const std::shared_ptr<tracing::ThreadTracer>& thread_tracer) noexcept;

void CreateAndStartTraceAggregator(std::shared_ptr<tracing::Sink> sink) noexcept;

void StopTraceAggregator() noexcept;
Expand Down
8 changes: 3 additions & 5 deletions include/cactus_rt/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "config.h"
#include "quill/Quill.h"
#include "tracing/thread_tracer.h"
#include "tracing/trace_aggregator.h"

namespace cactus_rt {

Expand Down Expand Up @@ -43,18 +44,15 @@ class Thread {

friend class App;

// Non-owning App pointer. Used only for notifying that the thread has
// started/stopped for tracing purposes. Set by Thread::Start and read at
// the beginning of Thread::RunThread.
App* app_ = nullptr;
std::weak_ptr<tracing::TraceAggregator> trace_aggregator_;

/**
* Starts the thread in the background.
*
* @param start_monotonic_time_ns should be the start time in nanoseconds for the monotonic clock.
* @param app The application that started this thread.
*/
void Start(int64_t start_monotonic_time_ns, App* app);
void Start(int64_t start_monotonic_time_ns, std::weak_ptr<tracing::TraceAggregator> trace_aggregator);

public:
/**
Expand Down
26 changes: 2 additions & 24 deletions src/cactus_rt/app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void App::Start() {

auto start_monotonic_time_ns = NowNs();
for (auto& thread : threads_) {
thread->Start(start_monotonic_time_ns, this);
thread->Start(start_monotonic_time_ns, trace_aggregator_);
}
}

Expand Down Expand Up @@ -100,28 +100,6 @@ bool App::StopTraceSession() noexcept {
return true;
}

void App::RegisterThreadTracer(std::shared_ptr<tracing::ThreadTracer> thread_tracer) noexcept {
const std::scoped_lock lock(aggregator_mutex_);

thread_tracers_.push_back(thread_tracer);

if (trace_aggregator_ != nullptr) {
trace_aggregator_->RegisterThreadTracer(thread_tracer);
}
}

void App::DeregisterThreadTracer(const std::shared_ptr<tracing::ThreadTracer>& thread_tracer) noexcept {
const std::scoped_lock lock(aggregator_mutex_);

thread_tracers_.remove_if([thread_tracer](const std::shared_ptr<tracing::ThreadTracer>& t) {
return t == thread_tracer;
});

if (trace_aggregator_ != nullptr) {
trace_aggregator_->DeregisterThreadTracer(thread_tracer);
}
}

void App::LockMemory() const {
// See https://lwn.net/Articles/837019/

Expand Down Expand Up @@ -196,7 +174,7 @@ void App::CreateAndStartTraceAggregator(std::shared_ptr<tracing::Sink> sink) noe
return;
}

trace_aggregator_ = std::make_unique<tracing::TraceAggregator>(name_, tracer_config_.trace_aggregator_cpu_affinity);
trace_aggregator_ = std::make_shared<tracing::TraceAggregator>(name_, tracer_config_.trace_aggregator_cpu_affinity);
for (auto tracer : thread_tracers_) {
trace_aggregator_->RegisterThreadTracer(tracer);
}
Expand Down
19 changes: 11 additions & 8 deletions src/cactus_rt/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
#include <cerrno>
#include <cstring>
#include <ctime>
#include <memory>
#include <stdexcept>

#include "cactus_rt/app.h"
#include "cactus_rt/config.h"
#include "cactus_rt/tracing/thread_tracer.h"

namespace cactus_rt {

Expand All @@ -17,10 +18,11 @@ void* Thread::RunThread(void* data) {
thread->config_.scheduler->SetSchedAttr();

thread->tracer_->SetTid();
if (thread->app_ != nullptr) {
thread->app_->RegisterThreadTracer(thread->tracer_);
auto trace_aggregator = thread->trace_aggregator_.lock();
if (trace_aggregator) {
trace_aggregator->RegisterThreadTracer(thread->tracer_);
} else {
LOG_WARNING(thread->Logger(), "thread {} does not have trace_aggregator_ and tracing is disabled. Did you all App::RegisterThread?", thread->name_);
LOG_WARNING(thread->Logger(), "thread {} does not have trace_aggregator_ and tracing is enabled. Did you all App::RegisterThread?", thread->name_);
}
quill::preallocate(); // Pre-allocates thread-local data to avoid the need to allocate on the first log message

Expand All @@ -30,9 +32,9 @@ void* Thread::RunThread(void* data) {
return nullptr;
}

void Thread::Start(int64_t start_monotonic_time_ns, App* app) {
void Thread::Start(int64_t start_monotonic_time_ns, std::weak_ptr<tracing::TraceAggregator> trace_aggregator) {
start_monotonic_time_ns_ = start_monotonic_time_ns;
app_ = app;
trace_aggregator_ = std::move(trace_aggregator);

pthread_attr_t attr;

Expand Down Expand Up @@ -80,8 +82,9 @@ int Thread::Join() const {
}

Thread::~Thread() {
if (app_ != nullptr) {
app_->DeregisterThreadTracer(tracer_);
auto trace_aggregator = trace_aggregator_.lock();
if (trace_aggregator) {
trace_aggregator->DeregisterThreadTracer(tracer_);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/cactus_rt/tracing/trace_aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void TraceAggregator::DeregisterThreadTracer(const std::shared_ptr<ThreadTracer>
const std::scoped_lock lock(mutex_);

tracers_.remove_if([tracer](const std::shared_ptr<tracing::ThreadTracer>& t) {
return t == tracer;
return t.get() == tracer.get();
});
}

Expand Down
2 changes: 0 additions & 2 deletions tests/tracing/helpers/assert_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#include <google/protobuf/util/json_util.h>
#include <gtest/gtest.h>

#include <utility>

std::string ProtoToJson(const google::protobuf::Message& proto) {
std::string json;
google::protobuf::util::MessageToJsonString(proto, &json);
Expand Down
4 changes: 2 additions & 2 deletions tests/tracing/single_threaded_test.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <cactus_rt/rt.h>
#include <google/protobuf/text_format.h>
#include <gtest/gtest.h>
#include <quill/detail/LogManager.h>

Expand Down Expand Up @@ -266,6 +267,7 @@ TEST_F(SingleThreadTracingTest, StopTracingAndNoEventsAreRecorded) {

auto traces = sink_->LoggedTraces();
auto packets = GetPacketsFromTraces(traces);

ASSERT_EQ(packets.size(), 1);

AssertIsProcessTrackDescriptor(*packets[0], kAppName);
Expand Down Expand Up @@ -310,8 +312,6 @@ TEST_F(SingleThreadTracingTest, RestartTracingStartsNewSession) {
AssertIsThreadTrackDescriptor(*packets2[1], kRegularThreadName, process_track_uuid);
auto thread_track_uuid = packets2[1]->track_descriptor().uuid();

std::cout << "packets2: " << packets2[2]->ShortDebugString() << "\n";

// Event1 is emitted as interned data because that thread is still active and the event name got interned previously.
auto event_names = GetInternedEventNames(*packets2[2]);
ASSERT_EQ(event_names.size(), 1);
Expand Down

0 comments on commit c7adecf

Please sign in to comment.