From 99fed221ab10e6a8c9e49fffe7a70df8260858fe Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Mon, 19 Feb 2024 01:06:54 +0400 Subject: [PATCH 1/9] Initial implementation empty transform_keys BatchTransform --- datapipe/datatable.py | 11 ++-- datapipe/step/batch_transform.py | 103 +++++++++++++++++++++---------- examples/many_to_zero/app.py | 37 ++++------- 3 files changed, 88 insertions(+), 63 deletions(-) diff --git a/datapipe/datatable.py b/datapipe/datatable.py index 6c42e9bd..74549196 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -590,11 +590,12 @@ def get_agg_cte( keys = [k for k in transform_keys if k in self.primary_keys] key_cols: List["ColumnClause"] = [column(k) for k in keys] - sql: Any = ( - select(*key_cols + [func.max(tbl.c["update_ts"]).label("update_ts")]) - .select_from(tbl) - .group_by(*key_cols) - ) + sql: Any = select( + *key_cols + [func.max(tbl.c["update_ts"]).label("update_ts")] + ).select_from(tbl) + + if len(key_cols) > 0: + sql = sql.group_by(*key_cols) sql = sql_apply_filters_idx_to_subquery(sql, keys, filters_idx) sql = sql_apply_runconfig_filter(sql, tbl, self.primary_keys, run_config) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 8a1fd2f6..f3303124 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -77,8 +77,7 @@ def __call__( input_dts: List[DataTable], run_config: Optional[RunConfig] = None, kwargs: Optional[Dict[str, Any]] = None, - ) -> TransformResult: - ... + ) -> TransformResult: ... BatchTransformFunc = Callable[..., TransformResult] @@ -164,31 +163,55 @@ def mark_rows_processed_success( if len(idx) == 0: return - insert_sql = self.dbconn.insert(self.sql_table).values( - [ - { + if idx.empty: + # DataFrame считает, что он пустой, если в нем нет колонок + # При этом мы хотим создать строки в БД + + # Мы можем обработать только случай с одной строкой + assert len(idx) == 1 + + with self.dbconn.con.begin() as con: + insert_sql = self.dbconn.insert(self.sql_table).values( + [ + { + "process_ts": process_ts, + "is_success": True, + "priority": 0, + "error": None, + } + ] + ) + + # удалить все из таблицы + con.execute(self.sql_table.delete()) + con.execute(insert_sql) + + else: + insert_sql = self.dbconn.insert(self.sql_table).values( + [ + { + "process_ts": process_ts, + "is_success": True, + "priority": 0, + "error": None, + **idx_dict, # type: ignore + } + for idx_dict in idx.to_dict(orient="records") + ] + ) + + sql = insert_sql.on_conflict_do_update( + index_elements=self.primary_keys, + set_={ "process_ts": process_ts, "is_success": True, - "priority": 0, "error": None, - **idx_dict, # type: ignore - } - for idx_dict in idx.to_dict(orient="records") - ] - ) - - sql = insert_sql.on_conflict_do_update( - index_elements=self.primary_keys, - set_={ - "process_ts": process_ts, - "is_success": True, - "error": None, - }, - ) + }, + ) - # execute - with self.dbconn.con.begin() as con: - con.execute(sql) + # execute + with self.dbconn.con.begin() as con: + con.execute(sql) def mark_rows_processed_error( self, @@ -295,6 +318,11 @@ def __init__( ) self.chunk_size = chunk_size + + # Force transform_keys to be a list, otherwise Pandas will not be happy + if transform_keys is not None and not isinstance(transform_keys, list): + transform_keys = list(transform_keys) + self.transform_keys, self.transform_schema = self.compute_transform_schema( [i.meta_table for i in input_dts], [i.meta_table for i in output_dts], @@ -358,9 +386,6 @@ def _build_changed_idx_sql( order: Literal["asc", "desc"] = "asc", run_config: Optional[RunConfig] = None, # TODO remove ) -> Tuple[Iterable[str], Any]: - if len(self.transform_keys) == 0: - raise NotImplementedError() - all_input_keys_counts: Dict[str, int] = {} for col in itertools.chain(*[dt.primary_schema for dt in self.input_dts]): all_input_keys_counts[col.name] = all_input_keys_counts.get(col.name, 0) + 1 @@ -432,11 +457,6 @@ def _make_agg_of_agg(ctes, agg_col): for tbl in self.input_dts ] - # Filter out ctes that do not have intersection with transform keys. - # These ctes do not affect the result, but may dramatically increase - # size of the temporary table during query execution. - inp_ctes = [(keys, cte) for (keys, cte) in inp_ctes if len(keys) > 0] - inp = _make_agg_of_agg(inp_ctes, "update_ts") tr_tbl = self.meta_table.sql_table @@ -453,8 +473,22 @@ def _make_agg_of_agg(ctes, agg_col): out = out.cte(name="transform") + if len(self.transform_keys) == 0: + join_onclause_sql: Any = literal(True) + elif len(self.transform_keys) == 1: + join_onclause_sql = ( + inp.c[self.transform_keys[0]] == out.c[self.transform_keys[0]] + ) + else: # len(self.transform_keys) > 1: + join_onclause_sql = and_( + *[inp.c[key] == out.c[key] for key in self.transform_keys] + ) + sql = ( select( + # Нам нужно выбирать хотя бы что-то, чтобы не было ошибки при + # пустом self.transform_keys + literal(1).label("_datapipe_dummy"), *[ func.coalesce(inp.c[key], out.c[key]).label(key) for key in self.transform_keys @@ -463,9 +497,7 @@ def _make_agg_of_agg(ctes, agg_col): .select_from(inp) .outerjoin( out, - onclause=and_( - *[inp.c[key] == out.c[key] for key in self.transform_keys] - ), + onclause=join_onclause_sql, full=True, ) .where( @@ -579,6 +611,8 @@ def get_full_process_ids( def alter_res_df(): with ds.meta_dbconn.con.begin() as con: for df in pd.read_sql_query(u1, con=con, chunksize=chunk_size): + df = df[self.transform_keys] + for k, v in extra_filters.items(): df[k] = v @@ -610,6 +644,7 @@ def get_change_list_process_ids( sql, con=con, ) + table_changes_df = table_changes_df[self.transform_keys] changes.append(table_changes_df) else: diff --git a/examples/many_to_zero/app.py b/examples/many_to_zero/app.py index 91ec36d3..0401ab05 100644 --- a/examples/many_to_zero/app.py +++ b/examples/many_to_zero/app.py @@ -1,14 +1,11 @@ -from typing import Dict, List, Optional - import pandas as pd from sqlalchemy import Integer from sqlalchemy.sql.schema import Column from datapipe.compute import Catalog, DatapipeApp, Pipeline, Table -from datapipe.datatable import DataStore, DataTable -from datapipe.run_config import RunConfig +from datapipe.datatable import DataStore from datapipe.step.batch_generate import BatchGenerate -from datapipe.step.datatable_transform import DatatableTransform +from datapipe.step.batch_transform import BatchTransform from datapipe.store.database import DBConn from datapipe.store.pandas import TableStoreJsonLine @@ -42,23 +39,9 @@ def generate_data(): def count( - ds: DataStore, - input_dts: List[DataTable], - output_dts: List[DataTable], - kwargs: Dict, - run_config: Optional[RunConfig] = None, -) -> None: - assert len(input_dts) == 1 - assert len(output_dts) == 1 - - input_dt = input_dts[0] - output_dt = output_dts[0] - - output_dt.store_chunk( - pd.DataFrame( - {"result_id": [0], "count": [len(input_dt.meta_table.get_existing_idx())]} - ) - ) + input_df: pd.DataFrame, +) -> pd.DataFrame: + return pd.DataFrame({"result_id": [0], "count": [len(input_df)]}) pipeline = Pipeline( @@ -67,11 +50,11 @@ def count( generate_data, outputs=["input"], ), - DatatableTransform( + BatchTransform( count, # type: ignore inputs=["input"], outputs=["result"], - check_for_changes=False, + transform_keys=[], ), ] ) @@ -80,3 +63,9 @@ def count( ds = DataStore(DBConn("sqlite+pysqlite3:///db.sqlite")) app = DatapipeApp(ds, catalog, pipeline) + + +if __name__ == "__main__": + from datapipe.compute import run_steps + + run_steps(ds, app.steps, None, None) From a1497119088684ff89fc938df4585668368bae77 Mon Sep 17 00:00:00 2001 From: TSheyd Date: Fri, 9 Aug 2024 14:57:55 +0300 Subject: [PATCH 2/9] fix: passed delete_stale to do_batch_generate --- datapipe/step/batch_generate.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datapipe/step/batch_generate.py b/datapipe/step/batch_generate.py index b782330a..9285a88c 100644 --- a/datapipe/step/batch_generate.py +++ b/datapipe/step/batch_generate.py @@ -99,12 +99,13 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: name=self.func.__name__, func=cast( DatatableTransformFunc, - lambda ds, input_dts, output_dts, run_config, kwargs: do_batch_generate( + lambda ds, input_dts, output_dts, run_config, kwargs, delete_stale: do_batch_generate( func=self.func, ds=ds, output_dts=output_dts, run_config=run_config, kwargs=kwargs, + delete_stale=delete_stale, ), ), input_dts=[], From 6c79566133dbe87b806d89534ca020a6975b8dd5 Mon Sep 17 00:00:00 2001 From: TSheyd Date: Fri, 9 Aug 2024 18:10:17 +0300 Subject: [PATCH 3/9] fix: passed delete_stale to do_batch_generate --- datapipe/step/batch_generate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datapipe/step/batch_generate.py b/datapipe/step/batch_generate.py index 9285a88c..374b7772 100644 --- a/datapipe/step/batch_generate.py +++ b/datapipe/step/batch_generate.py @@ -28,8 +28,8 @@ def do_batch_generate( ds: DataStore, output_dts: List[DataTable], run_config: Optional[RunConfig] = None, - kwargs: Optional[Dict] = None, delete_stale: bool = True, + kwargs: Optional[Dict] = None, ) -> None: """ Создание новой таблицы из результатов запуска `proc_func`. @@ -99,13 +99,13 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: name=self.func.__name__, func=cast( DatatableTransformFunc, - lambda ds, input_dts, output_dts, run_config, kwargs, delete_stale: do_batch_generate( + lambda ds, input_dts, output_dts, run_config, kwargs: do_batch_generate( func=self.func, ds=ds, output_dts=output_dts, run_config=run_config, + delete_stale=self.delete_stale, kwargs=kwargs, - delete_stale=delete_stale, ), ), input_dts=[], From c001d6a949dac8b122be268e6d15dcaa85e6d71c Mon Sep 17 00:00:00 2001 From: TSheyd Date: Sun, 11 Aug 2024 00:15:07 +0300 Subject: [PATCH 4/9] tests: added tests for delete_stale arg in BatchGenerate --- tests/test_chunked_processing_pipeline.py | 71 +++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/test_chunked_processing_pipeline.py b/tests/test_chunked_processing_pipeline.py index 34402cf4..969efa83 100644 --- a/tests/test_chunked_processing_pipeline.py +++ b/tests/test_chunked_processing_pipeline.py @@ -527,3 +527,74 @@ def transform(df, idx, ds, run_config, transform_count): changelist = ChangeList.create("inp", change_idx) run_steps_changelist(ds, steps, changelist, RunConfig()) assert transform_count["value"] == 2 + + +def test_stale_records_with_batch_generate(dbconn): + ds = DataStore(dbconn, create_meta_table=True) + catalog = Catalog( + { + "inp_keep": Table( + store=TableStoreDB( + dbconn=dbconn, + name="inp_data_keep", + data_sql_schema=TEST_SCHEMA, + create_table=True, + ) + ), + "inp_del": Table( + store=TableStoreDB( + dbconn=dbconn, + name="inp_data_del", + data_sql_schema=TEST_SCHEMA, + create_table=True, + ) + ), + } + ) + + bg_count = {"value": 0} + bg_chunk_size = 5 + + def add_inp_table(ds: DataStore, bg_count): + assert isinstance(ds, DataStore) + bg_count["value"] += 1 + yield pd.DataFrame( + { + "id": range(bg_count["value"]*bg_chunk_size, (bg_count["value"]+1) * bg_chunk_size), + "a": range(bg_count["value"]*bg_chunk_size, (bg_count["value"]+1) * bg_chunk_size), + } + ) + + pipeline = Pipeline( + [ + BatchGenerate( + func=add_inp_table, + outputs=["inp_keep"], + delete_stale=False, # keeps records that are not yielded by func + kwargs=dict(bg_count=bg_count), + ), + BatchGenerate( + func=add_inp_table, + outputs=["inp_del"], + delete_stale=True, # deletes records that are not yielded by func + kwargs=dict(bg_count={"value": 0}), # to avoid double counting + ), + ] + ) + + steps = build_compute(ds, catalog, pipeline) + + # First run + run_steps(ds, steps) + + # Second run + run_steps(ds, steps) + + # Check table shapes + df_keep = catalog.get_datatable(ds, "inp_keep").get_data() + df_del = catalog.get_datatable(ds, "inp_del").get_data() + + assert df_keep.shape[0] == bg_count["value"] * bg_chunk_size + assert df_del.shape[0] == bg_chunk_size + # additionally, check that delete_stale=True deletes previous chunks and keeps the last one + assert df_del.iloc[0]['id'] == bg_chunk_size*bg_count["value"] From 02be7381e82eee6ffcfe8e3cdc36ea86fa05fcff Mon Sep 17 00:00:00 2001 From: TSheyd Date: Sun, 11 Aug 2024 00:29:02 +0300 Subject: [PATCH 5/9] updated CHANGELOG.md --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1f3b750..e881a9bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.13.14 + +* Fix [#334](https://github.com/epoch8/datapipe/issues/334) + # 0.13.13 * Add `ComputeStep.get_status` method From 5b709a9b66b6504281eda95688a63180f9db7be5 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 11 Aug 2024 11:19:21 +0400 Subject: [PATCH 6/9] v0.13.14 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 270baaf5..cd15f665 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.13" +version = "0.13.14" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" From 7f7a3a2c865da9b939165dba74405fbfeb6e87fe Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 11 Aug 2024 11:22:58 +0400 Subject: [PATCH 7/9] split batch generate test in two --- tests/test_chunked_processing_pipeline.py | 83 ++++++++++++++++++----- 1 file changed, 66 insertions(+), 17 deletions(-) diff --git a/tests/test_chunked_processing_pipeline.py b/tests/test_chunked_processing_pipeline.py index 969efa83..6d8f25dd 100644 --- a/tests/test_chunked_processing_pipeline.py +++ b/tests/test_chunked_processing_pipeline.py @@ -529,22 +529,75 @@ def transform(df, idx, ds, run_config, transform_count): assert transform_count["value"] == 2 -def test_stale_records_with_batch_generate(dbconn): +def test_stale_records_deletion_with_batch_generate(dbconn): ds = DataStore(dbconn, create_meta_table=True) catalog = Catalog( { - "inp_keep": Table( + "inp_del": Table( store=TableStoreDB( dbconn=dbconn, - name="inp_data_keep", + name="inp_data_del", data_sql_schema=TEST_SCHEMA, create_table=True, ) ), - "inp_del": Table( + } + ) + + bg_count = {"value": 0} + bg_chunk_size = 5 + + def add_inp_table(ds: DataStore, bg_count): + assert isinstance(ds, DataStore) + bg_count["value"] += 1 + yield pd.DataFrame( + { + "id": range( + bg_count["value"] * bg_chunk_size, + (bg_count["value"] + 1) * bg_chunk_size, + ), + "a": range( + bg_count["value"] * bg_chunk_size, + (bg_count["value"] + 1) * bg_chunk_size, + ), + } + ) + + pipeline = Pipeline( + [ + BatchGenerate( + func=add_inp_table, + outputs=["inp_del"], + delete_stale=True, # Default behavior, deletes records that are not yielded by func + kwargs=dict(bg_count={"value": 0}), # to avoid double counting + ), + ] + ) + + steps = build_compute(ds, catalog, pipeline) + + # First run + run_steps(ds, steps) + + # Second run + run_steps(ds, steps) + + # Check table shapes + df_del = catalog.get_datatable(ds, "inp_del").get_data() + + assert df_del.shape[0] == bg_chunk_size + # additionally, check that delete_stale=True deletes previous chunks and keeps the last one + assert df_del.iloc[0]["id"] == bg_chunk_size * bg_count["value"] + + +def test_stale_records_keep_with_batch_generate(dbconn): + ds = DataStore(dbconn, create_meta_table=True) + catalog = Catalog( + { + "inp_keep": Table( store=TableStoreDB( dbconn=dbconn, - name="inp_data_del", + name="inp_data_keep", data_sql_schema=TEST_SCHEMA, create_table=True, ) @@ -560,8 +613,14 @@ def add_inp_table(ds: DataStore, bg_count): bg_count["value"] += 1 yield pd.DataFrame( { - "id": range(bg_count["value"]*bg_chunk_size, (bg_count["value"]+1) * bg_chunk_size), - "a": range(bg_count["value"]*bg_chunk_size, (bg_count["value"]+1) * bg_chunk_size), + "id": range( + bg_count["value"] * bg_chunk_size, + (bg_count["value"] + 1) * bg_chunk_size, + ), + "a": range( + bg_count["value"] * bg_chunk_size, + (bg_count["value"] + 1) * bg_chunk_size, + ), } ) @@ -573,12 +632,6 @@ def add_inp_table(ds: DataStore, bg_count): delete_stale=False, # keeps records that are not yielded by func kwargs=dict(bg_count=bg_count), ), - BatchGenerate( - func=add_inp_table, - outputs=["inp_del"], - delete_stale=True, # deletes records that are not yielded by func - kwargs=dict(bg_count={"value": 0}), # to avoid double counting - ), ] ) @@ -592,9 +645,5 @@ def add_inp_table(ds: DataStore, bg_count): # Check table shapes df_keep = catalog.get_datatable(ds, "inp_keep").get_data() - df_del = catalog.get_datatable(ds, "inp_del").get_data() assert df_keep.shape[0] == bg_count["value"] * bg_chunk_size - assert df_del.shape[0] == bg_chunk_size - # additionally, check that delete_stale=True deletes previous chunks and keeps the last one - assert df_del.iloc[0]['id'] == bg_chunk_size*bg_count["value"] From e2f8aeeafb1aea4f806b2a6ed681bd7722f5d4d2 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 11 Aug 2024 11:27:13 +0400 Subject: [PATCH 8/9] fix test --- tests/test_chunked_processing_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_chunked_processing_pipeline.py b/tests/test_chunked_processing_pipeline.py index 6d8f25dd..39878a8e 100644 --- a/tests/test_chunked_processing_pipeline.py +++ b/tests/test_chunked_processing_pipeline.py @@ -569,7 +569,7 @@ def add_inp_table(ds: DataStore, bg_count): func=add_inp_table, outputs=["inp_del"], delete_stale=True, # Default behavior, deletes records that are not yielded by func - kwargs=dict(bg_count={"value": 0}), # to avoid double counting + kwargs=dict(bg_count=bg_count), # to avoid double counting ), ] ) From 07f7438071e3ed96e7b483324fcc94ccfaf76138 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 11 Aug 2024 12:08:55 +0400 Subject: [PATCH 9/9] v0.14.0-alpha.1 --- docs/source/index.rst | 7 +++ docs/source/migration-v013-to-v014.md | 63 +++++++++++++++++++++++++++ pyproject.toml | 2 +- 3 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 docs/source/migration-v013-to-v014.md diff --git a/docs/source/index.rst b/docs/source/index.rst index 51d9671f..c04b2936 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -11,6 +11,13 @@ sqlite extending-cli +.. toctree:: + :caption: Migration + :maxdepth: 2 + :hidden: + + migration-v013-to-v014 + .. toctree:: :caption: Reference :maxdepth: 2 diff --git a/docs/source/migration-v013-to-v014.md b/docs/source/migration-v013-to-v014.md new file mode 100644 index 00000000..02faa598 --- /dev/null +++ b/docs/source/migration-v013-to-v014.md @@ -0,0 +1,63 @@ +# Migration from v0.13 to v0.14 + +## DatatableTansform can become BatchTransform + +Previously, if you had to do whole table transformation, you had to use +`DatatableTransform`. Now you can substitute it with `BatchTransform` which has +zero outputs. + +Before: + +```python +# Updates global count of input lines + +def count( + ds: DataStore, + input_dts: List[DataTable], + output_dts: List[DataTable], + kwargs: Dict, + run_config: Optional[RunConfig] = None, +) -> None: + assert len(input_dts) == 1 + assert len(output_dts) == 1 + + input_dt = input_dts[0] + output_dt = output_dts[0] + + output_dt.store_chunk( + pd.DataFrame( + {"result_id": [0], "count": [len(input_dt.meta_table.get_existing_idx())]} + ) + ) + +# ... + +DatatableTransform( + count, + inputs=["input"], + outputs=["result"], +) +``` + +After: + +```python +# Updates global count of input lines + +def count( + input_df: pd.DataFrame, +) -> pd.DataFrame: + return pd.DataFrame({"result_id": [0], "count": [len(input_df)]}) + +# ... + +BatchTransform( + count, + inputs=["input"], + outputs=["result"], + + # Important, we have to specify empty set in order for transformation to operate on + # the whole input at once + transform_keys=[], +) +``` diff --git a/pyproject.toml b/pyproject.toml index cd15f665..814d9141 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.14" +version = "0.14.0-alpha.1" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe"