Skip to content

Commit

Permalink
Unit test and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dbutenhof committed Oct 13, 2021
1 parent 59ba004 commit c61d20d
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 256 deletions.
79 changes: 31 additions & 48 deletions lib/pbench/server/api/resources/query_apis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from datetime import datetime
from http import HTTPStatus
from logging import Logger, error
from logging import Logger
from typing import Any, Callable, Dict, Iterator
from urllib.parse import urljoin

from dateutil import rrule
from dateutil.relativedelta import relativedelta
import elasticsearch
from elasticsearch.helpers import streaming_bulk, BulkIndexError
from flask.wrappers import Response
from flask_restful import abort
import requests
Expand All @@ -23,7 +22,7 @@
UnsupportedAccessMode,
)
from pbench.server.database.models.template import Template
from pbench.server.database.models.datasets import Dataset
from pbench.server.database.models.datasets import Dataset, DatasetNotFound
from pbench.server.database.models.users import User

# A type defined to allow the preprocess subclass method to provide shared
Expand Down Expand Up @@ -363,7 +362,8 @@ class ElasticBulkBase(ApiBase):
This class extends the ApiBase class in order to connect the post
and get methods to Flask's URI routing algorithms. It implements a common
mechanism for calling Elasticsearch and processing errors.
mechanism for calling the Elasticsearch package streaming_bulk helper, and
processing the response documents.
"""

def __init__(
Expand Down Expand Up @@ -435,6 +435,9 @@ def _post(self, json_data: JSON, _) -> Response:
"""
Perform the requested POST operation, and handle any exceptions.
NOTE: This is called by the ApiBase post() method through its dispatch
method, which provides parameter validation.
Args:
json_data: Type-normalized client JSON input
_: Original incoming Request object (not used)
Expand All @@ -443,56 +446,39 @@ def _post(self, json_data: JSON, _) -> Response:
Response to return to client
"""
klasname = self.__class__.__name__
dataset = Dataset.attach(
controller=json_data["controller"], name=json_data["name"]
)

try:
dataset = Dataset.attach(
controller=json_data["controller"], name=json_data["name"]
)
except DatasetNotFound as e:
abort(HTTPStatus.NOT_FOUND, message=str(e))

owner = User.query(id=dataset.owner_id)
if not owner:
self.logger.error(
"Dataset owner ID {} cannot be found in Users", dataset.owner_id
)
abort(HTTPStatus.INTERNAL_SERVER_ERROR, message="Dataset owner not found")

# For publish, we check authorization against the ownership of the
# dataset that was selected rather than having an explicit "user"
# JSON parameter. This will raise UnauthorizedAccess on failure.
self._check_authorization(owner.username, json_data["access"])
# For bulk Elasticsearch operations, we check authorization against the
# ownership of a designated dataset rather than having an explicit
# "user" JSON parameter. This will raise UnauthorizedAccess on failure.
try:
self._check_authorization(owner.username, json_data["access"])
except UnauthorizedAccess as e:
abort(HTTPStatus.FORBIDDEN, message=str(e))

# Build an Elasticsearch instance to manage the bulk update
elastic = elasticsearch.Elasticsearch([self.es_url])

try:
# Pass the assemble() generator to the bulk helper
results = streaming_bulk(elastic, self.generate_documents(json_data, dataset), raise_on_error=False)
except BulkIndexError as e:
self.logger.exception("{} update failed: {}", klasname, e)
abort(HTTPStatus.INTERNAL_SERVER_ERROR, message="INTERNAL ERROR")
except elasticsearch.TransportError as e:
# Elasticsearch returned a status >= 400
self.logger.exception(
"{} HTTP error {} from Elasticsearch request",
klasname,
e
)
abort(
HTTPStatus.BAD_GATEWAY,
message=f"Elasticsearch query failure {e.error} ({e.status_code})",
)
except elasticsearch.ConnectionError as e:
self.logger.exception(
"{}: connection refused during the Elasticsearch request: {}", klasname, str(e.info)
)
abort(
HTTPStatus.BAD_GATEWAY,
message="Network problem, could not reach Elasticsearch",
)
except elasticsearch.UnsupportedProductError as e:
self.logger.exception(
"{}: Python elasticsearch package is not compatible with Elasticsearch server: {}", klasname, str(e)
)
abort(
HTTPStatus.INTERNAL_SERVER_ERROR,
message="Elasticsearch server version is incompatible",
# Pass the bulk command generator to the helper
results = elasticsearch.helpers.streaming_bulk(
elastic,
self.generate_documents(json_data, dataset),
raise_on_exception=False,
raise_on_error=False,
)
except Exception as e:
self.logger.exception(
Expand All @@ -515,7 +501,7 @@ def _post(self, json_data: JSON, _) -> Response:
self.logger.debug(
"{} ({}: {}) for id {} in index {}",
u["status"],
type,
status,
e["reason"],
u["_id"],
u["_index"],
Expand All @@ -541,12 +527,9 @@ def _post(self, json_data: JSON, _) -> Response:
# retrying on non-terminal errors, but this requires some cleanup
# work on the pyesbulk side.
if error_count > 0:
raise PostprocessError(
abort(
HTTPStatus.INTERNAL_SERVER_ERROR,
(
f"{error_count:d} of {count:d} "
"Elasticsearch document UPDATE operations failed"
),
message=f"{error_count:d} of {count:d} Elasticsearch document UPDATE operations failed",
data=report,
)

Expand Down
18 changes: 11 additions & 7 deletions lib/pbench/server/api/resources/query_apis/datasets_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,23 @@
JSON,
Schema,
Parameter,
ParamType
ParamType,
)
from pbench.server.api.resources.query_apis import ElasticBulkBase
from pbench.server.database.models.datasets import Dataset, Metadata


class DatasetsPublish(ElasticBulkBase):
"""
Change the "access" authorization of a Pbench dataset
Change the "access" authorization of a Pbench dataset by modifying the
"authorization": {"access": value} subdocument of each Elasticsearch
document associated with the specified dataset.
Note that this may amount to hundreds of thousands of documents across a
range of Elasticsearch indices, so we use the Elasticsearch streaming_bulk
helper to break down our request into chunks that Elasticsearch can handle.
"""

def __init__(self, config: PbenchServerConfig, logger: Logger):
super().__init__(
config,
Expand Down Expand Up @@ -54,10 +61,7 @@ def generate_documents(self, json_data: JSON, dataset: Dataset) -> Iterator[dict
user = dataset.owner

self.logger.info(
"Update access for dataset {} for user {} to {}",
name,
user,
access
"Update access for dataset {} for user {} to {}", name, user, access
)

map = Metadata.getvalue(dataset=dataset, key=Metadata.INDEX_MAP)
Expand All @@ -79,7 +83,7 @@ def generate_documents(self, json_data: JSON, dataset: Dataset) -> Iterator[dict
"_op_type": "update",
"_index": index,
"_id": id,
"doc": {"authorization": {"access": access}}
"doc": {"authorization": {"access": access}},
}

def complete(self, dataset: Dataset, json_data: JSON, error_count: int) -> None:
Expand Down
37 changes: 37 additions & 0 deletions lib/pbench/test/unit/server/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
import shutil
import tempfile
import uuid
from pathlib import Path
from posix import stat_result
from stat import ST_MTIME
Expand Down Expand Up @@ -285,6 +286,15 @@ def attach_dataset(pbench_token, create_user):

@pytest.fixture()
def provide_metadata(attach_dataset):
"""
Create "real" metadata in the backing database, which will be accessible
via the un-mocked Metadata.getvalue() API.
TODO: We really want to move away from using a backing DB for unit tests;
see `get_document_map()` below for an alternative example. (But in many
contexts, using "half DB" and "half mock" will result in SQLAlchemy
confusion.)
"""
drb = Dataset.attach(controller="node", name="drb")
test = Dataset.attach(controller="node", name="test")
Metadata.setvalue(dataset=drb, key="user.contact", value="me@example.com")
Expand All @@ -293,6 +303,33 @@ def provide_metadata(attach_dataset):
Metadata.setvalue(dataset=test, key=Metadata.DELETION, value="2023-01-25")


@pytest.fixture()
def get_document_map(monkeypatch, attach_dataset):
"""
Mock a Metadata get call to return an Elasticsearch document index
without requiring a DB query.
Args:
monkeypatch: patching fixture
attach_dataset: create a mock Dataset object
"""
map = {
"unit-test.v6.run-data.2021-06": [uuid.uuid4().hex],
"unit-test.v6.run-toc.2021-06": [uuid.uuid4().hex for i in range(10)],
"unit-test.v5.result-data-sample.2021-06": [
uuid.uuid4().hex for i in range(20)
],
}

def get_document_map(dataset: Dataset, key: str) -> Metadata:
assert key == Metadata.INDEX_MAP
return map

with monkeypatch.context() as m:
m.setattr(Metadata, "getvalue", get_document_map)
yield map


@pytest.fixture()
def find_template(monkeypatch, fake_mtime):
"""
Expand Down
Loading

0 comments on commit c61d20d

Please sign in to comment.