Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into many-to-zero-batch
Browse files Browse the repository at this point in the history
  • Loading branch information
elephantum committed Aug 11, 2024
2 parents 99fed22 + 6602a40 commit 85d500b
Show file tree
Hide file tree
Showing 16 changed files with 334 additions and 163 deletions.
35 changes: 18 additions & 17 deletions .github/workflows/pytest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ jobs:
matrix:
python-version:
# - "3.8"
# - "3.9"
- "3.9"
- "3.10"

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
Expand Down Expand Up @@ -50,16 +50,17 @@ jobs:
strategy:
matrix:
include:
- python-version: "3.8"
test-db-env: "postgres"
pip-extra: "sqlalchemy <2"
- python-version: "3.8"
test-db-env: "postgres"
pip-extra: "sqlalchemy >2"
# - python-version: "3.8"
# test-db-env: "sqlite"
# - python-version: "3.9"
# test-db-env: "postgres"
# pip-extra: "sqlalchemy <2"
# - python-version: "3.8"
# test-db-env: "postgres"
# pip-extra: "'sqlalchemy>2'"
# - python-version: "3.8"
# test-db-env: "sqlite"
- python-version: "3.9"
test-db-env: "postgres"
pip-extra: '"sqlalchemy>2"'
# - python-version: "3.9"
# test-db-env: "sqlite"
# - python-version: "3.10"
Expand All @@ -68,13 +69,13 @@ jobs:
# test-db-env: "sqlite"
- python-version: "3.11"
test-db-env: "postgres"
pip-extra: "sqlalchemy <2"
pip-extra: '"sqlalchemy<2" "pandas<2.2"'
- python-version: "3.11"
test-db-env: "postgres"
pip-extra: "sqlalchemy >2"
pip-extra: '"sqlalchemy>2"'
- python-version: "3.11"
test-db-env: "sqlite"
pip-extra: "sqlalchemy >2"
pip-extra: '"sqlalchemy>2"'

services:
# Label used to access the service container
Expand Down Expand Up @@ -107,17 +108,17 @@ jobs:
- 6333:6333

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip

- name: Install dependencies
run: |
pip install "${{ matrix.pip-extra }}" ".[sqlite,excel,milvus,gcsfs,s3fs,redis,qdrant,gcp]" "pytest<8" "pytest_cases"
pip install ${{ matrix.pip-extra }} ".[sqlite,excel,milvus,gcsfs,s3fs,redis,qdrant,gcp]" "pytest<8" "pytest_cases"
- name: Test with pytest
run: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ jobs:
- RayExecutor

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
Expand Down
42 changes: 42 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,27 @@
# 0.13.14

