Skip to content

Commit

Permalink
Updated meteoscreening
Browse files Browse the repository at this point in the history
  • Loading branch information
holukas committed Jan 30, 2024
1 parent 87fd334 commit 073707a
Show file tree
Hide file tree
Showing 23 changed files with 1,651 additions and 1,731 deletions.
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,29 @@

![DIIVE](images/logo_diive1_256px.png)

## v0.68.0 | 30 Jan 2024

### Updates to stepwise outlier detection

Harmonized the way outlier flags are calculated. Outlier flags are all based on the same base
class `diive.core.base.flagbase.FlagBase` like before, but the base class now includes more code that
is shared by the different outlier detection methods. For example, `FlagBase` includes a method that
enables repeated execution of a single outlier detection method multiple times until all outliers
are removed. Results from all iterations are then combined into one single flag.

The class `StepwiseMeteoScreeningDb` that makes direct use of the stepwise outlier detection was
adjusted accordingly.

### Notebooks

- Updated notebook `StepwiseMeteoScreeningFromDatabase.ipynb`

### Removed features

- Removed outlier test based on seasonal-trend decomposition and z-score calculations (`OutlierSTLRZ`).
The test worked in principle, but at the moment it is unclear how to set reliable parameters. In addition
the test is slow when used with multiple years of high-resolution data. De-activated for the moment.

## v0.67.1 | 10 Jan 2024

- Updated: many docstrings.
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ Fill gaps in time series with various methods
- Local outlier factor: Identify outliers based on local outlier factor, daytime nighttime separately
- Manual removal: Remove time periods (from-to) or single records from time series
- Missing values: Simply creates a flag that indicated available and missing data in a time series
- Seasonal trend decomposition using LOESS, identify outliers based on seasonal-trend decomposition and
z-score calculations
- z-score: Identify outliers based on the z-score across all time series data
- z-score: Identify outliers based on the z-score, separately for daytime and nighttime
- z-score: Identify outliers based on max z-scores in the interquartile range data
Expand Down
102 changes: 90 additions & 12 deletions diive/core/base/flagbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pandas import Series, DatetimeIndex
from pandas import DataFrame, Series, DatetimeIndex

import diive.core.plotting.styles.LightTheme as theme
from diive.core.funcs.funcs import validate_id_string
Expand All @@ -22,12 +22,19 @@ def __init__(self, series: Series, flagid: str, idstr: str = None, verbose: bool
self._idstr = validate_id_string(idstr=idstr)
self.verbose = verbose

self.flagname = self._generate_flagname()

self._overall_flag = None
self._filteredseries = None
self._flag = None

print(f"Generating flag {self.flagname} for variable {self.series.name} ...")
@property
def overall_flag(self) -> Series:
"""Overall flag, calculated from individual flags from multiple iterations."""
if not isinstance(self._overall_flag, Series):
raise Exception(f'No overall flag available.')
return self._overall_flag

def get_flag(self):
return self.overall_flag

@property
def flag(self) -> Series:
Expand All @@ -45,6 +52,31 @@ def filteredseries(self) -> Series:
f'Solution: run .calc() to create flag for {self.series.name}.')
return self._filteredseries

def collect_results(self) -> DataFrame:
"""Store flag of this iteration in dataframe."""
frame = {
# self.filteredseries.name: self.filteredseries.copy(),
self.flag.name: self.flag.copy()
}
iteration_df = pd.DataFrame.from_dict(frame)
return iteration_df

def get_filteredseries(self, iteration) -> Series:
"""For the first iteration the original input series is used,
for all other iterations the filtered series from the previous
iteration is used."""
filteredseries = self.series.copy() if iteration == 1 else self.filteredseries
# Rename filtered series to include iteration number
filteredname = self.generate_iteration_filtered_variable_name(iteration=iteration)
filteredseries.name = filteredname
return filteredseries

def init_flag(self, iteration):
"""Initiate (empty) flag for this iteration."""
flagname = self.generate_flagname(iteration=iteration)
flag = pd.Series(index=self.filteredseries.index, data=np.nan, name=flagname)
return flag

