Skip to content

Commit

Permalink
Add bulk-update command to index/delete records from TIMDEX parquet d…
Browse files Browse the repository at this point in the history
…ataset

Why these changes are being introduced:
* The timdex-index-manager (TIM) needs to support the v2 parquet dataset,
which now contains records for both indexing and deleting. The new CLI
command performs a "bulk update" given a subset of the dataset
(filtered by 'run_date' and 'run_id') and uses the timdex-dataset-api
library to read records from the TIMDEXDataset.

By introducing a new CLI command, it doesn't require the feature
flagging approach, allowing the existing CLI commands and helper functions
to remain untouched for v1 purposes.

How this addresses that need:
* Implement 'bulk-update' CLI command
* Add unit tests for 'bulk-update'

Side effects of this change:
* TIM remains backwards v1 compatible but will now support v2 runs.

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-428
  • Loading branch information
jonavellecuerdo committed Jan 10, 2025
1 parent b6e8893 commit 34cda39
Show file tree
Hide file tree
Showing 6 changed files with 1,122 additions and 740 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mypy = "*"
pre-commit = "*"
pytest = "*"
ruff = "*"
timdex-dataset-api = { git = "git+https://github.com/MITLibraries/timdex-dataset-api.git"}
vcrpy = "*"

[requires]
Expand Down
1,707 changes: 968 additions & 739 deletions Pipfile.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
- discovery.type=single-node
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
volumes:
- opensearch-local-data:/usr/share/opensearch/data
networks:
Expand All @@ -21,6 +22,7 @@ services:
environment:
- "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true"
- 'OPENSEARCH_HOSTS=["http://opensearch:9200"]'
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
networks:
- opensearch-local-net
volumes:
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from click.testing import CliRunner

from tim.opensearch import configure_opensearch_client
from timdex_dataset_api import TIMDEXDataset

EXIT_CODES = {
"success": 0,
Expand Down
87 changes: 87 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import re
from unittest.mock import MagicMock, patch

from freezegun import freeze_time

from tim.cli import main
from tim.errors import BulkIndexingError

from .conftest import EXIT_CODES, my_vcr

Expand Down Expand Up @@ -256,3 +258,88 @@ def test_bulk_delete_with_source_success(caplog, runner):
"from index 'alma-2022-09-01t00-00-00'" in caplog.text
)
assert "Bulk deletion complete!" in caplog.text


@patch("timdex_dataset_api.dataset.TIMDEXDataset.load")
@patch("tim.helpers.validate_bulk_cli_options")
@patch("tim.opensearch.bulk_delete")
@patch("tim.opensearch.bulk_index")
def test_bulk_update_with_source_success(
mock_bulk_index,
mock_bulk_delete,
mock_validate_bulk_cli_options,
mock_timdex_dataset,
caplog,
monkeypatch,
runner,
):
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
mock_bulk_index.return_value = {
"created": 1000,
"updated": 0,
"errors": 0,
"total": 1000,
}
mock_bulk_delete.return_value = {"deleted": 0, "errors": 0, "total": 0}
mock_validate_bulk_cli_options.return_value = "alma"
mock_timdex_dataset.return_value = MagicMock()

result = runner.invoke(
main,
[
"bulk-update",
"--source",
"alma",
"--run-date",
"2024-12-01",
"--run-id",
"abc123",
"s3://test-timdex-bucket/dataset",
],
)
assert result.exit_code == EXIT_CODES["success"]
assert (
'Bulk update complete: {"index": {"created": 1000, "updated": 0, "errors": 0, "total": 1000}, "delete": {"deleted": 0, "errors": 0, "total": 0}'
in caplog.text
)


@patch("timdex_dataset_api.dataset.TIMDEXDataset.load")
@patch("tim.helpers.validate_bulk_cli_options")
@patch("tim.opensearch.bulk_delete")
@patch("tim.opensearch.bulk_index")
def test_bulk_update_with_source_raise_bulk_indexing_error(
mock_bulk_index,
mock_bulk_delete,
mock_validate_bulk_cli_options,
mock_timdex_dataset,
caplog,
monkeypatch,
runner,
):
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
mock_bulk_index.side_effect = BulkIndexingError(
record="alma:0", index="index", error="exception"
)
mock_bulk_delete.return_value = {"deleted": 0, "errors": 0, "total": 0}
mock_validate_bulk_cli_options.return_value = "alma"
mock_timdex_dataset.return_value = MagicMock()

