From dc07a1599e459074163d3d9af25669171c45d168 Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Wed, 18 Dec 2024 16:28:59 -0500 Subject: [PATCH] Further updates based on discussion in PR # 21 * Add ItemMetadatMissingRequiredFieldError exception * Rename metadata_keyname > metadata_s3_key * Shift BaseWorkflow to base/__init__.py and remove base_workflow.py * Add required key to metadata mapping and corresponding unit test * Update unit tests names to include _success --- dsc/exceptions.py | 4 + dsc/item_submission.py | 8 +- dsc/workflows/__init__.py | 10 ++ dsc/workflows/base/__init__.py | 207 ++++++++++++++++++++++++++++ dsc/workflows/base/base_workflow.py | 199 -------------------------- tests/conftest.py | 6 +- tests/test_base_workflow.py | 20 ++- tests/test_itemsubmission.py | 6 +- 8 files changed, 246 insertions(+), 214 deletions(-) delete mode 100644 dsc/workflows/base/base_workflow.py diff --git a/dsc/exceptions.py b/dsc/exceptions.py index e6b8855..caed71d 100644 --- a/dsc/exceptions.py +++ b/dsc/exceptions.py @@ -4,3 +4,7 @@ class InvalidDSpaceMetadataError(Exception): class InvalidSQSMessageError(Exception): pass + + +class ItemMetadatMissingRequiredFieldError(Exception): + pass diff --git a/dsc/item_submission.py b/dsc/item_submission.py index 86cc449..4b26d6f 100644 --- a/dsc/item_submission.py +++ b/dsc/item_submission.py @@ -14,7 +14,7 @@ class ItemSubmission: dspace_metadata: dict[str, Any] bitstream_uris: list[str] - metadata_keyname: str + metadata_s3_key: str metadata_uri: str = "" def upload_dspace_metadata(self, bucket: str) -> None: @@ -24,9 +24,7 @@ def upload_dspace_metadata(self, bucket: str) -> None: bucket: The S3 bucket for uploading the item metadata file. """ s3_client = S3Client() - s3_client.put_file( - json.dumps(self.dspace_metadata), bucket, self.metadata_keyname - ) - metadata_uri = f"s3://{bucket}/{self.metadata_keyname}" + s3_client.put_file(json.dumps(self.dspace_metadata), bucket, self.metadata_s3_key) + metadata_uri = f"s3://{bucket}/{self.metadata_s3_key}" logger.info(f"Metadata uploaded to S3: {metadata_uri}") self.metadata_uri = metadata_uri diff --git a/dsc/workflows/__init__.py b/dsc/workflows/__init__.py index e69de29..9b5c9ff 100644 --- a/dsc/workflows/__init__.py +++ b/dsc/workflows/__init__.py @@ -0,0 +1,10 @@ +"""dsc.workflows. + +All primary functions used by CLI are importable from here. +""" + +from dsc.workflows.base import BaseWorkflow + +__all__ = [ + "BaseWorkflow", +] diff --git a/dsc/workflows/base/__init__.py b/dsc/workflows/base/__init__.py index e69de29..95168cd 100644 --- a/dsc/workflows/base/__init__.py +++ b/dsc/workflows/base/__init__.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, final + +from dsc.exceptions import ( + InvalidDSpaceMetadataError, + ItemMetadatMissingRequiredFieldError, +) +from dsc.item_submission import ItemSubmission + +if TYPE_CHECKING: + from collections.abc import Iterator + +logger = logging.getLogger(__name__) + + +class BaseWorkflow(ABC): + """A base workflow class from which other workflow classes are derived.""" + + def __init__( + self, + workflow_name: str, + submission_system: str, + email_recipients: list[str], + metadata_mapping: dict, + s3_bucket: str, + s3_prefix: str | None, + collection_handle: str, + output_queue: str, + ) -> None: + """Initialize base instance. + + Args: + workflow_name: The name of the workflow. + submission_system: The system to which item submissions will be sent (e.g. + DSpace@MIT) + email_recipients: The email addresses to notify after runs of the workflow. + metadata_mapping: A mapping file for generating DSpace metadata from the + workflow's source metadata. + s3_bucket: The S3 bucket containing bitstream and metadata files for the + workflow. + s3_prefix: The S3 prefix used for objects in this workflow. This prefix does + NOT include the bucket name. + collection_handle: The handle of the DSpace collection to which submissions + will be uploaded + output_queue: The SQS output queue used for retrieving result messages from + the workflow's submissions. + """ + self.workflow_name: str = workflow_name + self.submission_system: str = submission_system + self.email_recipients: list[str] = email_recipients + self.metadata_mapping: dict = metadata_mapping + self.s3_bucket: str = s3_bucket + self.s3_prefix: str | None = s3_prefix + self.collection_handle: str = collection_handle + self.output_queue: str = output_queue + + @final # noqa: B027 + def run(self) -> None: + """Run workflow to submit items to the DSpace Submission Service. + + PLANNED CODE: + + sqs_client = SQSClient() + for item_submission in self.item_submissions_iter(): + item_submission.upload_dspace_metadata( + self.s3_bucket, item_submission.metadata_s3_key + ) + sqs_client.send_submission_message( + item_submission.item_identifier, + self.workflow_name, + self.output_queue, + self.submission_system, + self.collection_handle, + item_submission.metadata_uri, + item_submission.bitstream_uris, + ) + """ + + @final + def item_submissions_iter(self) -> Iterator[ItemSubmission]: + """Generate a batch of item submissions for the DSpace Submission Service. + + MUST NOT be overridden by workflow subclasses. + """ + for item_metadata in self.batch_metadata_iter(): + item_identifier = self.get_item_identifier(item_metadata) + logger.info(f"Processing submission for '{item_identifier}'") + metadata_s3_key = f"{self.s3_prefix}/{item_identifier}_metadata.json" + dspace_metadata = self.create_dspace_metadata(item_metadata) + self.validate_dspace_metadata(dspace_metadata) + item_submission = ItemSubmission( + dspace_metadata=dspace_metadata, + bitstream_uris=self.get_bitstream_uris(item_identifier), + metadata_s3_key=metadata_s3_key, + ) + yield item_submission + + @abstractmethod + def batch_metadata_iter(self) -> Iterator[dict[str, Any]]: + """Iterate through batch metadata to yield item metadata. + + MUST be overridden by workflow subclasses. + """ + + @abstractmethod + def get_item_identifier(self, item_metadata: dict[str, Any]) -> str: + """Get identifier for an item submission according to the workflow subclass. + + MUST be overridden by workflow subclasses. + + Args: + item_metadata: The item metadata from which the item identifier is extracted. + """ + + @final + def create_dspace_metadata(self, item_metadata: dict[str, Any]) -> dict[str, Any]: + """Create DSpace metadata from the item's source metadata. + + A metadata mapping is a dict with the format seen below: + + { + "dc.contributor": { + "source_field_name": "contributor", + "language": None, + "delimiter": "|", + } + + MUST NOT be overridden by workflow subclasses. + + Args: + item_metadata: Item metadata from which the DSpace metadata will be derived. + """ + metadata_entries = [] + for field_name, field_mapping in self.metadata_mapping.items(): + if field_name not in ["item_identifier"]: + + field_value = item_metadata.get(field_mapping["source_field_name"]) + if not field_value and field_mapping.get("required", False): + raise ItemMetadatMissingRequiredFieldError( + "Item metadata missing required field: '" + f"{field_mapping["source_field_name"]}'" + ) + if field_value: + delimiter = field_mapping["delimiter"] + language = field_mapping["language"] + if delimiter: + metadata_entries.extend( + [ + { + "key": field_name, + "value": value, + "language": language, + } + for value in field_value.split(delimiter) + ] + ) + else: + metadata_entries.append( + { + "key": field_name, + "value": field_value, + "language": language, + } + ) + + return {"metadata": metadata_entries} + + @final + def validate_dspace_metadata(self, dspace_metadata: dict[str, Any]) -> bool: + """Validate that DSpace metadata follows the expected format for DSpace 6.x. + + MUST NOT be overridden by workflow subclasses. + + Args: + dspace_metadata: DSpace metadata to be validated. + """ + valid = False + if dspace_metadata.get("metadata") is not None: + for element in dspace_metadata["metadata"]: + if element.get("key") is not None and element.get("value") is not None: + valid = True + logger.debug("Valid DSpace metadata created") + else: + raise InvalidDSpaceMetadataError( + f"Invalid DSpace metadata created: {dspace_metadata} ", + ) + return valid + + @abstractmethod + def get_bitstream_uris(self, item_identifier: str) -> list[str]: + """Get bitstreams for an item submission according to the workflow subclass. + + MUST be overridden by workflow subclasses. + + Args: + item_identifier: The identifier used for locating the item's bitstreams. + """ + + @abstractmethod + def process_deposit_results(self) -> list[str]: + """Process results generated by the deposit according to the workflow subclass. + + MUST be overridden by workflow subclasses. + """ diff --git a/dsc/workflows/base/base_workflow.py b/dsc/workflows/base/base_workflow.py deleted file mode 100644 index ddc04e1..0000000 --- a/dsc/workflows/base/base_workflow.py +++ /dev/null @@ -1,199 +0,0 @@ -from __future__ import annotations - -import logging -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Any, final - -from dsc.exceptions import InvalidDSpaceMetadataError -from dsc.item_submission import ItemSubmission - -if TYPE_CHECKING: - from collections.abc import Iterator - -logger = logging.getLogger(__name__) - - -class BaseWorkflow(ABC): - """A base workflow class from which other workflow classes are derived.""" - - def __init__( - self, - workflow_name: str, - submission_system: str, - email_recipients: list[str], - metadata_mapping: dict, - s3_bucket: str, - s3_prefix: str | None, - collection_handle: str, - output_queue: str, - ) -> None: - """Initialize base instance. - - Args: - workflow_name: The name of the workflow. - submission_system: The system to which item submissions will be sent (e.g. - DSpace@MIT) - email_recipients: The email addresses to notify after runs of the workflow. - metadata_mapping: A mapping file for generating DSpace metadata from the - workflow's source metadata. - s3_bucket: The S3 bucket containing bitstream and metadata files for the - workflow. - s3_prefix: The S3 prefix used for objects in this workflow. This prefix does - NOT include the bucket name. - collection_handle: The handle of the DSpace collection to which submissions - will be uploaded - output_queue: The SQS output queue used for retrieving result messages from - the workflow's submissions. - """ - self.workflow_name: str = workflow_name - self.submission_system: str = submission_system - self.email_recipients: list[str] = email_recipients - self.metadata_mapping: dict = metadata_mapping - self.s3_bucket: str = s3_bucket - self.s3_prefix: str | None = s3_prefix - self.collection_handle: str = collection_handle - self.output_queue: str = output_queue - - @final # noqa: B027 - def run(self) -> None: - """Run workflow to submit items to the DSpace Submission Service. - - PLANNED CODE: - - sqs_client = SQSClient() - for item_submission in self.item_submissions_iter(): - item_submission.upload_dspace_metadata( - self.s3_bucket, item_submission.metadata_keyname - ) - sqs_client.send_submission_message( - item_submission.item_identifier, - self.workflow_name, - self.output_queue, - self.submission_system, - self.collection_handle, - item_submission.metadata_uri, - item_submission.bitstream_uris, - ) - """ - - @final - def item_submissions_iter(self) -> Iterator[ItemSubmission]: - """Generate a batch of item submissions for the DSpace Submission Service. - - MUST NOT be overridden by workflow subclasses. - """ - for item_metadata in self.batch_metadata_iter(): - item_identifier = self.get_item_identifier(item_metadata) - logger.info(f"Processing submission for '{item_identifier}'") - metadata_keyname = f"{self.s3_prefix}/{item_identifier}_metadata.json" - dspace_metadata = self.create_dspace_metadata(item_metadata) - if self.validate_dspace_metadata(dspace_metadata): - item_submission = ItemSubmission( - dspace_metadata=dspace_metadata, - bitstream_uris=self.get_bitstream_uris(item_identifier), - metadata_keyname=metadata_keyname, - ) - yield item_submission - - @abstractmethod - def batch_metadata_iter(self) -> Iterator[dict[str, Any]]: - """Iterate through batch metadata to yield item metadata. - - MUST be overridden by workflow subclasses. - """ - - @abstractmethod - def get_item_identifier(self, item_metadata: Any) -> str: # noqa: ANN401 - """Get identifier for an item submission according to the workflow subclass. - - MUST be overridden by workflow subclasses. - - Args: - item_metadata: The item metadata from which the item identifier is extracted. - """ - - @final - def create_dspace_metadata(self, item_metadata: dict[str, Any]) -> dict[str, Any]: - """Create DSpace metadata from the item's source metadata. - - A metadata mapping is a dict with the format seen below: - - { - "dc.contributor": { - "source_field_name": "contributor", - "language": None, - "delimiter": "|", - } - - MUST NOT be overridden by workflow subclasses. - - Args: - item_metadata: Item metadata from which the DSpace metadata will be derived. - """ - metadata_entries = [] - for field_name, field_mapping in self.metadata_mapping.items(): - if field_name not in ["item_identifier", "source_system_identifier"]: - - field_value = item_metadata.get(field_mapping["source_field_name"]) - if field_value: - delimiter = field_mapping["delimiter"] - language = field_mapping["language"] - if delimiter: - metadata_entries.extend( - [ - { - "key": field_name, - "value": value, - "language": language, - } - for value in field_value.split(delimiter) - ] - ) - else: - metadata_entries.append( - { - "key": field_name, - "value": field_value, - "language": language, - } - ) - - return {"metadata": metadata_entries} - - @final - def validate_dspace_metadata(self, dspace_metadata: dict[str, Any]) -> bool: - """Validate that DSpace metadata follows the expected format for DSpace 6.x. - - MUST NOT be overridden by workflow subclasses. - - Args: - dspace_metadata: DSpace metadata to be validated. - """ - valid = False - if dspace_metadata.get("metadata") is not None: - for element in dspace_metadata["metadata"]: - if element.get("key") is not None and element.get("value") is not None: - valid = True - logger.debug("Valid DSpace metadata created") - else: - raise InvalidDSpaceMetadataError( - f"Invalid DSpace metadata created: {dspace_metadata} ", - ) - return valid - - @abstractmethod - def get_bitstream_uris(self, item_identifier: str) -> list[str]: - """Get bitstreams for an item submission according to the workflow subclass. - - MUST be overridden by workflow subclasses. - - Args: - item_identifier: The identifier used for locating the item's bitstreams. - """ - - @abstractmethod - def process_deposit_results(self) -> list[str]: - """Process results generated by the deposit according to the workflow subclass. - - MUST be overridden by workflow subclasses. - """ diff --git a/tests/conftest.py b/tests/conftest.py index e27c176..8badfa6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,7 +11,7 @@ from dsc.utilities.aws.s3 import S3Client from dsc.utilities.aws.ses import SESClient from dsc.utilities.aws.sqs import SQSClient -from dsc.workflows.base.base_workflow import BaseWorkflow +from dsc.workflows.base import BaseWorkflow @pytest.fixture(autouse=True) @@ -102,7 +102,7 @@ def item_submission_instance(dspace_metadata): "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", ], - metadata_keyname="workflow/folder/123_metadata.json", + metadata_s3_key="workflow/folder/123_metadata.json", ) @@ -113,11 +113,13 @@ def metadata_mapping(): "source_field_name": "item_identifier", "language": None, "delimiter": "", + "required": True, }, "dc.title": { "source_field_name": "title", "language": "en_US", "delimiter": "", + "required": True, }, "dc.contributor": { "source_field_name": "contributor", diff --git a/tests/test_base_workflow.py b/tests/test_base_workflow.py index e9d9415..c81aa8a 100644 --- a/tests/test_base_workflow.py +++ b/tests/test_base_workflow.py @@ -1,10 +1,13 @@ import pytest -from dsc.exceptions import InvalidDSpaceMetadataError +from dsc.exceptions import ( + InvalidDSpaceMetadataError, + ItemMetadatMissingRequiredFieldError, +) from dsc.item_submission import ItemSubmission -def test_base_workflow_item_submission_iter(base_workflow_instance): +def test_base_workflow_item_submission_iter_success(base_workflow_instance): assert next(base_workflow_instance.item_submissions_iter()) == ItemSubmission( dspace_metadata={ "metadata": [ @@ -17,12 +20,12 @@ def test_base_workflow_item_submission_iter(base_workflow_instance): "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", ], - metadata_keyname="workflow/folder/123_metadata.json", + metadata_s3_key="workflow/folder/123_metadata.json", metadata_uri="", ) -def test_base_workflow_create_dspace_metadata( +def test_base_workflow_create_dspace_metadata_success( base_workflow_instance, item_metadata, ): @@ -35,6 +38,15 @@ def test_base_workflow_create_dspace_metadata( } +def test_base_workflow_create_dspace_metadata_required_field_missing_raises_exception( + base_workflow_instance, + item_metadata, +): + item_metadata.pop("title") + with pytest.raises(ItemMetadatMissingRequiredFieldError): + base_workflow_instance.create_dspace_metadata(item_metadata) + + def test_base_workflow_validate_dspace_metadata_success( base_workflow_instance, dspace_metadata, diff --git a/tests/test_itemsubmission.py b/tests/test_itemsubmission.py index 7da0cda..6189643 100644 --- a/tests/test_itemsubmission.py +++ b/tests/test_itemsubmission.py @@ -7,12 +7,10 @@ def test_itemsubmission_init_success(item_submission_instance, dspace_metadata): "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", ] - assert ( - item_submission_instance.metadata_keyname == "workflow/folder/123_metadata.json" - ) + assert item_submission_instance.metadata_s3_key == "workflow/folder/123_metadata.json" -def test_upload_dspace_metadata(mocked_s3, item_submission_instance, s3_client): +def test_upload_dspace_metadata_success(mocked_s3, item_submission_instance, s3_client): item_submission_instance.upload_dspace_metadata("dsc") assert ( item_submission_instance.metadata_uri