Skip to content

Commit

Permalink
Update bulk indexing configuration
Browse files Browse the repository at this point in the history
Why these changes are being introduced:
We were getting 429 error responses during bulk indexing and needed to
adjust configurations for the opensearchpy bulk helper. Although those
adjustments didn't solve the problem, having them configurable via ENV
variables is a better long-term solution than hard-coding them, so this
commit adds that option.

How this addresses that need:
* Adds a new config function to configure opensearch bulk settings,
  reading from ENV variables if present with defaults if not.
* Adds tests for new function.
* Updates opensearch bulk_index function to configure settings using the
  new function.
* Updates README to document new optional ENV variables.

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-128
  • Loading branch information
hakbailey committed Dec 6, 2022
1 parent f35e70d commit 4246e4a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 20 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ TIMDEX! Index Manager (TIM) is a Python cli application for managing TIMDEX inde
## Optional ENV

- `AWS_REGION` = Only needed if AWS region changes from the default of us-east-1.
- `OPENSEARCH_REQUEST_TIMEOUT` = Only used for OpenSearch requests that tend to take longer than the default timeout of 10 seconds, such as bulk or index refresh requests. Defaults to 30 seconds if not set.
- `OPENSEARCH_BULK_MAX_CHUNK_BYTES` = Chunk size limit for sending requests to the bulk indexing endpoint, in bytes. Defaults to 100 MB (the opensearchpy default) if not set.
- `OPENSEARCH_BULK_MAX_RETRIES` = Maximum number of retries when sending requests to the bulk indexing endpoint. Defaults to 8 if not set.
- `OPENSEARCH_REQUEST_TIMEOUT` = Only used for OpenSearch requests that tend to take longer than the default timeout of 10 seconds, such as bulk or index refresh requests. Defaults to 120 seconds if not set.
- `SENTRY_DSN` = If set to a valid Sentry DSN, enables Sentry exception monitoring. This is not needed for local development.
- `STATUS_UPDATE_INTERVAL` = The ingest process logs the # of records indexed every nth record (1000 by default). Set this env variable to any integer to change the frequency of logging status updates. Can be useful for development/debugging.
- `TIMDEX_OPENSEARCH_ENDPOINT` = If using a local Docker OpenSearch instance, this isn't needed. Otherwise set to OpenSearch instance endpoint _without_ the http scheme, e.g. `search-timdex-env-1234567890.us-east-1.es.amazonaws.com`. Can also be passed directly to the CLI via the `--url` option.
Expand Down
31 changes: 20 additions & 11 deletions tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging

from tim.config import (
OPENSEARCH_BULK_CONFIG_DEFAULTS,
configure_index_settings,
configure_logger,
configure_opensearch_bulk_settings,
configure_sentry,
opensearch_request_timeout,
)


Expand All @@ -28,6 +29,24 @@ def test_configure_logger_verbose():
assert result == "Logger 'tests.test_config' configured with level=DEBUG"


def test_configure_opensearch_bulk_settings_from_env(monkeypatch):
monkeypatch.setenv("OPENSEARCH_BULK_MAX_CHUNK_BYTES", "10")
monkeypatch.setenv("OPENSEARCH_BULK_MAX_RETRIES", "2")
monkeypatch.setenv("OPENSEARCH_REQUEST_TIMEOUT", "20")
assert configure_opensearch_bulk_settings() == {
"OPENSEARCH_BULK_MAX_CHUNK_BYTES": 10,
"OPENSEARCH_BULK_MAX_RETRIES": 2,
"OPENSEARCH_REQUEST_TIMEOUT": 20,
}


def test_configure_opensearch_bulk_settings_uses_defaults(monkeypatch):
monkeypatch.delenv("OPENSEARCH_BULK_MAX_CHUNK_BYTES", raising=False)
monkeypatch.delenv("OPENSEARCH_BULK_MAX_RETRIES", raising=False)
monkeypatch.delenv("OPENSEARCH_REQUEST_TIMEOUT", raising=False)
assert configure_opensearch_bulk_settings() == OPENSEARCH_BULK_CONFIG_DEFAULTS


