Skip to content

Commit

Permalink
Merge pull request #112 from MITLibraries/timx-128-update-config
Browse files Browse the repository at this point in the history
Timx 128 update config
  • Loading branch information
hakbailey authored Dec 8, 2022
2 parents 20945f7 + 4246e4a commit f594059
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 265 deletions.
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.10.6
3.10.8
508 changes: 264 additions & 244 deletions Pipfile.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ TIMDEX! Index Manager (TIM) is a Python cli application for managing TIMDEX inde
## Optional ENV

- `AWS_REGION` = Only needed if AWS region changes from the default of us-east-1.
- `OPENSEARCH_REQUEST_TIMEOUT` = Only used for OpenSearch requests that tend to take longer than the default timeout of 10 seconds, such as bulk or index refresh requests. Defaults to 30 seconds if not set.
- `OPENSEARCH_BULK_MAX_CHUNK_BYTES` = Chunk size limit for sending requests to the bulk indexing endpoint, in bytes. Defaults to 100 MB (the opensearchpy default) if not set.
- `OPENSEARCH_BULK_MAX_RETRIES` = Maximum number of retries when sending requests to the bulk indexing endpoint. Defaults to 8 if not set.
- `OPENSEARCH_REQUEST_TIMEOUT` = Only used for OpenSearch requests that tend to take longer than the default timeout of 10 seconds, such as bulk or index refresh requests. Defaults to 120 seconds if not set.
- `SENTRY_DSN` = If set to a valid Sentry DSN, enables Sentry exception monitoring. This is not needed for local development.
- `STATUS_UPDATE_INTERVAL` = The ingest process logs the # of records indexed every nth record (1000 by default). Set this env variable to any integer to change the frequency of logging status updates. Can be useful for development/debugging.
- `TIMDEX_OPENSEARCH_ENDPOINT` = If using a local Docker OpenSearch instance, this isn't needed. Otherwise set to OpenSearch instance endpoint _without_ the http scheme, e.g. `search-timdex-env-1234567890.us-east-1.es.amazonaws.com`. Can also be passed directly to the CLI via the `--url` option.
Expand Down
31 changes: 20 additions & 11 deletions tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging

from tim.config import (
OPENSEARCH_BULK_CONFIG_DEFAULTS,
configure_index_settings,
configure_logger,
configure_opensearch_bulk_settings,
configure_sentry,
opensearch_request_timeout,
)


Expand All @@ -28,6 +29,24 @@ def test_configure_logger_verbose():
assert result == "Logger 'tests.test_config' configured with level=DEBUG"


def test_configure_opensearch_bulk_settings_from_env(monkeypatch):
monkeypatch.setenv("OPENSEARCH_BULK_MAX_CHUNK_BYTES", "10")
monkeypatch.setenv("OPENSEARCH_BULK_MAX_RETRIES", "2")
monkeypatch.setenv("OPENSEARCH_REQUEST_TIMEOUT", "20")
assert configure_opensearch_bulk_settings() == {
"OPENSEARCH_BULK_MAX_CHUNK_BYTES": 10,
"OPENSEARCH_BULK_MAX_RETRIES": 2,
"OPENSEARCH_REQUEST_TIMEOUT": 20,
}


def test_configure_opensearch_bulk_settings_uses_defaults(monkeypatch):
monkeypatch.delenv("OPENSEARCH_BULK_MAX_CHUNK_BYTES", raising=False)
monkeypatch.delenv("OPENSEARCH_BULK_MAX_RETRIES", raising=False)
monkeypatch.delenv("OPENSEARCH_REQUEST_TIMEOUT", raising=False)
assert configure_opensearch_bulk_settings() == OPENSEARCH_BULK_CONFIG_DEFAULTS


