diff --git a/src/system_data.jl b/src/system_data.jl index d97e3b2b..c44c77d4 100644 --- a/src/system_data.jl +++ b/src/system_data.jl @@ -574,7 +574,11 @@ function _transform_single_time_series!( end try - add_metadata!(data.time_series_manager.metadata_store, components, all_metadata) + begin_time_series_update(data.time_series_manager) do + for (component, metadata) in zip(components, all_metadata) + add_metadata!(data.time_series_manager.metadata_store, component, metadata) + end + end catch # This shouldn't be needed, but just in case there is a bug, remove all # DeterministicSingleTimeSeries to keep our guarantee. @@ -1225,7 +1229,7 @@ function fast_deepcopy_system( old_supplemental_attribute_manager = data.supplemental_attribute_manager new_time_series_manager = if skip_time_series - TimeSeriesManager(InMemoryTimeSeriesStorage(), TimeSeriesMetadataStore(), true) + TimeSeriesManager(InMemoryTimeSeriesStorage(), TimeSeriesMetadataStore(), true, nothing) else old_time_series_manager end diff --git a/src/time_series_manager.jl b/src/time_series_manager.jl index b28ab47f..26f78cbb 100644 --- a/src/time_series_manager.jl +++ b/src/time_series_manager.jl @@ -2,10 +2,15 @@ # This strikes a balance in SQLite efficiency vs loading many time arrays into memory. const ADD_TIME_SERIES_BATCH_SIZE = 100 +mutable struct BulkUpdateTSCache + forecast_params::Union{Nothing, ForecastParameters} +end + mutable struct TimeSeriesManager <: InfrastructureSystemsType data_store::TimeSeriesStorage metadata_store::TimeSeriesMetadataStore read_only::Bool + bulk_update_cache::Union{Nothing, BulkUpdateTSCache} end function TimeSeriesManager(; @@ -31,107 +36,127 @@ function TimeSeriesManager(; compression = compression, ) end - return TimeSeriesManager(data_store, metadata_store, read_only) + return TimeSeriesManager(data_store, metadata_store, read_only, nothing) end -function bulk_add_time_series!( - mgr::TimeSeriesManager, - associations; - batch_size = ADD_TIME_SERIES_BATCH_SIZE, -) - ts_keys = TimeSeriesKey[] - batch = TimeSeriesAssociation[] - sizehint!(batch, batch_size) - open_store!(mgr.data_store, "r+") do - for association in associations - push!(batch, association) - if length(batch) >= batch_size - append!(ts_keys, add_time_series!(mgr, batch)) - empty!(batch) - end - end +_get_forecast_params(ts::Forecast) = make_time_series_parameters(ts) +_get_forecast_params(::StaticTimeSeries) = nothing +_get_forecast_params!(::TimeSeriesManager, ::StaticTimeSeries) = nothing - if !isempty(batch) - append!(ts_keys, add_time_series!(mgr, batch)) +function _get_forecast_params!(mgr::TimeSeriesManager, forecast::Forecast) + # The time to read forecast parameters from the database can be slow, particularly when + # large numbers of StaticTimeSeries are stored. + # During a bulk update, cache it. + if isnothing(mgr.bulk_update_cache) + return get_forecast_parameters(mgr.metadata_store) + end + + if isnothing(mgr.bulk_update_cache.forecast_params) + mgr.bulk_update_cache.forecast_params = get_forecast_parameters(mgr.metadata_store) + if isnothing(mgr.bulk_update_cache.forecast_params) + mgr.bulk_update_cache.forecast_params = _get_forecast_params(forecast) end end - return ts_keys + return mgr.bulk_update_cache.forecast_params end -function add_time_series!(mgr::TimeSeriesManager, batch::Vector{TimeSeriesAssociation}) - _throw_if_read_only(mgr) - forecast_params = get_forecast_parameters(mgr.metadata_store) - sts_params = StaticTimeSeriesParameters() - num_metadata = length(batch) - all_metadata = Vector{TimeSeriesMetadata}(undef, num_metadata) - owners = Vector{TimeSeriesOwners}(undef, num_metadata) - ts_keys = Vector{TimeSeriesKey}(undef, num_metadata) - time_series_uuids = Dict{Base.UUID, TimeSeriesData}() - metadata_identifiers = Set{Tuple}() - TimerOutputs.@timeit_debug SYSTEM_TIMERS "add_time_series! in bulk" begin - for (i, item) in enumerate(batch) - throw_if_does_not_support_time_series(item.owner) - check_time_series_data(item.time_series) - metadata_type = time_series_data_to_metadata(typeof(item.time_series)) - metadata = metadata_type(item.time_series; item.features...) - identifier = make_unique_owner_metadata_identifer(item.owner, metadata) - if identifier in metadata_identifiers - throw(ArgumentError("$identifier is present multiple times")) +""" +Begin an update of time series. Use this function when adding many time series arrays +in order to improve performance. + +If an error occurs during the update, changes will be reverted. + +Using this function to remove time series is currently not supported. +""" +function begin_time_series_update( + func::Function, + mgr::TimeSeriesManager, +) + open_store!(mgr.data_store, "r+") do + original_ts_uuids = Set(list_existing_time_series_uuids(mgr.metadata_store)) + mgr.bulk_update_cache = BulkUpdateTSCache(nothing) + try + SQLite.transaction(mgr.metadata_store.db) do + func() end - push!(metadata_identifiers, identifier) - if isnothing(forecast_params) - forecast_params = _get_forecast_params(item.time_series) + catch + # If an error occurs, we can easily remove new time series data to ensure + # that the metadata database is consistent with the data. + # We currently can't restore time series data that was deleted. + new_ts_uuids = setdiff( + Set(list_existing_time_series_uuids(mgr.metadata_store)), + original_ts_uuids, + ) + for uuid in new_ts_uuids + remove_time_series!(mgr.data_store, uuid) end - check_params_compatibility(sts_params, forecast_params, item.time_series) - all_metadata[i] = metadata - owners[i] = item.owner - ts_keys[i] = make_time_series_key(metadata) - time_series_uuids[get_uuid(item.time_series)] = item.time_series + rethrow() + finally + mgr.bulk_update_cache = nothing end + end +end - uuids = keys(time_series_uuids) - existing_ts_uuids = if isempty(uuids) - Base.UUID[] - else - list_existing_time_series_uuids(mgr.metadata_store, uuids) - end - new_ts_uuids = setdiff(keys(time_series_uuids), existing_ts_uuids) - - existing_metadata = list_existing_metadata(mgr.metadata_store, owners, all_metadata) - if !isempty(existing_metadata) - throw( - ArgumentError( - "Time series data with duplicate attributes are already stored: " * - "$(existing_metadata)", - ), +function bulk_add_time_series!( + mgr::TimeSeriesManager, + associations; + kwargs..., +) + # TODO: deprecate this function if team agrees + ts_keys = TimeSeriesKey[] + begin_time_series_update(mgr) do + for association in associations + key = add_time_series!( + mgr, + association.owner, + association.time_series; association.features..., ) - end - for uuid in new_ts_uuids - serialize_time_series!(mgr.data_store, time_series_uuids[uuid]) - end - if length(all_metadata) == 1 - add_metadata!(mgr.metadata_store, owners[1], all_metadata[1]) - else - add_metadata!(mgr.metadata_store, owners, all_metadata) + push!(ts_keys, key) end end + return ts_keys end -_get_forecast_params(ts::Forecast) = make_time_series_parameters(ts) -_get_forecast_params(::StaticTimeSeries) = nothing - function add_time_series!( mgr::TimeSeriesManager, owner::TimeSeriesOwners, time_series::TimeSeriesData; features..., ) - return add_time_series!( - mgr, - [TimeSeriesAssociation(owner, time_series; features...)], - )[1] + _throw_if_read_only(mgr) + forecast_params = _get_forecast_params!(mgr, time_series) + sts_params = StaticTimeSeriesParameters() + throw_if_does_not_support_time_series(owner) + check_time_series_data(time_series) + metadata_type = time_series_data_to_metadata(typeof(time_series)) + metadata = metadata_type(time_series; features...) + ts_key = make_time_series_key(metadata) + check_params_compatibility(sts_params, forecast_params, time_series) + + if has_metadata( + mgr.metadata_store, + owner; + time_series_type = typeof(time_series), + name = get_name(metadata), + resolution = get_resolution(metadata), + features..., + ) + throw( + ArgumentError( + "Time series data with duplicate attributes are already stored: " * + "$(metadata)", + ), + ) + end + + if !has_metadata(mgr.metadata_store, get_uuid(time_series)) + serialize_time_series!(mgr.data_store, time_series) + end + + add_metadata!(mgr.metadata_store, owner, metadata) + return ts_key end function clear_time_series!(mgr::TimeSeriesManager) diff --git a/src/time_series_metadata_store.jl b/src/time_series_metadata_store.jl index e62a3661..94fba4a0 100644 --- a/src/time_series_metadata_store.jl +++ b/src/time_series_metadata_store.jl @@ -18,7 +18,7 @@ mutable struct TimeSeriesMetadataStore cached_statements::Dict{String, SQLite.Stmt} # This caching allows the code to skip some string interpolations. # It is experimental for PowerSimulations, which calls has_metadata frequently. - # It may not be necessary. Savings are less than 1 us. + # It may not be necessary. Savings are minimal. has_metadata_statements::Dict{HasMetadataQueryKey, SQLite.Stmt} metadata_uuids::Dict{Base.UUID, TimeSeriesMetadata} # If you add any fields, ensure they are managed in deepcopy_internal below. @@ -294,121 +294,38 @@ function add_metadata!( owner::TimeSeriesOwners, metadata::TimeSeriesMetadata, ) - TimerOutputs.@timeit_debug SYSTEM_TIMERS "add time series metadata single" begin - owner_category = _get_owner_category(owner) - time_series_type = time_series_metadata_to_data(metadata) - ts_category = _get_time_series_category(time_series_type) - features = make_features_string(metadata.features) - vals = _create_row( - metadata, - owner, - owner_category, - _convert_ts_type_to_string(time_series_type), - ts_category, - features, - ) - params = chop(repeat("?,", length(vals))) - _execute_cached( - store, - "INSERT INTO $ASSOCIATIONS_TABLE_NAME VALUES($params)", - vals, - ) - metadata_uuid = get_uuid(metadata) - metadata_row = - (missing, string(metadata_uuid), JSON3.write(serialize(metadata))) - metadata_params = ("?,?,jsonb(?)") - TimerOutputs.@timeit_debug SYSTEM_TIMERS "add ts_metadata row" begin - _execute_cached( - store, - "INSERT OR IGNORE INTO $METADATA_TABLE_NAME VALUES($metadata_params)", - metadata_row, - ) - end - if !haskey(store.metadata_uuids, metadata_uuid) - store.metadata_uuids[metadata_uuid] = metadata - end - @debug "Added metadata = $metadata to $(summary(owner))" _group = - LOG_GROUP_TIME_SERIES - end - return -end - -function add_metadata!( - store::TimeSeriesMetadataStore, - owners::Vector{<:TimeSeriesOwners}, - all_metadata::Vector{<:TimeSeriesMetadata}, -) - TimerOutputs.@timeit_debug SYSTEM_TIMERS "add time series metadata bulk" begin - @assert_op length(owners) == length(all_metadata) - columns = ( - "id", - "time_series_uuid", - "time_series_type", - "time_series_category", - "initial_timestamp", - "resolution_ms", - "horizon_ms", - "interval_ms", - "window_count", - "length", - "name", - "owner_uuid", - "owner_type", - "owner_category", - "features", - "metadata_uuid", - ) - num_rows = length(all_metadata) - num_columns = length(columns) - data = OrderedDict(x => Vector{Any}(undef, num_rows) for x in columns) - metadata_rows = Tuple[] - new_metadata = TimeSeriesMetadata[] - for i in 1:num_rows - owner = owners[i] - metadata = all_metadata[i] - metadata_uuid = get_uuid(metadata) - if !haskey(store.metadata_uuids, metadata_uuid) - push!( - metadata_rows, - (missing, string(metadata_uuid), JSON3.write(serialize(metadata))), - ) - push!(new_metadata, metadata) - end - owner_category = _get_owner_category(owner) - time_series_type = time_series_metadata_to_data(metadata) - ts_category = _get_time_series_category(time_series_type) - features = make_features_string(metadata.features) - row = _create_row( - metadata, - owner, - owner_category, - _convert_ts_type_to_string(time_series_type), - ts_category, - features, - ) - for (j, column) in enumerate(columns) - data[column][i] = row[j] - end - end - - placeholder = chop(repeat("?,", num_columns)) - SQLite.DBInterface.executemany( - store.db, - "INSERT INTO $ASSOCIATIONS_TABLE_NAME VALUES($placeholder)", - NamedTuple(Symbol(k) => v for (k, v) in data), - ) - _add_rows!( - store, - metadata_rows, - ("id", "metadata_uuid", "metadata"), - METADATA_TABLE_NAME, - ) - for metadata in new_metadata - store.metadata_uuids[get_uuid(metadata)] = metadata - end - @debug "Added $num_rows instances of time series metadata" _group = - LOG_GROUP_TIME_SERIES + owner_category = _get_owner_category(owner) + time_series_type = time_series_metadata_to_data(metadata) + ts_category = _get_time_series_category(time_series_type) + features = make_features_string(metadata.features) + vals = _create_row( + metadata, + owner, + owner_category, + _convert_ts_type_to_string(time_series_type), + ts_category, + features, + ) + params = chop(repeat("?,", length(vals))) + _execute_cached( + store, + "INSERT INTO $ASSOCIATIONS_TABLE_NAME VALUES($params)", + vals, + ) + metadata_uuid = get_uuid(metadata) + metadata_row = + (missing, string(metadata_uuid), JSON3.write(serialize(metadata))) + metadata_params = ("?,?,jsonb(?)") + _execute_cached( + store, + "INSERT OR IGNORE INTO $METADATA_TABLE_NAME VALUES($metadata_params)", + metadata_row, + ) + if !haskey(store.metadata_uuids, metadata_uuid) + store.metadata_uuids[metadata_uuid] = metadata end + @debug "Added metadata = $metadata to $(summary(owner))" _group = + LOG_GROUP_TIME_SERIES return end @@ -531,19 +448,19 @@ function get_forecast_initial_times(store::TimeSeriesMetadataStore) return get_initial_times(params.initial_timestamp, params.count, params.interval) end +const _GET_FORECAST_PARAMS = """ + SELECT + horizon_ms + ,initial_timestamp + ,interval_ms + ,resolution_ms + ,window_count + FROM $ASSOCIATIONS_TABLE_NAME + WHERE horizon_ms IS NOT NULL + LIMIT 1 +""" function get_forecast_parameters(store::TimeSeriesMetadataStore) - query = """ - SELECT - horizon_ms - ,initial_timestamp - ,interval_ms - ,resolution_ms - ,window_count - FROM $ASSOCIATIONS_TABLE_NAME - WHERE horizon_ms IS NOT NULL - LIMIT 1 - """ - table = Tables.rowtable(_execute_cached(store, query)) + table = Tables.rowtable(_execute_cached(store, _GET_FORECAST_PARAMS)) isempty(table) && return nothing row = table[1] return ForecastParameters(; @@ -991,7 +908,14 @@ function list_existing_time_series_uuids(store::TimeSeriesMetadataStore, uuids) FROM $ASSOCIATIONS_TABLE_NAME WHERE time_series_uuid IN ($placeholder) """ - table = Tables.columntable(_execute(store, query, uuids_str)) + table = Tables.columntable(_execute_cached(store, query, uuids_str)) + return Base.UUID.(table.time_series_uuid) +end + +const _LIST_EXISTING_TS_UUIDS = "SELECT DISTINCT time_series_uuid FROM $ASSOCIATIONS_TABLE_NAME" + +function list_existing_time_series_uuids(store::TimeSeriesMetadataStore) + table = Tables.columntable(_execute_cached(store, _LIST_EXISTING_TS_UUIDS)) return Base.UUID.(table.time_series_uuid) end diff --git a/test/test_time_series.jl b/test/test_time_series.jl index 9178d849..96a6f90b 100644 --- a/test/test_time_series.jl +++ b/test/test_time_series.jl @@ -1,3 +1,4 @@ +using Core: Argument @testset "Test add forecasts on the fly from dict" begin sys = IS.SystemData() name = "Component1" @@ -2943,6 +2944,102 @@ end ) end +@testset "Test bulk addition of time series with transaction" begin + sys = IS.SystemData() + name = "Component1" + component = IS.TestComponent(name, 5) + IS.add_component!(sys, component) + + initial_time = Dates.DateTime("2020-09-01") + resolution = Dates.Hour(1) + + other_time = initial_time + resolution + name = "test" + horizon_count = 24 + + make_values(count, index) = ones(count) * index + IS.begin_time_series_update(sys.time_series_manager) do + for i in 1:5 + forecast = IS.Deterministic(; + data = SortedDict( + initial_time => make_values(horizon_count, i), + other_time => make_values(horizon_count, i), + ), + name = "ts_$(i)", resolution = resolution, + ) + IS.add_time_series!(sys, component, forecast; model_year = "high") + end + end + ts_keys = IS.get_time_series_keys(component) + @test length(ts_keys) == 5 + actual_ts_data = Dict(IS.get_name(x) => x for x in ts_keys) + for i in 1:5 + name = "ts_$(i)" + @test haskey(actual_ts_data, name) + key = actual_ts_data[name] + ts = IS.get_time_series(component, key) + @test ts isa IS.Deterministic + data = IS.get_data(ts) + @test !isempty(values(data)) + for val in values(data) + @test val == make_values(horizon_count, i) + end + end +end + +@testset "Test bulk addition of time series with transaction and error" begin + sys = IS.SystemData() + name = "Component1" + component = IS.TestComponent(name, 5) + IS.add_component!(sys, component) + + initial_time = Dates.DateTime("2020-09-01") + resolution = Dates.Hour(1) + + other_time = initial_time + resolution + name = "test" + horizon_count = 24 + + make_values(count, index) = ones(count) * index + + bystander = IS.Deterministic(; + data = SortedDict( + initial_time => rand(horizon_count), + other_time => rand(horizon_count), + ), + name = "bystander", resolution = resolution, + ) + IS.add_time_series!(sys, component, bystander) + + @test_throws( + ArgumentError, + IS.begin_time_series_update(sys.time_series_manager) do + for i in 1:5 + if i < 5 + name = "ts_$i" + else + name = "ts_$(i - 1)" + end + forecast = IS.Deterministic(; + data = SortedDict( + initial_time => make_values(horizon_count, i), + other_time => make_values(horizon_count, i), + ), + name = name, resolution = resolution, + ) + IS.add_time_series!(sys, component, forecast) + end + end, + ) + ts_keys = IS.get_time_series_keys(component) + @test length(ts_keys) == 1 + key = ts_keys[1] + @test IS.get_name(key) == "bystander" + res = IS.get_time_series(component, key) + @test res isa IS.Deterministic + @test IS.get_data(res) == IS.get_data(bystander) +end + @testset "Test list_existing_metadata" begin sys = IS.SystemData() initial_time = Dates.DateTime("2020-09-01")