diff --git a/swmmanywhere/metric_utilities.py b/swmmanywhere/metric_utilities.py index a03e438c..1760a8db 100644 --- a/swmmanywhere/metric_utilities.py +++ b/swmmanywhere/metric_utilities.py @@ -118,15 +118,64 @@ def extract_var(df: pd.DataFrame, df_.date.min()).dt.total_seconds() return df_ -def align_calc_nse(synthetic_results: pd.DataFrame, +def pbias(y: np.ndarray, + yhat: np.ndarray) -> float: + r"""PBIAS. + + Calculate the percent bias: + + $$ + pbias = \frac{{syn_{length} - real_{length}}}{{real_{length}}} + $$ + + where: + + - $syn_{length}$ is the synthetic length, + - $real_{length}$ is the real length. + """ + total_observed = y.sum() + if total_observed == 0: + return np.inf + return (yhat.sum() - total_observed) / total_observed + +def nse(y: np.ndarray, + yhat: np.ndarray) -> float: + """Calculate Nash-Sutcliffe efficiency (NSE).""" + if np.std(y) == 0: + return np.inf + return 1 - np.sum(np.square(y - yhat)) / np.sum(np.square(y - np.mean(y))) + +def kge(y: np.ndarray,yhat: np.ndarray) -> float: + """Calculate the Kling-Gupta Efficiency (KGE) between simulated and observed data. + + Parameters: + y (np.array): Observed data array. + yhat (np.array): Simulated data array. + + Returns: + float: The KGE value. + """ + if (np.std(y) == 0) | (np.mean(y) == 0): + return np.inf + if np.std(yhat) == 0: + r = 0 + else: + r = np.corrcoef(yhat, y)[0, 1] + alpha = np.std(yhat) / np.std(y) + beta = np.mean(yhat) / np.mean(y) + kge = 1 - np.sqrt((r - 1)**2 + (alpha - 1)**2 + (beta - 1)**2) + return kge + +def align_calc_coef(synthetic_results: pd.DataFrame, real_results: pd.DataFrame, variable: str, syn_ids: list, - real_ids: list) -> float | None: - """Align and calculate NSE. + real_ids: list, + coef_func: Callable = nse) -> float: + """Align and calculate coef_func. - Align the synthetic and real data and calculate the Nash-Sutcliffe - efficiency (NSE) of the variable over time. In cases where the synthetic + Align the synthetic and real data and calculate the coef_func metric + of the variable over time. In cases where the synthetic data is does not overlap the real data, the value is interpolated. """ synthetic_results = synthetic_results.copy() @@ -163,8 +212,8 @@ def align_calc_nse(synthetic_results: pd.DataFrame, df['value_syn'] = df.value_syn.interpolate().to_numpy() df = df.dropna(subset=['value_real']) - # Calculate NSE - return nse(df.value_real, df.value_syn) + # Calculate coef_func + return coef_func(df.value_real, df.value_syn) def create_subgraph(G: nx.Graph, nodes: list) -> nx.Graph: @@ -195,27 +244,22 @@ def create_subgraph(G: nx.Graph, SG.graph.update(G.graph) return SG -def nse(y: np.ndarray, - yhat: np.ndarray) -> float | None: - """Calculate Nash-Sutcliffe efficiency (NSE).""" - if np.std(y) == 0: - return np.inf - return 1 - np.sum((y - yhat)**2) / np.sum((y - np.mean(y))**2) - -def median_nse_by_group(results: pd.DataFrame, - gb_key: str) -> float | None: - """Median NSE by group. +def median_coef_by_group(results: pd.DataFrame, + gb_key: str, + coef_func: Callable = nse) -> float: + """Median coef_func value by group. - Calculate the median Nash-Sutcliffe efficiency (NSE) of a variable over time + Calculate the median coef_func value of a variable over time for each group in the results dataframe, and return the median of these values. Args: results (pd.DataFrame): The results dataframe. gb_key (str): The column to group by. + coef_func (Callable): The coefficient to calculate. Default is nse. Returns: - float: The median NSE. + float: The median coef_func value. """ val = ( results @@ -223,11 +267,10 @@ def median_nse_by_group(results: pd.DataFrame, .sum() .reset_index() .groupby(gb_key) - .apply(lambda x: nse(x.value_real, x.value_sim)) + .apply(lambda x: coef_func(x.value_real, x.value_sim)) .median() ) - if not np.isfinite(val): - return None + return val @@ -420,6 +463,193 @@ def create_grid(bbox: tuple, return gpd.GeoDataFrame(grid) +def subcatchment(synthetic_results: pd.DataFrame, + synthetic_subs: gpd.GeoDataFrame, + synthetic_G: nx.Graph, + real_results: pd.DataFrame, + real_subs: gpd.GeoDataFrame, + real_G: nx.Graph, + metric_evaluation: MetricEvaluation, + var: str, + coef_func: Callable, + ): + """Subcatchment scale metric. + + Calculate the coefficient (coef_func) of a variable over time for aggregated + to real subcatchment scale. The metric produced is the median coef_func + across all subcatchments. + """ + results = align_by_shape(var, + synthetic_results = synthetic_results, + real_results = real_results, + shapes = real_subs, + synthetic_G = synthetic_G, + real_G = real_G) + + return median_coef_by_group(results, 'sub_id', coef_func=coef_func) + +def grid(synthetic_results: pd.DataFrame, + synthetic_subs: gpd.GeoDataFrame, + synthetic_G: nx.Graph, + real_results: pd.DataFrame, + real_subs: gpd.GeoDataFrame, + real_G: nx.Graph, + metric_evaluation: MetricEvaluation, + var: str, + coef_func: Callable, + ): + """Grid scale metric. + + Classify synthetic nodes to a grid and calculate the coef_func of a variable over + time for each grid cell. The metric produced is the median coef_func across all + grid cells. + """ + # Create a grid (GeoDataFrame of polygons) + scale = metric_evaluation.grid_scale + grid = create_grid(real_subs.total_bounds, + scale) + grid.crs = real_subs.crs + + # Align results + results = align_by_shape(var, + synthetic_results = synthetic_results, + real_results = real_results, + shapes = grid, + synthetic_G = synthetic_G, + real_G = real_G) + # Calculate coefficient + return median_coef_by_group(results, 'sub_id', coef_func=coef_func) + +def outlet(synthetic_results: pd.DataFrame, + synthetic_subs: gpd.GeoDataFrame, + synthetic_G: nx.Graph, + real_results: pd.DataFrame, + real_subs: gpd.GeoDataFrame, + real_G: nx.Graph, + metric_evaluation: MetricEvaluation, + var: str, + coef_func: Callable, + ): + """Outlet scale metric. + + Calculate the coefficient of a variable for the subgraph that + drains to the dominant outlet node. The dominant outlet node of the 'real' + network is calculated by dominant_outlet, while the dominant outlet node of + the 'synthetic' network is calculated by best_outlet_match. + """ + # Identify synthetic and real arcs that flow into the best outlet node + sg_syn, syn_outlet = best_outlet_match(synthetic_G, real_subs) + sg_real, real_outlet = dominant_outlet(real_G, real_results) + + allowable_var = ['nmanholes', 'npipes', 'length', 'flow', 'flooding'] + if var not in allowable_var: + raise ValueError(f"Invalid variable {var}. Can be {allowable_var}") + + if var == 'nmanholes': + # Calculate the coefficient based on the number of manholes + return coef_func(np.atleast_1d(sg_real.number_of_nodes()), + np.atleast_1d(sg_syn.number_of_nodes())) + if var == 'npipes': + # Calculate the coefficient based on the number of pipes + return coef_func(np.atleast_1d(sg_real.number_of_edges()), + np.atleast_1d(sg_syn.number_of_edges())) + if var == 'length': + # Calculate the coefficient based on the total length of the pipes + return coef_func( + np.array( + list(nx.get_edge_attributes(sg_real, 'length').values()) + ), + np.array( + list(nx.get_edge_attributes(sg_syn, 'length').values()) + ) + ) + if var == 'flow': + # Identify synthetic and real arcs that flow into the best outlet node + syn_arc = [d['id'] for u,v,d in synthetic_G.edges(data=True) + if v == syn_outlet] + real_arc = [d['id'] for u,v,d in real_G.edges(data=True) + if v == real_outlet] + elif var == 'flooding': + # Use all nodes in the subgraphs + syn_arc = list(sg_syn.nodes) + real_arc = list(sg_real.nodes) + + # Calculate the coefficient + return align_calc_coef(synthetic_results, + real_results, + var, + syn_arc, + real_arc, + coef_func = coef_func) + +def metric_factory(name: str): + """Create a metric function. + + A factory function to create a metric function based on the name. The first + part of the name is the scale, the second part is the metric, and the third + part is the variable. For example, 'grid_nse_flooding' is a metric function + that calculates the NSE of flooding at the grid scale. + + Args: + name (str): The name of the metric. + + Returns: + Callable: The metric function. + """ + # Split the name + parts = name.split('_') + if len(parts) != 3: + raise ValueError("Invalid metric name. Expected 'scale_metric_variable'") + scale, metric, variable = parts + + # Get coefficient + coef_dict = {"nse": nse, "kge": kge, "pbias": pbias} + if metric not in coef_dict: + raise ValueError(f"Invalid coef {metric}. Can be {coef_dict.values()}") + coef_func = coef_dict[metric] + + # Get scale + func_dict = {"grid": grid, "subcatchment": subcatchment, "outlet": outlet} + if scale not in func_dict: + raise ValueError(f"Invalid scale {scale}. Can be {func_dict.keys()}") + func = func_dict[scale] + + # Further validation + design_variables = ['length', 'nmanholes', 'npipes'] + if variable in design_variables: + if scale != 'outlet': + raise ValueError(f"Variable {variable} only supported at the outlet scale") + if metric != 'pbias': + raise ValueError(f"Variable {variable} only valid with pbias metric") + + # Create the metric function + def new_metric(**kwargs): + return func(coef_func = coef_func, + var = variable, + **kwargs) + new_metric.__name__ = name + return new_metric + +metrics.register(metric_factory('outlet_nse_flow')) +metrics.register(metric_factory('outlet_kge_flow')) +metrics.register(metric_factory('outlet_pbias_flow')) + +metrics.register(metric_factory('outlet_pbias_length')) +metrics.register(metric_factory('outlet_pbias_npipes')) +metrics.register(metric_factory('outlet_pbias_nmanholes')) + +metrics.register(metric_factory('outlet_nse_flooding')) +metrics.register(metric_factory('outlet_kge_flooding')) +metrics.register(metric_factory('outlet_pbias_flooding')) + +metrics.register(metric_factory('grid_nse_flooding')) +metrics.register(metric_factory('grid_kge_flooding')) +metrics.register(metric_factory('grid_pbias_flooding')) + +metrics.register(metric_factory('subcatchment_nse_flooding')) +metrics.register(metric_factory('subcatchment_kge_flooding')) +metrics.register(metric_factory('subcatchment_pbias_flooding')) + @metrics.register def nc_deltacon0(synthetic_G: nx.Graph, real_G: nx.Graph, @@ -536,60 +766,6 @@ def kstest_betweenness( return stats.ks_2samp(list(syn_betweenness.values()), list(real_betweenness.values())).statistic -@metrics.register -def outlet_nse_flow(synthetic_G: nx.Graph, - synthetic_results: pd.DataFrame, - real_G: nx.Graph, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float | None: - """Outlet NSE flow. - - Calculate the Nash-Sutcliffe efficiency (NSE) of flow over time, where flow - is measured as the total flow of all arcs that drain to the 'dominant' - outlet node. The dominant outlet node of the 'real' network is calculated by - dominant_outlet, while the dominant outlet node of the 'synthetic' network - is calculated by best_outlet_match. - """ - # Identify synthetic and real arcs that flow into the best outlet node - _, syn_outlet = best_outlet_match(synthetic_G, real_subs) - syn_arc = [d['id'] for u,v,d in synthetic_G.edges(data=True) - if v == syn_outlet] - _, real_outlet = dominant_outlet(real_G, real_results) - real_arc = [d['id'] for u,v,d in real_G.edges(data=True) - if v == real_outlet] - - return align_calc_nse(synthetic_results, - real_results, - 'flow', - syn_arc, - real_arc) - -@metrics.register -def outlet_nse_flooding(synthetic_G: nx.Graph, - synthetic_results: pd.DataFrame, - real_G: nx.Graph, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float | None: - """Outlet NSE flooding. - - Calculate the Nash-Sutcliffe efficiency (NSE) of flooding over time, where - flooding is the total volume of flooded water across all nodes that drain - to the 'dominant' outlet node. The dominant outlet node of the 'real' - network is calculated by dominant_outlet, while the dominant outlet node of - the 'synthetic' network is calculated by best_outlet_match. - """ - # Identify synthetic and real outlet arcs - sg_syn, _ = best_outlet_match(synthetic_G, real_subs) - sg_real, _ = dominant_outlet(real_G, real_results) - - return align_calc_nse(synthetic_results, - real_results, - 'flooding', - list(sg_syn.nodes), - list(sg_real.nodes)) - @metrics.register def outlet_kstest_diameters(real_G: nx.Graph, synthetic_G: nx.Graph, @@ -612,153 +788,3 @@ def outlet_kstest_diameters(real_G: nx.Graph, real_diameters = nx.get_edge_attributes(sg_real, 'diameter') return stats.ks_2samp(list(syn_diameters.values()), list(real_diameters.values())).statistic - -@metrics.register -def outlet_pbias_length(real_G: nx.Graph, - synthetic_G: nx.Graph, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float: - r"""Outlet PBIAS length. - - Calculate the percent bias of the total edge length in the subgraph that - drains to the dominant outlet node. The dominant outlet node of the 'real' - network is calculated by dominant_outlet, while the dominant outlet node of - the 'synthetic' network is calculated by best_outlet_match. - - The percentage bias is calculated as: - - $$ - pbias = \frac{{syn_{length} - real_{length}}}{{real_{length}}} - $$ - - where: - - - $syn_{length}$ is the synthetic length, - - $real_{length}$ is the real length. - - """ - # Identify synthetic and real outlet arcs - sg_syn, _ = best_outlet_match(synthetic_G, real_subs) - sg_real, _ = dominant_outlet(real_G, real_results) - - # Calculate the percent bias - syn_length = sum([d['length'] for u,v,d in sg_syn.edges(data=True)]) - real_length = sum([d['length'] for u,v,d in sg_real.edges(data=True)]) - return (syn_length - real_length) / real_length - -@metrics.register -def outlet_pbias_nmanholes(real_G: nx.Graph, - synthetic_G: nx.Graph, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float: - r"""Outlet PBIAS number of manholes (nodes). - - Calculate the percent bias of the total number of nodes in the subgraph - that drains to the dominant outlet node. The dominant outlet node of the - 'real' network is calculated by dominant_outlet, while the dominant outlet - node of the 'synthetic' network is calculated by best_outlet_match. - - The percentage bias is calculated as: - - $$ - pbias = \frac{{syn_{nnodes} - real_{nnodes}}}{{real_{nnodes}}} - $$ - - where: - - - $syn_{nnodes}$ is the number of synthetic nodes, - - $real_{nnodes}$ is the real number of nodes. - - """ - # Identify synthetic and real outlet arcs - sg_syn, _ = best_outlet_match(synthetic_G, real_subs) - sg_real, _ = dominant_outlet(real_G, real_results) - - return (sg_syn.number_of_nodes() - sg_real.number_of_nodes()) \ - / sg_real.number_of_nodes() - -@metrics.register -def outlet_pbias_npipes(real_G: nx.Graph, - synthetic_G: nx.Graph, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float: - r"""Outlet PBIAS number of pipes (edges). - - Calculate the percent bias of the total number of edges in the subgraph - that drains to the dominant outlet node. The dominant outlet node of the - 'real' network is calculated by dominant_outlet, while the dominant outlet - node of the 'synthetic' network is calculated by best_outlet_match. - - - The percentage bias is calculated as: - - $$ - pbias = \frac{{syn_{nedges} - real_{nedges}}}{{real_{nedges}}} - $$ - - where: - - - $syn_{nedges}$ is the number of synthetic edges, - - $real_{nedges}$ is the real number of edges. - - """ - # Identify synthetic and real outlet arcs - sg_syn, _ = best_outlet_match(synthetic_G, real_subs) - sg_real, _ = dominant_outlet(real_G, real_results) - - return (sg_syn.number_of_edges() - sg_real.number_of_edges()) \ - / sg_real.number_of_edges() - - -@metrics.register -def subcatchment_nse_flooding(synthetic_G: nx.Graph, - real_G: nx.Graph, - synthetic_results: pd.DataFrame, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float | None: - """Subcatchment NSE flooding. - - Classify synthetic nodes to real subcatchments and calculate the NSE of - flooding over time for each subcatchment. The metric produced is the median - NSE across all subcatchments. - """ - results = align_by_shape('flooding', - synthetic_results = synthetic_results, - real_results = real_results, - shapes = real_subs, - synthetic_G = synthetic_G, - real_G = real_G) - - return median_nse_by_group(results, 'sub_id') - -@metrics.register -def grid_nse_flooding(synthetic_G: nx.Graph, - real_G: nx.Graph, - synthetic_results: pd.DataFrame, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - metric_evaluation: MetricEvaluation, - **kwargs) -> float | None: - """Grid NSE flooding. - - Classify synthetic nodes to a grid and calculate the NSE of - flooding over time for each grid cell. The metric produced is the median - NSE across all grid cells. - """ - scale = metric_evaluation.grid_scale - grid = create_grid(real_subs.total_bounds, - scale) - grid.crs = real_subs.crs - - results = align_by_shape('flooding', - synthetic_results = synthetic_results, - real_results = real_results, - shapes = grid, - synthetic_G = synthetic_G, - real_G = real_G) - - return median_nse_by_group(results, 'sub_id') diff --git a/tests/test_metric_utilities.py b/tests/test_metric_utilities.py index d5a6e1c2..01fce278 100644 --- a/tests/test_metric_utilities.py +++ b/tests/test_metric_utilities.py @@ -127,6 +127,31 @@ def test_nse(): yhat = np.array([3,3,3,3,3])) assert val == 0.0 +def test_kge(): + """Test the kge metric.""" + val = mu.kge(y = np.array([1,2,3,4,5]), + yhat = np.array([1,2,3,4,5])) + assert_close(val, 1.0) + + val = mu.kge(y = np.array([1,2,3,4,5]), + yhat = np.array([3,3,3,3,3])) + assert_close(val, (1-2**0.5)) + +def test_inf(): + """Test metrics handling of invalid coefficients.""" + val = mu.kge(y = np.array([3,3,3,3,3]), + yhat = np.array([1,2,3,4,5])) + assert val == np.inf + + val = mu.nse(y = np.array([3,3,3,3,3]), + yhat = np.array([1,2,3,4,5])) + assert val == np.inf + + val = mu.pbias(y = np.array([-3,-3,0,3,3]), + yhat = np.array([1,2,3,4,5])) + assert val == np.inf + + def test_outlet_nse_flow(): """Test the outlet_nse_flow metric.""" # Load data @@ -153,20 +178,24 @@ def test_outlet_nse_flow(): # Calculate NSE (perfect results) val = mu.metrics.outlet_nse_flow(synthetic_G = G, + synthetic_subs = None, synthetic_results = results, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 1.0 # Calculate NSE (mean results) results_ = results.copy() results_.loc[[0,2],'value'] = 7.5 val = mu.metrics.outlet_nse_flow(synthetic_G = G, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 0.0 # Change the graph @@ -180,20 +209,24 @@ def test_outlet_nse_flow(): # Calculate NSE (mean results) val = mu.metrics.outlet_nse_flow(synthetic_G = G_, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 0.0 # Test time interpolation results_.loc[2,'date'] = pd.to_datetime('2021-01-01 00:00:10') results_.loc[[0,2], 'value'] = [0,30] val = mu.metrics.outlet_nse_flow(synthetic_G = G_, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == -15.0 def test_outlet_nse_flooding(): @@ -238,20 +271,24 @@ def test_outlet_nse_flooding(): # Calculate NSE (perfect results) val = mu.metrics.outlet_nse_flooding(synthetic_G = G, + synthetic_subs = None, synthetic_results = results, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 1.0 # Calculate NSE (mean results) results_ = results.copy() results_.loc[results_.id.isin([770549936, 25472468]),'value'] = [14.5 / 4] * 4 val = mu.metrics.outlet_nse_flooding(synthetic_G = G, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 0.0 # Change the graph @@ -265,10 +302,12 @@ def test_outlet_nse_flooding(): # Calculate NSE (mean results) val = mu.metrics.outlet_nse_flooding(synthetic_G = G_, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 0.0 # Test time interpolation @@ -277,9 +316,11 @@ def test_outlet_nse_flooding(): val = mu.metrics.outlet_nse_flooding(synthetic_G = G_, synthetic_results = results_, + synthetic_subs = None, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 0.0 def test_design_params(): @@ -430,10 +471,12 @@ def test_subcatchment_nse_flooding(): # Calculate NSE (perfect results) val = mu.metrics.subcatchment_nse_flooding(synthetic_G = G, + synthetic_subs = None, real_G = G, synthetic_results = results, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 1.0 # Calculate NSE (remapped node) @@ -450,14 +493,17 @@ def test_subcatchment_nse_flooding(): results_.id = results_.id.replace(mapping) val = mu.metrics.subcatchment_nse_flooding(synthetic_G = G_, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 1.0 # Test gridded val = mu.metrics.grid_nse_flooding(synthetic_G = G_, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results,