From b228ca0b60c29ecf8f886589aa4de62b2fd95b68 Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sat, 10 Aug 2024 10:50:23 +0400
Subject: [PATCH 01/12] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8?=
=?UTF-8?q?=D1=82=D1=8C=20=D0=B2=D0=BE=D0=B7=D0=BC=D0=BE=D0=B6=D0=BD=D0=BE?=
=?UTF-8?q?=D1=81=D1=82=D1=8C=20=D0=BD=D0=B0=D0=BF=D1=80=D1=8F=D0=BC=D1=83?=
=?UTF-8?q?=D1=8E=20=D1=83=D0=BA=D0=B0=D0=B7=D0=B0=D1=82=D1=8C=20SQLA=20?=
=?UTF-8?q?=D1=82=D0=B0=D0=B1=D0=BB=D0=B8=D1=86=D1=83=20=D0=B2=D0=BC=D0=B5?=
=?UTF-8?q?=D1=81=D1=82=D0=BE=20=D1=81=D1=85=D0=B5=D0=BC=D1=8B=20=D0=B2=20?=
=?UTF-8?q?TableStoreDB=20Fixes=20#332?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
datapipe/datatable.py | 11 +++-
datapipe/store/database.py | 80 ++++++++++++++++++-----
examples/datatable_batch_transform/app.py | 36 ++++++----
examples/model_inference/app.py | 16 ++---
4 files changed, 104 insertions(+), 39 deletions(-)
diff --git a/datapipe/datatable.py b/datapipe/datatable.py
index 6f34215a..4e6ea659 100644
--- a/datapipe/datatable.py
+++ b/datapipe/datatable.py
@@ -518,7 +518,16 @@ def store_chunk(
with tracer.start_as_current_span("store metadata"):
self.meta_table.update_rows(
- cast(MetadataDF, pd.concat([new_meta_df, changed_meta_df]))
+ cast(
+ MetadataDF,
+ pd.concat(
+ [
+ df
+ for df in [new_meta_df, changed_meta_df]
+ if not df.empty
+ ]
+ ),
+ )
)
if not new_df.empty:
diff --git a/datapipe/store/database.py b/datapipe/store/database.py
index cbfbb4f1..3047dac7 100644
--- a/datapipe/store/database.py
+++ b/datapipe/store/database.py
@@ -1,12 +1,24 @@
import copy
import logging
import math
-from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union, cast
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterator,
+ List,
+ Optional,
+ Tuple,
+ Type,
+ Union,
+ cast,
+)
import numpy as np
import pandas as pd
from opentelemetry import trace
-from sqlalchemy import Column, MetaData, Table, create_engine, func, text
+from sqlalchemy import Column, FromClause, MetaData, Table, create_engine, func, text
+from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.pool import QueuePool, SingletonThreadPool
from sqlalchemy.schema import SchemaItem
from sqlalchemy.sql.base import SchemaEventTarget
@@ -27,15 +39,17 @@ def __init__(
connstr: str,
schema: Optional[str] = None,
create_engine_kwargs: Optional[Dict[str, Any]] = None,
+ sqla_metadata: Optional[MetaData] = None,
):
create_engine_kwargs = create_engine_kwargs or {}
- self._init(connstr, schema, create_engine_kwargs)
+ self._init(connstr, schema, create_engine_kwargs, sqla_metadata)
def _init(
self,
connstr: str,
schema: Optional[str],
create_engine_kwargs: Dict[str, Any],
+ sqla_metadata: Optional[MetaData] = None,
) -> None:
self.connstr = connstr
self.schema = schema
@@ -75,11 +89,14 @@ def _init(
poolclass=QueuePool,
pool_pre_ping=True,
pool_recycle=3600,
- **create_engine_kwargs
+ **create_engine_kwargs,
# pool_size=25,
)
- self.sqla_metadata = MetaData(schema=schema)
+ if sqla_metadata is None:
+ self.sqla_metadata = MetaData(schema=schema)
+ else:
+ self.sqla_metadata = sqla_metadata
def __reduce__(self) -> Tuple[Any, ...]:
return self.__class__, (
@@ -119,28 +136,55 @@ class TableStoreDB(TableStore):
def __init__(
self,
dbconn: Union["DBConn", str],
- name: str,
- data_sql_schema: List[Column],
+ name: Optional[str] = None,
+ data_sql_schema: Optional[List[Column]] = None,
+ orm_table: Optional[Type[DeclarativeBase]] = None,
create_table: bool = False,
) -> None:
if isinstance(dbconn, str):
self.dbconn = DBConn(dbconn)
else:
self.dbconn = dbconn
- self.name = name
- self.data_sql_schema = data_sql_schema
+ if orm_table is not None:
+ assert name is None, "name should be None if orm_table is provided"
+ assert (
+ data_sql_schema is None
+ ), "data_sql_schema should be None if orm_table is provided"
- self.data_keys = [
- column.name for column in self.data_sql_schema if not column.primary_key
- ]
+ self.data_table = cast(Table, orm_table.__table__)
- self.data_table = Table(
- self.name,
- self.dbconn.sqla_metadata,
- *[copy.copy(i) for i in self.data_sql_schema],
- extend_existing=True,
- )
+ self.name = self.data_table.name
+
+ self.data_sql_schema = [
+ copy.copy(column) for column in self.data_table.columns
+ ]
+ self.data_keys = [
+ column.name for column in self.data_sql_schema if not column.primary_key
+ ]
+
+ else:
+ assert (
+ name is not None
+ ), "name should be provided if data_table is not provided"
+ assert (
+ data_sql_schema is not None
+ ), "data_sql_schema should be provided if data_table is not provided"
+
+ self.name = name
+
+ self.data_sql_schema = data_sql_schema
+
+ self.data_keys = [
+ column.name for column in self.data_sql_schema if not column.primary_key
+ ]
+
+ self.data_table = Table(
+ self.name,
+ self.dbconn.sqla_metadata,
+ *[copy.copy(i) for i in self.data_sql_schema],
+ extend_existing=True,
+ )
if create_table:
self.data_table.create(self.dbconn.con, checkfirst=True)
diff --git a/examples/datatable_batch_transform/app.py b/examples/datatable_batch_transform/app.py
index b44f94ff..b2f8279a 100644
--- a/examples/datatable_batch_transform/app.py
+++ b/examples/datatable_batch_transform/app.py
@@ -3,6 +3,7 @@
import numpy as np
import pandas as pd
from sqlalchemy import Integer
+from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.sql import functions, select
from sqlalchemy.sql.schema import Column
@@ -14,7 +15,26 @@
from datapipe.store.database import DBConn, TableStoreDB
from datapipe.types import IndexDF
-dbconn = DBConn("sqlite+pysqlite3:///db.sqlite")
+
+class Base(DeclarativeBase):
+ pass
+
+
+dbconn = DBConn("sqlite+pysqlite3:///db.sqlite", sqla_metadata=Base.metadata)
+
+
+class Input(Base):
+ __tablename__ = "input"
+
+ group_id = Column(Integer, primary_key=True)
+ item_id = Column(Integer, primary_key=True)
+
+
+class Output(Base):
+ __tablename__ = "output"
+
+ group_id = Column(Integer, primary_key=True)
+ count = Column(Integer)
catalog = Catalog(
@@ -22,22 +42,14 @@
"input": Table(
store=TableStoreDB(
dbconn=dbconn,
- name="input",
- data_sql_schema=[
- Column("group_id", Integer, primary_key=True),
- Column("item_id", Integer, primary_key=True),
- ],
+ orm_table=Input,
)
),
"result": Table(
store=TableStoreDB(
dbconn=dbconn,
- name="output",
- data_sql_schema=[
- Column("group_id", Integer, primary_key=True),
- Column("count", Integer),
- ],
- )
+ orm_table=Output,
+ ),
),
}
)
diff --git a/examples/model_inference/app.py b/examples/model_inference/app.py
index 50fb290b..6ac87f70 100644
--- a/examples/model_inference/app.py
+++ b/examples/model_inference/app.py
@@ -1,5 +1,5 @@
import pandas as pd
-from sqlalchemy import Column, Integer, String
+import sqlalchemy as sa
from datapipe.compute import Catalog, DatapipeApp, Pipeline, Table
from datapipe.datatable import DataStore
@@ -36,8 +36,8 @@ def apply_model(input_df: pd.DataFrame, model_df: pd.DataFrame) -> pd.DataFrame:
store=TableStoreJsonLine(
filename="input.jsonline",
primary_schema=[
- Column("pipeline_id", String, primary_key=True),
- Column("input_id", Integer, primary_key=True),
+ sa.Column("pipeline_id", sa.String, primary_key=True),
+ sa.Column("input_id", sa.Integer, primary_key=True),
],
)
),
@@ -45,8 +45,8 @@ def apply_model(input_df: pd.DataFrame, model_df: pd.DataFrame) -> pd.DataFrame:
store=TableStoreJsonLine(
filename="models.jsonline",
primary_schema=[
- Column("pipeline_id", String, primary_key=True),
- Column("model_id", String, primary_key=True),
+ sa.Column("pipeline_id", sa.String, primary_key=True),
+ sa.Column("model_id", sa.String, primary_key=True),
],
)
),
@@ -54,9 +54,9 @@ def apply_model(input_df: pd.DataFrame, model_df: pd.DataFrame) -> pd.DataFrame:
store=TableStoreJsonLine(
filename="output.jsonline",
primary_schema=[
- Column("pipeline_id", String, primary_key=True),
- Column("input_id", Integer, primary_key=True),
- Column("model_id", String, primary_key=True),
+ sa.Column("pipeline_id", sa.String, primary_key=True),
+ sa.Column("input_id", sa.Integer, primary_key=True),
+ sa.Column("model_id", sa.String, primary_key=True),
],
)
),
From cdd021de4caa97a13bec30f8b2527d7cefbd60fd Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sat, 10 Aug 2024 11:17:02 +0400
Subject: [PATCH 02/12] fix data_sql_schema serialization
---
datapipe/store/database.py | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/datapipe/store/database.py b/datapipe/store/database.py
index 3047dac7..9b73fd2a 100644
--- a/datapipe/store/database.py
+++ b/datapipe/store/database.py
@@ -157,7 +157,15 @@ def __init__(
self.name = self.data_table.name
self.data_sql_schema = [
- copy.copy(column) for column in self.data_table.columns
+ Column(
+ column.name,
+ column.type,
+ primary_key=column.primary_key,
+ nullable=column.nullable,
+ unique=column.unique,
+ *column.constraints,
+ )
+ for column in self.data_table.columns
]
self.data_keys = [
column.name for column in self.data_sql_schema if not column.primary_key
From 61d1ac40acc482b324828899361b9b033c212e2c Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sat, 10 Aug 2024 11:30:48 +0400
Subject: [PATCH 03/12] remove some incompatibility with sqla 1.4
---
datapipe/store/database.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/datapipe/store/database.py b/datapipe/store/database.py
index 9b73fd2a..ab08c1e7 100644
--- a/datapipe/store/database.py
+++ b/datapipe/store/database.py
@@ -17,7 +17,7 @@
import numpy as np
import pandas as pd
from opentelemetry import trace
-from sqlalchemy import Column, FromClause, MetaData, Table, create_engine, func, text
+from sqlalchemy import Column, MetaData, Table, create_engine, func, text
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.pool import QueuePool, SingletonThreadPool
from sqlalchemy.schema import SchemaItem
From 48a966687c5b70c6a57424839f110605251d8503 Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sat, 10 Aug 2024 11:38:03 +0400
Subject: [PATCH 04/12] fix for sqla1.4
---
datapipe/store/database.py | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/datapipe/store/database.py b/datapipe/store/database.py
index ab08c1e7..6d5cf26a 100644
--- a/datapipe/store/database.py
+++ b/datapipe/store/database.py
@@ -18,7 +18,7 @@
import pandas as pd
from opentelemetry import trace
from sqlalchemy import Column, MetaData, Table, create_engine, func, text
-from sqlalchemy.orm import DeclarativeBase
+from sqlalchemy.orm import DeclarativeMeta
from sqlalchemy.pool import QueuePool, SingletonThreadPool
from sqlalchemy.schema import SchemaItem
from sqlalchemy.sql.base import SchemaEventTarget
@@ -138,7 +138,7 @@ def __init__(
dbconn: Union["DBConn", str],
name: Optional[str] = None,
data_sql_schema: Optional[List[Column]] = None,
- orm_table: Optional[Type[DeclarativeBase]] = None,
+ orm_table: Optional[Type[DeclarativeMeta]] = None,
create_table: bool = False,
) -> None:
if isinstance(dbconn, str):
@@ -152,7 +152,8 @@ def __init__(
data_sql_schema is None
), "data_sql_schema should be None if orm_table is provided"
- self.data_table = cast(Table, orm_table.__table__)
+ orm_table__table = orm_table.__table__ # type: ignore
+ self.data_table = cast(Table, orm_table__table)
self.name = self.data_table.name
From 27f3175d5834b9b5cd8067348a5458f6c7705e15 Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sat, 10 Aug 2024 11:47:11 +0400
Subject: [PATCH 05/12] fix compatibility with existing code
---
datapipe/store/database.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/datapipe/store/database.py b/datapipe/store/database.py
index 6d5cf26a..3449ec96 100644
--- a/datapipe/store/database.py
+++ b/datapipe/store/database.py
@@ -138,8 +138,8 @@ def __init__(
dbconn: Union["DBConn", str],
name: Optional[str] = None,
data_sql_schema: Optional[List[Column]] = None,
- orm_table: Optional[Type[DeclarativeMeta]] = None,
create_table: bool = False,
+ orm_table: Optional[Type[DeclarativeMeta]] = None,
) -> None:
if isinstance(dbconn, str):
self.dbconn = DBConn(dbconn)
From ee3d043739bfdc8a16ae5b8c0c02d9d567b8ed05 Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sat, 10 Aug 2024 11:47:32 +0400
Subject: [PATCH 06/12] *
---
.vscode/tasks.json | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
index 0423de22..29b244c4 100644
--- a/.vscode/tasks.json
+++ b/.vscode/tasks.json
@@ -6,7 +6,7 @@
{
"label": "mypy-whole-project",
"type": "shell",
- "command": "source .venv/bin/activate; mypy -p datapipe --show-column-numbers --show-error-codes --ignore-missing-imports --namespace-packages",
+ "command": "poetry run mypy -p datapipe --show-column-numbers --show-error-codes --ignore-missing-imports --namespace-packages",
"presentation": {
"echo": true,
"reveal": "never",
From ba5de484223815c62174fea960eba57af5fecb80 Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sat, 10 Aug 2024 12:27:01 +0400
Subject: [PATCH 07/12] add ability to pass SqlAlchemy ORM tables instead of
names from catalog
---
datapipe/compute.py | 30 ++++++++++++++++++++---
datapipe/step/batch_generate.py | 4 +--
datapipe/step/batch_transform.py | 9 ++++---
datapipe/step/datatable_transform.py | 9 +++----
datapipe/store/database.py | 18 +++-----------
datapipe/types.py | 13 +++++++++-
examples/datatable_batch_transform/app.py | 30 ++++-------------------
7 files changed, 58 insertions(+), 55 deletions(-)
diff --git a/datapipe/compute.py b/datapipe/compute.py
index 8c0ff0c1..eb0013cf 100644
--- a/datapipe/compute.py
+++ b/datapipe/compute.py
@@ -9,8 +9,9 @@
from datapipe.datatable import DataStore, DataTable
from datapipe.executor import Executor, ExecutorConfig
from datapipe.run_config import RunConfig
+from datapipe.store.database import TableStoreDB
from datapipe.store.table_store import TableStore
-from datapipe.types import ChangeList, IndexDF, Labels
+from datapipe.types import ChangeList, IndexDF, Labels, OrmTableOrName
logger = logging.getLogger("datapipe.compute")
tracer = trace.get_tracer("datapipe.compute")
@@ -36,8 +37,31 @@ def init_all_tables(self, ds: DataStore):
for name in self.catalog.keys():
self.get_datatable(ds, name)
- def get_datatable(self, ds: DataStore, name: str) -> DataTable:
- return ds.get_or_create_table(name=name, table_store=self.catalog[name].store)
+ def get_datatable(self, ds: DataStore, table: OrmTableOrName) -> DataTable:
+ if isinstance(table, str):
+ assert table in self.catalog, f"Table {table} not found in catalog"
+ return ds.get_or_create_table(
+ name=table, table_store=self.catalog[table].store
+ )
+ else:
+ table_store = TableStoreDB(ds.meta_dbconn, orm_table=table)
+ if table_store.name not in self.catalog:
+ self.add_datatable(table_store.name, Table(store=table_store))
+ else:
+ existing_table_store = self.catalog[table_store.name].store
+ assert isinstance(existing_table_store, TableStoreDB), (
+ f"Table {table_store.name} already exists in catalog "
+ f"with different store {existing_table_store}"
+ )
+
+ assert existing_table_store.data_table == table.__table__, ( # type: ignore
+ f"Table {table_store.name} already exists in catalog "
+ f"with different orm_table {existing_table_store.data_table}"
+ )
+
+ return ds.get_or_create_table(
+ name=table_store.name, table_store=table_store
+ )
@dataclass
diff --git a/datapipe/step/batch_generate.py b/datapipe/step/batch_generate.py
index b782330a..28e801f0 100644
--- a/datapipe/step/batch_generate.py
+++ b/datapipe/step/batch_generate.py
@@ -14,7 +14,7 @@
DatatableTransformFunc,
DatatableTransformStep,
)
-from datapipe.types import Labels, TransformResult, cast
+from datapipe.types import Labels, OrmTableOrName, TransformResult, cast
logger = logging.getLogger("datapipe.step.batch_generate")
tracer = trace.get_tracer("datapipe.step.batch_generate")
@@ -88,7 +88,7 @@ def do_batch_generate(
@dataclass
class BatchGenerate(PipelineStep):
func: BatchGenerateFunc
- outputs: List[str]
+ outputs: List[OrmTableOrName]
kwargs: Optional[Dict] = None
labels: Optional[Labels] = None
delete_stale: bool = True
diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py
index 6ed989ba..d1aee88e 100644
--- a/datapipe/step/batch_transform.py
+++ b/datapipe/step/batch_transform.py
@@ -58,6 +58,7 @@
IndexDF,
Labels,
MetaSchema,
+ OrmTableOrName,
TransformResult,
data_to_index,
)
@@ -865,8 +866,8 @@ def run_idx(
@dataclass
class DatatableBatchTransform(PipelineStep):
func: DatatableBatchTransformFunc
- inputs: List[str]
- outputs: List[str]
+ inputs: List[OrmTableOrName]
+ outputs: List[OrmTableOrName]
chunk_size: int = 1000
transform_keys: Optional[List[str]] = None
kwargs: Optional[Dict] = None
@@ -935,8 +936,8 @@ def process_batch_dts(
@dataclass
class BatchTransform(PipelineStep):
func: BatchTransformFunc
- inputs: List[str]
- outputs: List[str]
+ inputs: List[OrmTableOrName]
+ outputs: List[OrmTableOrName]
chunk_size: int = 1000
kwargs: Optional[Dict[str, Any]] = None
transform_keys: Optional[List[str]] = None
diff --git a/datapipe/step/datatable_transform.py b/datapipe/step/datatable_transform.py
index 93967e66..feacd013 100644
--- a/datapipe/step/datatable_transform.py
+++ b/datapipe/step/datatable_transform.py
@@ -8,7 +8,7 @@
from datapipe.datatable import DataStore, DataTable
from datapipe.executor import Executor
from datapipe.run_config import RunConfig
-from datapipe.types import Labels
+from datapipe.types import Labels, OrmTableOrName
logger = logging.getLogger("datapipe.step.datatable_transform")
tracer = trace.get_tracer("datapipe.step.datatable_transform")
@@ -25,8 +25,7 @@ def __call__(
run_config: Optional[RunConfig],
# Возможно, лучше передавать как переменную, а не **
**kwargs,
- ) -> None:
- ...
+ ) -> None: ...
class DatatableTransformStep(ComputeStep):
@@ -92,8 +91,8 @@ def run_full(
@dataclass
class DatatableTransform(PipelineStep):
func: DatatableTransformFunc
- inputs: List[str]
- outputs: List[str]
+ inputs: List[OrmTableOrName]
+ outputs: List[OrmTableOrName]
check_for_changes: bool = True
kwargs: Optional[Dict[str, Any]] = None
labels: Optional[Labels] = None
diff --git a/datapipe/store/database.py b/datapipe/store/database.py
index 3449ec96..cac2c7a1 100644
--- a/datapipe/store/database.py
+++ b/datapipe/store/database.py
@@ -1,24 +1,12 @@
import copy
import logging
import math
-from typing import (
- Any,
- Callable,
- Dict,
- Iterator,
- List,
- Optional,
- Tuple,
- Type,
- Union,
- cast,
-)
+from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union, cast
import numpy as np
import pandas as pd
from opentelemetry import trace
from sqlalchemy import Column, MetaData, Table, create_engine, func, text
-from sqlalchemy.orm import DeclarativeMeta
from sqlalchemy.pool import QueuePool, SingletonThreadPool
from sqlalchemy.schema import SchemaItem
from sqlalchemy.sql.base import SchemaEventTarget
@@ -27,7 +15,7 @@
from datapipe.run_config import RunConfig
from datapipe.sql_util import sql_apply_idx_filter_to_table, sql_apply_runconfig_filter
from datapipe.store.table_store import TableStore
-from datapipe.types import DataDF, DataSchema, IndexDF, MetaSchema, TAnyDF
+from datapipe.types import DataDF, DataSchema, IndexDF, MetaSchema, OrmTable, TAnyDF
logger = logging.getLogger("datapipe.store.database")
tracer = trace.get_tracer("datapipe.store.database")
@@ -139,7 +127,7 @@ def __init__(
name: Optional[str] = None,
data_sql_schema: Optional[List[Column]] = None,
create_table: bool = False,
- orm_table: Optional[Type[DeclarativeMeta]] = None,
+ orm_table: Optional[OrmTable] = None,
) -> None:
if isinstance(dbconn, str):
self.dbconn = DBConn(dbconn)
diff --git a/datapipe/types.py b/datapipe/types.py
index 6fcaad9c..b6fd6aeb 100644
--- a/datapipe/types.py
+++ b/datapipe/types.py
@@ -2,7 +2,7 @@
import itertools
from dataclasses import dataclass, field
-from typing import Callable, Dict, List, NewType, Set, Tuple, TypeVar, Union, cast
+from typing import Callable, Dict, List, NewType, Set, Tuple, Type, TypeVar, Union, cast
import pandas as pd
from sqlalchemy import Column
@@ -26,6 +26,17 @@
TransformResult = Union[DataDF, List[DataDF], Tuple[DataDF, ...]]
+try:
+ from sqlalchemy.orm import DeclarativeBase
+
+ OrmTable = Type[DeclarativeBase]
+except ImportError:
+ from sqlalchemy.orm import DeclarativeMeta
+
+ OrmTable = Type[DeclarativeMeta] # type: ignore
+
+OrmTableOrName = Union[str, OrmTable]
+
@dataclass
class ChangeList:
diff --git a/examples/datatable_batch_transform/app.py b/examples/datatable_batch_transform/app.py
index b2f8279a..cc3b0779 100644
--- a/examples/datatable_batch_transform/app.py
+++ b/examples/datatable_batch_transform/app.py
@@ -20,9 +20,6 @@ class Base(DeclarativeBase):
pass
-dbconn = DBConn("sqlite+pysqlite3:///db.sqlite", sqla_metadata=Base.metadata)
-
-
class Input(Base):
__tablename__ = "input"
@@ -37,24 +34,6 @@ class Output(Base):
count = Column(Integer)
-catalog = Catalog(
- {
- "input": Table(
- store=TableStoreDB(
- dbconn=dbconn,
- orm_table=Input,
- )
- ),
- "result": Table(
- store=TableStoreDB(
- dbconn=dbconn,
- orm_table=Output,
- ),
- ),
- }
-)
-
-
def generate_data():
# N = 10_000
N = 100
@@ -108,17 +87,18 @@ def count_tbl(
[
BatchGenerate(
generate_data,
- outputs=["input"],
+ outputs=[Input],
),
DatatableBatchTransform(
count_tbl,
- inputs=["input"],
- outputs=["result"],
+ inputs=[Input],
+ outputs=[Output],
),
]
)
+dbconn = DBConn("sqlite+pysqlite3:///db.sqlite", sqla_metadata=Base.metadata)
ds = DataStore(dbconn)
-app = DatapipeApp(ds, catalog, pipeline)
+app = DatapipeApp(ds=ds, catalog=Catalog({}), pipeline=pipeline)
From acfee8ef8c6ec7ec29c7b7d94b2e853495087287 Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sat, 10 Aug 2024 13:23:48 +0400
Subject: [PATCH 08/12] translate example to idiomatic sqla 2
---
examples/datatable_batch_transform/app.py | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/examples/datatable_batch_transform/app.py b/examples/datatable_batch_transform/app.py
index cc3b0779..c8134ce0 100644
--- a/examples/datatable_batch_transform/app.py
+++ b/examples/datatable_batch_transform/app.py
@@ -3,7 +3,7 @@
import numpy as np
import pandas as pd
from sqlalchemy import Integer
-from sqlalchemy.orm import DeclarativeBase
+from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql import functions, select
from sqlalchemy.sql.schema import Column
@@ -23,15 +23,15 @@ class Base(DeclarativeBase):
class Input(Base):
__tablename__ = "input"
- group_id = Column(Integer, primary_key=True)
- item_id = Column(Integer, primary_key=True)
+ group_id: Mapped[int] = mapped_column(primary_key=True)
+ item_id: Mapped[int] = mapped_column(primary_key=True)
class Output(Base):
__tablename__ = "output"
- group_id = Column(Integer, primary_key=True)
- count = Column(Integer)
+ group_id: Mapped[int] = mapped_column(primary_key=True)
+ count: Mapped[int]
def generate_data():
From 2d77d80b86f56a7f28462e5d3330bf26cfbb72d5 Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sat, 10 Aug 2024 13:24:01 +0400
Subject: [PATCH 09/12] better logging on error
---
datapipe/step/batch_transform.py | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py
index d1aee88e..854f9ac0 100644
--- a/datapipe/step/batch_transform.py
+++ b/datapipe/step/batch_transform.py
@@ -690,12 +690,16 @@ def store_batch_err(
) -> None:
run_config = self._apply_filters_to_run_config(run_config)
- logger.error(f"Process batch failed: {str(e)}")
+ idx_records = idx.to_dict(orient="records")
+
+ logger.error(
+ f"Process batch in transform {self.name} on idx {idx_records} failed: {str(e)}"
+ )
ds.event_logger.log_exception(
e,
run_config=RunConfig.add_labels(
run_config,
- {"idx": idx.to_dict(orient="records"), "process_ts": process_ts},
+ {"idx": idx_records, "process_ts": process_ts},
),
)
From 60609f6a7dbc7725de7867b1765bb2337a4b05e3 Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sun, 11 Aug 2024 11:02:45 +0400
Subject: [PATCH 10/12] add catalog-less syntax to non ORM Tables
---
datapipe/compute.py | 20 ++-
datapipe/step/batch_generate.py | 4 +-
datapipe/step/batch_transform.py | 10 +-
datapipe/step/datatable_transform.py | 6 +-
datapipe/step/update_external_table.py | 9 +-
datapipe/store/filedir.py | 2 +-
datapipe/types.py | 20 ++-
examples/image_resize/app.py | 28 ++--
examples/many_to_zero/app.py | 46 +++---
examples/model_inference/app.py | 73 ++++-----
examples/one_to_many_pipeline/app.py | 202 ++++++++++++-------------
11 files changed, 218 insertions(+), 202 deletions(-)
diff --git a/datapipe/compute.py b/datapipe/compute.py
index eb0013cf..b3196c0b 100644
--- a/datapipe/compute.py
+++ b/datapipe/compute.py
@@ -11,7 +11,7 @@
from datapipe.run_config import RunConfig
from datapipe.store.database import TableStoreDB
from datapipe.store.table_store import TableStore
-from datapipe.types import ChangeList, IndexDF, Labels, OrmTableOrName
+from datapipe.types import ChangeList, IndexDF, Labels, TableOrName
logger = logging.getLogger("datapipe.compute")
tracer = trace.get_tracer("datapipe.compute")
@@ -20,6 +20,7 @@
@dataclass
class Table:
store: TableStore
+ name: Optional[str] = None
class Catalog:
@@ -37,12 +38,27 @@ def init_all_tables(self, ds: DataStore):
for name in self.catalog.keys():
self.get_datatable(ds, name)
- def get_datatable(self, ds: DataStore, table: OrmTableOrName) -> DataTable:
+ def get_datatable(self, ds: DataStore, table: TableOrName) -> DataTable:
if isinstance(table, str):
assert table in self.catalog, f"Table {table} not found in catalog"
return ds.get_or_create_table(
name=table, table_store=self.catalog[table].store
)
+
+ elif isinstance(table, Table):
+ assert table.name is not None, f"Table name must be specified for {table}"
+
+ if table.name not in self.catalog:
+ self.add_datatable(table.name, table)
+ else:
+ existing_table = self.catalog[table.name]
+ assert existing_table.store == table.store, (
+ f"Table {table.name} already exists in catalog "
+ f"with different store {existing_table.store}"
+ )
+
+ return ds.get_or_create_table(name=table.name, table_store=table.store)
+
else:
table_store = TableStoreDB(ds.meta_dbconn, orm_table=table)
if table_store.name not in self.catalog:
diff --git a/datapipe/step/batch_generate.py b/datapipe/step/batch_generate.py
index 28e801f0..4b58224c 100644
--- a/datapipe/step/batch_generate.py
+++ b/datapipe/step/batch_generate.py
@@ -14,7 +14,7 @@
DatatableTransformFunc,
DatatableTransformStep,
)
-from datapipe.types import Labels, OrmTableOrName, TransformResult, cast
+from datapipe.types import Labels, TableOrName, TransformResult, cast
logger = logging.getLogger("datapipe.step.batch_generate")
tracer = trace.get_tracer("datapipe.step.batch_generate")
@@ -88,7 +88,7 @@ def do_batch_generate(
@dataclass
class BatchGenerate(PipelineStep):
func: BatchGenerateFunc
- outputs: List[OrmTableOrName]
+ outputs: List[TableOrName]
kwargs: Optional[Dict] = None
labels: Optional[Labels] = None
delete_stale: bool = True
diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py
index 854f9ac0..3c78f6cd 100644
--- a/datapipe/step/batch_transform.py
+++ b/datapipe/step/batch_transform.py
@@ -58,7 +58,7 @@
IndexDF,
Labels,
MetaSchema,
- OrmTableOrName,
+ TableOrName,
TransformResult,
data_to_index,
)
@@ -870,8 +870,8 @@ def run_idx(
@dataclass
class DatatableBatchTransform(PipelineStep):
func: DatatableBatchTransformFunc
- inputs: List[OrmTableOrName]
- outputs: List[OrmTableOrName]
+ inputs: List[TableOrName]
+ outputs: List[TableOrName]
chunk_size: int = 1000
transform_keys: Optional[List[str]] = None
kwargs: Optional[Dict] = None
@@ -940,8 +940,8 @@ def process_batch_dts(
@dataclass
class BatchTransform(PipelineStep):
func: BatchTransformFunc
- inputs: List[OrmTableOrName]
- outputs: List[OrmTableOrName]
+ inputs: List[TableOrName]
+ outputs: List[TableOrName]
chunk_size: int = 1000
kwargs: Optional[Dict[str, Any]] = None
transform_keys: Optional[List[str]] = None
diff --git a/datapipe/step/datatable_transform.py b/datapipe/step/datatable_transform.py
index feacd013..d40400a8 100644
--- a/datapipe/step/datatable_transform.py
+++ b/datapipe/step/datatable_transform.py
@@ -8,7 +8,7 @@
from datapipe.datatable import DataStore, DataTable
from datapipe.executor import Executor
from datapipe.run_config import RunConfig
-from datapipe.types import Labels, OrmTableOrName
+from datapipe.types import Labels, TableOrName
logger = logging.getLogger("datapipe.step.datatable_transform")
tracer = trace.get_tracer("datapipe.step.datatable_transform")
@@ -91,8 +91,8 @@ def run_full(
@dataclass
class DatatableTransform(PipelineStep):
func: DatatableTransformFunc
- inputs: List[OrmTableOrName]
- outputs: List[OrmTableOrName]
+ inputs: List[TableOrName]
+ outputs: List[TableOrName]
check_for_changes: bool = True
kwargs: Optional[Dict[str, Any]] = None
labels: Optional[Labels] = None
diff --git a/datapipe/step/update_external_table.py b/datapipe/step/update_external_table.py
index 169237bb..fdaa6489 100644
--- a/datapipe/step/update_external_table.py
+++ b/datapipe/step/update_external_table.py
@@ -12,7 +12,7 @@
DatatableTransformFunc,
DatatableTransformStep,
)
-from datapipe.types import Labels, MetadataDF, cast
+from datapipe.types import Labels, MetadataDF, TableOrName, cast
logger = logging.getLogger("datapipe.step.update_external_table")
@@ -35,7 +35,10 @@ def update_external_table(
# TODO switch to iterative store_chunk and table.sync_meta_by_process_ts
table.meta_table.update_rows(
- cast(MetadataDF, pd.concat([new_meta_df, changed_meta_df]))
+ cast(
+ MetadataDF,
+ pd.concat(df for df in [new_meta_df, changed_meta_df] if not df.empty),
+ ),
)
for stale_idx in table.meta_table.get_stale_idx(now, run_config=run_config):
@@ -55,7 +58,7 @@ def update_external_table(
class UpdateExternalTable(PipelineStep):
def __init__(
self,
- output: str,
+ output: TableOrName,
labels: Optional[Labels] = None,
) -> None:
self.output_table_name = output
diff --git a/datapipe/store/filedir.py b/datapipe/store/filedir.py
index 6d36ac20..2370478e 100644
--- a/datapipe/store/filedir.py
+++ b/datapipe/store/filedir.py
@@ -153,7 +153,7 @@ def _pattern_to_match(pat: str) -> str:
class Replacer:
def __init__(self, values: List[str]):
self.counter = -1
- self.values = values
+ self.values = list(values)
def __call__(self, matchobj):
self.counter += 1
diff --git a/datapipe/types.py b/datapipe/types.py
index b6fd6aeb..c0585c68 100644
--- a/datapipe/types.py
+++ b/datapipe/types.py
@@ -2,11 +2,27 @@
import itertools
from dataclasses import dataclass, field
-from typing import Callable, Dict, List, NewType, Set, Tuple, Type, TypeVar, Union, cast
+from typing import (
+ TYPE_CHECKING,
+ Callable,
+ Dict,
+ List,
+ NewType,
+ Optional,
+ Set,
+ Tuple,
+ Type,
+ TypeVar,
+ Union,
+ cast,
+)
import pandas as pd
from sqlalchemy import Column
+if TYPE_CHECKING:
+ from datapipe.compute import Table
+
DataSchema = List[Column]
MetaSchema = List[Column]
@@ -35,7 +51,7 @@
OrmTable = Type[DeclarativeMeta] # type: ignore
-OrmTableOrName = Union[str, OrmTable]
+TableOrName = Union[str, OrmTable, "Table"]
@dataclass
diff --git a/examples/image_resize/app.py b/examples/image_resize/app.py
index df676001..cf7da5a5 100644
--- a/examples/image_resize/app.py
+++ b/examples/image_resize/app.py
@@ -7,18 +7,14 @@
from datapipe.store.database import DBConn
from datapipe.store.filedir import PILFile, TableStoreFiledir
-catalog = Catalog(
- {
- "input_images": Table(
- store=TableStoreFiledir("input/{id}.jpeg", PILFile("jpg")),
- ),
- # 'input_img_metadata': ExternalTable(
- # store=Filedir(CATALOG_DIR / 'input/{id}.csv', CSVFile()),
- # ),
- "preprocessed_images": Table(
- store=TableStoreFiledir("output/{id}.png", PILFile("png")),
- ),
- }
+input_images_tbl = Table(
+ name="input_images",
+ store=TableStoreFiledir("input/{id}.jpeg", PILFile("jpg")),
+)
+
+preprocessed_images_tbl = Table(
+ name="preprocessed_images",
+ store=TableStoreFiledir("output/{id}.png", PILFile("png")),
)
@@ -29,11 +25,11 @@ def batch_preprocess_images(df: pd.DataFrame) -> pd.DataFrame:
pipeline = Pipeline(
[
- UpdateExternalTable(output="input_images"),
+ UpdateExternalTable(output=input_images_tbl),
BatchTransform(
batch_preprocess_images,
- inputs=["input_images"],
- outputs=["preprocessed_images"],
+ inputs=[input_images_tbl],
+ outputs=[preprocessed_images_tbl],
chunk_size=100,
),
]
@@ -42,4 +38,4 @@ def batch_preprocess_images(df: pd.DataFrame) -> pd.DataFrame:
ds = DataStore(DBConn("sqlite+pysqlite3:///db.sqlite"))
-app = DatapipeApp(ds, catalog, pipeline)
+app = DatapipeApp(ds, Catalog({}), pipeline)
diff --git a/examples/many_to_zero/app.py b/examples/many_to_zero/app.py
index 91ec36d3..c5a331ef 100644
--- a/examples/many_to_zero/app.py
+++ b/examples/many_to_zero/app.py
@@ -12,25 +12,6 @@
from datapipe.store.database import DBConn
from datapipe.store.pandas import TableStoreJsonLine
-catalog = Catalog(
- {
- "input": Table(
- store=TableStoreJsonLine(
- filename="input.json",
- primary_schema=[
- Column("id", Integer, primary_key=True),
- ],
- )
- ),
- "result": Table(
- store=TableStoreJsonLine(
- filename="result.json",
- primary_schema=[Column("result_id", Integer, primary_key=True)],
- )
- ),
- }
-)
-
def generate_data():
yield pd.DataFrame(
@@ -61,16 +42,35 @@ def count(
)
+input_tbl = Table(
+ name="input",
+ store=TableStoreJsonLine(
+ filename="input.json",
+ primary_schema=[
+ Column("id", Integer, primary_key=True),
+ ],
+ ),
+)
+
+result_tbl = Table(
+ name="result",
+ store=TableStoreJsonLine(
+ filename="result.json",
+ primary_schema=[Column("result_id", Integer, primary_key=True)],
+ ),
+)
+
+
pipeline = Pipeline(
[
BatchGenerate(
generate_data,
- outputs=["input"],
+ outputs=[input_tbl],
),
DatatableTransform(
count, # type: ignore
- inputs=["input"],
- outputs=["result"],
+ inputs=[input_tbl],
+ outputs=[result_tbl],
check_for_changes=False,
),
]
@@ -79,4 +79,4 @@ def count(
ds = DataStore(DBConn("sqlite+pysqlite3:///db.sqlite"))
-app = DatapipeApp(ds, catalog, pipeline)
+app = DatapipeApp(ds, Catalog({}), pipeline)
diff --git a/examples/model_inference/app.py b/examples/model_inference/app.py
index 6ac87f70..26b03c3b 100644
--- a/examples/model_inference/app.py
+++ b/examples/model_inference/app.py
@@ -30,55 +30,56 @@ def apply_model(input_df: pd.DataFrame, model_df: pd.DataFrame) -> pd.DataFrame:
]
-catalog = Catalog(
- {
- "input": Table(
- store=TableStoreJsonLine(
- filename="input.jsonline",
- primary_schema=[
- sa.Column("pipeline_id", sa.String, primary_key=True),
- sa.Column("input_id", sa.Integer, primary_key=True),
- ],
- )
- ),
- "models": Table(
- store=TableStoreJsonLine(
- filename="models.jsonline",
- primary_schema=[
- sa.Column("pipeline_id", sa.String, primary_key=True),
- sa.Column("model_id", sa.String, primary_key=True),
- ],
- )
- ),
- "output": Table(
- store=TableStoreJsonLine(
- filename="output.jsonline",
- primary_schema=[
- sa.Column("pipeline_id", sa.String, primary_key=True),
- sa.Column("input_id", sa.Integer, primary_key=True),
- sa.Column("model_id", sa.String, primary_key=True),
- ],
- )
- ),
- }
+input_tbl = Table(
+ name="input",
+ store=TableStoreJsonLine(
+ filename="input.jsonline",
+ primary_schema=[
+ sa.Column("pipeline_id", sa.String, primary_key=True),
+ sa.Column("input_id", sa.Integer, primary_key=True),
+ ],
+ ),
+)
+
+models_tbl = Table(
+ name="models",
+ store=TableStoreJsonLine(
+ filename="models.jsonline",
+ primary_schema=[
+ sa.Column("pipeline_id", sa.String, primary_key=True),
+ sa.Column("model_id", sa.String, primary_key=True),
+ ],
+ ),
+)
+
+output_tbl = Table(
+ name="output",
+ store=TableStoreJsonLine(
+ filename="output.jsonline",
+ primary_schema=[
+ sa.Column("pipeline_id", sa.String, primary_key=True),
+ sa.Column("input_id", sa.Integer, primary_key=True),
+ sa.Column("model_id", sa.String, primary_key=True),
+ ],
+ ),
)
pipeline = Pipeline(
[
UpdateExternalTable(
- output="input",
+ output=input_tbl,
),
UpdateExternalTable(
- output="models",
+ output=models_tbl,
),
BatchTransform(
apply_model,
- inputs=["input", "models"],
- outputs=["output"],
+ inputs=[input_tbl, models_tbl],
+ outputs=[output_tbl],
transform_keys=["pipeline_id", "input_id", "model_id"],
),
]
)
-app = DatapipeApp(ds, catalog, pipeline)
+app = DatapipeApp(ds, Catalog({}), pipeline)
diff --git a/examples/one_to_many_pipeline/app.py b/examples/one_to_many_pipeline/app.py
index 9e580b0f..46d899ae 100644
--- a/examples/one_to_many_pipeline/app.py
+++ b/examples/one_to_many_pipeline/app.py
@@ -1,16 +1,75 @@
import pandas as pd
-from sqlalchemy.sql.schema import Column
-from sqlalchemy.sql.sqltypes import JSON, Boolean, Integer, String
+from sqlalchemy import JSON
+from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
-from datapipe.compute import Catalog, DatapipeApp, Pipeline, Table
+from datapipe.compute import Catalog, DatapipeApp, Pipeline
from datapipe.datatable import DataStore
from datapipe.step.batch_generate import BatchGenerate
from datapipe.step.batch_transform import BatchTransform
-from datapipe.store.database import DBConn, TableStoreDB
+from datapipe.store.database import DBConn
-dbconn = DBConn("sqlite+pysqlite3:///db.sqlite")
-# dbconn = DBConn('postgresql://postgres:password@localhost/postgres', schema='test')
-ds = DataStore(dbconn)
+
+class Base(DeclarativeBase):
+ pass
+
+
+class TestProducts(Base):
+ __tablename__ = "test_products"
+
+ pipeline_id: Mapped[int] = mapped_column(primary_key=True)
+ offer_id: Mapped[int] = mapped_column(primary_key=True)
+ attributes: Mapped[dict] = mapped_column(type_=JSON)
+
+
+class TestAttrProducts(Base):
+ __tablename__ = "test_attr_products"
+
+ pipeline_id: Mapped[int] = mapped_column(primary_key=True)
+ offer_id: Mapped[int] = mapped_column(primary_key=True)
+ name: Mapped[str] = mapped_column(primary_key=True)
+ value: Mapped[int]
+
+
+class TestOzonProducts(Base):
+ __tablename__ = "test_ozon_products"
+
+ pipeline_id: Mapped[int] = mapped_column(primary_key=True)
+ offer_id: Mapped[int] = mapped_column(primary_key=True)
+ attributes: Mapped[dict] = mapped_column(type_=JSON)
+
+
+class TestOffersProducts(Base):
+ __tablename__ = "test_offers_products"
+
+ pipeline_id: Mapped[int] = mapped_column(primary_key=True)
+ offer_id: Mapped[int] = mapped_column(primary_key=True)
+ offers: Mapped[dict] = mapped_column(type_=JSON)
+
+
+class TestAllProducts(Base):
+ __tablename__ = "test_all_products"
+
+ pipeline_id: Mapped[int] = mapped_column(primary_key=True)
+ offer_id: Mapped[int] = mapped_column(primary_key=True)
+ attributes_base: Mapped[dict] = mapped_column(type_=JSON)
+ attributes_new: Mapped[dict] = mapped_column(type_=JSON)
+
+
+class TestStoreProducts(Base):
+ __tablename__ = "test_store_products"
+
+ pipeline_id: Mapped[int] = mapped_column(primary_key=True)
+ offer_id: Mapped[int] = mapped_column(primary_key=True)
+ attributes: Mapped[dict] = mapped_column(type_=JSON)
+ is_deleted: Mapped[bool]
+
+
+class TestFilterProducts(Base):
+ __tablename__ = "test_filter_products"
+
+ pipeline_id: Mapped[int] = mapped_column(primary_key=True)
+ offer_id: Mapped[int] = mapped_column(primary_key=True)
+ attributes: Mapped[dict] = mapped_column(type_=JSON)
def generate_products():
@@ -109,9 +168,9 @@ def gen_product_store(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
merged_df = pd.merge(df1, df2, how="outer", left_on=keys, right_on=keys)
merged_df["attributes"] = merged_df.apply(
- lambda x: x["attributes_x"]
- if pd.notna(x["attributes_x"])
- else x["attributes_y"],
+ lambda x: (
+ x["attributes_x"] if pd.notna(x["attributes_x"]) else x["attributes_y"]
+ ),
axis=1,
)
merged_df["is_deleted"] = merged_df.apply(
@@ -125,132 +184,57 @@ def filter(df: pd.DataFrame) -> pd.DataFrame:
return df[df["pipeline_id"] == 1]
-catalog = Catalog(
- {
- "test_products": Table(
- store=TableStoreDB(
- dbconn,
- "test_products_data",
- [
- Column("pipeline_id", Integer(), primary_key=True),
- Column("offer_id", Integer(), primary_key=True),
- Column("attributes", JSON),
- ],
- )
- ),
- "test_attr_products": Table(
- store=TableStoreDB(
- dbconn,
- "test_attr_products_data",
- [
- Column("pipeline_id", Integer(), primary_key=True),
- Column("offer_id", Integer(), primary_key=True),
- Column("name", String(), primary_key=True),
- Column("value", Integer()),
- ],
- )
- ),
- "test_ozon_products": Table(
- store=TableStoreDB(
- dbconn,
- "test_ozon_products_data",
- [
- Column("pipeline_id", Integer(), primary_key=True),
- Column("offer_id", Integer(), primary_key=True),
- Column("attributes", JSON),
- ],
- )
- ),
- "test_offers_products": Table(
- store=TableStoreDB(
- dbconn,
- "test_offers_products_data",
- [
- Column("pipeline_id", Integer(), primary_key=True),
- Column("name", String(), primary_key=True),
- Column("offers", JSON),
- ],
- )
- ),
- "test_all_products": Table(
- store=TableStoreDB(
- dbconn,
- "test_all_products_data",
- [
- Column("pipeline_id", Integer(), primary_key=True),
- Column("offer_id", Integer(), primary_key=True),
- Column("attributes_base", JSON),
- Column("attributes_new", JSON),
- ],
- )
- ),
- "test_store_products": Table(
- store=TableStoreDB(
- dbconn,
- "test_store_products_data",
- [
- Column("pipeline_id", Integer(), primary_key=True),
- Column("offer_id", Integer(), primary_key=True),
- Column("attributes", JSON),
- Column("is_deleted", Boolean()),
- ],
- )
- ),
- "test_filter_products": Table(
- store=TableStoreDB(
- dbconn,
- "test_filter_products_data",
- [
- Column("pipeline_id", Integer(), primary_key=True),
- Column("offer_id", Integer(), primary_key=True),
- Column("attributes", JSON),
- ],
- )
- ),
- }
-)
-
pipeline = Pipeline(
[
- BatchGenerate(generate_products, outputs=["test_products"]),
+ BatchGenerate(generate_products, outputs=[TestProducts]),
BatchTransform(
unpack_attr,
- inputs=["test_products"],
- outputs=["test_attr_products"],
+ inputs=[TestProducts],
+ outputs=[TestAttrProducts],
chunk_size=2,
),
BatchTransform(
pack_attr,
- inputs=["test_attr_products"],
- outputs=["test_ozon_products"],
+ inputs=[TestAttrProducts],
+ outputs=[TestOzonProducts],
chunk_size=2,
),
BatchTransform(
pack_offers,
- inputs=["test_attr_products"],
- outputs=["test_offers_products"],
+ inputs=[TestAttrProducts],
+ outputs=[TestOffersProducts],
chunk_size=2,
),
BatchTransform(
gen_product_all,
- inputs=["test_products", "test_ozon_products"],
- outputs=["test_all_products"],
+ inputs=[TestProducts, TestOzonProducts],
+ outputs=[TestAllProducts],
chunk_size=2,
),
BatchTransform(
gen_product_store,
- inputs=["test_products", "test_store_products"],
- outputs=["test_store_products"],
+ inputs=[TestProducts, TestStoreProducts],
+ outputs=[TestStoreProducts],
chunk_size=2,
),
BatchTransform(
filter,
- inputs=["test_products"],
- outputs=["test_filter_products"],
+ inputs=[TestProducts],
+ outputs=[TestFilterProducts],
chunk_size=2,
),
]
)
-app = DatapipeApp(ds, catalog, pipeline)
+dbconn = DBConn("sqlite+pysqlite3:///db.sqlite", sqla_metadata=Base.metadata)
+# dbconn = DBConn('postgresql://postgres:password@localhost/postgres', schema='test')
+ds = DataStore(dbconn)
+app = DatapipeApp(ds, Catalog({}), pipeline)
+
+
+if __name__ == "__main__":
+ from datapipe.compute import run_steps
+
+ ds.meta_dbconn.sqla_metadata.create_all(ds.meta_dbconn.con)
+ run_steps(app.ds, app.steps)
From 74d31b0e5136f3c11335e25076dab34dbff80361 Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sun, 11 Aug 2024 11:04:26 +0400
Subject: [PATCH 11/12] insignificant tweaks to docs
---
docs/source/conf.py | 3 ++-
docs/source/index.rst | 10 +++++++++-
2 files changed, 11 insertions(+), 2 deletions(-)
diff --git a/docs/source/conf.py b/docs/source/conf.py
index effd559e..e4b24157 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -1,10 +1,11 @@
+import datetime
import os
import sys
sys.path.insert(0, os.path.abspath("../.."))
project = "Datapipe"
-copyright = "2023, Epoch8 Team and Contributors"
+copyright = f"{datetime.date.today().year}, Epoch8 Team and Contributors"
author = "Epoch8 Team and Contributors"
extensions = [
diff --git a/docs/source/index.rst b/docs/source/index.rst
index 51d9671f..482f6741 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -1,15 +1,23 @@
..
.. toctree::
- :caption: Developer Guide
+ :caption: Introduction
:maxdepth: 2
:hidden:
introduction
+ first-steps
concepts
+
+.. toctree::
+ :caption: Advanced Topics
+ :maxdepth: 2
+ :hidden:
+
transformation-lifecycle
sqlite
extending-cli
+ how-merging-works
.. toctree::
:caption: Reference
From 0a44cdeff0cb62477ea790a3ecda375abb5775e7 Mon Sep 17 00:00:00 2001
From: Andrey Tatarinov
Date: Sun, 11 Aug 2024 12:43:19 +0400
Subject: [PATCH 12/12] improve migration, cleanup docs not ready to be
published
---
docs/source/index.rst | 9 ---
docs/source/migration-v013-to-v014.md | 88 +++++++++++++++++++++++++++
2 files changed, 88 insertions(+), 9 deletions(-)
diff --git a/docs/source/index.rst b/docs/source/index.rst
index 25438810..3537305a 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -6,8 +6,6 @@
:hidden:
introduction
- first-steps
- concepts
.. toctree::
:caption: Advanced Topics
@@ -17,7 +15,6 @@
transformation-lifecycle
sqlite
extending-cli
- how-merging-works
.. toctree::
:caption: Migration
@@ -26,12 +23,6 @@
migration-v013-to-v014
-.. toctree::
- :caption: Reference
- :maxdepth: 2
- :hidden:
-
- api
Index and Search
----------------
diff --git a/docs/source/migration-v013-to-v014.md b/docs/source/migration-v013-to-v014.md
index 02faa598..693e6ba5 100644
--- a/docs/source/migration-v013-to-v014.md
+++ b/docs/source/migration-v013-to-v014.md
@@ -61,3 +61,91 @@ BatchTransform(
transform_keys=[],
)
```
+
+## SQLAlchemy tables can be used directly without duplication in Catalog
+
+Starting `v0.14` SQLA table can be provided directly into `inputs=` or
+`outputs=` parameters without duplicating entry in `Catalog`.
+
+Note, that in order for `datapipe db create-all` to work, we should use the same
+SQLA for declarative base and in datapipe.
+
+Example:
+
+```python
+class Base(DeclarativeBase):
+ pass
+
+
+class Input(Base):
+ __tablename__ = "input"
+
+ group_id: Mapped[int] = mapped_column(primary_key=True)
+ item_id: Mapped[int] = mapped_column(primary_key=True)
+
+
+class Output(Base):
+ __tablename__ = "output"
+
+ group_id: Mapped[int] = mapped_column(primary_key=True)
+ count: Mapped[int]
+
+# ...
+
+pipeline = Pipeline(
+ [
+ BatchGenerate(
+ generate_data,
+ outputs=[Input],
+ ),
+ DatatableBatchTransform(
+ count_tbl,
+ inputs=[Input],
+ outputs=[Output],
+ ),
+ ]
+)
+
+# Note! `sqla_metadata` is used from SQLAlchemy DeclarativeBase
+dbconn = DBConn("sqlite+pysqlite3:///db.sqlite", sqla_metadata=Base.metadata)
+ds = DataStore(dbconn)
+
+app = DatapipeApp(ds=ds, catalog=Catalog({}), pipeline=pipeline)
+```
+
+## Table can be provided directly without Catalog
+
+Similar to usage pattern of SQLA tables, it is also possible to pass
+`datapipe.compute.Table` instance directly without registering in catalog.
+
+```python
+
+from datapipe.compute import Table
+from datapipe.store.filedir import PILFile, TableStoreFiledir
+from datapipe.step.batch_transform import BatchTransform
+from datapipe.step.update_external_table import UpdateExternalTable
+
+input_images_tbl = Table(
+ name="input_images",
+ store=TableStoreFiledir("input/{id}.jpeg", PILFile("jpg")),
+)
+
+preprocessed_images_tbl = Table(
+ name="preprocessed_images",
+ store=TableStoreFiledir("output/{id}.png", PILFile("png")),
+)
+
+# ...
+
+pipeline = Pipeline(
+ [
+ UpdateExternalTable(output=input_images_tbl),
+ BatchTransform(
+ batch_preprocess_images,
+ inputs=[input_images_tbl],
+ outputs=[preprocessed_images_tbl],
+ chunk_size=100,
+ ),
+ ]
+)
+```
\ No newline at end of file