def setflag(self, ok: DatetimeIndex, rejected: DatetimeIndex):
"""Set flag with values 0=ok, 2=rejected"""
self._flag.loc[ok] = 0
Expand All @@ -69,35 +101,81 @@ def reset(self):
# Generate flag series with NaNs
self._flag = pd.Series(index=self.series.index, data=np.nan, name=self.flagname)

def _generate_flagname(self) -> str:
def generate_flagname(self, iteration: int = None) -> str:
"""Generate standardized name for flag variable"""
flagname = "FLAG"
if self._idstr:
flagname += f"{self._idstr}"
flagname += f"_{self.series.name}"
if self._flagid:
flagname += f"_{self._flagid}"
flagname += f"_TEST"
if iteration:
flagname += f'_ITER{iteration}_TEST'
else:
flagname += f'_TEST'
return flagname

def plot(self, ok: DatetimeIndex, rejected: DatetimeIndex, plottitle: str = ""):
def generate_iteration_filtered_variable_name(self, iteration: int):
filteredname = f"{self.series.name}_FILTERED-AFTER-ITER{iteration}"
return filteredname

def repeat(self, func, repeat):
"""Repeat function until no more outliers found."""
n_outliers = 9999
iteration = 0
iteration_flags_df = pd.DataFrame()
while n_outliers > 0:
iteration += 1
cur_iteration_flag_df, n_outliers = func(iteration=iteration)
iteration_flags_df = pd.concat([iteration_flags_df, cur_iteration_flag_df], axis=1)
if not repeat:
break

# Calcualte the sum of all flags that show 2, for each data row
overall_flag = iteration_flags_df[iteration_flags_df == 2].sum(axis=1)
overall_flag.name = self.generate_flagname()

n_iterations = len(iteration_flags_df.columns)

return overall_flag, n_iterations

def run_flagtests(self, iteration):
"""Calculate flag for given iteration."""
self._filteredseries = self.get_filteredseries(iteration=iteration)
self._flag = self.init_flag(iteration=iteration)
ok, rejected, n_outliers = self._flagtests(iteration=iteration)
self.setflag(ok=ok, rejected=rejected)
self.setfiltered(rejected=rejected)
iteration_df = self.collect_results()
return iteration_df, n_outliers

def defaultplot(self, n_iterations: int = 1):
"""Basic plot that shows time series with and without outliers"""
ok = self.overall_flag == 0
rejected = self.overall_flag == 2
n_outliers = rejected.sum()

fig = plt.figure(facecolor='white', figsize=(16, 7))
gs = gridspec.GridSpec(2, 1) # rows, cols
gs.update(wspace=0.3, hspace=0.1, left=0.03, right=0.97, top=0.95, bottom=0.05)
ax_series = fig.add_subplot(gs[0, 0])
ax_ok = fig.add_subplot(gs[1, 0], sharex=ax_series)
ax_series.plot_date(self.series.index, self.series, label=f"{self.series.name}", color="#42A5F5",
alpha=.5, markersize=2, markeredgecolor='none')
ax_series.plot_date(self.series.index, self.series,
label=f"{self.series.name}", color="#607D8B",
alpha=.5, markersize=8, markeredgecolor='none')
ax_series.plot_date(self.series[rejected].index, self.series[rejected],
label="outlier (rejected)", color="#F44336", alpha=1,
markersize=8, markeredgecolor='none', fmt='X')
ax_ok.plot_date(self.series[ok].index, self.series[ok], label=f"OK", color="#9CCC65", alpha=.5,
markersize=2, markeredgecolor='none')
markersize=12, markeredgecolor='none', fmt='X')
ax_ok.plot_date(self.series[ok].index, self.series[ok],
label=f"filtered series", alpha=.5,
markersize=8, markeredgecolor='none')
default_format(ax=ax_series)
default_format(ax=ax_ok)
default_legend(ax=ax_series)
default_legend(ax=ax_ok)
plt.setp(ax_series.get_xticklabels(), visible=False)
plottitle = (f"{self.series.name} filtered by {self.overall_flag.name}, "
f"n_iterations = {n_iterations}, "
f"n_outliers = {n_outliers}")
fig.suptitle(plottitle, fontsize=theme.FIGHEADER_FONTSIZE)
fig.show()
46 changes: 46 additions & 0 deletions diive/core/base/identify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from pandas import DataFrame