def test_configure_sentry_no_env_variable(monkeypatch):
monkeypatch.delenv("SENTRY_DSN", raising=False)
result = configure_sentry()
Expand All @@ -44,13 +63,3 @@ def test_configure_sentry_env_variable_is_dsn(monkeypatch):
monkeypatch.setenv("SENTRY_DSN", "https://1234567890@00000.ingest.sentry.io/123456")
result = configure_sentry()
assert result == "Sentry DSN found, exceptions will be sent to Sentry with env=test"


def test_opensearch_request_timeout_default(monkeypatch):
monkeypatch.delenv("OPENSEARCH_REQUEST_TIMEOUT", raising=False)
assert opensearch_request_timeout() == 120


def test_opensearch_request_timeout_from_env(monkeypatch):
monkeypatch.setenv("OPENSEARCH_REQUEST_TIMEOUT", "5")
assert opensearch_request_timeout() == 5
16 changes: 12 additions & 4 deletions tim/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

import sentry_sdk

OPENSEARCH_BULK_CONFIG_DEFAULTS = {
"OPENSEARCH_BULK_MAX_CHUNK_BYTES": 100 * 1024 * 1024,
"OPENSEARCH_BULK_MAX_RETRIES": 8,
"OPENSEARCH_REQUEST_TIMEOUT": 120,
}
PRIMARY_ALIAS = "all-current"
VALID_BULK_OPERATIONS = ["create", "delete", "index", "update"]
VALID_SOURCES = ["alma", "aspace", "dspace", "jpal", "whoas", "zenodo"]
Expand Down Expand Up @@ -35,14 +40,17 @@ def configure_logger(logger: logging.Logger, verbose: bool) -> str:
)


def configure_opensearch_bulk_settings() -> dict[str, int]:
result = {}
for key, value in OPENSEARCH_BULK_CONFIG_DEFAULTS.items():
result[key] = int(os.getenv(key) or value)
return result


def configure_sentry() -> str:
env = os.getenv("WORKSPACE")
sentry_dsn = os.getenv("SENTRY_DSN")
if sentry_dsn and sentry_dsn.lower() != "none":
sentry_sdk.init(sentry_dsn, environment=env)
return f"Sentry DSN found, exceptions will be sent to Sentry with env={env}"
return "No Sentry DSN found, exceptions will not be sent to Sentry"


def opensearch_request_timeout() -> int:
return int(os.getenv("OPENSEARCH_REQUEST_TIMEOUT", "120"))
11 changes: 7 additions & 4 deletions tim/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from tim.config import (
PRIMARY_ALIAS,
configure_index_settings,
opensearch_request_timeout,
configure_opensearch_bulk_settings,
)
from tim.errors import AliasNotFoundError, IndexExistsError, IndexNotFoundError

Expand Down Expand Up @@ -315,14 +315,16 @@ def bulk_index(
Returns total sums of: records created, records updated, errors, and total records
processed.
"""
bulk_config = configure_opensearch_bulk_settings()
result = {"created": 0, "updated": 0, "errors": 0, "total": 0}
actions = helpers.generate_bulk_actions(index, records, "index")
responses = streaming_bulk(
client,
actions,
max_retries=3,
max_chunk_bytes=bulk_config["OPENSEARCH_BULK_MAX_CHUNK_BYTES"],
max_retries=bulk_config["OPENSEARCH_BULK_MAX_RETRIES"],
raise_on_error=False,
request_timeout=opensearch_request_timeout(),
request_timeout=bulk_config["OPENSEARCH_REQUEST_TIMEOUT"],
)
for response in responses:
if response[0] is False:
Expand All @@ -347,7 +349,8 @@ def bulk_index(
logger.info("Status update: %s records indexed so far!", result["total"])
logger.info("All records ingested, refreshing index.")
response = client.indices.refresh(
index=index, request_timeout=opensearch_request_timeout()
index=index,
request_timeout=bulk_config["OPENSEARCH_REQUEST_TIMEOUT"],
)
logger.debug(response)
return result

0 comments on commit 4246e4a

Please sign in to comment.