Skip to content

Commit

Permalink
Add begin_time_series_update for bulk additions
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-thom committed Jan 10, 2025
1 parent 4e2deb8 commit 6eee783
Show file tree
Hide file tree
Showing 5 changed files with 575 additions and 530 deletions.
6 changes: 5 additions & 1 deletion src/system_data.jl
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,11 @@ function _transform_single_time_series!(
end

try
add_metadata!(data.time_series_manager.metadata_store, components, all_metadata)
begin_time_series_update(data.time_series_manager) do
for (component, metadata) in zip(components, all_metadata)
add_metadata!(data.time_series_manager.metadata_store, component, metadata)
end
end
catch
# This shouldn't be needed, but just in case there is a bug, remove all
# DeterministicSingleTimeSeries to keep our guarantee.
Expand Down
174 changes: 97 additions & 77 deletions src/time_series_manager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
# This strikes a balance in SQLite efficiency vs loading many time arrays into memory.
const ADD_TIME_SERIES_BATCH_SIZE = 100

mutable struct BulkUpdateTSCache
forecast_params::Union{Nothing, ForecastParameters}
end

mutable struct TimeSeriesManager <: InfrastructureSystemsType
data_store::TimeSeriesStorage
metadata_store::TimeSeriesMetadataStore
read_only::Bool
bulk_update_cache::Union{Nothing, BulkUpdateTSCache}
end

function TimeSeriesManager(;
Expand All @@ -31,107 +36,122 @@ function TimeSeriesManager(;
compression = compression,
)
end
return TimeSeriesManager(data_store, metadata_store, read_only)
return TimeSeriesManager(data_store, metadata_store, read_only, nothing)
end

function bulk_add_time_series!(
mgr::TimeSeriesManager,
associations;
batch_size = ADD_TIME_SERIES_BATCH_SIZE,
)
ts_keys = TimeSeriesKey[]
batch = TimeSeriesAssociation[]
sizehint!(batch, batch_size)
open_store!(mgr.data_store, "r+") do
for association in associations
push!(batch, association)
if length(batch) >= batch_size
append!(ts_keys, add_time_series!(mgr, batch))
empty!(batch)
end
end
_get_forecast_params(ts::Forecast) = make_time_series_parameters(ts)
_get_forecast_params(::StaticTimeSeries) = nothing
_get_forecast_params(::TimeSeriesManager, ::StaticTimeSeries) = nothing

if !isempty(batch)
append!(ts_keys, add_time_series!(mgr, batch))
function _get_forecast_params(mgr::TimeSeriesManager, forecast::Forecast)
if isnothing(mgr.bulk_update_cache)
return get_forecast_parameters(mgr.metadata_store)
end

if isnothing(mgr.bulk_update_cache.forecast_params)
mgr.bulk_update_cache.forecast_params = get_forecast_parameters(mgr.metadata_store)
if isnothing(mgr.bulk_update_cache.forecast_params)
mgr.bulk_update_cache.forecast_params = _get_forecast_params(forecast)
end
end

return ts_keys
return mgr.bulk_update_cache.forecast_params
end

function add_time_series!(mgr::TimeSeriesManager, batch::Vector{TimeSeriesAssociation})
_throw_if_read_only(mgr)
forecast_params = get_forecast_parameters(mgr.metadata_store)
sts_params = StaticTimeSeriesParameters()
num_metadata = length(batch)
all_metadata = Vector{TimeSeriesMetadata}(undef, num_metadata)
owners = Vector{TimeSeriesOwners}(undef, num_metadata)
ts_keys = Vector{TimeSeriesKey}(undef, num_metadata)
time_series_uuids = Dict{Base.UUID, TimeSeriesData}()
metadata_identifiers = Set{Tuple}()
TimerOutputs.@timeit_debug SYSTEM_TIMERS "add_time_series! in bulk" begin
for (i, item) in enumerate(batch)
throw_if_does_not_support_time_series(item.owner)
check_time_series_data(item.time_series)
metadata_type = time_series_data_to_metadata(typeof(item.time_series))
metadata = metadata_type(item.time_series; item.features...)
identifier = make_unique_owner_metadata_identifer(item.owner, metadata)
if identifier in metadata_identifiers
throw(ArgumentError("$identifier is present multiple times"))
"""
Begin an update of time series. Use this function when adding many time series arrays
in order to improve performance.
Using this function to remove time series is currently not supported.
"""
function begin_time_series_update(
func::Function,
mgr::TimeSeriesManager,
)
open_store!(mgr.data_store, "r+") do
original_ts_uuids = Set(list_existing_time_series_uuids(mgr.metadata_store))
mgr.bulk_update_cache = BulkUpdateTSCache(nothing)
try
SQLite.transaction(mgr.metadata_store.db) do
func()
end
push!(metadata_identifiers, identifier)
if isnothing(forecast_params)
forecast_params = _get_forecast_params(item.time_series)
catch
# If an error occurs, we can easily remove new time series data to ensure
# that the metadata database is consistent with the data.
# We currently can't restore time series data that was deleted.
new_ts_uuids = setdiff(
Set(list_existing_time_series_uuids(mgr.metadata_store)),
original_ts_uuids,
)
for uuid in new_ts_uuids
remove_time_series!(mgr.data_store, uuid)
end
check_params_compatibility(sts_params, forecast_params, item.time_series)
all_metadata[i] = metadata
owners[i] = item.owner
ts_keys[i] = make_time_series_key(metadata)
time_series_uuids[get_uuid(item.time_series)] = item.time_series
rethrow()
finally
mgr.bulk_update_cache = nothing
end
end
end

uuids = keys(time_series_uuids)
existing_ts_uuids = if isempty(uuids)
Base.UUID[]
else
list_existing_time_series_uuids(mgr.metadata_store, uuids)
end
new_ts_uuids = setdiff(keys(time_series_uuids), existing_ts_uuids)

existing_metadata = list_existing_metadata(mgr.metadata_store, owners, all_metadata)
if !isempty(existing_metadata)
throw(
ArgumentError(
"Time series data with duplicate attributes are already stored: " *
"$(existing_metadata)",
),
function bulk_add_time_series!(
mgr::TimeSeriesManager,
associations;
kwargs...,
)
# TODO: deprecate this function if team agrees
ts_keys = TimeSeriesKey[]
begin_time_series_update(mgr) do
for association in associations
key = add_time_series!(
mgr,
association.owner,
association.time_series; association.features...,
)
end
for uuid in new_ts_uuids
serialize_time_series!(mgr.data_store, time_series_uuids[uuid])
end
if length(all_metadata) == 1
add_metadata!(mgr.metadata_store, owners[1], all_metadata[1])
else
add_metadata!(mgr.metadata_store, owners, all_metadata)
push!(ts_keys, key)
end
end

return ts_keys
end

_get_forecast_params(ts::Forecast) = make_time_series_parameters(ts)
_get_forecast_params(::StaticTimeSeries) = nothing

function add_time_series!(
mgr::TimeSeriesManager,
owner::TimeSeriesOwners,
time_series::TimeSeriesData;
features...,
)
return add_time_series!(
mgr,
[TimeSeriesAssociation(owner, time_series; features...)],
)[1]
_throw_if_read_only(mgr)
forecast_params = _get_forecast_params(mgr, time_series)
sts_params = StaticTimeSeriesParameters()
throw_if_does_not_support_time_series(owner)
check_time_series_data(time_series)
metadata_type = time_series_data_to_metadata(typeof(time_series))
metadata = metadata_type(time_series; features...)
ts_key = make_time_series_key(metadata)
check_params_compatibility(sts_params, forecast_params, time_series)

if has_metadata(
mgr.metadata_store,
owner;
time_series_type = typeof(time_series),
name = get_name(metadata),
resolution = get_resolution(metadata),
features...,
)
throw(
ArgumentError(
"Time series data with duplicate attributes are already stored: " *
"$(metadata)",
),
)
end

if !has_metadata(mgr.metadata_store, get_uuid(time_series))
serialize_time_series!(mgr.data_store, time_series)
end

add_metadata!(mgr.metadata_store, owner, metadata)
return ts_key
end

function clear_time_series!(mgr::TimeSeriesManager)
Expand Down
Loading

0 comments on commit 6eee783

Please sign in to comment.