def identify_relevants(seriescol: str) -> list:
"""Find relevant series column.
Needed because variables can change their naming over the
course of the QC checks, e.g. for NEE, checks done on the
variable FC are relevant.
"""
if seriescol.startswith('NEE_') or seriescol == 'FC' or seriescol == 'co2_flux':
relevant = ['_FC_', '_NEE_', '_co2_flux_']
elif seriescol.startswith('co2_flux_'):
relevant = ['CHECK', '_NEE_'] # todo
elif seriescol.startswith('H_') or seriescol == 'H':
relevant = ['_H_']
elif seriescol.startswith('LE_') or seriescol == 'LE':
relevant = ['_LE_']
elif seriescol.startswith('ET_') or seriescol == 'ET':
relevant = ['_ET_']
elif seriescol.startswith('FH2O_') or seriescol == 'FH2O':
relevant = ['_FH2O_']
elif seriescol.startswith('h2o_flux_') or seriescol == 'h2o_flux':
relevant = ['_h2o_flux_']
elif seriescol.startswith('TAU_') or seriescol == 'TAU':
relevant = ['_TAU_']
elif seriescol.startswith('FN2O_') or seriescol == 'FN2O':
relevant = ['_FN2O_']
elif seriescol.startswith('FCH4_') or seriescol == 'FCH4':
relevant = ['_FCH4_']
else:
relevant = [seriescol]
return relevant


def identify_flagcols(df: DataFrame, seriescol: str) -> list:
"""Identify flag columns."""
flagcols = [c for c in df.columns
if str(c).startswith('FLAG_')
and (str(c).endswith(('_TEST', '_QCF')))]

# Collect columns relevant for this flux
relevant = identify_relevants(seriescol=seriescol)
flagcols = [f for f in flagcols if any(n in f for n in relevant)]

return flagcols
12 changes: 12 additions & 0 deletions diive/core/dfun/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@
pd.set_option('display.max_rows', 50)


def detect_new_columns(df: DataFrame, other: DataFrame) -> list:
"""Detect columns in *df* that do not exist in other."""
duplicatecols = [c for c in df.columns if c in other.columns]
for col in df[duplicatecols]:
if not df[col].equals(other[col]):
raise Exception(f"Column {col} was identified as duplicate, but is not identical.")

newcols = [c for c in df.columns if c not in other.columns]

return newcols