* Fix [#334](https://github.com/epoch8/datapipe/issues/334)

# 0.13.13

* Add `ComputeStep.get_status` method
* Remove restriction for Pandas < 2.2

# 0.13.12

* Add processing of an empty response in `QdrantStore`
* Add optional `index_schema` to `QdrantStore`
* Add redis cluster mode support in `RedisStore`

# 0.13.11

* Remove logging to database (`datapipe_events` table) from `EventLogger`

# 0.13.10

* Fix compatibility with SQLalchemy < 2 (ColumnClause in typing)
* Fix compatibility with Ray and SQLalchemy > 2 (serialization of Table)
* (post.1) Fix dependencies for MacOS; deprecate Python 3.8

# 0.13.9

Expand Down Expand Up @@ -541,6 +561,28 @@

Стало: `DataTable(name, meta_store.create_meta_table(name), data_store)`

# 0.5.0

* Добавился прототип debug ui. Запускается с помощью команды `python script.py ui`

# 0.4.0

**Значимый рефакторинг**

* Корневой модуль переименован `c12n_pipe``datapipe`
* Реализован новый модуль `datapipe.dsl` для декларативного описания пайплайнов.
Пример использования: `examples/goal_v0.4.0.py`
* Описание пайплайна отделено от способа выполнения → есть возможность написать
конвертер в другой рантайм, например Prefect
* Переписана логика работы `MetaStore`, теперь всеми мета-таблицами `MetaStore`
владеет в явном виде, а потребители информации обращаются к ним по имени
* Реализовано хранение данных таблиц в отделяемых хранилищах (хранение в БД - один
из способов)
* `database.TableStoreDB` - хранение данных в таблице БД
* `filedir.TableStoreFiledir` - хранение данных в виде папки с файлами (плюс
описаны адаптеры для разных типов файлов)
* `pandas.TableStoreJsonLine` - хранение данных в виде одного jsonline файла

# 0.2.0 (2021-02-01)
- Add major code with Label Studio implementation
- Add Data cataloges and Nodes
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# Datapipe

`datapipe` is a real-time, incremental ETL library for Python with record-level dependency tracking.
[Datapipe](https://datapipe.dev/) is a real-time, incremental ETL library for Python with record-level dependency tracking.

The library is designed for describing data processing pipelines and is capable
of tracking dependencies for each record in the pipeline. This ensures that
tasks within the pipeline receive only the data that has been modified, thereby
improving the overall efficiency of data handling.

https://datapipe.dev/

# Development

At the moment these branches are active:
Expand Down
23 changes: 10 additions & 13 deletions datapipe/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,19 +348,16 @@ def list(ctx: click.Context, status: bool) -> None: # noqa
extra_args = {}

if status:
if len(step.input_dts) > 0:
try:
if isinstance(step, BaseBatchTransformStep):
changed_idx_count = step.get_changed_idx_count(ds=app.ds)

if changed_idx_count > 0:
extra_args[
"changed_idx_count"
] = f"[red]{changed_idx_count}[/red]"

except NotImplementedError:
# Currently we do not support empty join_keys
extra_args["changed_idx_count"] = "[red]N/A[/red]"
try:
step_status = step.get_status(ds=app.ds)
extra_args["total_idx_count"] = str(step_status.total_idx_count)
extra_args["changed_idx_count"] = (
f"[red]{step_status.changed_idx_count}[/red]"
)
except NotImplementedError:
# Currently we do not support empty join_keys
extra_args["total_idx_count"] = "[red]N/A[/red]"
extra_args["changed_idx_count"] = "[red]N/A[/red]"

rprint(to_human_repr(step, extra_args=extra_args))
rprint("")
Expand Down
10 changes: 10 additions & 0 deletions datapipe/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ def get_datatable(self, ds: DataStore, name: str) -> DataTable:
return ds.get_or_create_table(name=name, table_store=self.catalog[name].store)


@dataclass
class StepStatus:
name: str
total_idx_count: int
changed_idx_count: int


class ComputeStep:
"""
Шаг вычислений в графе вычислений.
Expand Down Expand Up @@ -91,6 +98,9 @@ def name(self) -> str:
def labels(self) -> Labels:
return self._labels if self._labels else []

def get_status(self, ds: DataStore) -> StepStatus:
raise NotImplementedError

# TODO: move to lints
def validate(self) -> None:
inp_p_keys_arr = [set(inp.primary_keys) for inp in self.input_dts if inp]
Expand Down
4 changes: 1 addition & 3 deletions datapipe/datatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,9 +610,7 @@ def __init__(
create_meta_table: bool = False,
) -> None:
self.meta_dbconn = meta_dbconn
self.event_logger = EventLogger(
self.meta_dbconn, create_table=create_meta_table
)
self.event_logger = EventLogger()
self.tables: Dict[str, DataTable] = {}

self.create_meta_table = create_meta_table
Expand Down
104 changes: 2 additions & 102 deletions datapipe/event_logger.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,14 @@
import logging
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional, Tuple
from typing import Optional

from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql import func
from sqlalchemy.sql.schema import Column, Table
from sqlalchemy.sql.sqltypes import JSON, DateTime, Integer, String
from traceback_with_variables import format_exc

from datapipe.run_config import RunConfig

logger = logging.getLogger("datapipe.event_logger")

if TYPE_CHECKING:
from datapipe.store.database import DBConn


class EventTypes(Enum):
STATE = "state"
ERROR = "error"


class StepEventTypes(Enum):
RUN_FULL_COMPLETE = "run_full_complete"


class EventLogger:
def __init__(self, dbconn: "DBConn", create_table: bool = False):
self.dbconn = dbconn

self.events_table = Table(
"datapipe_events",
dbconn.sqla_metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("event_ts", DateTime, server_default=func.now()),
Column("type", String(100)),
Column("event", JSON if dbconn.con.name == "sqlite" else JSONB),
)

self.step_events_table = Table(
"datapipe_step_events",
dbconn.sqla_metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("step", String(100)),
Column("event_ts", DateTime, server_default=func.now()),
Column("event", String(100)),
Column("event_payload", JSON if dbconn.con.name == "sqlite" else JSONB),
)

if create_table:
self.events_table.create(self.dbconn.con, checkfirst=True)
self.step_events_table.create(self.dbconn.con, checkfirst=True)

def __reduce__(self) -> Tuple[Any, ...]:
return self.__class__, (self.dbconn,)

def log_state(
self,
table_name,
Expand All @@ -66,34 +20,9 @@ def log_state(
):
logger.debug(
f'Table "{table_name}": added = {added_count}; updated = {updated_count}; '
f"deleted = {deleted_count}, processed_count = {deleted_count}"
f"deleted = {deleted_count}, processed_count = {processed_count}"
)

if run_config is not None:
meta = {
"labels": run_config.labels,
"filters": run_config.filters,
}
else:
meta = {}

ins = self.events_table.insert().values(
type=EventTypes.STATE.value,
event={
"meta": meta,
"data": {
"table_name": table_name,
"added_count": added_count,
"updated_count": updated_count,
"deleted_count": deleted_count,
"processed_count": processed_count,
},
},
)

with self.dbconn.con.begin() as con:
con.execute(ins)

def log_error(
self,
type,
Expand All @@ -106,29 +35,8 @@ def log_error(
logger.error(
f'Error in step {run_config.labels.get("step_name")}: {type} {message}\n{description}'
)
meta = {
"labels": run_config.labels,
"filters": run_config.filters,
}
else:
logger.error(f"Error: {type} {message}\n{description}")
meta = {}

ins = self.events_table.insert().values(
type=EventTypes.ERROR.value,
event={
"meta": meta,
"data": {
"type": type,
"message": message,
"description": description,
"params": params,
},
},
)

with self.dbconn.con.begin() as con:
con.execute(ins)

def log_exception(
self,
Expand All @@ -148,11 +56,3 @@ def log_step_full_complete(
step_name: str,
) -> None:
logger.debug(f"Step {step_name} is marked complete")

ins = self.step_events_table.insert().values(
step=step_name,
event=StepEventTypes.RUN_FULL_COMPLETE.value,
)

with self.dbconn.con.begin() as con:
con.execute(ins)
Empty file added datapipe/py.typed
Empty file.
3 changes: 2 additions & 1 deletion datapipe/step/batch_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def do_batch_generate(
ds: DataStore,
output_dts: List[DataTable],
run_config: Optional[RunConfig] = None,
kwargs: Optional[Dict] = None,
delete_stale: bool = True,
kwargs: Optional[Dict] = None,
) -> None:
"""
Создание новой таблицы из результатов запуска `proc_func`.
Expand Down Expand Up @@ -104,6 +104,7 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]:
ds=ds,
output_dts=output_dts,
run_config=run_config,
delete_stale=self.delete_stale,
kwargs=kwargs,
),
),
Expand Down
Loading

0 comments on commit 85d500b

Please sign in to comment.