def test_configure_sentry_no_env_variable(monkeypatch):
monkeypatch.delenv("SENTRY_DSN", raising=False)
result = configure_sentry()
Expand All @@ -44,13 +63,3 @@ def test_configure_sentry_env_variable_is_dsn(monkeypatch):
monkeypatch.setenv("SENTRY_DSN", "https://1234567890@00000.ingest.sentry.io/123456")
result = configure_sentry()
assert result == "Sentry DSN found, exceptions will be sent to Sentry with env=test"


def test_opensearch_request_timeout_default(monkeypatch):
monkeypatch.delenv("OPENSEARCH_REQUEST_TIMEOUT", raising=False)
assert opensearch_request_timeout() == 120


def test_opensearch_request_timeout_from_env(monkeypatch):
monkeypatch.setenv("OPENSEARCH_REQUEST_TIMEOUT", "5")
assert opensearch_request_timeout() == 5
16 changes: 12 additions & 4 deletions tim/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

import sentry_sdk

OPENSEARCH_BULK_CONFIG_DEFAULTS = {
"OPENSEARCH_BULK_MAX_CHUNK_BYTES": 100 * 1024 * 1024,
"OPENSEARCH_BULK_MAX_RETRIES": 8,
"OPENSEARCH_REQUEST_TIMEOUT": 120,
}
PRIMARY_ALIAS = "all-current"
VALID_BULK_OPERATIONS = ["create", "delete", "index", "update"]
VALID_SOURCES = ["alma", "aspace", "dspace", "jpal", "whoas", "zenodo"]
Expand Down Expand Up @@ -35,14 +40,17 @@ def configure_logger(logger: logging.Logger, verbose: bool) -> str:
)


def configure_opensearch_bulk_settings() -> dict[str, int]:
result = {}
for key, value in OPENSEARCH_BULK_CONFIG_DEFAULTS.items():
result[key] = int(os.getenv(key) or value)
return result


def configure_sentry() -> str:
env = os.getenv("WORKSPACE")
sentry_dsn = os.getenv("SENTRY_DSN")
if sentry_dsn and sentry_dsn.lower() != "none":
sentry_sdk.init(sentry_dsn, environment=env)
return f"Sentry DSN found, exceptions will be sent to Sentry with env={env}"
return "No Sentry DSN found, exceptions will not be sent to Sentry"


def opensearch_request_timeout() -> int:
return int(os.getenv("OPENSEARCH_REQUEST_TIMEOUT", "120"))
11 changes: 7 additions & 4 deletions tim/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from tim.config import (
PRIMARY_ALIAS,
configure_index_settings,
opensearch_request_timeout,
configure_opensearch_bulk_settings,
)
from tim.errors import AliasNotFoundError, IndexExistsError, IndexNotFoundError

Expand Down Expand Up @@ -315,14 +315,16 @@ def bulk_index(
Returns total sums of: records created, records updated, errors, and total records
processed.
"""
bulk_config = configure_opensearch_bulk_settings()
result = {"created": 0, "updated": 0, "errors": 0, "total": 0}
actions = helpers.generate_bulk_actions(index, records, "index")
responses = streaming_bulk(
client,
actions,
max_retries=3,
max_chunk_bytes=bulk_config["OPENSEARCH_BULK_MAX_CHUNK_BYTES"],
max_retries=bulk_config["OPENSEARCH_BULK_MAX_RETRIES"],
raise_on_error=False,
request_timeout=opensearch_request_timeout(),
request_timeout=bulk_config["OPENSEARCH_REQUEST_TIMEOUT"],
)
for response in responses:
if response[0] is False:
Expand All @@ -347,7 +349,8 @@ def bulk_index(
logger.info("Status update: %s records indexed so far!", result["total"])
logger.info("All records ingested, refreshing index.")
response = client.indices.refresh(
index=index, request_timeout=opensearch_request_timeout()
index=index,
request_timeout=bulk_config["OPENSEARCH_REQUEST_TIMEOUT"],
)
logger.debug(response)
return result

0 comments on commit f594059

Please sign in to comment.