def aggregated_as_hires(aggregate_series: Series,
hires_timestamp,
to_freq: str = 'D',
Expand Down
9 changes: 5 additions & 4 deletions diive/core/utils/prints.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ def section(self):
if self.spacing:
print("")
print("")
self.str(txt=f"{'=' * 40}")
# self.str(txt=f"{'=' * 40}")
self.str(txt=f"{self.id}")
self.str(txt=f"{'=' * 40}")
# self.str(txt=f"{'=' * 40}")

def str(self, txt: str):
# print(f"{txt}")
print(f"[{self.id}] {txt}")

def done(self):
print(f"[{self.id}] Done.")
print(f"[{self.id}] {'_' * 40}")
pass
# print(f"[{self.id}] Done.")
# print(f"[{self.id}] {'_' * 40}")
33 changes: 9 additions & 24 deletions diive/pkgs/fluxprocessingchain/fluxprocessingchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from diive.pkgs.fluxprocessingchain.level31_storagecorrection import FluxStorageCorrectionSinglePointEddyPro
from diive.pkgs.outlierdetection.stepwiseoutlierdetection import StepwiseOutlierDetection
from diive.pkgs.qaqc.qcf import FlagQCF

from diive.core.dfun.frames import detect_new_columns

class FluxProcessingChain:

Expand Down Expand Up @@ -226,17 +226,6 @@ def level2_quality_flag_expansion(
if steadiness_of_horizontal_wind:
self._level2.steadiness_of_horizontal_wind()

def _detect_new_columns(self, level_df: DataFrame) -> list:
# Before export, check for already existing columns
existcols = [c for c in level_df.columns if c in self.fpc_df.columns]
for c in level_df[existcols]:
if not level_df[c].equals(self.fpc_df[c]):
raise Exception(f"Column {c} was identified as duplicate, but is not identical.")

# Add new columns to processing chain dataframe
newcols = [c for c in level_df.columns if c not in self.fpc_df.columns]
[print(f"++Added new column {col}.") for col in newcols]
return newcols

def _finalize_level(self,
run_qcf_on_col: str,
Expand All @@ -247,8 +236,9 @@ def _finalize_level(self,
nighttimetime_accept_qcf_below: int = 2) -> FlagQCF:

# Detect new columns
newcols = self._detect_new_columns(level_df=level_df)
newcols = detect_new_columns(df=level_df, other=self.fpc_df)
self._fpc_df = pd.concat([self.fpc_df, level_df[newcols]], axis=1)
[print(f"++Added new column {col}.") for col in newcols]

# Calculate overall quality flag QCF
qcf = FlagQCF(series=self.fpc_df[run_qcf_on_col],
Expand Down Expand Up @@ -287,7 +277,7 @@ def finalize_level32(self,
self._level32_qcf = self._finalize_level(
run_qcf_on_col=self.level31.flux_corrected_col,
idstr='L3.2',
level_df=self.level32.results,
level_df=self.level32.flags,
nighttime_threshold=nighttime_threshold,
daytime_accept_qcf_below=daytime_accept_qcf_below,
nighttimetime_accept_qcf_below=nighttimetime_accept_qcf_below
Expand All @@ -308,8 +298,9 @@ def level31_storage_correction(self, gapfill_storage_term: bool = False):

def finalize_level31(self):

newcols = self._detect_new_columns(level_df=self.level31.results)
newcols = detect_new_columns(df=self.level31.results, other=self.fpc_df)
self._fpc_df = pd.concat([self.fpc_df, self.level31.results[newcols]], axis=1)
[print(f"++Added new column {col}.") for col in newcols]

# Apply QCF also to Level-3.1 flux output
self._apply_level2_qcf_to_level31_flux()
Expand Down Expand Up @@ -384,12 +375,6 @@ def level32_flag_outliers_zscore_test(self, thres_zscore: int = 4, showplot: boo
self._level32.flag_outliers_zscore_test(thres_zscore=thres_zscore, showplot=showplot,
verbose=verbose, plottitle=plottitle, repeat=repeat)

def level32_flag_outliers_stl_rz_test(self, thres_zscore: float = 4.5, decompose_downsampling_freq: str = '1H',
repeat: bool = False, showplot: bool = False):
self._level32.flag_outliers_stl_rz_test(thres_zscore=thres_zscore,
decompose_downsampling_freq=decompose_downsampling_freq,
repeat=repeat, showplot=showplot)

def level32_flag_outliers_lof_test(self, n_neighbors: int = None, contamination: float = None,
showplot: bool = False, verbose: bool = False, repeat: bool = True,
n_jobs: int = 1):
Expand Down Expand Up @@ -729,7 +714,7 @@ def example():
fpc.finalize_level32(nighttime_threshold=50, daytime_accept_qcf_below=2, nighttimetime_accept_qcf_below=2)

fpc.filteredseries
fpc.level32.results
fpc.level32.flags
fpc.level32_qcf.showplot_qcf_heatmaps()
fpc.level32_qcf.showplot_qcf_timeseries()
fpc.level32_qcf.report_qcf_flags()
Expand Down Expand Up @@ -854,5 +839,5 @@ def example():


if __name__ == '__main__':
example_simple()
# example()
# example_simple()
example()
2 changes: 1 addition & 1 deletion diive/pkgs/fluxprocessingchain/level2_qualityflags.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def spectral_correction_factor_test(self,

def missing_vals_test(self):
results = MissingValues(series=self.dfin[self.fluxcol].copy(), idstr=self.idstr)
results.calc()
results.repeat()
self._results[results.flag.name] = results.flag

def ssitc_test(self):
Expand Down
Loading

0 comments on commit 073707a

Please sign in to comment.