diff --git a/swmmanywhere/metric_utilities.py b/swmmanywhere/metric_utilities.py index fb2406a6..402c4dbd 100644 --- a/swmmanywhere/metric_utilities.py +++ b/swmmanywhere/metric_utilities.py @@ -67,8 +67,6 @@ def __getattr__(self, name): return self[name] except KeyError: raise AttributeError(f"{name} NOT in the metric registry!") - - metrics = MetricRegistry() def iterate_metrics(synthetic_results: pd.DataFrame, @@ -118,16 +116,187 @@ def extract_var(df: pd.DataFrame, df_.date.min()).dt.total_seconds() return df_ -def align_calc_nse(synthetic_results: pd.DataFrame, +# Restriction registry +restriction_registry = {} + +def register_restriction(restriction_func: Callable): + """Register a restriction function. + + Register a restriction function to the restriction_registry. A restriction + allows for the restriction of certain combinations of variables within the + metric_factory. The function should take three arguments, 'scale', 'metric', + and 'variable', and should raise a ValueError if the combination is not + allowed. The function should be registered with the '@register_restriction'. + + Args: + restriction_func (Callable): The restriction function to register. + """ + name = restriction_func.__name__ + + # Check if the function is already registered + if name in restriction_registry: + raise ValueError(f"Restriction function '{name}' already registered.") + + # Validate the restriction + args = list(get_type_hints(restriction_func).keys()) + if args != ['scale', 'metric', 'variable']: + raise ValueError(f"""Restriction {restriction_func.__name__} requires + args ('scale', 'metric', 'variable').""") + + # Add the function to the registry + restriction_registry[name] = restriction_func + return restriction_func + +@register_restriction +def restriction_on_scale(scale: str, + metric: str, + variable: str): + """Restriction on scale. + + Restrict the design variables to the outlet scale if the metric is 'pbias'. + + Args: + scale (str): The scale of the metric. + metric (str): The metric. + variable (str): The variable. + """ + if variable in ('length', 'nmanholes', 'npipes') and scale != 'outlet': + raise ValueError(f"Variable {variable} only supported at the outlet scale") + +@register_restriction +def restriction_on_metric(scale: str, + metric: str, + variable: str): + """Restriction on metric. + + Restrict the variable to 'flow' if the metric is 'pbias'. + + Args: + scale (str): The scale of the metric. + metric (str): The metric. + variable (str): The variable. + """ + if variable in ('length', 'nmanholes', 'npipes') and metric != 'pbias': + raise ValueError(f"Variable {variable} only valid with pbias metric") + + +# Coefficient Registry +coef_registry = {} + +def register_coef(coef_func: Callable): + """Register a coefficient function. + + Register a coefficient function to the coef_registry. The function should + take two arguments, 'y' and 'yhat', and return a float. The function should + be registered with the '@register_coef' decorator. + + Args: + coef_func (Callable): The coefficient function to register. + """ + name = coef_func.__name__ + + # Check if the function is already registered + if name in coef_registry: + raise ValueError(f"Coefficient function '{name}' already registered.") + + # Validate the function + args = list(get_type_hints(coef_func).keys()) + if 'y' != args[0] or 'yhat' != args[1]: + raise ValueError(f"Coef {coef_func.__name__} requires args ('y', 'yhat').") + + # Add the function to the registry + coef_registry[name] = coef_func + return coef_func + +@register_coef +def pbias(y: np.ndarray, + yhat: np.ndarray) -> float: + r"""Percentage bias, PBIAS. + + Calculate the percent bias: + + .. math:: + + pbias = \\frac{{\sum(synthetic) - \sum(real)}}{{\sum(real)}} + + where: + - :math:`synthetic` is the synthetic data, + - :math:`real` is the real data. + + Args: + y (np.ndarray): The real data. + yhat (np.ndarray): The synthetic data. + + Returns: + float: The PBIAS value. + """ + total_observed = y.sum() + if total_observed == 0: + return np.inf + return (yhat.sum() - total_observed) / total_observed + +@register_coef +def nse(y: np.ndarray, + yhat: np.ndarray) -> float: + """Calculate Nash-Sutcliffe efficiency (NSE). + + Args: + y (np.array): Observed data array. + yhat (np.array): Simulated data array. + + Returns: + float: The NSE value. + """ + if np.std(y) == 0: + return np.inf + return 1 - np.sum(np.square(y - yhat)) / np.sum(np.square(y - np.mean(y))) + +@register_coef +def kge(y: np.ndarray,yhat: np.ndarray) -> float: + """Calculate the Kling-Gupta Efficiency (KGE) between simulated and observed data. + + Args: + y (np.array): Observed data array. + yhat (np.array): Simulated data array. + + Returns: + float: The KGE value. + """ + if (np.std(y) == 0) | (np.mean(y) == 0): + return np.inf + if np.std(yhat) == 0: + r = 0 + else: + r = np.corrcoef(yhat, y)[0, 1] + alpha = np.std(yhat) / np.std(y) + beta = np.mean(yhat) / np.mean(y) + kge = 1 - np.sqrt((r - 1)**2 + (alpha - 1)**2 + (beta - 1)**2) + return kge + +def align_calc_coef(synthetic_results: pd.DataFrame, real_results: pd.DataFrame, variable: str, syn_ids: list, - real_ids: list) -> float | None: - """Align and calculate NSE. + real_ids: list, + coef_func: Callable = nse) -> float: + """Align and calculate coef_func. - Align the synthetic and real data and calculate the Nash-Sutcliffe - efficiency (NSE) of the variable over time. In cases where the synthetic + Aggregate synthetic and real results by date for specifics ids. + Align the synthetic and real dates and calculate the coef_func metric + of the variable over time. In cases where the synthetic data is does not overlap the real data, the value is interpolated. + + Args: + synthetic_results (pd.DataFrame): The synthetic results. + real_results (pd.DataFrame): The real results. + variable (str): The variable to align and calculate coef_func for. + syn_ids (list): The ids of the synthetic data to subselect for. + real_ids (list): The ids of the real data to subselect for. + coef_func (Callable, optional): The coefficient to calculate. + Defaults to nse. + + Returns: + float: The coef_func value. """ synthetic_results = synthetic_results.copy() real_results = real_results.copy() @@ -163,8 +332,8 @@ def align_calc_nse(synthetic_results: pd.DataFrame, df['value_syn'] = df.value_syn.interpolate().to_numpy() df = df.dropna(subset=['value_real']) - # Calculate NSE - return nse(df.value_real, df.value_syn) + # Calculate coef_func + return coef_func(df.value_real, df.value_syn) def create_subgraph(G: nx.Graph, nodes: list) -> nx.Graph: @@ -195,27 +364,22 @@ def create_subgraph(G: nx.Graph, SG.graph.update(G.graph) return SG -def nse(y: np.ndarray, - yhat: np.ndarray) -> float | None: - """Calculate Nash-Sutcliffe efficiency (NSE).""" - if np.std(y) == 0: - return np.inf - return 1 - np.sum((y - yhat)**2) / np.sum((y - np.mean(y))**2) - -def median_nse_by_group(results: pd.DataFrame, - gb_key: str) -> float | None: - """Median NSE by group. +def median_coef_by_group(results: pd.DataFrame, + gb_key: str, + coef_func: Callable = nse) -> float: + """Median coef_func value by group. - Calculate the median Nash-Sutcliffe efficiency (NSE) of a variable over time + Calculate the median coef_func value of a variable over time for each group in the results dataframe, and return the median of these values. Args: results (pd.DataFrame): The results dataframe. gb_key (str): The column to group by. + coef_func (Callable): The coefficient to calculate. Default is nse. Returns: - float: The median NSE. + float: The median coef_func value. """ val = ( results @@ -223,11 +387,10 @@ def median_nse_by_group(results: pd.DataFrame, .sum() .reset_index() .groupby(gb_key) - .apply(lambda x: nse(x.value_real, x.value_sim)) + .apply(lambda x: coef_func(x.value_real, x.value_sim)) .median() ) - if not np.isfinite(val): - return None + return val @@ -420,6 +583,262 @@ def create_grid(bbox: tuple, return gpd.GeoDataFrame(grid) +scale_registry = {} +def register_scale(scale_func: Callable): + """Register a scale function. + + Register a scale function to the scale_registry. The function should + take the same arguments as the scale functions and return a float. The + function should be registered with the '@register_scale' decorator. A scale + function is called as a metric, but with some additional arguments provided + (i.e., the variable name and the coefficient function to use). The function + should return a float. + + Args: + scale_func (Callable): The scale function to register. + """ + name = scale_func.__name__ + + # Check if the function is already registered + if name in scale_registry: + raise ValueError(f"Scale function '{name}' already registered.") + + # Validate the function + args = list(get_type_hints(scale_func).keys()) + if args != ['synthetic_results', 'synthetic_subs', 'synthetic_G', + 'real_results', 'real_subs', 'real_G', 'metric_evaluation', + 'var', 'coef_func']: + raise ValueError(f"""Scale {scale_func.__name__} requires args + ('synthetic_results', 'synthetic_subs', 'synthetic_G', + 'real_results', 'real_subs', 'real_G', + 'metric_evaluation', 'var', 'coef_func').""") + + # Add the function to the registry + scale_registry[name] = scale_func + return scale_func + +@register_scale +def subcatchment(synthetic_results: pd.DataFrame, + synthetic_subs: gpd.GeoDataFrame, + synthetic_G: nx.Graph, + real_results: pd.DataFrame, + real_subs: gpd.GeoDataFrame, + real_G: nx.Graph, + metric_evaluation: MetricEvaluation, + var: str, + coef_func: Callable, + ): + """Subcatchment scale metric. + + Calculate the coefficient (coef_func) of a variable over time for aggregated + to real subcatchment scale. The metric produced is the median coef_func + across all subcatchments. + + Args: + synthetic_results (pd.DataFrame): The synthetic results. + synthetic_subs (gpd.GeoDataFrame): The synthetic subcatchments. + synthetic_G (nx.Graph): The synthetic graph. + real_results (pd.DataFrame): The real results. + real_subs (gpd.GeoDataFrame): The real subcatchments. + real_G (nx.Graph): The real graph. + metric_evaluation (MetricEvaluation): The metric evaluation parameters. + var (str): The variable to calculate the coefficient for. + coef_func (Callable): The coefficient to calculate. + + Returns: + float: The median coef_func value. + """ + results = align_by_shape(var, + synthetic_results = synthetic_results, + real_results = real_results, + shapes = real_subs, + synthetic_G = synthetic_G, + real_G = real_G) + + return median_coef_by_group(results, 'sub_id', coef_func=coef_func) + +@register_scale +def grid(synthetic_results: pd.DataFrame, + synthetic_subs: gpd.GeoDataFrame, + synthetic_G: nx.Graph, + real_results: pd.DataFrame, + real_subs: gpd.GeoDataFrame, + real_G: nx.Graph, + metric_evaluation: MetricEvaluation, + var: str, + coef_func: Callable, + ): + """Grid scale metric. + + Classify synthetic nodes to a grid and calculate the coef_func of a variable over + time for each grid cell. The metric produced is the median coef_func across all + grid cells. + + Args: + synthetic_results (pd.DataFrame): The synthetic results. + synthetic_subs (gpd.GeoDataFrame): The synthetic subcatchments. + synthetic_G (nx.Graph): The synthetic graph. + real_results (pd.DataFrame): The real results. + real_subs (gpd.GeoDataFrame): The real subcatchments. + real_G (nx.Graph): The real graph. + metric_evaluation (MetricEvaluation): The metric evaluation parameters. + var (str): The variable to calculate the coefficient for. + coef_func (Callable): The coefficient to calculate. + + Returns: + float: The median coef_func value. + """ + # Create a grid (GeoDataFrame of polygons) + scale = metric_evaluation.grid_scale + grid = create_grid(real_subs.total_bounds, + scale) + grid.crs = real_subs.crs + + # Align results + results = align_by_shape(var, + synthetic_results = synthetic_results, + real_results = real_results, + shapes = grid, + synthetic_G = synthetic_G, + real_G = real_G) + # Calculate coefficient + return median_coef_by_group(results, 'sub_id', coef_func=coef_func) + +@register_scale +def outlet(synthetic_results: pd.DataFrame, + synthetic_subs: gpd.GeoDataFrame, + synthetic_G: nx.Graph, + real_results: pd.DataFrame, + real_subs: gpd.GeoDataFrame, + real_G: nx.Graph, + metric_evaluation: MetricEvaluation, + var: str, + coef_func: Callable, + ): + """Outlet scale metric. + + Calculate the coefficient of a variable for the subgraph that + drains to the dominant outlet node. The dominant outlet node of the 'real' + network is calculated by dominant_outlet, while the dominant outlet node of + the 'synthetic' network is calculated by best_outlet_match. + + Args: + synthetic_results (pd.DataFrame): The synthetic results. + synthetic_subs (gpd.GeoDataFrame): The synthetic subcatchments. + synthetic_G (nx.Graph): The synthetic graph. + real_results (pd.DataFrame): The real results. + real_subs (gpd.GeoDataFrame): The real subcatchments. + real_G (nx.Graph): The real graph. + metric_evaluation (MetricEvaluation): The metric evaluation parameters. + var (str): The variable to calculate the coefficient for. + coef_func (Callable): The coefficient to calculate. + + Returns: + float: The median coef_func value. + """ + # Identify synthetic and real arcs that flow into the best outlet node + sg_syn, syn_outlet = best_outlet_match(synthetic_G, real_subs) + sg_real, real_outlet = dominant_outlet(real_G, real_results) + + allowable_var = ['nmanholes', 'npipes', 'length', 'flow', 'flooding'] + if var not in allowable_var: + raise ValueError(f"Invalid variable {var}. Can be {allowable_var}") + + if var == 'nmanholes': + # Calculate the coefficient based on the number of manholes + return coef_func(np.atleast_1d(sg_real.number_of_nodes()), + np.atleast_1d(sg_syn.number_of_nodes())) + if var == 'npipes': + # Calculate the coefficient based on the number of pipes + return coef_func(np.atleast_1d(sg_real.number_of_edges()), + np.atleast_1d(sg_syn.number_of_edges())) + if var == 'length': + # Calculate the coefficient based on the total length of the pipes + return coef_func( + np.array( + list(nx.get_edge_attributes(sg_real, 'length').values()) + ), + np.array( + list(nx.get_edge_attributes(sg_syn, 'length').values()) + ) + ) + if var == 'flow': + # Identify synthetic and real arcs that flow into the best outlet node + syn_arc = [d['id'] for u,v,d in synthetic_G.edges(data=True) + if v == syn_outlet] + real_arc = [d['id'] for u,v,d in real_G.edges(data=True) + if v == real_outlet] + elif var == 'flooding': + # Use all nodes in the subgraphs + syn_arc = list(sg_syn.nodes) + real_arc = list(sg_real.nodes) + + # Calculate the coefficient + return align_calc_coef(synthetic_results, + real_results, + var, + syn_arc, + real_arc, + coef_func = coef_func) + +def metric_factory(name: str): + """Create a metric function. + + A factory function to create a metric function based on the name. The first + part of the name is the scale, the second part is the metric, and the third + part is the variable. For example, 'grid_nse_flooding' is a metric function + that calculates the NSE of flooding at the grid scale. + + Args: + name (str): The name of the metric. + + Returns: + Callable: The metric function. + """ + # Split the name + parts = name.split('_') + if len(parts) != 3: + raise ValueError("Invalid metric name. Expected 'scale_metric_variable'") + scale, metric, variable = parts + + # Get coefficient + coef_func = coef_registry[metric] + + # Get scale + func = scale_registry[scale] + + # Validate the metric + for restriction in restriction_registry.values(): + restriction(scale, metric, variable) + + # Create the metric function + def new_metric(**kwargs): + return func(coef_func = coef_func, + var = variable, + **kwargs) + new_metric.__name__ = name + return new_metric + +metrics.register(metric_factory('outlet_nse_flow')) +metrics.register(metric_factory('outlet_kge_flow')) +metrics.register(metric_factory('outlet_pbias_flow')) + +metrics.register(metric_factory('outlet_pbias_length')) +metrics.register(metric_factory('outlet_pbias_npipes')) +metrics.register(metric_factory('outlet_pbias_nmanholes')) + +metrics.register(metric_factory('outlet_nse_flooding')) +metrics.register(metric_factory('outlet_kge_flooding')) +metrics.register(metric_factory('outlet_pbias_flooding')) + +metrics.register(metric_factory('grid_nse_flooding')) +metrics.register(metric_factory('grid_kge_flooding')) +metrics.register(metric_factory('grid_pbias_flooding')) + +metrics.register(metric_factory('subcatchment_nse_flooding')) +metrics.register(metric_factory('subcatchment_kge_flooding')) +metrics.register(metric_factory('subcatchment_pbias_flooding')) + @metrics.register def nc_deltacon0(synthetic_G: nx.Graph, real_G: nx.Graph, @@ -536,60 +955,6 @@ def kstest_betweenness( return stats.ks_2samp(list(syn_betweenness.values()), list(real_betweenness.values())).statistic -@metrics.register -def outlet_nse_flow(synthetic_G: nx.Graph, - synthetic_results: pd.DataFrame, - real_G: nx.Graph, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float | None: - """Outlet NSE flow. - - Calculate the Nash-Sutcliffe efficiency (NSE) of flow over time, where flow - is measured as the total flow of all arcs that drain to the 'dominant' - outlet node. The dominant outlet node of the 'real' network is calculated by - dominant_outlet, while the dominant outlet node of the 'synthetic' network - is calculated by best_outlet_match. - """ - # Identify synthetic and real arcs that flow into the best outlet node - _, syn_outlet = best_outlet_match(synthetic_G, real_subs) - syn_arc = [d['id'] for u,v,d in synthetic_G.edges(data=True) - if v == syn_outlet] - _, real_outlet = dominant_outlet(real_G, real_results) - real_arc = [d['id'] for u,v,d in real_G.edges(data=True) - if v == real_outlet] - - return align_calc_nse(synthetic_results, - real_results, - 'flow', - syn_arc, - real_arc) - -@metrics.register -def outlet_nse_flooding(synthetic_G: nx.Graph, - synthetic_results: pd.DataFrame, - real_G: nx.Graph, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float | None: - """Outlet NSE flooding. - - Calculate the Nash-Sutcliffe efficiency (NSE) of flooding over time, where - flooding is the total volume of flooded water across all nodes that drain - to the 'dominant' outlet node. The dominant outlet node of the 'real' - network is calculated by dominant_outlet, while the dominant outlet node of - the 'synthetic' network is calculated by best_outlet_match. - """ - # Identify synthetic and real outlet arcs - sg_syn, _ = best_outlet_match(synthetic_G, real_subs) - sg_real, _ = dominant_outlet(real_G, real_results) - - return align_calc_nse(synthetic_results, - real_results, - 'flooding', - list(sg_syn.nodes), - list(sg_real.nodes)) - @metrics.register def outlet_kstest_diameters(real_G: nx.Graph, synthetic_G: nx.Graph, @@ -611,148 +976,4 @@ def outlet_kstest_diameters(real_G: nx.Graph, syn_diameters = nx.get_edge_attributes(sg_syn, 'diameter') real_diameters = nx.get_edge_attributes(sg_real, 'diameter') return stats.ks_2samp(list(syn_diameters.values()), - list(real_diameters.values())).statistic - -@metrics.register -def outlet_pbias_length(real_G: nx.Graph, - synthetic_G: nx.Graph, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float: - r"""Outlet PBIAS length. - - Calculate the percent bias of the total edge length in the subgraph that - drains to the dominant outlet node. The dominant outlet node of the 'real' - network is calculated by dominant_outlet, while the dominant outlet node of - the 'synthetic' network is calculated by best_outlet_match. - - The percentage bias is calculated as: - - .. math:: - - pbias = \\frac{{syn\_length - real\_length}}{{real\_length}} - - where: - - :math:`syn\_length` is the synthetic length, - - :math:`real\_length` is the real length. - """ - # Identify synthetic and real outlet arcs - sg_syn, _ = best_outlet_match(synthetic_G, real_subs) - sg_real, _ = dominant_outlet(real_G, real_results) - - # Calculate the percent bias - syn_length = sum([d['length'] for u,v,d in sg_syn.edges(data=True)]) - real_length = sum([d['length'] for u,v,d in sg_real.edges(data=True)]) - return (syn_length - real_length) / real_length - -@metrics.register -def outlet_pbias_nmanholes(real_G: nx.Graph, - synthetic_G: nx.Graph, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float: - r"""Outlet PBIAS number of manholes (nodes). - - Calculate the percent bias of the total number of nodes in the subgraph - that drains to the dominant outlet node. The dominant outlet node of the - 'real' network is calculated by dominant_outlet, while the dominant outlet - node of the 'synthetic' network is calculated by best_outlet_match. - - The percentage bias is calculated as: - - .. math:: - - pbias = \\frac{{syn\_nnodes - real\_nnodes}}{{real\_nnodes}} - - where: - - :math:`syn\_nnodes` is the number of synthetic nodes, - - :math:`real\_nnodes` is the real number of nodes. - """ - # Identify synthetic and real outlet arcs - sg_syn, _ = best_outlet_match(synthetic_G, real_subs) - sg_real, _ = dominant_outlet(real_G, real_results) - - return (sg_syn.number_of_nodes() - sg_real.number_of_nodes()) \ - / sg_real.number_of_nodes() - -@metrics.register -def outlet_pbias_npipes(real_G: nx.Graph, - synthetic_G: nx.Graph, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float: - r"""Outlet PBIAS number of pipes (edges). - - Calculate the percent bias of the total number of edges in the subgraph - that drains to the dominant outlet node. The dominant outlet node of the - 'real' network is calculated by dominant_outlet, while the dominant outlet - node of the 'synthetic' network is calculated by best_outlet_match. - - - The percentage bias is calculated as: - - .. math:: - - pbias = \\frac{{syn\_nedges - real\_nedges}}{{real\_nedges}} - - where: - - :math:`syn\_nedges` is the number of synthetic edges, - - :math:`real\_nedges` is the real number of edges. - """ - # Identify synthetic and real outlet arcs - sg_syn, _ = best_outlet_match(synthetic_G, real_subs) - sg_real, _ = dominant_outlet(real_G, real_results) - - return (sg_syn.number_of_edges() - sg_real.number_of_edges()) \ - / sg_real.number_of_edges() - - -@metrics.register -def subcatchment_nse_flooding(synthetic_G: nx.Graph, - real_G: nx.Graph, - synthetic_results: pd.DataFrame, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - **kwargs) -> float | None: - """Subcatchment NSE flooding. - - Classify synthetic nodes to real subcatchments and calculate the NSE of - flooding over time for each subcatchment. The metric produced is the median - NSE across all subcatchments. - """ - results = align_by_shape('flooding', - synthetic_results = synthetic_results, - real_results = real_results, - shapes = real_subs, - synthetic_G = synthetic_G, - real_G = real_G) - - return median_nse_by_group(results, 'sub_id') - -@metrics.register -def grid_nse_flooding(synthetic_G: nx.Graph, - real_G: nx.Graph, - synthetic_results: pd.DataFrame, - real_results: pd.DataFrame, - real_subs: gpd.GeoDataFrame, - metric_evaluation: MetricEvaluation, - **kwargs) -> float | None: - """Grid NSE flooding. - - Classify synthetic nodes to a grid and calculate the NSE of - flooding over time for each grid cell. The metric produced is the median - NSE across all grid cells. - """ - scale = metric_evaluation.grid_scale - grid = create_grid(real_subs.total_bounds, - scale) - grid.crs = real_subs.crs - - results = align_by_shape('flooding', - synthetic_results = synthetic_results, - real_results = real_results, - shapes = grid, - synthetic_G = synthetic_G, - real_G = real_G) - - return median_nse_by_group(results, 'sub_id') + list(real_diameters.values())).statistic \ No newline at end of file diff --git a/swmmanywhere/paper/experimenter.py b/swmmanywhere/paper/experimenter.py index 88760139..69543854 100644 --- a/swmmanywhere/paper/experimenter.py +++ b/swmmanywhere/paper/experimenter.py @@ -12,7 +12,6 @@ from pathlib import Path import pandas as pd -import toolz as tlz from SALib.sample import sobol # Set the number of threads to 1 to avoid conflicts with parallel processing @@ -137,17 +136,14 @@ def process_parameters(jobid: int, df = pd.DataFrame(X) gb = df.groupby('iter') - + n_iter = len(gb) flooding_results = {} - nproc = nproc if nproc is not None else len(X) + nproc = nproc if nproc is not None else n_iter # Assign jobs based on jobid - job_iter = tlz.partition_all(nproc, range(len(X))) - for _ in range(jobid + 1): - job_idx = next(job_iter, None) - - if job_idx is None: - raise ValueError(f"Jobid {jobid} is required.") + if jobid >= nproc: + raise ValueError("Jobid should be less than the number of processors.") + job_idx = range(jobid, n_iter, nproc) config = config_base.copy() diff --git a/swmmanywhere/paper/submit_icl_example b/swmmanywhere/paper/submit_icl_example index 212f0880..ca3fe1c7 100644 --- a/swmmanywhere/paper/submit_icl_example +++ b/swmmanywhere/paper/submit_icl_example @@ -19,4 +19,4 @@ cd $PBS_O_WORKDIR # Run program, passing the index of this subjob within the array -python experimenter.py $PBS_ARRAY_INDEX 7000 /rds/general/user/bdobson/ephemeral/swmmanywhere/cranbrook/cranbrook_hpc.yml +python experimenter.py --jobid=$PBS_ARRAY_INDEX --nproc=7000 --config_path=/rds/general/user/bdobson/ephemeral/swmmanywhere/cranbrook/cranbrook_hpc.yml diff --git a/tests/test_metric_utilities.py b/tests/test_metric_utilities.py index d5a6e1c2..e81d5eef 100644 --- a/tests/test_metric_utilities.py +++ b/tests/test_metric_utilities.py @@ -6,6 +6,7 @@ import networkx as nx import numpy as np import pandas as pd +import pytest import shapely from swmmanywhere import metric_utilities as mu @@ -127,6 +128,31 @@ def test_nse(): yhat = np.array([3,3,3,3,3])) assert val == 0.0 +def test_kge(): + """Test the kge metric.""" + val = mu.kge(y = np.array([1,2,3,4,5]), + yhat = np.array([1,2,3,4,5])) + assert_close(val, 1.0) + + val = mu.kge(y = np.array([1,2,3,4,5]), + yhat = np.array([3,3,3,3,3])) + assert_close(val, (1-2**0.5)) + +def test_inf(): + """Test metrics handling of invalid coefficients.""" + val = mu.kge(y = np.array([3,3,3,3,3]), + yhat = np.array([1,2,3,4,5])) + assert val == np.inf + + val = mu.nse(y = np.array([3,3,3,3,3]), + yhat = np.array([1,2,3,4,5])) + assert val == np.inf + + val = mu.pbias(y = np.array([-3,-3,0,3,3]), + yhat = np.array([1,2,3,4,5])) + assert val == np.inf + + def test_outlet_nse_flow(): """Test the outlet_nse_flow metric.""" # Load data @@ -153,20 +179,24 @@ def test_outlet_nse_flow(): # Calculate NSE (perfect results) val = mu.metrics.outlet_nse_flow(synthetic_G = G, + synthetic_subs = None, synthetic_results = results, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 1.0 # Calculate NSE (mean results) results_ = results.copy() results_.loc[[0,2],'value'] = 7.5 val = mu.metrics.outlet_nse_flow(synthetic_G = G, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 0.0 # Change the graph @@ -180,20 +210,24 @@ def test_outlet_nse_flow(): # Calculate NSE (mean results) val = mu.metrics.outlet_nse_flow(synthetic_G = G_, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 0.0 # Test time interpolation results_.loc[2,'date'] = pd.to_datetime('2021-01-01 00:00:10') results_.loc[[0,2], 'value'] = [0,30] val = mu.metrics.outlet_nse_flow(synthetic_G = G_, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == -15.0 def test_outlet_nse_flooding(): @@ -238,20 +272,24 @@ def test_outlet_nse_flooding(): # Calculate NSE (perfect results) val = mu.metrics.outlet_nse_flooding(synthetic_G = G, + synthetic_subs = None, synthetic_results = results, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 1.0 # Calculate NSE (mean results) results_ = results.copy() results_.loc[results_.id.isin([770549936, 25472468]),'value'] = [14.5 / 4] * 4 val = mu.metrics.outlet_nse_flooding(synthetic_G = G, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 0.0 # Change the graph @@ -265,10 +303,12 @@ def test_outlet_nse_flooding(): # Calculate NSE (mean results) val = mu.metrics.outlet_nse_flooding(synthetic_G = G_, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 0.0 # Test time interpolation @@ -277,9 +317,11 @@ def test_outlet_nse_flooding(): val = mu.metrics.outlet_nse_flooding(synthetic_G = G_, synthetic_results = results_, + synthetic_subs = None, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 0.0 def test_design_params(): @@ -430,10 +472,12 @@ def test_subcatchment_nse_flooding(): # Calculate NSE (perfect results) val = mu.metrics.subcatchment_nse_flooding(synthetic_G = G, + synthetic_subs = None, real_G = G, synthetic_results = results, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 1.0 # Calculate NSE (remapped node) @@ -450,14 +494,17 @@ def test_subcatchment_nse_flooding(): results_.id = results_.id.replace(mapping) val = mu.metrics.subcatchment_nse_flooding(synthetic_G = G_, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, - real_subs = subs) + real_subs = subs, + metric_evaluation = None) assert val == 1.0 # Test gridded val = mu.metrics.grid_nse_flooding(synthetic_G = G_, + synthetic_subs = None, synthetic_results = results_, real_G = G, real_results = results, @@ -469,4 +516,14 @@ def test_create_grid(): """Test the create_grid function.""" grid = mu.create_grid((0,0,1,1), 1/3 - 0.001) assert grid.shape[0] == 16 - assert set(grid.columns) == {'sub_id','geometry'} \ No newline at end of file + assert set(grid.columns) == {'sub_id','geometry'} + +def test_restirctions(): + """Test the restriction register by generating an invalid metric.""" + # Invalid because length can't be calculated at grid scale + with pytest.raises(ValueError): + mu.metric_factory('grid_pbias_length') + + # Invalid because nmanholes can't be evaluated with nse + with pytest.raises(ValueError): + mu.metric_factory('outlet_nse_nmanholes') \ No newline at end of file