Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
elephantum committed Aug 11, 2024
2 parents 74d31b0 + 4ce1688 commit b4a69ca
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 64 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.13.14

* Fix [#334](https://github.com/epoch8/datapipe/issues/334)

# 0.13.13

* Add `ComputeStep.get_status` method
Expand Down
11 changes: 6 additions & 5 deletions datapipe/datatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,12 @@ def get_agg_cte(
keys = [k for k in transform_keys if k in self.primary_keys]
key_cols: List["ColumnClause"] = [column(k) for k in keys]

sql: Any = (
select(*key_cols + [func.max(tbl.c["update_ts"]).label("update_ts")])
.select_from(tbl)
.group_by(*key_cols)
)
sql: Any = select(
*key_cols + [func.max(tbl.c["update_ts"]).label("update_ts")]
).select_from(tbl)

if len(key_cols) > 0:
sql = sql.group_by(*key_cols)

sql = sql_apply_filters_idx_to_subquery(sql, keys, filters_idx)
sql = sql_apply_runconfig_filter(sql, tbl, self.primary_keys, run_config)
Expand Down
3 changes: 2 additions & 1 deletion datapipe/step/batch_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def do_batch_generate(
ds: DataStore,
output_dts: List[DataTable],
run_config: Optional[RunConfig] = None,
kwargs: Optional[Dict] = None,
delete_stale: bool = True,
kwargs: Optional[Dict] = None,
) -> None:
"""
Создание новой таблицы из результатов запуска `proc_func`.
Expand Down Expand Up @@ -104,6 +104,7 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]:
ds=ds,
output_dts=output_dts,
run_config=run_config,
delete_stale=self.delete_stale,
kwargs=kwargs,
),
),
Expand Down
100 changes: 68 additions & 32 deletions datapipe/step/batch_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,31 +164,55 @@ def mark_rows_processed_success(
if len(idx) == 0:
return

insert_sql = self.dbconn.insert(self.sql_table).values(
[
{
if idx.empty:
# DataFrame считает, что он пустой, если в нем нет колонок
# При этом мы хотим создать строки в БД

# Мы можем обработать только случай с одной строкой
assert len(idx) == 1

with self.dbconn.con.begin() as con:
insert_sql = self.dbconn.insert(self.sql_table).values(
[
{
"process_ts": process_ts,
"is_success": True,
"priority": 0,
"error": None,
}
]
)

# удалить все из таблицы
con.execute(self.sql_table.delete())
con.execute(insert_sql)

else:
insert_sql = self.dbconn.insert(self.sql_table).values(
[
{
"process_ts": process_ts,
"is_success": True,
"priority": 0,
"error": None,
**idx_dict, # type: ignore
}
for idx_dict in idx.to_dict(orient="records")
]
)

sql = insert_sql.on_conflict_do_update(
index_elements=self.primary_keys,
set_={
"process_ts": process_ts,
"is_success": True,
"priority": 0,
"error": None,
**idx_dict, # type: ignore
}
for idx_dict in idx.to_dict(orient="records")
]
)

sql = insert_sql.on_conflict_do_update(
index_elements=self.primary_keys,
set_={
"process_ts": process_ts,
"is_success": True,
"error": None,
},
)
},
)

# execute
with self.dbconn.con.begin() as con:
con.execute(sql)
# execute
with self.dbconn.con.begin() as con:
con.execute(sql)

def mark_rows_processed_error(
self,
Expand Down Expand Up @@ -295,6 +319,11 @@ def __init__(
)

self.chunk_size = chunk_size

# Force transform_keys to be a list, otherwise Pandas will not be happy
if transform_keys is not None and not isinstance(transform_keys, list):
transform_keys = list(transform_keys)

self.transform_keys, self.transform_schema = self.compute_transform_schema(
[i.meta_table for i in input_dts],
[i.meta_table for i in output_dts],
Expand Down Expand Up @@ -358,9 +387,6 @@ def _build_changed_idx_sql(
order: Literal["asc", "desc"] = "asc",
run_config: Optional[RunConfig] = None, # TODO remove
) -> Tuple[Iterable[str], Any]:
if len(self.transform_keys) == 0:
raise NotImplementedError()

all_input_keys_counts: Dict[str, int] = {}
for col in itertools.chain(*[dt.primary_schema for dt in self.input_dts]):
all_input_keys_counts[col.name] = all_input_keys_counts.get(col.name, 0) + 1
Expand Down Expand Up @@ -432,11 +458,6 @@ def _make_agg_of_agg(ctes, agg_col):
for tbl in self.input_dts
]

# Filter out ctes that do not have intersection with transform keys.
# These ctes do not affect the result, but may dramatically increase
# size of the temporary table during query execution.
inp_ctes = [(keys, cte) for (keys, cte) in inp_ctes if len(keys) > 0]

inp = _make_agg_of_agg(inp_ctes, "update_ts")

tr_tbl = self.meta_table.sql_table
Expand All @@ -453,8 +474,22 @@ def _make_agg_of_agg(ctes, agg_col):

out = out.cte(name="transform")

if len(self.transform_keys) == 0:
join_onclause_sql: Any = literal(True)
elif len(self.transform_keys) == 1:
join_onclause_sql = (
inp.c[self.transform_keys[0]] == out.c[self.transform_keys[0]]
)
else: # len(self.transform_keys) > 1:
join_onclause_sql = and_(
*[inp.c[key] == out.c[key] for key in self.transform_keys]
)

sql = (
select(
# Нам нужно выбирать хотя бы что-то, чтобы не было ошибки при
# пустом self.transform_keys
literal(1).label("_datapipe_dummy"),
*[
func.coalesce(inp.c[key], out.c[key]).label(key)
for key in self.transform_keys
Expand All @@ -463,9 +498,7 @@ def _make_agg_of_agg(ctes, agg_col):
.select_from(inp)
.outerjoin(
out,
onclause=and_(
*[inp.c[key] == out.c[key] for key in self.transform_keys]
),
onclause=join_onclause_sql,
full=True,
)
.where(
Expand Down Expand Up @@ -586,6 +619,8 @@ def get_full_process_ids(
def alter_res_df():
with ds.meta_dbconn.con.begin() as con:
for df in pd.read_sql_query(u1, con=con, chunksize=chunk_size):
df = df[self.transform_keys]

for k, v in extra_filters.items():
df[k] = v

Expand Down Expand Up @@ -617,6 +652,7 @@ def get_change_list_process_ids(
sql,
con=con,
)
table_changes_df = table_changes_df[self.transform_keys]

changes.append(table_changes_df)
else:
Expand Down
7 changes: 7 additions & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
extending-cli
how-merging-works

.. toctree::
:caption: Migration
:maxdepth: 2
:hidden:

migration-v013-to-v014

.. toctree::
:caption: Reference
:maxdepth: 2
Expand Down
63 changes: 63 additions & 0 deletions docs/source/migration-v013-to-v014.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Migration from v0.13 to v0.14

## DatatableTansform can become BatchTransform

Previously, if you had to do whole table transformation, you had to use
`DatatableTransform`. Now you can substitute it with `BatchTransform` which has
zero outputs.

Before:

```python
# Updates global count of input lines

def count(
ds: DataStore,
input_dts: List[DataTable],
output_dts: List[DataTable],
kwargs: Dict,
run_config: Optional[RunConfig] = None,
) -> None:
assert len(input_dts) == 1
assert len(output_dts) == 1

input_dt = input_dts[0]
output_dt = output_dts[0]

output_dt.store_chunk(
pd.DataFrame(
{"result_id": [0], "count": [len(input_dt.meta_table.get_existing_idx())]}
)
)

# ...

DatatableTransform(
count,
inputs=["input"],
outputs=["result"],
)
```

After:

```python
# Updates global count of input lines

def count(
input_df: pd.DataFrame,
) -> pd.DataFrame:
return pd.DataFrame({"result_id": [0], "count": [len(input_df)]})

# ...

BatchTransform(
count,
inputs=["input"],
outputs=["result"],

# Important, we have to specify empty set in order for transformation to operate on
# the whole input at once
transform_keys=[],
)
```
39 changes: 14 additions & 25 deletions examples/many_to_zero/app.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
from typing import Dict, List, Optional

import pandas as pd
from sqlalchemy import Integer
from sqlalchemy.sql.schema import Column

from datapipe.compute import Catalog, DatapipeApp, Pipeline, Table
from datapipe.datatable import DataStore, DataTable
from datapipe.run_config import RunConfig
from datapipe.datatable import DataStore
from datapipe.step.batch_generate import BatchGenerate
from datapipe.step.datatable_transform import DatatableTransform
from datapipe.step.batch_transform import BatchTransform
from datapipe.store.database import DBConn
from datapipe.store.pandas import TableStoreJsonLine

Expand All @@ -23,23 +20,9 @@ def generate_data():


def count(
ds: DataStore,
input_dts: List[DataTable],
output_dts: List[DataTable],
kwargs: Dict,
run_config: Optional[RunConfig] = None,
) -> None:
assert len(input_dts) == 1
assert len(output_dts) == 1

input_dt = input_dts[0]
output_dt = output_dts[0]

output_dt.store_chunk(
pd.DataFrame(
{"result_id": [0], "count": [len(input_dt.meta_table.get_existing_idx())]}
)
)
input_df: pd.DataFrame,
) -> pd.DataFrame:
return pd.DataFrame({"result_id": [0], "count": [len(input_df)]})


input_tbl = Table(
Expand Down Expand Up @@ -67,11 +50,11 @@ def count(
generate_data,
outputs=[input_tbl],
),
DatatableTransform(
count, # type: ignore
BatchTransform(
count,
inputs=[input_tbl],
outputs=[result_tbl],
check_for_changes=False,
transform_keys=[],
),
]
)
Expand All @@ -80,3 +63,9 @@ def count(
ds = DataStore(DBConn("sqlite+pysqlite3:///db.sqlite"))

app = DatapipeApp(ds, Catalog({}), pipeline)


if __name__ == "__main__":
from datapipe.compute import run_steps

run_steps(ds, app.steps, None, None)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "datapipe-core"
version = "0.13.13"
version = "0.14.0-alpha.1"
description = "`datapipe` is a realtime incremental ETL library for Python application"
readme = "README.md"
repository = "https://github.com/epoch8/datapipe"
Expand Down
Loading

0 comments on commit b4a69ca

Please sign in to comment.