Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

📊 Sort spikes. #2433

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions arbor/backends/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ struct deliverable_event {
target_handle handle;

deliverable_event() = default;
constexpr deliverable_event(time_type time, target_handle handle, float weight) noexcept:
time(time), weight(weight), handle(handle) {}
constexpr deliverable_event(time_type time,
target_handle handle,
float weight) noexcept:
time(time), weight(weight), handle(std::move(handle)) {}

ARB_SERDES_ENABLE(deliverable_event, time, weight, handle);
};

// Subset of event information required for mechanism delivery.
struct deliverable_event_data {
cell_local_size_type mech_index; // same as target_handle::mech_index
float weight;
deliverable_event_data(cell_local_size_type idx, float w):
cell_local_size_type mech_index = 0; // same as target_handle::mech_index
float weight = 0;
deliverable_event_data(cell_local_size_type idx, float w) noexcept:
mech_index(idx),
weight(w) {}
ARB_SERDES_ENABLE(deliverable_event_data,
Expand Down
91 changes: 56 additions & 35 deletions arbor/backends/event_stream_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "backends/event_stream_state.hpp"
#include "event_lane.hpp"
#include "timestep_range.hpp"
#include "util/partition.hpp"

ARB_SERDES_ENABLE_EXT(arb_deliverable_event_data, mech_index, weight);

Expand Down Expand Up @@ -63,7 +62,28 @@ struct event_stream_base {
virtual void init() = 0;
};

struct spike_event_stream_base : event_stream_base<deliverable_event> {
struct spike_event_stream_base: event_stream_base<deliverable_event> {
// Take in one event lane per cell `gid` and reorganise into one stream per
// synapse `mech_id`.
//
// - Due to the cell group coalescing multiple cells and their synapses into
// one object, one `mech_id` can touch multiple lanes / `gid`s.
// - Inversely, two `mech_id`s can cover different, but overlapping sets of `gid`s
// - Multiple `mech_id`s can receive events from the same source
//
// Pre:
// - Events in `lanes[ix]` forall ix
// * are sorted by time
// * `ix` maps to exactly one cell in the local cell group
// - `divs` partitions `handles` such that the target handles for cell `ix`
// are located in `handles[divs[ix]..divs[ix + 1]]`
// - `handles` records `(mech_id, index)` of a target s.t. `index` is the instance
// with the set identified by `mech_id`, e.g. a single synapse placed on a multi-
// location locset (plus the merging across cells by groups)
// Post:
// - streams[mech_id] contains a list of all events for synapse `mech_id` s.t.
// * the list is sorted by (time_step, lid, time)
// * the list is partitioned by `time_step` via `ev_spans`
template<typename EventStream>
friend void initialize(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
Expand All @@ -74,54 +94,56 @@ struct spike_event_stream_base : event_stream_base<deliverable_event> {

// reset streams and allocate sufficient space for temporaries
auto n_steps = steps.size();
for (auto& [k, v]: streams) {
v.clear();
v.spike_counter_.clear();
v.spike_counter_.resize(steps.size(), 0);
v.spikes_.clear();
for (auto& [id, stream]: streams) {
stream.clear();
stream.ev_spans_.resize(steps.size() + 1, 0);
stream.spikes_.clear();
// ev_data_ has been cleared during v.clear(), so we use its capacity
v.spikes_.reserve(v.ev_data_.capacity());
stream.spikes_.reserve(stream.ev_data_.capacity());
}

// loop over lanes: group events by mechanism and sort them by time
auto cell = 0;
arb_size_type cell = 0;
for (const auto& lane: lanes) {
auto div = divs[cell];
++cell;
arb_size_type step = 0;
for (const auto& evt: lane) {
auto time = evt.time;
auto weight = evt.weight;
auto target = evt.target;
while(step < n_steps && time >= steps[step].t_end()) ++step;
step = std::lower_bound(steps.begin() + step,
steps.end(),
evt.time,
[](const auto& bucket, time_type time) { return bucket.t_end() <= time; })
- steps.begin();
// Events coinciding with epoch's upper boundary belong to next epoch
if (step >= n_steps) break;
arb_assert(div + target < handles.size());
const auto& handle = handles[div + target];
arb_assert(div + evt.target < handles.size());
const auto& handle = handles[div + evt.target];
auto& stream = streams[handle.mech_id];
stream.spikes_.push_back(spike_data{step, handle.mech_index, time, weight});
// insertion sort with last element as pivot
// ordering: first w.r.t. step, within a step: mech_index, within a mech_index: time
auto first = stream.spikes_.begin();
auto last = stream.spikes_.end();
auto pivot = std::prev(last, 1);
std::rotate(std::upper_bound(first, pivot, *pivot), pivot, last);
// increment count in current time interval
stream.spike_counter_[step]++;
stream.spikes_.emplace_back(spike_data{step, handle.mech_index, evt.time, evt.weight});
stream.ev_spans_[step + 1]++;
}
++cell;
}

// TODO parallelise over streams, however, need a proper testcase/benchmark.
// auto tg = threading::task_group(ts.get());
for (auto& [id, stream]: streams) {
// copy temporary deliverable_events into stream's ev_data_
stream.ev_data_.reserve(stream.spikes_.size());
std::transform(stream.spikes_.begin(), stream.spikes_.end(), std::back_inserter(stream.ev_data_),
[](auto const& e) noexcept -> arb_deliverable_event_data {
return {e.mech_index, e.weight}; });
// scan over spike_counter_ and written to ev_spans_
util::make_partition(stream.ev_spans_, stream.spike_counter_);
// delegate to derived class init: static cast necessary to access protected init()
static_cast<spike_event_stream_base&>(stream).init();
// tg.run([&stream=stream]() {
// scan to partition stream (aliasing is explicitly allowed)
std::inclusive_scan(stream.ev_spans_.begin(), stream.ev_spans_.end(),
stream.ev_spans_.begin());
// This is made slightly faster by using pdqsort.
// Despite events being sorted by time in the partitions defined
// by the lane index here, they are not _totally_ sorted, thus
// sort is needed, merge not being strong enough :/
std::sort(stream.spikes_.begin(), stream.spikes_.end());
// copy temporary deliverable_events into stream's ev_data_
stream.ev_data_.reserve(stream.spikes_.size());
for (const auto& spike: stream.spikes_) stream.ev_data_.emplace_back(event_data_type{spike.mech_index, spike.weight});
// delegate to derived class init: static cast necessary to access protected init()
static_cast<spike_event_stream_base&>(stream).init();
// });
}
// tg.wait();
}

protected: // members
Expand All @@ -133,7 +155,6 @@ struct spike_event_stream_base : event_stream_base<deliverable_event> {
auto operator<=>(spike_data const&) const noexcept = default;
};
std::vector<spike_data> spikes_;
std::vector<std::size_t> spike_counter_;
};

struct sample_event_stream_base : event_stream_base<sample_event> {
Expand Down
9 changes: 4 additions & 5 deletions scripts/run_cpp_examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ all_examples=(
"brunel -G 100000"
"brunel --n-excitatory 10000 --n-inhibitory 2500 --tfinal 10 --in-degree-prop 0.1 --dt 0.01 --lambda 1.5 -g 0.8 --delay 1.0 --weight 0.00010 -G 1 --use-cable-cells"
"brunel --n-excitatory 10000 --n-inhibitory 2500 --tfinal 10 --in-degree-prop 0.1 --dt 0.01 --lambda 1.5 -g 0.8 --delay 1.0 --weight 0.00010 -G 10 --use-cable-cells"
# TODO These need to be enabled when the group size issue is fixed.
# "brunel --n-excitatory 10000 --n-inhibitory 2500 --tfinal 10 --in-degree-prop 0.1 --dt 0.01 --lambda 1.5 -g 0.8 --delay 1.0 --weight 0.00010 -G 100 --use-cable-cells"
# "brunel --n-excitatory 10000 --n-inhibitory 2500 --tfinal 10 --in-degree-prop 0.1 --dt 0.01 --lambda 1.5 -g 0.8 --delay 1.0 --weight 0.00010 -G 100000 --use-cable-cells"
"brunel --n-excitatory 10000 --n-inhibitory 2500 --tfinal 10 --in-degree-prop 0.1 --dt 0.01 --lambda 1.5 -g 0.8 --delay 1.0 --weight 0.00010 -G 100 --use-cable-cells"
"brunel --n-excitatory 10000 --n-inhibitory 2500 --tfinal 10 --in-degree-prop 0.1 --dt 0.01 --lambda 1.5 -g 0.8 --delay 1.0 --weight 0.00010 -G 100000 --use-cable-cells"
"gap_junctions"
"generators"
"lfp"
Expand Down Expand Up @@ -62,8 +61,8 @@ expected_outputs=(
6998
38956
38956
# 38956
# 38956
38956
38956
"30"
""
""
Expand Down
Loading