result = runner.invoke(
main,
[
"bulk-update",
"--source",
"alma",
"--run-date",
"2024-12-01",
"--run-id",
"abc123",
"s3://test-timdex-bucket/dataset",
],
)
assert result.exit_code == EXIT_CODES["success"]
assert (
'Bulk update complete: {"index": {"created": 0, "updated": 0, "errors": 0, "total": 0}, "delete": {"deleted": 0, "errors": 0, "total": 0}'
in caplog.text
)
64 changes: 63 additions & 1 deletion tim/cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# ruff: noqa: TRY003, EM101
import json
import logging
from datetime import timedelta
from time import perf_counter

import rich_click as click

from tim import errors, helpers
from tim import opensearch as tim_os
from tim.config import PRIMARY_ALIAS, VALID_SOURCES, configure_logger, configure_sentry
from tim.errors import BulkIndexingError
from timdex_dataset_api import TIMDEXDataset

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -252,6 +254,7 @@ def promote(ctx: click.Context, index: str, alias: list[str]) -> None:
# Bulk record processing commands


# NOTE: FEATURE FLAG: 'bulk_index' may be removed entirely when v2 work done
@main.command()
@click.option("-i", "--index", help="Name of the index to bulk index records into.")
@click.option(
Expand Down Expand Up @@ -295,6 +298,7 @@ def bulk_index(ctx: click.Context, index: str, source: str, filepath: str) -> No
)


# NOTE: FEATURE FLAG: 'bulk_index' may be removed entirely when v2 work done
@main.command()
@click.option("-i", "--index", help="Name of the index to bulk delete records from.")
@click.option(
Expand Down Expand Up @@ -334,3 +338,61 @@ def bulk_delete(ctx: click.Context, index: str, source: str, filepath: str) -> N
results["deleted"],
results["total"],
)


@main.command()
@click.option("-i", "--index", help="Name of the index to bulk index records into.")
@click.option(
"-s",
"--source",
type=click.Choice(VALID_SOURCES),
help="Source whose primary-aliased index to bulk index records into.",
)
@click.option("-d", "--run-date", help="Run date, formatted as YYYY-MM-DD.")
@click.option("-rid", "--run-id", help="Run ID.")
@click.argument("dataset_path", type=click.Path())
@click.pass_context
def bulk_update(
ctx: click.Context,
index: str,
source: str,
run_date: str,
run_id: str,
dataset_path: str,
):
"""Bulk update records for an index.
Must provide either the name of an existing index in the cluster or a valid source.
If source is provided, it will perform indexing and/or deletion of records for
the primary-aliased index for the source.
The method will read transformed records from a TIMDEXDataset
located at dataset_path using the 'timdex-dataset-api' library. The dataset
is filtered by run date and run ID.
Logs an error and aborts if the provided index doesn't exist in the cluster.
"""
client = ctx.obj["CLIENT"]
index = helpers.validate_bulk_cli_options(index, source, client)

logger.info(f"Bulk updating records from dataset '{dataset_path}' into '{index}'")

index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}
delete_results = {"deleted": 0, "errors": 0, "total": 0}

td = TIMDEXDataset(location=dataset_path)
td.load(run_date=run_date, run_id=run_id)

# bulk index records
records_to_index = td.read_transformed_records_iter(action="index")
try:
index_results.update(tim_os.bulk_index(client, index, records_to_index))
except BulkIndexingError as exception:
logger.info(f"Bulk indexing failed: {exception}")

# bulk delete records
records_to_delete = td.read_dicts_iter(columns=["timdex_record_id"], action="delete")
delete_results.update(tim_os.bulk_delete(client, index, records_to_delete))

summary_results = {"index": index_results, "delete": delete_results}
logger.info(f"Bulk update complete: {json.dumps(summary_results)}")

0 comments on commit 34cda39

Please sign in to comment.