diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b9fd5f1..39435d4 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10"] steps: - name: Checkout repository and submodules uses: actions/checkout@v4 diff --git a/environment.yml b/environment.yml index 3a54c15..a70b750 100644 --- a/environment.yml +++ b/environment.yml @@ -9,5 +9,11 @@ dependencies: - jinja2 - click - psutil +- birdhouse-birdy +- tigramite +- tefs +- pandas +- scikit-learn +- numpy # tests - pytest diff --git a/hawk/analysis/__init__.py b/hawk/analysis/__init__.py new file mode 100644 index 0000000..a18465f --- /dev/null +++ b/hawk/analysis/__init__.py @@ -0,0 +1 @@ +from .main import CausalAnalysis # noqa \ No newline at end of file diff --git a/hawk/analysis/file_management.py b/hawk/analysis/file_management.py new file mode 100644 index 0000000..80cf141 --- /dev/null +++ b/hawk/analysis/file_management.py @@ -0,0 +1,28 @@ +import os +import pickle +from typing import Any + + +def save_to_pkl_file(target_file: str, data: Any, overwrite: bool = True) -> None: + # Check if the file already exists + if os.path.exists(target_file) and not overwrite: + raise ValueError(f"File {target_file} already exists.") + + # Create the directory and parent directories if they don't exist + os.makedirs(os.path.dirname(target_file), exist_ok=True) + + # Save the data to the file + with open(target_file, "wb") as f: + pickle.dump(data, f) + + +def load_from_pkl_file(source_file: str) -> Any: + # Check if the file exists + if not os.path.exists(source_file): + raise ValueError(f"File {source_file} does not exist.") + + # Load the data from the file + with open(source_file, "rb") as f: + data = pickle.load(f) + + return data diff --git a/hawk/analysis/main.py b/hawk/analysis/main.py new file mode 100644 index 0000000..b311416 --- /dev/null +++ b/hawk/analysis/main.py @@ -0,0 +1,265 @@ +import itertools +import os + +import pandas as pd +from tigramite.independence_tests.cmiknn import CMIknn +from tigramite.independence_tests.parcorr import ParCorr + +from .file_management import save_to_pkl_file +from .metrics import regression_analysis +from .pcmci_tools import initialize_tigramite_df +from .postprocessing import ( + run_postprocessing_pcmci, + run_postprocessing_tefs, + run_postprocessing_tefs_wrapper, +) +from .simulation import run_simulation_pcmci, run_simulation_tefs + + +class CausalAnalysis: + def __init__( + self, + df_train, + df_test, + target_column_name, + pcmci_test_choice, + pcmci_max_lag, + tefs_direction, + tefs_use_contemporary_features, + tefs_max_lag_features, + tefs_max_lag_target, + workdir, + response=None, + ): + self.response = response + + # Move target column as last column for the get_connected_variables + # function which requires it (TODO this would be interesting to be fixed) + df_train = df_train[[col for col in df_train.columns if col != target_column_name] + [target_column_name]] + df_test = df_test[[col for col in df_test.columns if col != target_column_name] + [target_column_name]] + + df_full = pd.concat([df_train, df_test], axis=0).reset_index(drop=True) + df_full_tigramite = initialize_tigramite_df(df_full) + + self.datasets = { + "normal": { + "full_tigramite": df_full_tigramite, + "full": df_full, + "train": df_train, + "test": df_test, + "var_names": df_train.columns.tolist(), + }, + } + + self.target_column_name = target_column_name + self.pcmci_test_choice = pcmci_test_choice + self.pcmci_max_lag = pcmci_max_lag + self.tefs_direction = tefs_direction + self.tefs_use_contemporary_features = tefs_use_contemporary_features + self.tefs_max_lag_features = tefs_max_lag_features + self.tefs_max_lag_target = tefs_max_lag_target + self.workdir = workdir + + self.tefs_features_lags = [] + if self.tefs_use_contemporary_features: + self.tefs_features_lags.append(0) + self.tefs_features_lags.extend(list(range(1, self.tefs_max_lag_features + 1))) + + self.tefs_target_lags = list(range(1, self.tefs_max_lag_target + 1)) + + self.pcmci_features_lags = list(range(0, self.pcmci_max_lag + 1)) + + self.baseline = None + self.plot_pcmci = None + self.details_pcmci = None + self.plot_tefs = None + self.details_tefs = None + self.plot_tefs_wrapper = None + self.details_tefs_wrapper = None + + def run_baseline_analysis(self): + baseline = {} + + features_names = self.datasets["normal"]["var_names"] + + configs = [] + + # Autoregressive baselines + for i in range(1, self.tefs_max_lag_target): + configs.append((f"AR({i})", {self.target_column_name: list(range(1, i + 1))})) + + # With all features + configs.append( + ( + "All features", + {feature: self.tefs_features_lags for feature in features_names}, + ) + ) + + for label, inputs_names_lags in configs: + baseline[label] = { + "inputs": inputs_names_lags, + "r2": regression_analysis( + inputs_names_lags=inputs_names_lags, + target_name=self.target_column_name, + df_train=self.datasets["normal"]["train"], + df_test=self.datasets["normal"]["test"], + ), + } + + target_file = os.path.join(self.workdir, "baseline.pkl") + save_to_pkl_file(target_file, baseline) + + return target_file + + def run_tefs_analysis( + self, + k=10, + threshold_forward=float("inf"), + threshold_backward=0, + ): + # Grid of options + + lagtarget_options = [self.tefs_target_lags[: i + 1] for i in range(len(self.tefs_target_lags))] + + lagfeatures_options = [self.tefs_features_lags[: i + 1] for i in range(len(self.tefs_features_lags))] + + if self.tefs_direction == "both": + directions = ["forward", "backward"] + else: + directions = [self.tefs_direction] + + dataset_names = [ + "normal", + ] + + # Create the configurations + configurations = [] + + for lagfeatures, lagtarget, direction, dataset_name in itertools.product( + lagfeatures_options, lagtarget_options, directions, dataset_names + ): + threshold = threshold_forward if direction == "forward" else threshold_backward + configuration = { + "params": { + "lagfeatures": lagfeatures, + "lagtarget": lagtarget, + "direction": direction, + "threshold": threshold, + "k": k, + }, + "dataset_name": dataset_name, + } + configurations.append(configuration) + + # Run the analysis + results = [] + for config in configurations: + results.append( + run_simulation_tefs( + datasets=self.datasets, + target_column_name=self.target_column_name, + config=config, + ) + ) + + return results + + def run_pcmci_analysis( + self, + ): + lag_options = self.pcmci_features_lags # max lag + + # Define the tests + parcorr = ParCorr(significance="analytic") + cmiknn = CMIknn( + significance="shuffle_test", + knn=0.1, + shuffle_neighbors=5, + transform="ranks", + sig_samples=200, + ) + + # Create the dictionary of tests + independence_tests = { + "parcorr": parcorr, + "cmiknn": cmiknn, + } + + if self.pcmci_test_choice == "ParCorr": + independence_tests_options = ["parcorr"] + elif self.pcmci_test_choice == "CMIknn": + independence_tests_options = ["cmiknn"] + + algorithm_options = [ + "pcmci_plus", + ] + + dataset_options = [ + "normal", + ] + + # Generating the configurations + configurations = [] + + for lag, independencetest, algorithm, dataset_name in itertools.product( + lag_options, independence_tests_options, algorithm_options, dataset_options + ): + configuration = { + "params": { + "lag": lag, + "independencetest": independencetest, + "algorithm": algorithm, + }, + "dataset_name": dataset_name, + } + configurations.append(configuration) + + # Run the analysis + results = [] + for config in configurations: + results.append( + run_simulation_pcmci( + datasets=self.datasets, + config=config, + independence_tests=independence_tests, + ) + ) + + return results + + def run(self): + if self.response: + self.response.update_status("Performing baseline analysis", 16) + self.baseline = self.run_baseline_analysis() + if self.response: + self.response.update_status("Performing TEFS analysis", 33) + tefs_results = self.run_tefs_analysis() + if self.response: + self.response.update_status("Performing PCMCI analysis", 66) + pcmci_results = self.run_pcmci_analysis() + + if self.response: + self.response.update_status("Postprocessing PCMCI", 80) + self.plot_pcmci, self.details_pcmci = run_postprocessing_pcmci( + results_pcmci=pcmci_results, + target_column_name=self.target_column_name, + datasets=self.datasets, + destination_path=self.workdir, + ) + if self.response: + self.response.update_status("Postprocessing TEFS", 90) + self.plot_tefs, self.details_tefs = run_postprocessing_tefs( + results_tefs=tefs_results, + target_column_name=self.target_column_name, + datasets=self.datasets, + destination_path=self.workdir, + ) + if self.response: + self.response.update_status("Postprocessing TEFS Wrapper", 95) + self.plot_tefs_wrapper, self.details_tefs_wrapper = run_postprocessing_tefs_wrapper( + results_tefs=tefs_results, + target_column_name=self.target_column_name, + datasets=self.datasets, + destination_path=self.workdir, + ) diff --git a/hawk/analysis/metrics.py b/hawk/analysis/metrics.py new file mode 100644 index 0000000..066f0c6 --- /dev/null +++ b/hawk/analysis/metrics.py @@ -0,0 +1,124 @@ +from typing import Any, Dict, Optional, Tuple + +import pandas as pd +from sklearn.linear_model import LinearRegression +from sklearn.metrics import r2_score +from sklearn.model_selection import BaseCrossValidator, cross_val_score + +inputs_names_lags_doc = """ +:param inputs_names_lags: A dictionary mapping input feature names to their corresponding list of lags. + For example, {'feature1': [1, 2], 'feature2': [1]} indicates 'feature1' should be lagged by 1 and 2 periods, + and 'feature2' by 1 period. +""" + +target_name_doc = """ +:param target_name: The name of the target variable in the DataFrame. +""" + + +def prepare_data_with_lags( + df: pd.DataFrame, + inputs_names_lags: Dict[str, list[int]], + target_name: str, +) -> Tuple[pd.DataFrame, pd.Series]: + f""" + Prepares data for regression by generating lagged features for specified variables and targets. + + :param df: The pandas DataFrame containing the time series data. + {inputs_names_lags_doc} + {target_name_doc} + :return: A tuple containing the lagged features DataFrame and the target variable Series. + """ + + required_columns = set([*inputs_names_lags.keys(), target_name]) + if not required_columns.issubset(set(df.columns)): + raise ValueError( + "DataFrame 'df' must contain all the columns specified in 'features_names' and 'targets_names'." + ) + + for lags in inputs_names_lags.values(): + if lags and min(lags) < 0: + raise ValueError("Lag for independent variables must be a non-negative integer.") + + # Initialize a list to hold all DataFrame chunks + lagged_chunks = [] + + # Generate lagged inputs for the independent variables + for input, lags in inputs_names_lags.items(): + for lag in lags: + lagged_chunk = df[input].shift(lag).to_frame(f"{input}_t-{lag}") + lagged_chunks.append(lagged_chunk) + + # Adding target column + lagged_chunks.append(df[target_name].to_frame(target_name)) + + # Concatenate chunks + df_lagged = pd.concat(lagged_chunks, axis=1) + + # Dropping rows with NaN values caused by shifting + df_lagged = df_lagged.dropna() + + return df_lagged.drop(columns=target_name), df_lagged[target_name] + + +def regression_analysis( + inputs_names_lags: Dict[str, list[int]], + target_name: str, + df: Optional[pd.DataFrame] = None, + cv_scheme: Optional[BaseCrossValidator] = None, + df_train: Optional[pd.DataFrame] = None, + df_test: Optional[pd.DataFrame] = None, +) -> Any: + f""" + Performs regression analysis with support for either cross-validation or a train-test split, + based on the arguments provided. + + {inputs_names_lags_doc} + {target_name_doc} + :param df: DataFrame for cross-validation mode. If specified, cv_scheme must also be provided. + :param cv_scheme: Cross-validator object for cross-validation mode. If specified, df must also be provided. + :param df_train: Training DataFrame for train-test split mode. Required if df_test is provided. + :param df_test: Testing DataFrame for train-test split mode. Requires df_train to be specified. + :return: Cross-validated scores or R-squared scores from train-test evaluation. + """ + + # Check that exactly one mode is specified + cross_val_mode = bool(df is not None and cv_scheme is not None) + train_test_mode = bool(df_train is not None and df_test is not None) + if not (cross_val_mode ^ train_test_mode): + raise ValueError( + "Specify either a 'cv_scheme' and 'df', or a train-test split with 'df_train' and 'df_test', not both." + ) + + if cross_val_mode: + if df is None or cv_scheme is None: + raise ValueError("Both 'df' and 'cv_scheme' must be specified for cross-validation mode.") + + X, y = prepare_data_with_lags( + df, + inputs_names_lags, + target_name, + ) + + model = LinearRegression() + return cross_val_score(model, X, y, cv=cv_scheme) + + elif train_test_mode: + if df_train is None or df_test is None: + raise ValueError("Both 'df_train' and 'df_test' must be specified for train-test split mode.") + + X_train, y_train = prepare_data_with_lags( + df_train, + inputs_names_lags, + target_name, + ) + + X_test, y_test = prepare_data_with_lags( + df_test, + inputs_names_lags, + target_name, + ) + + model = LinearRegression().fit(X_train, y_train) + y_pred = model.predict(X_test) + return r2_score(y_test, y_pred) diff --git a/hawk/analysis/pcmci_tools.py b/hawk/analysis/pcmci_tools.py new file mode 100644 index 0000000..6ce7a7c --- /dev/null +++ b/hawk/analysis/pcmci_tools.py @@ -0,0 +1,55 @@ +import numpy as np +import pandas as pd +from tigramite import data_processing as pp + + +def get_connected_variables(graph: np.ndarray, var_names: list[str]) -> list[str]: + """ + Get the variables connected to the target in the graph. + The target is assumed to be the last variable. + The connection is considered of any type: from, to, or undefined. + + :param graph: the graph of the PCMCI algorithm, i.e. what's returned by PCMCI.run_pcmci() + :param var_names: the names of the variables + """ + + assert len(graph.shape) == 3, "The graph must be a 3D array" + assert graph.shape[0] == graph.shape[1], "The graph must be square" + + # Inspecting the results object + # results['p_matrix'] + # results['val_matrix'] + # results['graph'] + # results['graph'][-1] # last element (target connections, target is always last) + + # in the array replace the empty with 0, otherwise it's 1 (there's a connection) + np.where(graph[-1] == "", 0, 1) + + # transpose it and add it to a dataframe with the variable names + # each row is a lag (when lag is 1, I have a row for lag 0 and one for lag 1) + target_connections = pd.DataFrame(np.where(graph[-1] == "", 0, 1).T, columns=var_names) + + # ignore autocorrelation (a connection from a variable to itself) + target_connections = target_connections.drop(var_names[-1], axis=1) + + # drop all columns with only zeros (no connection) and keep the names + connected_variables = list(target_connections.loc[:, (target_connections != 0).any(axis=0)].columns.values) + + return connected_variables + + +def initialize_tigramite_df(df: pd.DataFrame): + """ + Initialize a tigramite dataframe from a pandas dataframe + + :param df: pandas dataframe + :return: tigramite dataframe and variable names tuple + """ + + dataframe = pp.DataFrame( + df.values, + datatime={0: np.arange(len(df))}, + var_names=df.columns, + ) + + return dataframe diff --git a/hawk/analysis/postprocessing.py b/hawk/analysis/postprocessing.py new file mode 100644 index 0000000..8d27db5 --- /dev/null +++ b/hawk/analysis/postprocessing.py @@ -0,0 +1,548 @@ +import os +import re + +import matplotlib.colors as mcolors +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import seaborn as sns + +# from tigramite import plotting as tp +from .file_management import save_to_pkl_file +from .metrics import regression_analysis +from .pcmci_tools import get_connected_variables + + +# Adjusted custom sort key function to handle lag sequences and replace them with the last lag +def general_custom_sort_key(s): + # Find all sequences like [0,1], [1,2,3,5,6], etc. + sequences = re.findall(r"\[\d+(?:,\d+)*\]", s) + + # Process each found sequence + for seq in sequences: + # Split the sequence into individual numbers, convert to int, and take the last number + last_num = seq.strip("[]").split(",")[-1] + + # Replace the original sequence with a format that ensures correct sorting + s = s.replace(seq, f"[{last_num}]", 1) # Replace only the first occurrence to maintain structure + + return s + + +def plot_feature_presence_and_r2(df_presence, scores_values, scores_labels): + # Create a figure with two subplots + fig, (ax_bar, ax_heatmap) = plt.subplots( + 2, + gridspec_kw={"height_ratios": [3, 10]}, + sharex=True, + figsize=(12, 5), + ) + + # Plot the heatmap of R2 values + sns.heatmap( + np.stack(scores_values), + annot=True, + fmt=".3f", + ax=ax_bar, + cmap="Greens", + vmin=0, + vmax=0.5, + cbar=False, + linewidths=0.2, + linecolor="black", + clip_on=False, + annot_kws={"size": 8}, + yticklabels=scores_labels, + ) + # ax_bar.set_yticks([]) + ax_bar.set_title(r"$R^2$ Value of the configuration") + ax_bar.tick_params(left=True, bottom=False) + ax_bar.set_yticklabels(ax_bar.get_yticklabels(), rotation=0) + + cmap_presence = mcolors.ListedColormap(["white", "black", "#851010"]) + bounds = [0, 1, 2, 3] # Boundaries for 0 -> white, 1 -> black, 2 -> red (#851010) + norm = mcolors.BoundaryNorm(bounds, cmap_presence.N) + + # Plot the heatmap of presences + sns.heatmap( + df_presence, + cmap=cmap_presence, + norm=norm, + cbar=False, + ax=ax_heatmap, + linewidths=0.2, + linecolor="black", + clip_on=False, + ) + ax_heatmap.set_ylabel("Feature name") + ax_heatmap.set_xlabel("Simulation ID") + ax_heatmap.tick_params(left=True, bottom=False) + + return fig, (ax_bar, ax_heatmap) + + +def run_postprocessing_pcmci( + results_pcmci, + target_column_name, + datasets, + destination_path, +): + all_basin_variables = set() + results_table_pcmci = [] + for simulation in results_pcmci: + dataframe = datasets[simulation["dataset_name"]] + var_names = dataframe["var_names"] + all_basin_variables.update(var_names) + + results = simulation["results"] + + # Plot only the connections to any of the target variables + temp_graph = results["graph"].copy() + + # Show only the connections to the target variables + # Identify the indexes of the target variables + # target_vars = np.where(["target" in var for var in var_names.values])[0] + # for i in range(temp_graph.shape[0]): + # for j in range(temp_graph.shape[1]): + # # if the edge is not connected to the target variables + # if i not in target_vars and j not in target_vars: + # # remove the edge + # temp_graph[i, j, :] = '' + # temp_graph[j, i, :] = '' + + # Base arguments for tp.plot_graph + plot_args = { + "val_matrix": results["val_matrix"], + "graph": temp_graph, + "var_names": var_names, + "link_colorbar_label": "cross-MCI", + "node_colorbar_label": "auto-MCI", + "show_autodependency_lags": False, + } + + # Additional arguments to include if the independence_test is CMIknn + if simulation["params"]["independencetest"] == "cmiknn": + plot_args.update( + { + "vmin_edges": 0.0, + "vmax_edges": 0.1, + "edge_ticks": 0.05, + "cmap_edges": "OrRd", + "vmin_nodes": 0, + "vmax_nodes": 0.1, + "node_ticks": 0.1, + "cmap_nodes": "OrRd", + } + ) + + # Plot causal graph + # target_file = os.path.join(constants.path_figures, "algorithm_results", basin_name, "pcmci", key + ".pdf") + # if not os.path.exists(target_file): + # fig, ax = plt.subplots() + # tp.plot_graph(**plot_args, fig_ax=(fig, ax)) + # os.makedirs(os.path.dirname(target_file), exist_ok=True) + # plt.savefig(target_file, bbox_inches="tight") + # plt.close(fig) + + # # Plot time series graph if lag > 0 + # if simulation["params"]["lag"] > 0: + # target_file = os.path.join( + # constants.path_figures, "algorithm_results", basin_name, "pcmci", key + "_timeseries.pdf" + # ) + # if not os.path.exists(target_file): + # fig, ax = plt.subplots() + # tp.plot_time_series_graph( + # figsize=(6, 4), + # fig_ax=(fig, ax), + # val_matrix=results["val_matrix"], + # graph=results["graph"], + # var_names=var_names, + # link_colorbar_label="MCI", + # ) + # os.makedirs(os.path.dirname(target_file), exist_ok=True) + # plt.savefig(target_file, bbox_inches="tight") + # plt.close(fig) + + # Extract the selected features + selected_features = get_connected_variables(results["graph"], var_names) + + # Compute the R2 scores + inputs_names_lags = {feature: [0] for feature in selected_features} + score_r2 = ( + regression_analysis( + inputs_names_lags=inputs_names_lags, + target_name=target_column_name, + df_train=dataframe["train"], + df_test=dataframe["test"], + ) + if len(selected_features) > 0 + else np.nan + ) + + inputs_names_lags = {feature: list(range(0, simulation["params"]["lag"] + 1)) for feature in selected_features} + score_r2_lag = ( + regression_analysis( + inputs_names_lags=inputs_names_lags, + target_name=target_column_name, + df_train=dataframe["train"], + df_test=dataframe["test"], + ) + if len(selected_features) > 0 + else np.nan + ) + + inputs_names_lags = {feature: list(range(0, simulation["params"]["lag"] + 1)) for feature in selected_features} + inputs_names_lags[target_column_name] = list(range(1, simulation["params"]["lag"] + 1)) + score_r2_lag_ar = regression_analysis( + inputs_names_lags=inputs_names_lags, + target_name=target_column_name, + df_train=dataframe["train"], + df_test=dataframe["test"], + ) + + # Table of results + results_table_pcmci.append( + { + "selected_features": " ".join(selected_features), + "score_r2": score_r2, + "score_r2_lag": score_r2_lag, + "score_r2_lag_ar": score_r2_lag_ar, + "dataset_name": simulation["dataset_name"], + "algorithm": simulation["params"]["algorithm"], + "independencetest": simulation["params"]["independencetest"], + "lag": simulation["params"]["lag"], + "execution_time": simulation["execution_time"], + } + ) + + # Export the file to pkl + target_file_results_details = os.path.join(destination_path, "results_details_pcmci.pkl") + save_to_pkl_file(target_file_results_details, results_table_pcmci) + + # Feature presences heatmap + if target_column_name in all_basin_variables: + all_basin_variables.remove(target_column_name) + all_basin_variables = sorted(list(all_basin_variables)) + df_presence = pd.DataFrame(index=all_basin_variables, columns=range(len(results_pcmci))) + scores = [] + scores_lag = [] + scores_lag_ar = [] + + for index, result in enumerate(results_table_pcmci): + scores.append(result["score_r2"]) + scores_lag.append(result["score_r2_lag"]) + scores_lag_ar.append(result["score_r2_lag_ar"]) + + # loop through the rows of the df, if the feature is in the list of selected features, put a 1 + for feature in df_presence.index: + if feature in result["selected_features"]: + df_presence.loc[feature, index] = 1 + else: + df_presence.loc[feature, index] = 0 + if feature not in datasets[result["dataset_name"]]["var_names"]: + df_presence.loc[feature, index] = 2 + + df_presence = df_presence.astype(float) + scores = np.array(scores) + scores_lag = np.array(scores_lag) + scores_lag_ar = np.array(scores_lag_ar) + + fig, ax = plot_feature_presence_and_r2( + df_presence=df_presence, + scores_values=[scores, scores_lag, scores_lag_ar], + scores_labels=[r"$R^2$", r"$R^2$ (lag)", r"$R^2$ (lag + AR)"], + ) + target_file_plot = os.path.join(destination_path, "algorithm_results", "pcmci", "feature_presence.pdf") + os.makedirs(os.path.dirname(target_file_plot), exist_ok=True) + plt.savefig(target_file_plot, bbox_inches="tight") + plt.close(fig) + + return target_file_plot, target_file_results_details + + +def run_postprocessing_tefs( + results_tefs, + target_column_name, + datasets, + destination_path, +): + all_basin_variables = set() + results_table_te = [] + for simulation in results_tefs: + dataframe = datasets[simulation["dataset_name"]] + var_names = dataframe["var_names"] + all_basin_variables.update(var_names) + + results = simulation["results"] + lagfeatures = simulation["params"]["lagfeatures"] + lagtarget = simulation["params"]["lagtarget"] + + # Plot the results + # fig, ax = plt.subplots() + # results.plot_te_results(ax=ax) + # target_dir = os.path.join(constants.path_figures, "algorithm_results", basin_name, "te", key + ".pdf") + # os.makedirs(os.path.dirname(target_dir), exist_ok=True) + # plt.savefig(target_dir, bbox_inches="tight") + # plt.close(fig) + + # Extract the selected features + selected_features_names = results.select_features(simulation["params"]["threshold"]) + + # get the r2 score on the test set + inputs_names_lags = {feature: [0] for feature in selected_features_names} + score_r2 = ( + regression_analysis( + inputs_names_lags=inputs_names_lags, + target_name=target_column_name, + df_train=dataframe["train"], + df_test=dataframe["test"], + ) + if len(selected_features_names) > 0 + else np.nan + ) + + inputs_names_lags = {feature: lagfeatures for feature in selected_features_names} + score_r2_lag = ( + regression_analysis( + inputs_names_lags=inputs_names_lags, + target_name=target_column_name, + df_train=dataframe["train"], + df_test=dataframe["test"], + ) + if len(selected_features_names) > 0 + else np.nan + ) + + inputs_names_lags = {feature: lagfeatures for feature in selected_features_names} + inputs_names_lags[target_column_name] = lagtarget + score_r2_lag_ar = regression_analysis( + inputs_names_lags=inputs_names_lags, + target_name=target_column_name, # TODO change to use the target column name given by the user + df_train=dataframe["train"], + df_test=dataframe["test"], + ) + + # Table of results + results_table_te.append( + { + "selected_features": " ".join(selected_features_names), + "score_r2": score_r2, + "score_r2_lag": score_r2_lag, + "score_r2_lag_ar": score_r2_lag_ar, + "dataset_name": simulation["dataset_name"], + "lagfeatures": simulation["params"]["lagfeatures"], + "lagtarget": simulation["params"]["lagtarget"], + "direction": simulation["params"]["direction"], # not putting threshold and k + "execution_time": simulation["execution_time"], + } + ) + + # Export the file to pkl + target_file_results_details = os.path.join(destination_path, "results_details_te.pkl") + save_to_pkl_file(target_file_results_details, results_table_te) + + # Feature presences heatmap + if target_column_name in all_basin_variables: + all_basin_variables.remove(target_column_name) + all_basin_variables = sorted(list(all_basin_variables)) + df_presence = pd.DataFrame(index=all_basin_variables, columns=range(len(results_tefs))) + scores = [] + scores_lag = [] + scores_lag_ar = [] + + for index, result in enumerate(results_table_te): + scores.append(result["score_r2"]) + scores_lag.append(result["score_r2_lag"]) + scores_lag_ar.append(result["score_r2_lag_ar"]) + + # loop through the rows of the df, if the feature is in the list of selected features, put a 1 + for feature in df_presence.index: + if feature in result["selected_features"]: + df_presence.loc[feature, index] = 1 + else: + df_presence.loc[feature, index] = 0 + if feature not in datasets[result["dataset_name"]]["var_names"]: + df_presence.loc[feature, index] = 2 + + df_presence = df_presence.astype(float) + scores = np.array(scores) + scores_lag = np.array(scores_lag) + scores_lag_ar = np.array(scores_lag_ar) + + fig, ax = plot_feature_presence_and_r2( + df_presence=df_presence, + scores_values=[scores, scores_lag, scores_lag_ar], + scores_labels=[r"$R^2$", r"$R^2$ (lag)", r"$R^2$ (lag + AR)"], + ) + target_file_plot = os.path.join(destination_path, "algorithm_results", "te", "feature_presence.pdf") + os.makedirs(os.path.dirname(target_file_plot), exist_ok=True) + plt.savefig(target_file_plot, bbox_inches="tight") + plt.close(fig) + + return target_file_plot, target_file_results_details + + +def run_postprocessing_tefs_wrapper( + results_tefs, + target_column_name, + datasets, + destination_path, +): + results_table_tefs_wrapper = [] + target_file_train_test = os.path.join(destination_path, "tefs_as_wrapper", "wrapper.pdf") + # target_file_cv = os.path.join(constants.path_figures, "tefs_as_wrapper_cv", f"{basename}_wrapper_cv.pdf") + + fig, ax = plt.subplots(figsize=(10, 5)) + + for simulation in results_tefs: + # --------------------- Load corresponding dataset --------------------- + dataset_name = simulation["dataset_name"] + dataframe = datasets[dataset_name] + + features_columns = dataframe["full"].drop(columns=[target_column_name]).columns + + # --------------------- Select features using threshold (conservative) --------------------- + # selected_features_names_with_threshold = simulation["results"].select_features(simulation["params"]["threshold"]) # noqa + # n_features_selected_with_threshold = len(selected_features_names_with_threshold) # noqa + + # --------------------- Compute test R2 for each number of features --------------------- + test_r2_train_test = [] + # test_r2_cv = [] + num_total_features = len(features_columns) + for num_features in range(0, num_total_features + 1): + if num_features == 0: + selected_features_names = [] + else: + selected_features_names = simulation["results"].select_n_features(num_features) + + lagfeatures = simulation["params"]["lagfeatures"] + lagtarget = simulation["params"]["lagtarget"] + + inputs_names_lags = {feature: lagfeatures for feature in selected_features_names} + inputs_names_lags[target_column_name] = lagtarget + + # --- Compute the train_test version --- + test_r2_train_test.append( + regression_analysis( + inputs_names_lags=inputs_names_lags, + target_name=target_column_name, + df_train=dataframe["train"], + df_test=dataframe["test"], + ) + ) + + # # --- Compute the cross-validation version --- + # # To perform a cross-validation, we need to concatenate the train and test sets + # unified_df = pd.concat([dataframe["train"], dataframe["test"]], axis=0).reset_index(drop=True) + + # # Fixed window size + # # n_samples = unified_df.shape[0] + # # n_splits = 5 + # # cv_scheme = TimeSeriesSplit( + # # n_splits=n_splits, + # # max_train_size=n_samples // (n_splits + 1), + # # ) + + # # Regular KFold + # cv_scheme = KFold(n_splits=4) # 4 splits is about using the same test set size + + # test_r2_cv.append( + # regression_analysis( + # inputs_names_lags=inputs_names_lags, + # target_name=target_columns[0], + # df=unified_df, + # cv_scheme=cv_scheme, + # ) + # ) + + test_r2_train_test = np.array(test_r2_train_test) + # test_r2_cv = np.array(test_r2_cv) + + results_table_tefs_wrapper.append({"test_r2_train_test": test_r2_train_test}) + + # Export the file to pkl + target_file_results_details = os.path.join(destination_path, "results_details_tefs_wrapper.pkl") + save_to_pkl_file(target_file_results_details, results_table_tefs_wrapper) + + param_str = "_".join(f"{k}{v}" for k, v in simulation["params"].items()) + ax.plot(test_r2_train_test, marker="o", label=param_str) + # maxima = np.where(test_r2_train_test == test_r2_train_test.max())[0] + # ax.plot( + # maxima, + # test_r2_train_test[maxima], + # marker="o", + # color="red", + # linestyle="None", + # label="Maximum", + # markersize=6, + # ) + # ax.plot( + # n_features_selected_with_threshold, + # test_r2_train_test[n_features_selected_with_threshold], + # marker="o", + # color="green", + # linestyle="None", + # label="TEFS (conservative)", + # markersize=10, + # ) + + ax.set_xlabel("Number of features") + ax.set_ylabel("Test $R^2$") + ax.set_title("TEFS Wrapper") + ax.legend(title="Configurations", bbox_to_anchor=(1.05, 1), loc="upper left", borderaxespad=0) + if num_total_features < 30: + step = 1 + elif num_total_features < 80: + step = 5 + else: + step = 10 + ax.set_xticks(range(0, num_total_features + 1, step)) + ax.set_xticklabels(range(0, num_total_features + 1, step)) + ax.set_ylim(-0.1, 1.1) + ax.grid() + + os.makedirs(os.path.dirname(target_file_train_test), exist_ok=True) + plt.savefig(target_file_train_test, bbox_inches="tight") + plt.close(fig) + + return target_file_train_test, target_file_results_details + + # # --------------------- Plot cross-validation version --------------------- + # fig, ax = plt.subplots(figsize=(10, 5)) + # ax.plot(test_r2_cv.mean(axis=1), marker="o", label="Cross-validation") + # maxima = np.where(test_r2_cv.mean(axis=1) == test_r2_cv.mean(axis=1).max())[0] + # ax.plot(maxima, test_r2_cv.mean(axis=1)[maxima], marker="o", color="red", linestyle="None", label="Maximum", markersize=10) # noqa + # ax.plot(n_features_selected_with_threshold, test_r2_cv.mean(axis=1)[n_features_selected_with_threshold], marker="o", color="green", linestyle="None", label="TEFS (conservative)", markersize=10) # noqa + + # # plot confidence interval bands from cross-validation based on mean and standard deviation (90% confidence) + # alpha = 0.1 + # quantile = scipy.stats.norm.ppf(1 - alpha / 2) + # ax.fill_between(range(test_r2_cv.shape[0]), test_r2_cv.mean(axis=1) - test_r2_cv.std(axis=1) * quantile / np.sqrt(test_r2_cv.shape[1]), test_r2_cv.mean(axis=1) + test_r2_cv.std(axis=1) * quantile / np.sqrt(test_r2_cv.shape[1]), alpha=0.3) # noqa + + # ax.set_xlabel("Number of features") + # ax.set_ylabel("Test $R^2$") + + # if simulation["params"]["threshold"] == np.inf: + # threshold_text = "\infty" + # elif simulation["params"]["threshold"] == -np.inf: + # threshold_text = "-\infty" + # else: + # threshold_text = simulation["params"]["threshold"] + + # title_text = f"TEFS on basin {basin_name.upper()} with dataset {dataset_name}\n[lagfeatures $={simulation['params']['lagfeatures']}$, lagtarget $={simulation['params']['lagtarget']}$, direction = {simulation['params']['direction']}, threshold $={threshold_text}]$" # noqa + # ax.set_title(title_text) + # ax.legend() + # if num_total_features < 30: + # step = 1 + # elif num_total_features < 80: + # step = 5 + # else: + # step = 10 + # ax.set_xticks(range(0, num_total_features + 1, step)) + # ax.set_xticklabels(range(0, num_total_features + 1, step)) + # ax.set_ylim(-0.1, 0.55) + # ax.grid() + + # os.makedirs(os.path.dirname(target_file_cv), exist_ok=True) + # plt.savefig(target_file_cv, bbox_inches="tight") + # plt.close(fig) diff --git a/hawk/analysis/simulation.py b/hawk/analysis/simulation.py new file mode 100644 index 0000000..ec21c8b --- /dev/null +++ b/hawk/analysis/simulation.py @@ -0,0 +1,110 @@ +import time + +from tefs import TEFS +from tigramite.pcmci import PCMCI + + +def run_simulation_pcmci( + datasets, + config, + independence_tests, +): + params = config["params"] + dataset_name = config["dataset_name"] + dataframe = datasets[dataset_name] + + independence_test = independence_tests[params["independencetest"]] + algorithm = params["algorithm"] + lag = params["lag"] + + # Construct a unique identifier for the configuration + # param_str = "_".join(f"{k}{v}" for k, v in params.items()) + # config_id = f"dataset{dataset_name}_{param_str}" + + print(f"Running experiment with config: {config}") + + pcmci = PCMCI(dataframe=dataframe["full_tigramite"], cond_ind_test=independence_test, verbosity=2) + + start_time = time.time() + if algorithm == "pcmci": + results = pcmci.run_pcmci(tau_max=lag, pc_alpha=0.05, alpha_level=0.01) + elif algorithm == "pcmci_plus": + results = pcmci.run_pcmciplus(tau_min=0, tau_max=lag) + else: + raise ValueError(f"Invalid algorithm {algorithm}") + end_time = time.time() + execution_time = end_time - start_time + + q_matrix = pcmci.get_corrected_pvalues( + p_matrix=results["p_matrix"], + tau_max=lag, + fdr_method="fdr_bh", + ) + + graph = pcmci.get_graph_from_pmatrix( + p_matrix=q_matrix, + alpha_level=0.01, + tau_min=0, + tau_max=lag, + link_assumptions=None, + ) + + results["graph"] = graph + + return { + "results": results, + "params": params, + "dataset_name": dataset_name, + "execution_time": execution_time, + } + + +def run_simulation_tefs( + datasets, + config, + target_column_name, + n_jobs=1, +): + params = config["params"] + dataset_name = config["dataset_name"] + dataframe = datasets[dataset_name] + + # extract the parameters + direction = params["direction"] + lagfeatures = params["lagfeatures"] + lagtarget = params["lagtarget"] + k = params["k"] + + # Construct a unique identifier for the configuration + # param_str = "_".join(f"{k}{v}" for k, v in params.items()) + # param_str = param_str.replace(" ", "") + # config_id = f"dataset{dataset_name}_{param_str}" + + features = dataframe["full"].drop(columns=[target_column_name]) + target = dataframe["full"][target_column_name] + var_names = list(features.columns) + + # run the feature selection algorithm + start_time = time.time() + fs = TEFS( + features=features.values, + target=target.values, + k=k, + lag_features=lagfeatures, + lag_target=lagtarget, + direction=direction, + verbose=1, + var_names=var_names, + n_jobs=n_jobs, + ) + fs.fit() + end_time = time.time() + execution_time = end_time - start_time + + # Save results to the dictionary + return { + "results": fs, + "params": params, + "dataset_name": dataset_name, + "execution_time": execution_time, + } diff --git a/hawk/processes/__init__.py b/hawk/processes/__init__.py index 437441e..e8a8308 100644 --- a/hawk/processes/__init__.py +++ b/hawk/processes/__init__.py @@ -1,5 +1,7 @@ +from .wps_causal import Causal from .wps_say_hello import SayHello processes = [ SayHello(), + Causal(), ] diff --git a/hawk/processes/simulation_interactive.py b/hawk/processes/simulation_interactive.py new file mode 100644 index 0000000..2d3bd68 --- /dev/null +++ b/hawk/processes/simulation_interactive.py @@ -0,0 +1,26 @@ +from birdy import WPSClient + +train_file_path = "Emiliani1_train.csv" +test_file_path = "Emiliani1_test.csv" +target_column_name = "cyclostationary_mean_rr_4w_1" + +# ----------------- WPS ----------------- + +wps = WPSClient("http://localhost:5002/wps", verify=False) +help(wps) + +# Input some data for the causal process +resp = wps.causal( + dataset_train=open(train_file_path), + dataset_test=open(test_file_path), + target_column_name=target_column_name, + pcmci_test_choice="ParCorr", + pcmci_max_lag="0", + tefs_direction="both", + tefs_use_contemporary_features="Yes", + tefs_max_lag_features="1", + tefs_max_lag_target="1", +) + +print(resp) +resp.get() diff --git a/hawk/processes/wps_causal.py b/hawk/processes/wps_causal.py new file mode 100644 index 0000000..1f25c35 --- /dev/null +++ b/hawk/processes/wps_causal.py @@ -0,0 +1,231 @@ +from pywps import Process, LiteralInput, LiteralOutput, UOM, ComplexInput, ComplexOutput # noqa +from pywps.app.Common import Metadata +from pywps import FORMATS, Format +from pathlib import Path +import logging +import pandas as pd +from hawk.analysis import CausalAnalysis + +LOGGER = logging.getLogger("PYWPS") + +FORMAT_PNG = Format("image/png", extension=".png", encoding="base64") +FORMAT_PICKLE = Format("application/octet-stream", extension=".pkl", encoding="utf-8") + + +class Causal(Process): + """A nice process saying 'hello'.""" + + def __init__(self): + inputs = [ + ComplexInput( + "dataset_train", + "Train Dataset", + abstract="Please add the train csv file here.", + min_occurs=1, + max_occurs=1, + supported_formats=[FORMATS.CSV], + ), + ComplexInput( + "dataset_test", + "Test Dataset", + abstract="Please add the test csv file here.", + min_occurs=1, + max_occurs=1, + supported_formats=[FORMATS.CSV], + ), + LiteralInput( + "target_column_name", + "Target Column Name", + data_type="string", + abstract="Please enter the case-specific name of the target variable in the dataframe.", + ), + LiteralInput( + "pcmci_test_choice", + "PCMCI Test Choice", + data_type="string", + abstract="Choose the independence test to be used in PCMCI.", + allowed_values=[ + "ParCorr", + "CMIknn", + ], + ), + LiteralInput( + "pcmci_max_lag", + "PCMCI Max Lag", + data_type="string", + abstract="Choose the maximum lag to test used in PCMCI.", + allowed_values=[ + "0", + "1", + "2", + "3", + "4", + "5", + ], + ), + LiteralInput( + "tefs_direction", + "TEFS Direction", + data_type="string", + abstract="Choose the direction of the TEFS algorithm.", + allowed_values=[ + "forward", + "backward", + "both", + ], + ), + LiteralInput( + "tefs_use_contemporary_features", + "TEFS Use Contemporary Features", + data_type="boolean", + abstract="Choose whether to use comtemporary features in the TEFS algorithm.", + default="Yes", + ), + LiteralInput( + "tefs_max_lag_features", + "TEFS Max Lag Features", + data_type="string", + abstract="Choose the maximum lag of the features in the TEFS algorithm.", + allowed_values=[ + "no_lag", + "1", + "2", + "3", + "4", + "5", + ], + ), + LiteralInput( + "tefs_max_lag_target", + "TEFS Max Lag Target", + data_type="string", + abstract="Choose the maximum lag of the target in the TEFS algorithm.", + allowed_values=[ + "1", + "2", + "3", + "4", + "5", + ], + ), + ] + outputs = [ + ComplexOutput( + "pkl_baseline", + "Baseline Scores", + abstract="The baseline scores on the initial data.", + as_reference=True, + supported_formats=[FORMAT_PICKLE], + ), + ComplexOutput( + "png_pcmci", + "Selected features by PCMCI", + abstract="The selected features by PCMCI.", + as_reference=True, + supported_formats=[FORMAT_PNG], + ), + ComplexOutput( + "pkl_pcmci", + "PCMCI Results Details", + abstract="The PCMCI results details.", + as_reference=True, + supported_formats=[FORMAT_PICKLE], + ), + ComplexOutput( + "png_tefs", + "Selected features by TEFS", + abstract="The selected features by TEFS.", + as_reference=True, + supported_formats=[FORMAT_PNG], + ), + ComplexOutput( + "pkl_tefs", + "TEFS Results", + abstract="The TEFS results.", + as_reference=True, + supported_formats=[FORMAT_PICKLE], + ), + ComplexOutput( + "png_tefs_wrapper", + "Wrapper scores by TEFS", + abstract="The wrapper scores evolution by TEFS.", + as_reference=True, + supported_formats=[FORMAT_PNG], + ), + ComplexOutput( + "pkl_tefs_wrapper", + "TEFS Wrapper Scores Evolution details", + abstract="The TEFS wrapper scores evolution details.", + as_reference=True, + supported_formats=[FORMAT_PICKLE], + ), + ] + + super(Causal, self).__init__( + self._handler, + identifier="causal", + title="Causal Analysis", + abstract="Just says a friendly Hello. Returns a literal string output with Hello plus the inputed name.", + keywords=["hello", "demo"], + metadata=[ + Metadata("PyWPS", "https://pywps.org/"), + Metadata("Birdhouse", "http://bird-house.github.io/"), + Metadata("PyWPS Demo", "https://pywps-demo.readthedocs.io/en/latest/"), + Metadata("Emu: PyWPS examples", "https://emu.readthedocs.io/en/latest/"), + ], + version="1.5", + inputs=inputs, + outputs=outputs, + store_supported=True, + status_supported=True, + ) + + def _handler(self, request, response): + response.update_status("Processing started", 0) + + # Read the inputs + target_column_name = request.inputs["target_column_name"][0].data + + df_train = pd.read_csv(request.inputs["dataset_train"][0].file) + df_test = pd.read_csv(request.inputs["dataset_test"][0].file) + + pcmci_test_choice = request.inputs["pcmci_test_choice"][0].data + pcmci_max_lag = int(request.inputs["pcmci_max_lag"][0].data) + + tefs_direction = request.inputs["tefs_direction"][0].data + tefs_use_contemporary_features = request.inputs["tefs_use_contemporary_features"][0].data + tefs_max_lag_features = int(request.inputs["tefs_max_lag_features"][0].data) + tefs_max_lag_target = int(request.inputs["tefs_max_lag_target"][0].data) + + workdir = Path(self.workdir) + + if not tefs_use_contemporary_features and tefs_max_lag_features == "no_lag": + raise ValueError("You cannot use no lag features and not use contemporary features in TEFS.") + + causal_analysis = CausalAnalysis( + df_train, + df_test, + target_column_name, + pcmci_test_choice, + pcmci_max_lag, + tefs_direction, + tefs_use_contemporary_features, + tefs_max_lag_features, + tefs_max_lag_target, + workdir, + response, + ) + + causal_analysis.run() + + response.outputs["pkl_baseline"].file = causal_analysis.baseline + response.outputs["png_pcmci"].file = causal_analysis.plot_pcmci + response.outputs["pkl_pcmci"].file = causal_analysis.details_pcmci + response.outputs["png_tefs"].file = causal_analysis.plot_tefs + response.outputs["pkl_tefs"].file = causal_analysis.details_tefs + response.outputs["png_tefs_wrapper"].file = causal_analysis.plot_tefs_wrapper + response.outputs["pkl_tefs_wrapper"].file = causal_analysis.details_tefs_wrapper + + response.update_status("Processing completed", 100) + + return response diff --git a/requirements.txt b/requirements.txt index 01cdf08..f89c934 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,9 @@ click jinja2 psutil pywps>=4.5.1,<4.6 +birdhouse-birdy +tigramite>=5.2.5.1 +tefs +pandas +scikit-learn +numpy \ No newline at end of file diff --git a/tests/test_wps_caps.py b/tests/test_wps_caps.py index 8613d73..9c638c8 100644 --- a/tests/test_wps_caps.py +++ b/tests/test_wps_caps.py @@ -1,8 +1,9 @@ from pywps import Service -from .common import client_for from hawk.processes import processes +from .common import client_for + def test_wps_caps(): client = client_for(Service(processes=processes)) @@ -12,5 +13,6 @@ def test_wps_caps(): '/wps:Process' '/ows:Identifier') assert sorted(names.split()) == [ + 'causal', 'hello', ]