From 4246e4ab01626cb9d6651c1b5acc0640c494e306 Mon Sep 17 00:00:00 2001 From: Helen Bailey Date: Mon, 5 Dec 2022 11:32:38 -0500 Subject: [PATCH] Update bulk indexing configuration Why these changes are being introduced: We were getting 429 error responses during bulk indexing and needed to adjust configurations for the opensearchpy bulk helper. Although those adjustments didn't solve the problem, having them configurable via ENV variables is a better long-term solution than hard-coding them, so this commit adds that option. How this addresses that need: * Adds a new config function to configure opensearch bulk settings, reading from ENV variables if present with defaults if not. * Adds tests for new function. * Updates opensearch bulk_index function to configure settings using the new function. * Updates README to document new optional ENV variables. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-128 --- README.md | 4 +++- tests/test_config.py | 31 ++++++++++++++++++++----------- tim/config.py | 16 ++++++++++++---- tim/opensearch.py | 11 +++++++---- 4 files changed, 42 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 9ff4c31..973ef1b 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,9 @@ TIMDEX! Index Manager (TIM) is a Python cli application for managing TIMDEX inde ## Optional ENV - `AWS_REGION` = Only needed if AWS region changes from the default of us-east-1. -- `OPENSEARCH_REQUEST_TIMEOUT` = Only used for OpenSearch requests that tend to take longer than the default timeout of 10 seconds, such as bulk or index refresh requests. Defaults to 30 seconds if not set. +- `OPENSEARCH_BULK_MAX_CHUNK_BYTES` = Chunk size limit for sending requests to the bulk indexing endpoint, in bytes. Defaults to 100 MB (the opensearchpy default) if not set. +- `OPENSEARCH_BULK_MAX_RETRIES` = Maximum number of retries when sending requests to the bulk indexing endpoint. Defaults to 8 if not set. +- `OPENSEARCH_REQUEST_TIMEOUT` = Only used for OpenSearch requests that tend to take longer than the default timeout of 10 seconds, such as bulk or index refresh requests. Defaults to 120 seconds if not set. - `SENTRY_DSN` = If set to a valid Sentry DSN, enables Sentry exception monitoring. This is not needed for local development. - `STATUS_UPDATE_INTERVAL` = The ingest process logs the # of records indexed every nth record (1000 by default). Set this env variable to any integer to change the frequency of logging status updates. Can be useful for development/debugging. - `TIMDEX_OPENSEARCH_ENDPOINT` = If using a local Docker OpenSearch instance, this isn't needed. Otherwise set to OpenSearch instance endpoint _without_ the http scheme, e.g. `search-timdex-env-1234567890.us-east-1.es.amazonaws.com`. Can also be passed directly to the CLI via the `--url` option. diff --git a/tests/test_config.py b/tests/test_config.py index a2e884a..efb7e68 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,10 +1,11 @@ import logging from tim.config import ( + OPENSEARCH_BULK_CONFIG_DEFAULTS, configure_index_settings, configure_logger, + configure_opensearch_bulk_settings, configure_sentry, - opensearch_request_timeout, ) @@ -28,6 +29,24 @@ def test_configure_logger_verbose(): assert result == "Logger 'tests.test_config' configured with level=DEBUG" +def test_configure_opensearch_bulk_settings_from_env(monkeypatch): + monkeypatch.setenv("OPENSEARCH_BULK_MAX_CHUNK_BYTES", "10") + monkeypatch.setenv("OPENSEARCH_BULK_MAX_RETRIES", "2") + monkeypatch.setenv("OPENSEARCH_REQUEST_TIMEOUT", "20") + assert configure_opensearch_bulk_settings() == { + "OPENSEARCH_BULK_MAX_CHUNK_BYTES": 10, + "OPENSEARCH_BULK_MAX_RETRIES": 2, + "OPENSEARCH_REQUEST_TIMEOUT": 20, + } + + +def test_configure_opensearch_bulk_settings_uses_defaults(monkeypatch): + monkeypatch.delenv("OPENSEARCH_BULK_MAX_CHUNK_BYTES", raising=False) + monkeypatch.delenv("OPENSEARCH_BULK_MAX_RETRIES", raising=False) + monkeypatch.delenv("OPENSEARCH_REQUEST_TIMEOUT", raising=False) + assert configure_opensearch_bulk_settings() == OPENSEARCH_BULK_CONFIG_DEFAULTS + + def test_configure_sentry_no_env_variable(monkeypatch): monkeypatch.delenv("SENTRY_DSN", raising=False) result = configure_sentry() @@ -44,13 +63,3 @@ def test_configure_sentry_env_variable_is_dsn(monkeypatch): monkeypatch.setenv("SENTRY_DSN", "https://1234567890@00000.ingest.sentry.io/123456") result = configure_sentry() assert result == "Sentry DSN found, exceptions will be sent to Sentry with env=test" - - -def test_opensearch_request_timeout_default(monkeypatch): - monkeypatch.delenv("OPENSEARCH_REQUEST_TIMEOUT", raising=False) - assert opensearch_request_timeout() == 120 - - -def test_opensearch_request_timeout_from_env(monkeypatch): - monkeypatch.setenv("OPENSEARCH_REQUEST_TIMEOUT", "5") - assert opensearch_request_timeout() == 5 diff --git a/tim/config.py b/tim/config.py index c143e9f..ddf0ac7 100644 --- a/tim/config.py +++ b/tim/config.py @@ -4,6 +4,11 @@ import sentry_sdk +OPENSEARCH_BULK_CONFIG_DEFAULTS = { + "OPENSEARCH_BULK_MAX_CHUNK_BYTES": 100 * 1024 * 1024, + "OPENSEARCH_BULK_MAX_RETRIES": 8, + "OPENSEARCH_REQUEST_TIMEOUT": 120, +} PRIMARY_ALIAS = "all-current" VALID_BULK_OPERATIONS = ["create", "delete", "index", "update"] VALID_SOURCES = ["alma", "aspace", "dspace", "jpal", "whoas", "zenodo"] @@ -35,6 +40,13 @@ def configure_logger(logger: logging.Logger, verbose: bool) -> str: ) +def configure_opensearch_bulk_settings() -> dict[str, int]: + result = {} + for key, value in OPENSEARCH_BULK_CONFIG_DEFAULTS.items(): + result[key] = int(os.getenv(key) or value) + return result + + def configure_sentry() -> str: env = os.getenv("WORKSPACE") sentry_dsn = os.getenv("SENTRY_DSN") @@ -42,7 +54,3 @@ def configure_sentry() -> str: sentry_sdk.init(sentry_dsn, environment=env) return f"Sentry DSN found, exceptions will be sent to Sentry with env={env}" return "No Sentry DSN found, exceptions will not be sent to Sentry" - - -def opensearch_request_timeout() -> int: - return int(os.getenv("OPENSEARCH_REQUEST_TIMEOUT", "120")) diff --git a/tim/opensearch.py b/tim/opensearch.py index 08a7c42..1b4a1c4 100644 --- a/tim/opensearch.py +++ b/tim/opensearch.py @@ -12,7 +12,7 @@ from tim.config import ( PRIMARY_ALIAS, configure_index_settings, - opensearch_request_timeout, + configure_opensearch_bulk_settings, ) from tim.errors import AliasNotFoundError, IndexExistsError, IndexNotFoundError @@ -315,14 +315,16 @@ def bulk_index( Returns total sums of: records created, records updated, errors, and total records processed. """ + bulk_config = configure_opensearch_bulk_settings() result = {"created": 0, "updated": 0, "errors": 0, "total": 0} actions = helpers.generate_bulk_actions(index, records, "index") responses = streaming_bulk( client, actions, - max_retries=3, + max_chunk_bytes=bulk_config["OPENSEARCH_BULK_MAX_CHUNK_BYTES"], + max_retries=bulk_config["OPENSEARCH_BULK_MAX_RETRIES"], raise_on_error=False, - request_timeout=opensearch_request_timeout(), + request_timeout=bulk_config["OPENSEARCH_REQUEST_TIMEOUT"], ) for response in responses: if response[0] is False: @@ -347,7 +349,8 @@ def bulk_index( logger.info("Status update: %s records indexed so far!", result["total"]) logger.info("All records ingested, refreshing index.") response = client.indices.refresh( - index=index, request_timeout=opensearch_request_timeout() + index=index, + request_timeout=bulk_config["OPENSEARCH_REQUEST_TIMEOUT"], ) logger.debug(response) return result