From 8cf67010ede019b6c5c9281ae16e6e1aee7ab90f Mon Sep 17 00:00:00 2001 From: Pavel Kochetov Date: Mon, 24 Jul 2023 18:54:04 +0300 Subject: [PATCH 01/47] Keep existing added to TransformMetaTable. --- datapipe/metastore.py | 1 + 1 file changed, 1 insertion(+) diff --git a/datapipe/metastore.py b/datapipe/metastore.py index b7de8f1c..98b5acc5 100644 --- a/datapipe/metastore.py +++ b/datapipe/metastore.py @@ -505,6 +505,7 @@ def __init__( name, dbconn.sqla_metadata, *self.sql_schema, + keep_existing=True, ) if create_table: From a505d7fb629c649dc81a600cccd7ba353fda4df1 Mon Sep 17 00:00:00 2001 From: Pavel Kochetov Date: Wed, 26 Jul 2023 16:26:16 +0300 Subject: [PATCH 02/47] Bugfix, table joins. Co-authored-by: Alexander Kozlov --- datapipe/cli.py | 48 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/datapipe/cli.py b/datapipe/cli.py index ce78382e..b460d42e 100644 --- a/datapipe/cli.py +++ b/datapipe/cli.py @@ -516,7 +516,7 @@ def migrate_transform_tables(ctx: click.Context, labels: str, name: str) -> None for batch_transform in batch_transforms_steps: if not isinstance(batch_transform, BaseBatchTransformStep): continue - print(f"Checking '{batch_transform.get_name()}': ") + print(f"Migrate '{batch_transform.get_name()}': ") size = batch_transform.meta_table.get_metadata_size() if size > 0: print(f"Skipping -- size of metadata is greater 0: {size=}") @@ -528,27 +528,53 @@ def migrate_transform_tables(ctx: click.Context, labels: str, name: str) -> None output_tbls = [ output_dt.meta_table.sql_table for output_dt in batch_transform.output_dts ] + + def make_ids_cte(): + ids_cte = ( + select( + *[func.coalesce(*[tbl.c[k] for tbl in output_tbls]).label(k) for k in batch_transform.transform_keys], + ) + .distinct() + .select_from(output_tbls[0]) + .where(and_(*[tbl.c.delete_ts.is_(None) for tbl in output_tbls])) + ) + + prev_tbl = output_tbls[0] + for tbl in output_tbls[1:]: + ids_cte = ids_cte.outerjoin( + tbl, + and_( + *[prev_tbl.c[k] == tbl.c[k] for k in batch_transform.transform_keys] + ), + full=True, + ) + + return ids_cte.cte(name="ids") + + ids_cte = make_ids_cte() + sql = ( select( - *[output_tbls[0].c[k] for k in batch_transform.transform_keys], - greatest_func(*[tbl.c["process_ts"] for tbl in output_tbls]).label( + *[ids_cte.c[k] for k in batch_transform.transform_keys], + func.max(greatest_func(*[tbl.c["process_ts"] for tbl in output_tbls])).label( "process_ts" ), ) - .distinct() - .select_from(output_tbls[0]) + .select_from(ids_cte) .where(and_(*[tbl.c.delete_ts.is_(None) for tbl in output_tbls])) ) - prev_tbl = output_tbls[0] - for tbl in output_tbls[1:]: - sql = sql.outerjoin( + + for tbl in output_tbls: + sql = sql.join( tbl, and_( - *[prev_tbl.c[k] == tbl.c[k] for k in batch_transform.transform_keys] + *[ids_cte.c[k] == tbl.c[k] for k in batch_transform.transform_keys] ), - full=True, + isouter=True, ) - prev_tbl = tbl + + sql = sql.group_by(*[ids_cte.c[k] for k in batch_transform.transform_keys]) + insert_stmt = insert(batch_transform.meta_table.sql_table).from_select( batch_transform.transform_keys + ["process_ts", "is_success", "error", "priority"], From 423b22ff73281132b4ea2128282c9bcd6eb179b4 Mon Sep 17 00:00:00 2001 From: Pavel Kochetov Date: Thu, 3 Aug 2023 17:23:36 +0300 Subject: [PATCH 03/47] Get ids after date added to metatable. --- datapipe/metastore.py | 16 ++++++++++++++++ tests/test_metatable.py | 28 +++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/datapipe/metastore.py b/datapipe/metastore.py index 98b5acc5..e28afb52 100644 --- a/datapipe/metastore.py +++ b/datapipe/metastore.py @@ -477,6 +477,22 @@ def get_stale_idx( list(pd.read_sql_query(sql, con=con, chunksize=1000)), ) + def get_ids_changed_after_date_threshold( + self, + date_threshold: float, + ) -> Iterator[IndexDF]: + idx_cols = [self.sql_table.c[key] for key in self.primary_keys] + sql = select(idx_cols).where( + and_( + self.sql_table.c.process_ts > date_threshold, + self.sql_table.c.delete_ts.is_(None), + ) + ) + with self.dbconn.con.begin() as con: + return cast( + Iterator[IndexDF], + list(pd.read_sql_query(sql, con=con, chunksize=1000)), + ) TRANSFORM_META_SCHEMA = [ Column("process_ts", Float), # Время последней успешной обработки diff --git a/tests/test_metatable.py b/tests/test_metatable.py index b7923b88..98b945e4 100644 --- a/tests/test_metatable.py +++ b/tests/test_metatable.py @@ -1,6 +1,8 @@ from typing import cast, List -import pandas as pd +from datetime import timedelta +import time +import pandas as pd from sqlalchemy import Integer from sqlalchemy.sql.schema import Column @@ -136,3 +138,27 @@ def test_get_metadata( assert_df_equal(mt.get_metadata(cast(IndexDF, part_idx))[index_cols], part_idx, index_cols=index_cols) assert_df_equal(mt.get_metadata(cast(IndexDF, part_idx))[keys], part_df[keys], index_cols=index_cols) + + +@parametrize_with_cases("index_cols,primary_schema,meta_schema,test_df", cases=CasesTestDF, import_fixtures=True) +def test_get_ids_changed_after_date_threshold( + dbconn: DBConn, index_cols: List[str], primary_schema: DataSchema, meta_schema: MetaSchema, test_df: DataDF +): + mt = MetaTable( + name="test", + dbconn=dbconn, + primary_schema=primary_schema, + meta_schema=meta_schema, + create_table=True, + ) + _, _, new_meta_df, _ = mt.get_changes_for_store_chunk(test_df) + mt.insert_meta_for_store_chunk(new_meta_df=new_meta_df) + + date_before_insertion = time.time() - timedelta(days=1).total_seconds() + ids_from_date_before_insertion = mt.get_ids_changed_after_date_threshold(date_before_insertion) + total_len = sum(len(ids) for ids in ids_from_date_before_insertion) + assert total_len == len(test_df) + + ids_from_date_after_insertion = mt.get_ids_changed_after_date_threshold(time.time()) + total_len = sum(len(ids) for ids in ids_from_date_after_insertion) + assert total_len == 0 From 3de270e1858a40faa9eb72ceca4b3d83c4ca586f Mon Sep 17 00:00:00 2001 From: Pavel Kochetov Date: Fri, 4 Aug 2023 11:44:51 +0300 Subject: [PATCH 04/47] Updated pipe to master version. --- datapipe/step/batch_transform.py | 1 + 1 file changed, 1 insertion(+) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 87bb1198..e016ad00 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -111,6 +111,7 @@ def __init__( name, dbconn.sqla_metadata, *self.sql_schema, + keep_existing=True, ) if create_table: From 99f225ed1fc3c8a2057c79d97559a67c31b59dc9 Mon Sep 17 00:00:00 2001 From: Pavel Kochetov Date: Fri, 4 Aug 2023 13:36:21 +0300 Subject: [PATCH 05/47] changed rows count added to crutch branch. --- datapipe/datatable.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datapipe/datatable.py b/datapipe/datatable.py index 3ae22cb4..2c724adf 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -399,6 +399,23 @@ def get_stale_idx( list(pd.read_sql_query(sql, con=con, chunksize=1000)), ) + def get_changed_rows_count_after_timestamp( + self, + ts: float, + ) -> int: + sql = select(func.count()).where( + and_( + self.sql_table.c.process_ts > ts, + self.sql_table.c.delete_ts.is_(None), + ) + ) + + with self.dbconn.con.begin() as con: + res = con.execute(sql).fetchone() + assert res is not None and len(res) == 1 + + return res[0] + class DataTable: def __init__( From c02243330f9bfc6ea0d39583a1f030e35b64c7f0 Mon Sep 17 00:00:00 2001 From: Pavel Kochetov Date: Sun, 6 Aug 2023 21:25:17 +0600 Subject: [PATCH 06/47] Minor change count fix in datatable. --- datapipe/datatable.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datapipe/datatable.py b/datapipe/datatable.py index 2c724adf..864a82a1 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -404,10 +404,7 @@ def get_changed_rows_count_after_timestamp( ts: float, ) -> int: sql = select(func.count()).where( - and_( - self.sql_table.c.process_ts > ts, - self.sql_table.c.delete_ts.is_(None), - ) + self.sql_table.c.process_ts > ts, ) with self.dbconn.con.begin() as con: From f1aa75c64acb518f27613994ba14d95e735accaf Mon Sep 17 00:00:00 2001 From: Pavel Kochetov Date: Thu, 28 Sep 2023 18:26:23 +0300 Subject: [PATCH 07/47] Added dropna to transform meta tables. --- datapipe/step/batch_transform.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index ff951137..4cf7dc6a 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -153,8 +153,10 @@ def mark_rows_processed_success( run_config: Optional[RunConfig] = None, ) -> None: idx = cast( - IndexDF, idx[self.primary_keys].drop_duplicates() + IndexDF, idx[self.primary_keys].drop_duplicates().dropna() ) # FIXME: сделать в основном запросе distinct + if len(idx) == 0: + return insert_sql = self.dbconn.insert(self.sql_table).values( [ @@ -190,8 +192,10 @@ def mark_rows_processed_error( run_config: Optional[RunConfig] = None, ) -> None: idx = cast( - IndexDF, idx[self.primary_keys].drop_duplicates() + IndexDF, idx[self.primary_keys].drop_duplicates().dropna() ) # FIXME: сделать в основном запросе distinct + if len(idx) == 0: + return insert_sql = self.dbconn.insert(self.sql_table).values( [ From 9fe3be69ee104b266d50c4d78622c4114068cd06 Mon Sep 17 00:00:00 2001 From: Nikita Lebedev Date: Thu, 26 Oct 2023 00:39:51 +0300 Subject: [PATCH 08/47] RedisStore Cluster Mode --- datapipe/store/redis.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datapipe/store/redis.py b/datapipe/store/redis.py index adeaf805..ec07b482 100644 --- a/datapipe/store/redis.py +++ b/datapipe/store/redis.py @@ -3,6 +3,7 @@ import pandas as pd from redis.client import Redis +from redis.cluster import RedisCluster from sqlalchemy import Column from datapipe.store.database import MetaKey @@ -24,10 +25,13 @@ def _to_itertuples(df: DataDF, colnames): class RedisStore(TableStore): def __init__( - self, connection: str, name: str, data_sql_schema: List[Column] + self, connection: str, name: str, data_sql_schema: List[Column], cluster_mode: bool = False ) -> None: self.connection = connection - self.redis_connection = Redis.from_url(connection, decode_responses=True) + if not cluster_mode: + self.redis_connection = Redis.from_url(connection, decode_responses=True) + else: + self.redis_connection = RedisCluster.from_url(connection, decode_responses=True) self.name = name self.data_sql_schema = data_sql_schema From 1f843eba011e78e30f764c7f4715283723e052ee Mon Sep 17 00:00:00 2001 From: Pavel Kochetov Date: Wed, 27 Dec 2023 21:02:22 +0300 Subject: [PATCH 09/47] Handled missing vectors in read_rows response. --- datapipe/store/qdrant.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datapipe/store/qdrant.py b/datapipe/store/qdrant.py index 18ffc879..94d8686d 100644 --- a/datapipe/store/qdrant.py +++ b/datapipe/store/qdrant.py @@ -146,6 +146,9 @@ def read_rows(self, idx: Optional[IndexDF] = None) -> DataDF: records = [] assert response.result is not None + if len(response.result) == 0: + return pd.DataFrame(columns=[column.name for column in self.schema]) + for point in response.result: record = point.payload From 6294e734182cdb72aa43453e7ba679091bf12a8c Mon Sep 17 00:00:00 2001 From: puritanne Date: Wed, 7 Feb 2024 12:23:55 +0300 Subject: [PATCH 10/47] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 480a4e3d..7d66a051 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Datapipe -`datapipe` is a real-time, incremental ETL library for Python with record-level dependency tracking. +[Datapipe](https://datapipe.dev/) is a real-time, incremental ETL library for Python with record-level dependency tracking. The library is designed for describing data processing pipelines and is capable of tracking dependencies for each record in the pipeline. This ensures that From 23a5562ac9e8d9403a0f61a9fb9f0489ee27f5cb Mon Sep 17 00:00:00 2001 From: puritanne Date: Wed, 7 Feb 2024 12:25:00 +0300 Subject: [PATCH 11/47] Update README.md --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 7d66a051..aadd3770 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # Datapipe +https://datapipe.dev/ + [Datapipe](https://datapipe.dev/) is a real-time, incremental ETL library for Python with record-level dependency tracking. The library is designed for describing data processing pipelines and is capable From dd149f67b073ace58fab2af4cdc2aa0a30bb5277 Mon Sep 17 00:00:00 2001 From: puritanne Date: Wed, 7 Feb 2024 12:25:17 +0300 Subject: [PATCH 12/47] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index aadd3770..3c02d8ec 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,5 @@ # Datapipe -https://datapipe.dev/ - [Datapipe](https://datapipe.dev/) is a real-time, incremental ETL library for Python with record-level dependency tracking. The library is designed for describing data processing pipelines and is capable @@ -9,6 +7,8 @@ of tracking dependencies for each record in the pipeline. This ensures that tasks within the pipeline receive only the data that has been modified, thereby improving the overall efficiency of data handling. +https://datapipe.dev/ + # Development At the moment these branches are active: From bc13719b2624608be3800bcb8c7f8ace322bd076 Mon Sep 17 00:00:00 2001 From: TSheyd Date: Tue, 9 Apr 2024 13:40:42 +0300 Subject: [PATCH 13/47] feat: added index_schema to QdrantStore --- datapipe/store/qdrant.py | 33 ++++++++++++++++++++++++++++++--- tests/test_qdrant_store.py | 10 ++++++++-- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/datapipe/store/qdrant.py b/datapipe/store/qdrant.py index 18ffc879..34c0c9d7 100644 --- a/datapipe/store/qdrant.py +++ b/datapipe/store/qdrant.py @@ -28,6 +28,8 @@ class QdrantStore(TableStore): pk_field (str): name of the primary key field in the schema, used to identify records embedding_field (str): name of the field in the schema that contains the vector representation of the record collection_params (CollectionParams): parameters for creating a collection in Qdrant + index_schema (dict): {field_name: field_schema} - field(s) in payload that will be used to create an index on. + For data types and field schema, check https://qdrant.tech/documentation/concepts/indexing/#payload-index api_key (Optional[str]): api_key for Qdrant server """ @@ -39,6 +41,7 @@ def __init__( pk_field: str, embedding_field: str, collection_params: CollectionParams, + index_schema: Optional[dict] = None, api_key: Optional[str] = None, ): super().__init__() @@ -55,13 +58,21 @@ def __init__( pk_columns = [column for column in self.schema if column.primary_key] if len(pk_columns) != 1 and pk_columns[0].name != pk_field: - raise ValueError("Incorrect prymary key columns in schema") + raise ValueError("Incorrect primary key columns in schema") self.paylods_filelds = [ column.name for column in self.schema if column.name != self.embedding_field ] - def __init(self): + self.index_field = {} + if index_schema: + # check if index field is present in schema + for field, field_schema in index_schema.items(): + if field not in self.paylods_filelds: + raise ValueError(f"Index field `{field}` ({field_schema}) not found in payload schema") + self.index_field = index_schema + + def __init_collection(self): self.client = QdrantClient(url=self.url, api_key=self._api_key) try: self.client.get_collection(self.name) @@ -71,9 +82,25 @@ def __init(self): collection_name=self.name, create_collection=self.collection_params ) + def __init_indexes(self): + """ + Checks on collection's payload indexes and adds them from index_field, if necessary. + Schema checks are not performed. + """ + payload_schema = self.client.get_collection(self.name).payload_schema + for field, field_schema in self.index_field.items(): + if field not in payload_schema.keys(): + self.client.create_payload_index( + collection_name=self.name, + field_name=field, + field_schema=field_schema + ) + def __check_init(self): if not self.inited: - self.__init() + self.__init_collection() + if self.index_field: + self.__init_indexes() self.inited = True def __get_ids(self, df): diff --git a/tests/test_qdrant_store.py b/tests/test_qdrant_store.py index 17113469..dee6b175 100644 --- a/tests/test_qdrant_store.py +++ b/tests/test_qdrant_store.py @@ -3,7 +3,7 @@ import pandas as pd from qdrant_client.models import Distance, VectorParams -from sqlalchemy import ARRAY, Float, Integer +from sqlalchemy import ARRAY, Float, Integer, String from sqlalchemy.sql.schema import Column from datapipe.compute import Catalog, Pipeline, Table, build_compute, run_steps @@ -20,7 +20,7 @@ def extract_id(df: pd.DataFrame) -> pd.DataFrame: def generate_data() -> Generator[pd.DataFrame, None, None]: - yield pd.DataFrame({"id": [1], "embedding": [[0.1]]}) + yield pd.DataFrame({"id": [1], "embedding": [[0.1]], "str_payload": ['foo'], 'int_payload': [42]}) def test_qdrant_table_to_json(dbconn: DBConn, tmp_dir: Path) -> None: @@ -34,6 +34,8 @@ def test_qdrant_table_to_json(dbconn: DBConn, tmp_dir: Path) -> None: schema=[ Column("id", Integer, primary_key=True), Column("embedding", ARRAY(Float, dimensions=1)), + Column("str_payload", String), + Column("int_payload", Integer), ], collection_params=CollectionParams( vectors=VectorParams( @@ -43,6 +45,10 @@ def test_qdrant_table_to_json(dbconn: DBConn, tmp_dir: Path) -> None: ), pk_field="id", embedding_field="embedding", + index_schema={ + 'str_payload': "keyword", + "int_payload": {"type": "integer", "lookup": False, "range": True} + }, ) ), "output": Table( From 78880e1b0406306102aa62c767567b7e6b66cc7a Mon Sep 17 00:00:00 2001 From: TSheyd Date: Tue, 9 Apr 2024 14:05:35 +0300 Subject: [PATCH 14/47] feat: added index_schema to QdrantShardedStore --- datapipe/store/qdrant.py | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/datapipe/store/qdrant.py b/datapipe/store/qdrant.py index 34c0c9d7..12b1d4a5 100644 --- a/datapipe/store/qdrant.py +++ b/datapipe/store/qdrant.py @@ -196,6 +196,8 @@ class QdrantShardedStore(TableStore): schema (DataSchema): Describes data that will be stored in the Qdrant collection embedding_field (str): name of the field in the schema that contains the vector representation of the record collection_params (CollectionParams): parameters for creating a collection in Qdrant + index_schema (dict): {field_name: field_schema} - field(s) in payload that will be used to create an index on. + For data types and field schema, check https://qdrant.tech/documentation/concepts/indexing/#payload-index api_key (Optional[str]): api_key for Qdrant server """ @@ -206,6 +208,7 @@ def __init__( schema: DataSchema, embedding_field: str, collection_params: CollectionParams, + index_schema: Optional[dict] = None, api_key: Optional[str] = None, ): super().__init__() @@ -220,9 +223,16 @@ def __init__( self.client: Optional[QdrantClient] = None self.pk_fields = [column.name for column in self.schema if column.primary_key] - self.paylods_filelds = [ - column.name for column in self.schema if column.name != self.embedding_field - ] + self.paylods_filelds = [column.name for column in self.schema if column.name != self.embedding_field] + + self.index_field = {} + if index_schema: + # check if index field is present in schema + for field, field_schema in index_schema.items(): + if field not in self.paylods_filelds: + raise ValueError(f"Index field `{field}` ({field_schema}) not found in payload schema") + self.index_field = index_schema + self.name_params = re.findall(r"\{([^/]+?)\}", self.name_pattern) if not len(self.pk_fields): @@ -240,12 +250,28 @@ def __init_collection(self, name): collection_name=name, create_collection=self.collection_params ) + def __init_indexes(self, name): + """ + Checks on collection's payload indexes and adds them from index_field, if necessary. + Schema checks are not performed. + """ + payload_schema = self.client.get_collection(name).payload_schema + for field, field_schema in self.index_field.items(): + if field not in payload_schema.keys(): + self.client.create_payload_index( + collection_name=name, + field_name=field, + field_schema=field_schema + ) + def __check_init(self, name): if not self.client: self.client = QdrantClient(url=self.url, api_key=self._api_key) if name not in self.inited_collections: self.__init_collection(name) + if self.index_field: + self.__init_indexes(name) self.inited_collections.add(name) def __get_ids(self, df): From e92516c22e17647503e3381f345615ab05ce0e34 Mon Sep 17 00:00:00 2001 From: TSheyd Date: Tue, 9 Apr 2024 14:06:27 +0300 Subject: [PATCH 15/47] fix: typos --- datapipe/store/qdrant.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datapipe/store/qdrant.py b/datapipe/store/qdrant.py index 12b1d4a5..13da3a4c 100644 --- a/datapipe/store/qdrant.py +++ b/datapipe/store/qdrant.py @@ -60,7 +60,7 @@ def __init__( if len(pk_columns) != 1 and pk_columns[0].name != pk_field: raise ValueError("Incorrect primary key columns in schema") - self.paylods_filelds = [ + self.payloads_filelds = [ column.name for column in self.schema if column.name != self.embedding_field ] @@ -68,7 +68,7 @@ def __init__( if index_schema: # check if index field is present in schema for field, field_schema in index_schema.items(): - if field not in self.paylods_filelds: + if field not in self.payloads_filelds: raise ValueError(f"Index field `{field}` ({field_schema}) not found in payload schema") self.index_field = index_schema @@ -134,7 +134,7 @@ def insert_rows(self, df: DataDF) -> None: vectors=df[self.embedding_field].apply(list).to_list(), payloads=cast( List[Dict[str, Any]], - df[self.paylods_filelds].to_dict(orient="records"), + df[self.payloads_filelds].to_dict(orient="records"), ), ), wait=True, @@ -223,13 +223,13 @@ def __init__( self.client: Optional[QdrantClient] = None self.pk_fields = [column.name for column in self.schema if column.primary_key] - self.paylods_filelds = [column.name for column in self.schema if column.name != self.embedding_field] + self.payloads_filelds = [column.name for column in self.schema if column.name != self.embedding_field] self.index_field = {} if index_schema: # check if index field is present in schema for field, field_schema in index_schema.items(): - if field not in self.paylods_filelds: + if field not in self.payloads_filelds: raise ValueError(f"Index field `{field}` ({field_schema}) not found in payload schema") self.index_field = index_schema @@ -317,7 +317,7 @@ def insert_rows(self, df: DataDF) -> None: vectors=gdf[self.embedding_field].apply(list).to_list(), payloads=cast( List[Dict[str, Any]], - df[self.paylods_filelds].to_dict(orient="records"), + df[self.payloads_filelds].to_dict(orient="records"), ), ), wait=True, From ad1a9267461a0f1a95b1a5af985ebac52462c718 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Mon, 27 May 2024 11:13:18 +0400 Subject: [PATCH 16/47] Fix deps for MacOS --- .github/workflows/pytest.yaml | 6 +++--- CHANGELOG.md | 1 + pyproject.toml | 10 +++++----- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index cccbca09..8e7f8a45 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -50,9 +50,9 @@ jobs: strategy: matrix: include: - - python-version: "3.8" - test-db-env: "postgres" - pip-extra: "sqlalchemy <2" + # - python-version: "3.8" + # test-db-env: "postgres" + # pip-extra: "sqlalchemy <2" - python-version: "3.8" test-db-env: "postgres" pip-extra: "sqlalchemy >2" diff --git a/CHANGELOG.md b/CHANGELOG.md index 33ac0eda..92782717 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * Fix compatibility with SQLalchemy < 2 (ColumnClause in typing) * Fix compatibility with Ray and SQLalchemy > 2 (serialization of Table) +* (post.1) Fix dependencies for MacOS; deprecate Python 3.8 # 0.13.9 diff --git a/pyproject.toml b/pyproject.toml index 23c5833e..0e41566d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.10" +version = "0.13.10-post.1" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" @@ -10,7 +10,7 @@ packages = [ ] [tool.poetry.dependencies] -python = ">=3.8,<3.12" +python = ">3.8,<3.12" fsspec = ">=2021.11.1" gcsfs = {version=">=2021.11.1", optional=true} @@ -30,7 +30,7 @@ cityhash = "^0.4.2" # TODO 0.14: make it optional Pillow = "^10.0.0" -epoch8-tqdm-loggable = "^0.1.4" +tqdm-loggable = "^0.2" traceback-with-variables = "^2.0.4" pymilvus = {version="^2.0.2", optional=true} @@ -44,8 +44,8 @@ xlrd = {version=">=2.0.1", optional=true} openpyxl = {version=">=3.0.7", optional=true} redis = {version="^4.3.4", optional=true} -pysqlite3-binary = {version="^0.5.0", optional=true} -sqlalchemy-pysqlite3-binary = {version="^0.0.4", optional=true} +pysqlite3-binary = {version="^0.5.0", optional=true, markers="sys_platform != 'darwin'"} +sqlalchemy-pysqlite3-binary = {version="^0.0.4", optional=true, markers="sys_platform != 'darwin'"} qdrant-client = {version="^1.1.7", optional=true} click = ">=7.1.2" From 27d99c4c09ce8e257649a67467c304029eff9905 Mon Sep 17 00:00:00 2001 From: Nikita Lebedev Date: Tue, 25 Jun 2024 14:02:34 +0200 Subject: [PATCH 17/47] Remove logging in datapipe_events --- datapipe/datatable.py | 4 +- datapipe/event_logger.py | 103 ++------------------------------------- 2 files changed, 4 insertions(+), 103 deletions(-) diff --git a/datapipe/datatable.py b/datapipe/datatable.py index 6c42e9bd..6f34215a 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -609,9 +609,7 @@ def __init__( create_meta_table: bool = False, ) -> None: self.meta_dbconn = meta_dbconn - self.event_logger = EventLogger( - self.meta_dbconn, create_table=create_meta_table - ) + self.event_logger = EventLogger() self.tables: Dict[str, DataTable] = {} self.create_meta_table = create_meta_table diff --git a/datapipe/event_logger.py b/datapipe/event_logger.py index a678a591..3a4ce347 100644 --- a/datapipe/event_logger.py +++ b/datapipe/event_logger.py @@ -1,60 +1,17 @@ import logging -from enum import Enum -from typing import TYPE_CHECKING, Any, Optional, Tuple +from typing import Any, Optional, Tuple -from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.sql import func -from sqlalchemy.sql.schema import Column, Table -from sqlalchemy.sql.sqltypes import JSON, DateTime, Integer, String from traceback_with_variables import format_exc from datapipe.run_config import RunConfig logger = logging.getLogger("datapipe.event_logger") -if TYPE_CHECKING: - from datapipe.store.database import DBConn - - -class EventTypes(Enum): - STATE = "state" - ERROR = "error" - - -class StepEventTypes(Enum): - RUN_FULL_COMPLETE = "run_full_complete" - class EventLogger: - def __init__(self, dbconn: "DBConn", create_table: bool = False): - self.dbconn = dbconn - - self.events_table = Table( - "datapipe_events", - dbconn.sqla_metadata, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("event_ts", DateTime, server_default=func.now()), - Column("type", String(100)), - Column("event", JSON if dbconn.con.name == "sqlite" else JSONB), - ) - - self.step_events_table = Table( - "datapipe_step_events", - dbconn.sqla_metadata, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("step", String(100)), - Column("event_ts", DateTime, server_default=func.now()), - Column("event", String(100)), - Column("event_payload", JSON if dbconn.con.name == "sqlite" else JSONB), - ) - - if create_table: - self.events_table.create(self.dbconn.con, checkfirst=True) - self.step_events_table.create(self.dbconn.con, checkfirst=True) - def __reduce__(self) -> Tuple[Any, ...]: - return self.__class__, (self.dbconn,) - + return self.__class__ + def log_state( self, table_name, @@ -69,31 +26,6 @@ def log_state( f"deleted = {deleted_count}, processed_count = {deleted_count}" ) - if run_config is not None: - meta = { - "labels": run_config.labels, - "filters": run_config.filters, - } - else: - meta = {} - - ins = self.events_table.insert().values( - type=EventTypes.STATE.value, - event={ - "meta": meta, - "data": { - "table_name": table_name, - "added_count": added_count, - "updated_count": updated_count, - "deleted_count": deleted_count, - "processed_count": processed_count, - }, - }, - ) - - with self.dbconn.con.begin() as con: - con.execute(ins) - def log_error( self, type, @@ -106,29 +38,8 @@ def log_error( logger.error( f'Error in step {run_config.labels.get("step_name")}: {type} {message}\n{description}' ) - meta = { - "labels": run_config.labels, - "filters": run_config.filters, - } else: logger.error(f"Error: {type} {message}\n{description}") - meta = {} - - ins = self.events_table.insert().values( - type=EventTypes.ERROR.value, - event={ - "meta": meta, - "data": { - "type": type, - "message": message, - "description": description, - "params": params, - }, - }, - ) - - with self.dbconn.con.begin() as con: - con.execute(ins) def log_exception( self, @@ -148,11 +59,3 @@ def log_step_full_complete( step_name: str, ) -> None: logger.debug(f"Step {step_name} is marked complete") - - ins = self.step_events_table.insert().values( - step=step_name, - event=StepEventTypes.RUN_FULL_COMPLETE.value, - ) - - with self.dbconn.con.begin() as con: - con.execute(ins) From d5f75b2096ff0eef7b321c796e656a9ebfc73486 Mon Sep 17 00:00:00 2001 From: Nikita Lebedev Date: Tue, 25 Jun 2024 14:15:38 +0200 Subject: [PATCH 18/47] fix tests & linters --- datapipe/event_logger.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/datapipe/event_logger.py b/datapipe/event_logger.py index 3a4ce347..491c87d4 100644 --- a/datapipe/event_logger.py +++ b/datapipe/event_logger.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Optional, Tuple +from typing import Optional from traceback_with_variables import format_exc @@ -8,10 +8,7 @@ logger = logging.getLogger("datapipe.event_logger") -class EventLogger: - def __reduce__(self) -> Tuple[Any, ...]: - return self.__class__ - +class EventLogger: def log_state( self, table_name, From 9ee96728a2554b30a6e9be11f6b456c44b8b8af3 Mon Sep 17 00:00:00 2001 From: Nikita Lebedev Date: Tue, 25 Jun 2024 14:28:54 +0200 Subject: [PATCH 19/47] Update version and changelog --- CHANGELOG.md | 4 ++++ pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92782717..c053d1ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.13.11 + +* Remove logging to database (`datapipe_events` table) from `EventLogger` + # 0.13.10 * Fix compatibility with SQLalchemy < 2 (ColumnClause in typing) diff --git a/pyproject.toml b/pyproject.toml index 0e41566d..29acf267 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.10-post.1" +version = "0.13.11" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" From 6b2ba4c7e02d28175d05db48a218cb88feaed48c Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Tue, 25 Jun 2024 17:40:57 +0400 Subject: [PATCH 20/47] update actions versions --- .github/workflows/pytest.yaml | 8 ++++---- .github/workflows/test_examples.yaml | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index 8e7f8a45..e263e947 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -19,10 +19,10 @@ jobs: - "3.10" steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: pip @@ -107,10 +107,10 @@ jobs: - 6333:6333 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: pip diff --git a/.github/workflows/test_examples.yaml b/.github/workflows/test_examples.yaml index c16799c0..96176804 100644 --- a/.github/workflows/test_examples.yaml +++ b/.github/workflows/test_examples.yaml @@ -28,10 +28,10 @@ jobs: - RayExecutor steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: pip From b3257365d28371ed0a4723f2ced35ce996d8a9ac Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Tue, 25 Jun 2024 17:58:58 +0400 Subject: [PATCH 21/47] fix typo in `log_state` --- datapipe/event_logger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datapipe/event_logger.py b/datapipe/event_logger.py index 491c87d4..5cc763c9 100644 --- a/datapipe/event_logger.py +++ b/datapipe/event_logger.py @@ -8,7 +8,7 @@ logger = logging.getLogger("datapipe.event_logger") -class EventLogger: +class EventLogger: def log_state( self, table_name, @@ -20,7 +20,7 @@ def log_state( ): logger.debug( f'Table "{table_name}": added = {added_count}; updated = {updated_count}; ' - f"deleted = {deleted_count}, processed_count = {deleted_count}" + f"deleted = {deleted_count}, processed_count = {processed_count}" ) def log_error( From bf806e156f7989e009d6f96fbdfa8af4c668f06e Mon Sep 17 00:00:00 2001 From: Nikita Lebedev Date: Tue, 25 Jun 2024 17:43:13 +0200 Subject: [PATCH 22/47] Revert changes in get_changed_rows_count_after_timestamp --- datapipe/datatable.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datapipe/datatable.py b/datapipe/datatable.py index 864a82a1..2c724adf 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -404,7 +404,10 @@ def get_changed_rows_count_after_timestamp( ts: float, ) -> int: sql = select(func.count()).where( - self.sql_table.c.process_ts > ts, + and_( + self.sql_table.c.process_ts > ts, + self.sql_table.c.delete_ts.is_(None), + ) ) with self.dbconn.con.begin() as con: From 5d7548c9648017f52ff2108d61ee824cb68d8116 Mon Sep 17 00:00:00 2001 From: Nikita Lebedev Date: Tue, 25 Jun 2024 17:53:55 +0200 Subject: [PATCH 23/47] Revert keep_existing in TransformMetaTable --- datapipe/step/batch_transform.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index aee68e24..8a1fd2f6 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -111,7 +111,6 @@ def __init__( name, dbconn.sqla_metadata, *self.sql_schema, - keep_existing=True, ) if create_table: From 7ab3f7e124772951e84e769268169ffc4eb129f1 Mon Sep 17 00:00:00 2001 From: Nikita Lebedev Date: Tue, 25 Jun 2024 17:57:29 +0200 Subject: [PATCH 24/47] Update version & changelog --- CHANGELOG.md | 5 +++++ pyproject.toml | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c053d1ad..65a17cb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 0.13.12 + +* Add processing of an empty response in `QdrantStore` +* Add redis cluster mode support in `RedisStore` + # 0.13.11 * Remove logging to database (`datapipe_events` table) from `EventLogger` diff --git a/pyproject.toml b/pyproject.toml index 29acf267..6994ec4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.11" +version = "0.13.12" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" From 5daec8f44a89ed103a7b9aef652dec22520fbb49 Mon Sep 17 00:00:00 2001 From: Nikita Lebedev Date: Tue, 25 Jun 2024 18:43:02 +0200 Subject: [PATCH 25/47] Fix mypy --- datapipe/store/redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datapipe/store/redis.py b/datapipe/store/redis.py index 06180ed9..d3113afc 100644 --- a/datapipe/store/redis.py +++ b/datapipe/store/redis.py @@ -29,7 +29,7 @@ def __init__( ) -> None: self.connection = connection if not cluster_mode: - self.redis_connection = Redis.from_url(connection, decode_responses=True) + self.redis_connection: Union[Redis, RedisCluster] = Redis.from_url(connection, decode_responses=True) else: self.redis_connection = RedisCluster.from_url(connection, decode_responses=True) From 5a3af7283a460034a19fdf9e82f4f7d48ed70f82 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Wed, 26 Jun 2024 17:26:46 +0400 Subject: [PATCH 26/47] format --- datapipe/store/qdrant.py | 45 +++++++++++++++++++++++++++----------- tests/test_qdrant_store.py | 12 +++++++--- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/datapipe/store/qdrant.py b/datapipe/store/qdrant.py index cb3f092d..8d39d154 100644 --- a/datapipe/store/qdrant.py +++ b/datapipe/store/qdrant.py @@ -22,14 +22,27 @@ class QdrantStore(TableStore): Args: name (str): name of the Qdrant collection - url (str): url of the Qdrant server (if using with api_key, - you should explicitly specify port 443, by default qdrant uses 6333) - schema (DataSchema): Describes data that will be stored in the Qdrant collection - pk_field (str): name of the primary key field in the schema, used to identify records - embedding_field (str): name of the field in the schema that contains the vector representation of the record - collection_params (CollectionParams): parameters for creating a collection in Qdrant - index_schema (dict): {field_name: field_schema} - field(s) in payload that will be used to create an index on. - For data types and field schema, check https://qdrant.tech/documentation/concepts/indexing/#payload-index + + url (str): url of the Qdrant server (if using with api_key, you should + explicitly specify port 443, by default qdrant uses 6333) + + schema (DataSchema): Describes data that will be stored in the Qdrant + collection + + pk_field (str): name of the primary key field in the schema, used to + identify records + + embedding_field (str): name of the field in the schema that contains the + vector representation of the record + + collection_params (CollectionParams): parameters for creating a + collection in Qdrant + + index_schema (dict): {field_name: field_schema} - field(s) in payload + that will be used to create an index on. For data types and field + schema, check + https://qdrant.tech/documentation/concepts/indexing/#payload-index + api_key (Optional[str]): api_key for Qdrant server """ @@ -69,7 +82,9 @@ def __init__( # check if index field is present in schema for field, field_schema in index_schema.items(): if field not in self.payloads_filelds: - raise ValueError(f"Index field `{field}` ({field_schema}) not found in payload schema") + raise ValueError( + f"Index field `{field}` ({field_schema}) not found in payload schema" + ) self.index_field = index_schema def __init_collection(self): @@ -93,7 +108,7 @@ def __init_indexes(self): self.client.create_payload_index( collection_name=self.name, field_name=field, - field_schema=field_schema + field_schema=field_schema, ) def __check_init(self): @@ -226,14 +241,18 @@ def __init__( self.client: Optional[QdrantClient] = None self.pk_fields = [column.name for column in self.schema if column.primary_key] - self.payloads_filelds = [column.name for column in self.schema if column.name != self.embedding_field] + self.payloads_filelds = [ + column.name for column in self.schema if column.name != self.embedding_field + ] self.index_field = {} if index_schema: # check if index field is present in schema for field, field_schema in index_schema.items(): if field not in self.payloads_filelds: - raise ValueError(f"Index field `{field}` ({field_schema}) not found in payload schema") + raise ValueError( + f"Index field `{field}` ({field_schema}) not found in payload schema" + ) self.index_field = index_schema self.name_params = re.findall(r"\{([^/]+?)\}", self.name_pattern) @@ -264,7 +283,7 @@ def __init_indexes(self, name): self.client.create_payload_index( collection_name=name, field_name=field, - field_schema=field_schema + field_schema=field_schema, ) def __check_init(self, name): diff --git a/tests/test_qdrant_store.py b/tests/test_qdrant_store.py index dee6b175..f922dd7b 100644 --- a/tests/test_qdrant_store.py +++ b/tests/test_qdrant_store.py @@ -20,7 +20,9 @@ def extract_id(df: pd.DataFrame) -> pd.DataFrame: def generate_data() -> Generator[pd.DataFrame, None, None]: - yield pd.DataFrame({"id": [1], "embedding": [[0.1]], "str_payload": ['foo'], 'int_payload': [42]}) + yield pd.DataFrame( + {"id": [1], "embedding": [[0.1]], "str_payload": ["foo"], "int_payload": [42]} + ) def test_qdrant_table_to_json(dbconn: DBConn, tmp_dir: Path) -> None: @@ -46,8 +48,12 @@ def test_qdrant_table_to_json(dbconn: DBConn, tmp_dir: Path) -> None: pk_field="id", embedding_field="embedding", index_schema={ - 'str_payload': "keyword", - "int_payload": {"type": "integer", "lookup": False, "range": True} + "str_payload": "keyword", + "int_payload": { + "type": "integer", + "lookup": False, + "range": True, + }, }, ) ), From 22bd889ecfe3c226cb11bae633ea3ee4f89e1b2f Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Wed, 26 Jun 2024 17:27:52 +0400 Subject: [PATCH 27/47] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65a17cb7..26db31f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # 0.13.12 * Add processing of an empty response in `QdrantStore` +* Add optional `index_schema` to `QdrantStore` * Add redis cluster mode support in `RedisStore` # 0.13.11 From f6bf728d205b24dd4ba677be0a98308e3c77a51a Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Thu, 27 Jun 2024 16:02:40 +0400 Subject: [PATCH 28/47] add ComputeStep.get_status --- datapipe/cli.py | 23 ++++++++++------------- datapipe/compute.py | 10 ++++++++++ datapipe/step/batch_transform.py | 12 +++++++++--- pyproject.toml | 2 +- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/datapipe/cli.py b/datapipe/cli.py index a420e450..753b2fbe 100644 --- a/datapipe/cli.py +++ b/datapipe/cli.py @@ -348,19 +348,16 @@ def list(ctx: click.Context, status: bool) -> None: # noqa extra_args = {} if status: - if len(step.input_dts) > 0: - try: - if isinstance(step, BaseBatchTransformStep): - changed_idx_count = step.get_changed_idx_count(ds=app.ds) - - if changed_idx_count > 0: - extra_args[ - "changed_idx_count" - ] = f"[red]{changed_idx_count}[/red]" - - except NotImplementedError: - # Currently we do not support empty join_keys - extra_args["changed_idx_count"] = "[red]N/A[/red]" + try: + step_status = step.get_status(ds=app.ds) + extra_args["total_idx_count"] = str(step_status.total_idx_count) + extra_args["changed_idx_count"] = ( + f"[red]{step_status.changed_idx_count}[/red]" + ) + except NotImplementedError: + # Currently we do not support empty join_keys + extra_args["total_idx_count"] = "[red]N/A[/red]" + extra_args["changed_idx_count"] = "[red]N/A[/red]" rprint(to_human_repr(step, extra_args=extra_args)) rprint("") diff --git a/datapipe/compute.py b/datapipe/compute.py index 7255867c..8c0ff0c1 100644 --- a/datapipe/compute.py +++ b/datapipe/compute.py @@ -40,6 +40,13 @@ def get_datatable(self, ds: DataStore, name: str) -> DataTable: return ds.get_or_create_table(name=name, table_store=self.catalog[name].store) +@dataclass +class StepStatus: + name: str + total_idx_count: int + changed_idx_count: int + + class ComputeStep: """ Шаг вычислений в графе вычислений. @@ -91,6 +98,9 @@ def name(self) -> str: def labels(self) -> Labels: return self._labels if self._labels else [] + def get_status(self, ds: DataStore) -> StepStatus: + raise NotImplementedError + # TODO: move to lints def validate(self) -> None: inp_p_keys_arr = [set(inp.primary_keys) for inp in self.input_dts if inp] diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 8a1fd2f6..6ed989ba 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -42,7 +42,7 @@ from sqlalchemy.sql.expression import select from tqdm_loggable.auto import tqdm -from datapipe.compute import Catalog, ComputeStep, PipelineStep +from datapipe.compute import Catalog, ComputeStep, PipelineStep, StepStatus from datapipe.datatable import DataStore, DataTable, MetaTable from datapipe.executor import Executor, ExecutorConfig, SingleThreadExecutor from datapipe.run_config import LabelDict, RunConfig @@ -77,8 +77,7 @@ def __call__( input_dts: List[DataTable], run_config: Optional[RunConfig] = None, kwargs: Optional[Dict[str, Any]] = None, - ) -> TransformResult: - ... + ) -> TransformResult: ... BatchTransformFunc = Callable[..., TransformResult] @@ -517,6 +516,13 @@ def _apply_filters_to_run_config( run_config.filters = filters return run_config + def get_status(self, ds: DataStore) -> StepStatus: + return StepStatus( + name=self.name, + total_idx_count=self.meta_table.get_metadata_size(), + changed_idx_count=self.get_changed_idx_count(ds), + ) + def get_changed_idx_count( self, ds: DataStore, diff --git a/pyproject.toml b/pyproject.toml index 6994ec4e..0b7abbe2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.12" +version = "0.13.12-dev+pipeline-monitoring.1" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" From e5a3f07bf1c86bfc0f2700a6741504c2109838e4 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Fri, 28 Jun 2024 12:49:46 +0400 Subject: [PATCH 29/47] v0.13.13-dev.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 0b7abbe2..c9881680 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.12-dev+pipeline-monitoring.1" +version = "0.13.13-dev.1" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" From 2798e4dbe336cc508a75a9a4b7efdb96ae636501 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sat, 29 Jun 2024 23:42:45 +0400 Subject: [PATCH 30/47] changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 26db31f3..37be34a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# WIP 0.13.13 + +* Add `ComputeStep.get_status` method + # 0.13.12 * Add processing of an empty response in `QdrantStore` From 8f742b6b51e450cf2984038f03ec9d6324eb6636 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sat, 29 Jun 2024 23:53:43 +0400 Subject: [PATCH 31/47] Remove <2.2 restriction for Pandas --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c9881680..d5493f7d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ gcsfs = {version=">=2021.11.1", optional=true} s3fs = {version=">=2021.11.1", optional=true} # TODO Fix incompatibility between sqlalchemy < 2 and pandas 2.2 -pandas = ">=1.2.0, <2.2" +pandas = ">=1.2.0" numpy = ">=1.21.0, <2.0" SQLAlchemy = ">=1.4.25, <3.0.0" From e30b798bd904a3ba618f5a12bee1d6f1c568dc74 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sat, 29 Jun 2024 23:59:57 +0400 Subject: [PATCH 32/47] restrict pandas for sqlalchemy<2 --- .github/workflows/pytest.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index e263e947..e61e1b67 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -55,7 +55,7 @@ jobs: # pip-extra: "sqlalchemy <2" - python-version: "3.8" test-db-env: "postgres" - pip-extra: "sqlalchemy >2" + pip-extra: "'sqlalchemy>2'" # - python-version: "3.8" # test-db-env: "sqlite" # - python-version: "3.9" @@ -68,13 +68,13 @@ jobs: # test-db-env: "sqlite" - python-version: "3.11" test-db-env: "postgres" - pip-extra: "sqlalchemy <2" + pip-extra: "'sqlalchemy<2' 'pandas<2.2'" - python-version: "3.11" test-db-env: "postgres" - pip-extra: "sqlalchemy >2" + pip-extra: "'sqlalchemy>2'" - python-version: "3.11" test-db-env: "sqlite" - pip-extra: "sqlalchemy >2" + pip-extra: "'sqlalchemy>2'" services: # Label used to access the service container @@ -117,7 +117,7 @@ jobs: - name: Install dependencies run: | - pip install "${{ matrix.pip-extra }}" ".[sqlite,excel,milvus,gcsfs,s3fs,redis,qdrant,gcp]" "pytest<8" "pytest_cases" + pip install ${{ matrix.pip-extra }} ".[sqlite,excel,milvus,gcsfs,s3fs,redis,qdrant,gcp]" "pytest<8" "pytest_cases" - name: Test with pytest run: | From 456e0ca055ab5dd26735381d7893b28593957b3b Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 30 Jun 2024 00:03:33 +0400 Subject: [PATCH 33/47] v0.13.13-dev.2 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d5493f7d..f9e53db7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.13-dev.1" +version = "0.13.13-dev.2" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" From 33ffd19abcea1a40eac42d3c472832d2c203a586 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 30 Jun 2024 00:05:01 +0400 Subject: [PATCH 34/47] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37be34a6..c41b6db0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # WIP 0.13.13 * Add `ComputeStep.get_status` method +* Remove restriction for Pandas < 2.2 # 0.13.12 From b63178c55fb7377513a4a3ba52c13065278300de Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 30 Jun 2024 18:04:56 +0400 Subject: [PATCH 35/47] * --- .github/workflows/pytest.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index e61e1b67..c61303c0 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -68,13 +68,13 @@ jobs: # test-db-env: "sqlite" - python-version: "3.11" test-db-env: "postgres" - pip-extra: "'sqlalchemy<2' 'pandas<2.2'" + pip-extra: '"sqlalchemy<2" "pandas<2.2"' - python-version: "3.11" test-db-env: "postgres" - pip-extra: "'sqlalchemy>2'" + pip-extra: '"sqlalchemy>2"' - python-version: "3.11" test-db-env: "sqlite" - pip-extra: "'sqlalchemy>2'" + pip-extra: '"sqlalchemy>2"' services: # Label used to access the service container From e992183b48821f30d60a38aaca68602d443ba1b6 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 30 Jun 2024 18:08:21 +0400 Subject: [PATCH 36/47] v0.13.13 --- CHANGELOG.md | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c41b6db0..f6a6e11f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -# WIP 0.13.13 +# 0.13.13 * Add `ComputeStep.get_status` method * Remove restriction for Pandas < 2.2 diff --git a/pyproject.toml b/pyproject.toml index f9e53db7..341aca12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.13-dev.2" +version = "0.13.13" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" From 2e151b56f3a84dcc9b926b6d15ae0500acee1ecb Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Wed, 10 Jul 2024 12:32:10 +0400 Subject: [PATCH 37/47] Fix Python 3.9 --- .github/workflows/pytest.yaml | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index c61303c0..58f9e712 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -15,7 +15,7 @@ jobs: matrix: python-version: # - "3.8" - # - "3.9" + - "3.9" - "3.10" steps: diff --git a/pyproject.toml b/pyproject.toml index 6762650d..7be085c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ packages = [ ] [tool.poetry.dependencies] -python = ">3.8,<3.12" +python = ">=3.9,<3.12" fsspec = ">=2021.11.1" gcsfs = {version=">=2021.11.1", optional=true} From 2082929f9b6e651c1150066a66c58a58ca15cb50 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Wed, 10 Jul 2024 12:34:34 +0400 Subject: [PATCH 38/47] change test matrix --- .github/workflows/pytest.yaml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index 58f9e712..da5d75ff 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -53,13 +53,14 @@ jobs: # - python-version: "3.8" # test-db-env: "postgres" # pip-extra: "sqlalchemy <2" - - python-version: "3.8" - test-db-env: "postgres" - pip-extra: "'sqlalchemy>2'" # - python-version: "3.8" - # test-db-env: "sqlite" - # - python-version: "3.9" # test-db-env: "postgres" + # pip-extra: "'sqlalchemy>2'" + # - python-version: "3.8" + # test-db-env: "sqlite" + - python-version: "3.9" + test-db-env: "postgres" + pip-extra: '"sqlalchemy>2"' # - python-version: "3.9" # test-db-env: "sqlite" # - python-version: "3.10" From cc149f6ffb89456ce2087c4affbe3cd4e8c6e897 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Wed, 10 Jul 2024 14:05:03 +0400 Subject: [PATCH 39/47] add py.typed marker --- datapipe/py.typed | 0 pyproject.toml | 2 ++ 2 files changed, 2 insertions(+) create mode 100644 datapipe/py.typed diff --git a/datapipe/py.typed b/datapipe/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/pyproject.toml b/pyproject.toml index 7be085c8..270baaf5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,8 @@ packages = [ { include = "datapipe" } ] +include = ["datapipe/py.typed"] + [tool.poetry.dependencies] python = ">=3.9,<3.12" fsspec = ">=2021.11.1" From 1ae371290694bc0cc0b0586424026e938930a93d Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Wed, 24 Jul 2024 12:14:12 +0400 Subject: [PATCH 40/47] add historical changelog entries --- CHANGELOG.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6a6e11f..f1f3b750 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -557,6 +557,28 @@ Стало: `DataTable(name, meta_store.create_meta_table(name), data_store)` +# 0.5.0 + +* Добавился прототип debug ui. Запускается с помощью команды `python script.py ui` + +# 0.4.0 + +**Значимый рефакторинг** + +* Корневой модуль переименован `c12n_pipe` → `datapipe` +* Реализован новый модуль `datapipe.dsl` для декларативного описания пайплайнов. + Пример использования: `examples/goal_v0.4.0.py` +* Описание пайплайна отделено от способа выполнения → есть возможность написать + конвертер в другой рантайм, например Prefect +* Переписана логика работы `MetaStore`, теперь всеми мета-таблицами `MetaStore` + владеет в явном виде, а потребители информации обращаются к ним по имени +* Реализовано хранение данных таблиц в отделяемых хранилищах (хранение в БД - один + из способов) + * `database.TableStoreDB` - хранение данных в таблице БД + * `filedir.TableStoreFiledir` - хранение данных в виде папки с файлами (плюс + описаны адаптеры для разных типов файлов) + * `pandas.TableStoreJsonLine` - хранение данных в виде одного jsonline файла + # 0.2.0 (2021-02-01) - Add major code with Label Studio implementation - Add Data cataloges and Nodes From a1497119088684ff89fc938df4585668368bae77 Mon Sep 17 00:00:00 2001 From: TSheyd Date: Fri, 9 Aug 2024 14:57:55 +0300 Subject: [PATCH 41/47] fix: passed delete_stale to do_batch_generate --- datapipe/step/batch_generate.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datapipe/step/batch_generate.py b/datapipe/step/batch_generate.py index b782330a..9285a88c 100644 --- a/datapipe/step/batch_generate.py +++ b/datapipe/step/batch_generate.py @@ -99,12 +99,13 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: name=self.func.__name__, func=cast( DatatableTransformFunc, - lambda ds, input_dts, output_dts, run_config, kwargs: do_batch_generate( + lambda ds, input_dts, output_dts, run_config, kwargs, delete_stale: do_batch_generate( func=self.func, ds=ds, output_dts=output_dts, run_config=run_config, kwargs=kwargs, + delete_stale=delete_stale, ), ), input_dts=[], From 6c79566133dbe87b806d89534ca020a6975b8dd5 Mon Sep 17 00:00:00 2001 From: TSheyd Date: Fri, 9 Aug 2024 18:10:17 +0300 Subject: [PATCH 42/47] fix: passed delete_stale to do_batch_generate --- datapipe/step/batch_generate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datapipe/step/batch_generate.py b/datapipe/step/batch_generate.py index 9285a88c..374b7772 100644 --- a/datapipe/step/batch_generate.py +++ b/datapipe/step/batch_generate.py @@ -28,8 +28,8 @@ def do_batch_generate( ds: DataStore, output_dts: List[DataTable], run_config: Optional[RunConfig] = None, - kwargs: Optional[Dict] = None, delete_stale: bool = True, + kwargs: Optional[Dict] = None, ) -> None: """ Создание новой таблицы из результатов запуска `proc_func`. @@ -99,13 +99,13 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: name=self.func.__name__, func=cast( DatatableTransformFunc, - lambda ds, input_dts, output_dts, run_config, kwargs, delete_stale: do_batch_generate( + lambda ds, input_dts, output_dts, run_config, kwargs: do_batch_generate( func=self.func, ds=ds, output_dts=output_dts, run_config=run_config, + delete_stale=self.delete_stale, kwargs=kwargs, - delete_stale=delete_stale, ), ), input_dts=[], From c001d6a949dac8b122be268e6d15dcaa85e6d71c Mon Sep 17 00:00:00 2001 From: TSheyd Date: Sun, 11 Aug 2024 00:15:07 +0300 Subject: [PATCH 43/47] tests: added tests for delete_stale arg in BatchGenerate --- tests/test_chunked_processing_pipeline.py | 71 +++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/test_chunked_processing_pipeline.py b/tests/test_chunked_processing_pipeline.py index 34402cf4..969efa83 100644 --- a/tests/test_chunked_processing_pipeline.py +++ b/tests/test_chunked_processing_pipeline.py @@ -527,3 +527,74 @@ def transform(df, idx, ds, run_config, transform_count): changelist = ChangeList.create("inp", change_idx) run_steps_changelist(ds, steps, changelist, RunConfig()) assert transform_count["value"] == 2 + + +def test_stale_records_with_batch_generate(dbconn): + ds = DataStore(dbconn, create_meta_table=True) + catalog = Catalog( + { + "inp_keep": Table( + store=TableStoreDB( + dbconn=dbconn, + name="inp_data_keep", + data_sql_schema=TEST_SCHEMA, + create_table=True, + ) + ), + "inp_del": Table( + store=TableStoreDB( + dbconn=dbconn, + name="inp_data_del", + data_sql_schema=TEST_SCHEMA, + create_table=True, + ) + ), + } + ) + + bg_count = {"value": 0} + bg_chunk_size = 5 + + def add_inp_table(ds: DataStore, bg_count): + assert isinstance(ds, DataStore) + bg_count["value"] += 1 + yield pd.DataFrame( + { + "id": range(bg_count["value"]*bg_chunk_size, (bg_count["value"]+1) * bg_chunk_size), + "a": range(bg_count["value"]*bg_chunk_size, (bg_count["value"]+1) * bg_chunk_size), + } + ) + + pipeline = Pipeline( + [ + BatchGenerate( + func=add_inp_table, + outputs=["inp_keep"], + delete_stale=False, # keeps records that are not yielded by func + kwargs=dict(bg_count=bg_count), + ), + BatchGenerate( + func=add_inp_table, + outputs=["inp_del"], + delete_stale=True, # deletes records that are not yielded by func + kwargs=dict(bg_count={"value": 0}), # to avoid double counting + ), + ] + ) + + steps = build_compute(ds, catalog, pipeline) + + # First run + run_steps(ds, steps) + + # Second run + run_steps(ds, steps) + + # Check table shapes + df_keep = catalog.get_datatable(ds, "inp_keep").get_data() + df_del = catalog.get_datatable(ds, "inp_del").get_data() + + assert df_keep.shape[0] == bg_count["value"] * bg_chunk_size + assert df_del.shape[0] == bg_chunk_size + # additionally, check that delete_stale=True deletes previous chunks and keeps the last one + assert df_del.iloc[0]['id'] == bg_chunk_size*bg_count["value"] From 02be7381e82eee6ffcfe8e3cdc36ea86fa05fcff Mon Sep 17 00:00:00 2001 From: TSheyd Date: Sun, 11 Aug 2024 00:29:02 +0300 Subject: [PATCH 44/47] updated CHANGELOG.md --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1f3b750..e881a9bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.13.14 + +* Fix [#334](https://github.com/epoch8/datapipe/issues/334) + # 0.13.13 * Add `ComputeStep.get_status` method From 5b709a9b66b6504281eda95688a63180f9db7be5 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 11 Aug 2024 11:19:21 +0400 Subject: [PATCH 45/47] v0.13.14 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 270baaf5..cd15f665 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.13" +version = "0.13.14" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" From 7f7a3a2c865da9b939165dba74405fbfeb6e87fe Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 11 Aug 2024 11:22:58 +0400 Subject: [PATCH 46/47] split batch generate test in two --- tests/test_chunked_processing_pipeline.py | 83 ++++++++++++++++++----- 1 file changed, 66 insertions(+), 17 deletions(-) diff --git a/tests/test_chunked_processing_pipeline.py b/tests/test_chunked_processing_pipeline.py index 969efa83..6d8f25dd 100644 --- a/tests/test_chunked_processing_pipeline.py +++ b/tests/test_chunked_processing_pipeline.py @@ -529,22 +529,75 @@ def transform(df, idx, ds, run_config, transform_count): assert transform_count["value"] == 2 -def test_stale_records_with_batch_generate(dbconn): +def test_stale_records_deletion_with_batch_generate(dbconn): ds = DataStore(dbconn, create_meta_table=True) catalog = Catalog( { - "inp_keep": Table( + "inp_del": Table( store=TableStoreDB( dbconn=dbconn, - name="inp_data_keep", + name="inp_data_del", data_sql_schema=TEST_SCHEMA, create_table=True, ) ), - "inp_del": Table( + } + ) + + bg_count = {"value": 0} + bg_chunk_size = 5 + + def add_inp_table(ds: DataStore, bg_count): + assert isinstance(ds, DataStore) + bg_count["value"] += 1 + yield pd.DataFrame( + { + "id": range( + bg_count["value"] * bg_chunk_size, + (bg_count["value"] + 1) * bg_chunk_size, + ), + "a": range( + bg_count["value"] * bg_chunk_size, + (bg_count["value"] + 1) * bg_chunk_size, + ), + } + ) + + pipeline = Pipeline( + [ + BatchGenerate( + func=add_inp_table, + outputs=["inp_del"], + delete_stale=True, # Default behavior, deletes records that are not yielded by func + kwargs=dict(bg_count={"value": 0}), # to avoid double counting + ), + ] + ) + + steps = build_compute(ds, catalog, pipeline) + + # First run + run_steps(ds, steps) + + # Second run + run_steps(ds, steps) + + # Check table shapes + df_del = catalog.get_datatable(ds, "inp_del").get_data() + + assert df_del.shape[0] == bg_chunk_size + # additionally, check that delete_stale=True deletes previous chunks and keeps the last one + assert df_del.iloc[0]["id"] == bg_chunk_size * bg_count["value"] + + +def test_stale_records_keep_with_batch_generate(dbconn): + ds = DataStore(dbconn, create_meta_table=True) + catalog = Catalog( + { + "inp_keep": Table( store=TableStoreDB( dbconn=dbconn, - name="inp_data_del", + name="inp_data_keep", data_sql_schema=TEST_SCHEMA, create_table=True, ) @@ -560,8 +613,14 @@ def add_inp_table(ds: DataStore, bg_count): bg_count["value"] += 1 yield pd.DataFrame( { - "id": range(bg_count["value"]*bg_chunk_size, (bg_count["value"]+1) * bg_chunk_size), - "a": range(bg_count["value"]*bg_chunk_size, (bg_count["value"]+1) * bg_chunk_size), + "id": range( + bg_count["value"] * bg_chunk_size, + (bg_count["value"] + 1) * bg_chunk_size, + ), + "a": range( + bg_count["value"] * bg_chunk_size, + (bg_count["value"] + 1) * bg_chunk_size, + ), } ) @@ -573,12 +632,6 @@ def add_inp_table(ds: DataStore, bg_count): delete_stale=False, # keeps records that are not yielded by func kwargs=dict(bg_count=bg_count), ), - BatchGenerate( - func=add_inp_table, - outputs=["inp_del"], - delete_stale=True, # deletes records that are not yielded by func - kwargs=dict(bg_count={"value": 0}), # to avoid double counting - ), ] ) @@ -592,9 +645,5 @@ def add_inp_table(ds: DataStore, bg_count): # Check table shapes df_keep = catalog.get_datatable(ds, "inp_keep").get_data() - df_del = catalog.get_datatable(ds, "inp_del").get_data() assert df_keep.shape[0] == bg_count["value"] * bg_chunk_size - assert df_del.shape[0] == bg_chunk_size - # additionally, check that delete_stale=True deletes previous chunks and keeps the last one - assert df_del.iloc[0]['id'] == bg_chunk_size*bg_count["value"] From e2f8aeeafb1aea4f806b2a6ed681bd7722f5d4d2 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 11 Aug 2024 11:27:13 +0400 Subject: [PATCH 47/47] fix test --- tests/test_chunked_processing_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_chunked_processing_pipeline.py b/tests/test_chunked_processing_pipeline.py index 6d8f25dd..39878a8e 100644 --- a/tests/test_chunked_processing_pipeline.py +++ b/tests/test_chunked_processing_pipeline.py @@ -569,7 +569,7 @@ def add_inp_table(ds: DataStore, bg_count): func=add_inp_table, outputs=["inp_del"], delete_stale=True, # Default behavior, deletes records that are not yielded by func - kwargs=dict(bg_count={"value": 0}), # to avoid double counting + kwargs=dict(bg_count=bg_count), # to avoid double counting ), ] )