From 21414dab6bd8bb96b08e7c5a796621ea68ccb4db Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Wed, 11 Dec 2024 16:55:46 -0500 Subject: [PATCH 1/5] Update fixture references to 'dsc' --- tests/conftest.py | 6 +++--- tests/test_s3.py | 24 ++++++++++++------------ tests/test_sqs.py | 4 ++-- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 189390e..f12f03a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -30,7 +30,7 @@ def config_instance() -> Config: def mocked_s3(config_instance): with mock_aws(): s3 = boto3.client("s3", region_name=config_instance.AWS_REGION_NAME) - s3.create_bucket(Bucket="awd") + s3.create_bucket(Bucket="dsc") yield s3 @@ -130,11 +130,11 @@ def submission_message_body(): { "SubmissionSystem": "DSpace@MIT", "CollectionHandle": "123.4/5678", - "MetadataLocation": "s3://awd/10.1002-term.3131.json", + "MetadataLocation": "s3://dsc/10.1002-term.3131.json", "Files": [ { "BitstreamName": "10.1002-term.3131.pdf", - "FileLocation": "s3://awd/10.1002-term.3131.pdf", + "FileLocation": "s3://dsc/10.1002-term.3131.pdf", "BitstreamDescription": None, } ], diff --git a/tests/test_s3.py b/tests/test_s3.py index 6c174b1..e7425df 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -7,46 +7,46 @@ def test_s3_archive_file_in_bucket(mocked_s3, s3_client): s3_client.put_file( file_content="test1,test2,test3,test4", - bucket="awd", + bucket="dsc", key="test.csv", ) s3_client.archive_file_with_new_key( - bucket="awd", + bucket="dsc", key="test.csv", archived_key_prefix="archived", ) with pytest.raises(ClientError) as e: - response = s3_client.client.get_object(Bucket="awd", Key="test.csv") + response = s3_client.client.get_object(Bucket="dsc", Key="test.csv") assert ( "An error occurred (NoSuchKey) when calling the GetObject operation: The" " specified key does not exist." in str(e.value) ) - response = s3_client.client.get_object(Bucket="awd", Key="archived/test.csv") + response = s3_client.client.get_object(Bucket="dsc", Key="archived/test.csv") assert response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK def test_s3_put_file(mocked_s3, s3_client): - assert "Contents" not in s3_client.client.list_objects(Bucket="awd") + assert "Contents" not in s3_client.client.list_objects(Bucket="dsc") s3_client.put_file( file_content=str({"metadata": {"key": "dc.title", "value": "A Title"}}), - bucket="awd", + bucket="dsc", key="test.json", ) - assert len(s3_client.client.list_objects(Bucket="awd")["Contents"]) == 1 + assert len(s3_client.client.list_objects(Bucket="dsc")["Contents"]) == 1 assert ( - s3_client.client.list_objects(Bucket="awd")["Contents"][0]["Key"] == "test.json" + s3_client.client.list_objects(Bucket="dsc")["Contents"][0]["Key"] == "test.json" ) def test_s3_get_files_iter_with_matching_csv(mocked_s3, s3_client): s3_client.put_file( file_content="test1,test2,test3,test4", - bucket="awd", + bucket="dsc", key="test.csv", ) assert list( s3_client.get_files_iter( - bucket="awd", file_type="csv", excluded_key_prefix="archived" + bucket="dsc", file_type="csv", excluded_key_prefix="archived" ) ) == ["test.csv"] @@ -54,13 +54,13 @@ def test_s3_get_files_iter_with_matching_csv(mocked_s3, s3_client): def test_s3_get_files_iter_without_matching_csv(mocked_s3, s3_client): s3_client.put_file( file_content="test1,test2,test3,test4", - bucket="awd", + bucket="dsc", key="archived/test.csv", ) assert ( list( s3_client.get_files_iter( - bucket="awd", file_type="csv", excluded_key_prefix="archived" + bucket="dsc", file_type="csv", excluded_key_prefix="archived" ) ) == [] diff --git a/tests/test_sqs.py b/tests/test_sqs.py index f826b7b..488b009 100644 --- a/tests/test_sqs.py +++ b/tests/test_sqs.py @@ -17,9 +17,9 @@ def test_sqs_create_dss_message_body(sqs_client, submission_message_body): dss_message_body = sqs_client.create_dss_message_body( submission_system="DSpace@MIT", collection_handle="123.4/5678", - metadata_s3_uri="s3://awd/10.1002-term.3131.json", + metadata_s3_uri="s3://dsc/10.1002-term.3131.json", bitstream_file_name="10.1002-term.3131.pdf", - bitstream_s3_uri="s3://awd/10.1002-term.3131.pdf", + bitstream_s3_uri="s3://dsc/10.1002-term.3131.pdf", ) assert dss_message_body == submission_message_body From 769d6da43c09cb89a07123b8c89a3fa4689fd6ed Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Fri, 13 Dec 2024 11:05:29 -0500 Subject: [PATCH 2/5] Create BaseWorkflow and ItemSubmission classes Why these changes are being introduced: * The BaseWorkflow and ItemSubmission classes are the foundation from which a significant portion of the application's functionality will be derived. How this addresses that need: * Add BaseWorflow class with abstract methods outlining the expected functionality. * Add ItemSubmission class with largely complete functionality and corresponding unit tests and fixtures Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/IN-1101 --- dsc/base.py | 96 ++++++++++++++++++++++++++++++++++++ dsc/item_submission.py | 68 +++++++++++++++++++++++++ tests/conftest.py | 39 ++++++++++++++- tests/test_itemsubmission.py | 64 ++++++++++++++++++++++++ 4 files changed, 266 insertions(+), 1 deletion(-) create mode 100644 dsc/base.py create mode 100644 dsc/item_submission.py create mode 100644 tests/test_itemsubmission.py diff --git a/dsc/base.py b/dsc/base.py new file mode 100644 index 0000000..68dcc8d --- /dev/null +++ b/dsc/base.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, final + +from dsc.item_submission import ItemSubmission +from dsc.utilities.aws.s3 import S3Client + +if TYPE_CHECKING: + from collections.abc import Iterator + +logger = logging.getLogger(__name__) + + +class BaseWorkflow(ABC): + """A base workflow class from which other workflow classes are derived.""" + + def __init__( + self, + email_recipients: list[str], + metadata_mapping: dict, + s3_bucket: str, + s3_prefix: str | None, + ) -> None: + """Initialize base instance. + + Args: + email_recipients: The email addresses to notify after runs of the workflow. + metadata_mapping: A mapping file for generating DSpace metadata from the + workflow's source metadata. + s3_bucket: The S3 bucket containing bitstream and metadata files for the + workflow. + s3_prefix: The S3 prefix used for objects in this workflow. This prefix does + NOT include the bucket name. + """ + self.email_recipients: list[str] = email_recipients + self.metadata_mapping: dict = metadata_mapping + self.s3_bucket: str = s3_bucket + self.s3_prefix: str | None = s3_prefix + + @final + def generate_submission_batch(self) -> Iterator[tuple[str, list[str]]]: + """Generate a batch of item submissions for the DSpace Submission Service. + + MUST NOT be overridden by workflow subclasses. + """ + s3_client = S3Client() + batch_metadata = self.get_batch_metadata() + for item_metadata in batch_metadata: + item_identifier = self.get_item_identifier(item_metadata) + logger.info(f"Processing submission for '{item_identifier}'") + metadata_keyname = f"{self.s3_prefix}/{item_identifier}_metadata.json" + item_submission = ItemSubmission( + source_metadata=item_metadata, + metadata_mapping=self.metadata_mapping, + s3_client=s3_client, + bitstream_uris=self.get_bitstream_uris(item_identifier), + metadata_keyname=metadata_keyname, + ) + item_submission.generate_and_upload_dspace_metadata(self.s3_bucket) + yield item_submission.metadata_uri, item_submission.bitstream_uris + + @abstractmethod + def get_batch_metadata(self) -> list: + """Get source metadata for the batch of items submissions. + + MUST be overridden by workflow subclasses. + """ + + @abstractmethod + def get_item_identifier(self, item_metadata: Any) -> str: # noqa: ANN401 + """Get identifier for an item submission according to the workflow subclass. + + MUST be overridden by workflow subclasses. + + Args: + item_metadata: The item metadata from which the item identifier is extracted. + """ + + @abstractmethod + def get_bitstream_uris(self, item_identifier: str) -> list[str]: + """Get bitstreams for an item submission according to the workflow subclass. + + MUST be overridden by workflow subclasses. + + Args: + item_identifier: The identifier used for locating the item's bitstreams. + """ + + @abstractmethod + def process_deposit_results(self) -> list[str]: + """Process results generated by the deposit according to the workflow subclass. + + MUST be overridden by workflow subclasses. + """ diff --git a/dsc/item_submission.py b/dsc/item_submission.py new file mode 100644 index 0000000..88ec3bb --- /dev/null +++ b/dsc/item_submission.py @@ -0,0 +1,68 @@ +import json +import logging +from dataclasses import dataclass +from typing import Any + +from dsc.utilities.aws.s3 import S3Client + +logger = logging.getLogger(__name__) + + +@dataclass +class ItemSubmission: + """A class to store the required values for a DSpace submission.""" + + source_metadata: dict[str, Any] + metadata_mapping: dict[str, Any] + s3_client: S3Client + bitstream_uris: list[str] + metadata_keyname: str + metadata_uri: str = "" + + def generate_and_upload_dspace_metadata(self, bucket: str) -> None: + """Generate DSpace metadata from the item's source metadata and upload it to S3. + + Args: + bucket: The S3 bucket for uploading the item metadata file. + """ + dspace_metadata = self.create_dspace_metadata() + self.s3_client.put_file( + json.dumps(dspace_metadata), + bucket, + self.metadata_keyname, + ) + metadata_uri = f"s3://{bucket}/{self.metadata_keyname}" + logger.info(f"Metadata uploaded to S3: {metadata_uri}") + self.metadata_uri = metadata_uri + + def create_dspace_metadata(self) -> dict[str, Any]: + """Create DSpace metadata from the item's source metadata.""" + metadata_entries = [] + for field_name, field_mapping in self.metadata_mapping.items(): + if field_name not in ["item_identifier", "source_system_identifier"]: + + field_value = self.source_metadata.get(field_mapping["source_field_name"]) + if field_value: + delimiter = field_mapping["delimiter"] + language = field_mapping["language"] + if delimiter: + metadata_entries.extend( + [ + { + "key": field_name, + "value": value, + "language": language, + } + for value in field_value.split(delimiter) + ] + ) + else: + metadata_entries.append( + { + "key": field_name, + "value": field_value, + "language": language, + } + ) + + return {"metadata": metadata_entries} diff --git a/tests/conftest.py b/tests/conftest.py index f12f03a..5d282f7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,6 +7,7 @@ from moto import mock_aws from dsc.config import Config +from dsc.item_submission import ItemSubmission from dsc.utilities.aws.s3 import S3Client from dsc.utilities.aws.ses import SESClient from dsc.utilities.aws.sqs import SQSClient @@ -22,10 +23,46 @@ def _test_env(monkeypatch): @pytest.fixture -def config_instance() -> Config: +def config_instance(): return Config() +@pytest.fixture +def item_submission_instance(metadata_mapping, s3_client): + source_metadata = {"title": "Title", "contributor": "Author 1|Author 2"} + return ItemSubmission( + source_metadata=source_metadata, + metadata_mapping=metadata_mapping, + s3_client=s3_client, + bitstream_uris=[ + "s3://dsc/workflow/folder/123_01.pdf", + "s3://dsc/workflow/folder/123_02.pdf", + ], + metadata_keyname="workflow/folder/123_metadata.json", + ) + + +@pytest.fixture +def metadata_mapping(): + return { + "item_identifier": { + "source_field_name": "item_identifier", + "language": None, + "delimiter": "", + }, + "dc.title": { + "source_field_name": "title", + "language": "en_US", + "delimiter": "", + }, + "dc.contributor": { + "source_field_name": "contributor", + "language": None, + "delimiter": "|", + }, + } + + @pytest.fixture def mocked_s3(config_instance): with mock_aws(): diff --git a/tests/test_itemsubmission.py b/tests/test_itemsubmission.py new file mode 100644 index 0000000..3cd3d96 --- /dev/null +++ b/tests/test_itemsubmission.py @@ -0,0 +1,64 @@ +from http import HTTPStatus + + +def test_itemsubmission_init_success(item_submission_instance): + assert item_submission_instance.source_metadata == { + "title": "Title", + "contributor": "Author 1|Author 2", + } + assert item_submission_instance.metadata_mapping == { + "dc.contributor": { + "delimiter": "|", + "language": None, + "source_field_name": "contributor", + }, + "dc.title": {"delimiter": "", "language": "en_US", "source_field_name": "title"}, + "item_identifier": { + "delimiter": "", + "language": None, + "source_field_name": "item_identifier", + }, + } + assert item_submission_instance.bitstream_uris == [ + "s3://dsc/workflow/folder/123_01.pdf", + "s3://dsc/workflow/folder/123_02.pdf", + ] + assert ( + item_submission_instance.metadata_keyname == "workflow/folder/123_metadata.json" + ) + + +def test_generate_and_upload_dspace_metadata( + mocked_s3, item_submission_instance, s3_client +): + item_submission_instance.generate_and_upload_dspace_metadata("dsc") + assert ( + item_submission_instance.metadata_uri + == "s3://dsc/workflow/folder/123_metadata.json" + ) + response = s3_client.client.get_object( + Bucket="dsc", Key="workflow/folder/123_metadata.json" + ) + assert response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK + + +def test_create_dspace_metadata(item_submission_instance): + assert item_submission_instance.create_dspace_metadata() == { + "metadata": [ + { + "key": "dc.title", + "language": "en_US", + "value": "Title", + }, + { + "key": "dc.contributor", + "language": None, + "value": "Author 1", + }, + { + "key": "dc.contributor", + "language": None, + "value": "Author 2", + }, + ] + } From ead9f8885413f97eda440b9adb6dd79c89be9b2e Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Fri, 13 Dec 2024 11:09:26 -0500 Subject: [PATCH 3/5] Update dependencies --- Pipfile.lock | 88 +++++++++++++++++++++++++--------------------------- 1 file changed, 43 insertions(+), 45 deletions(-) diff --git a/Pipfile.lock b/Pipfile.lock index e9c94c9..4474219 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -18,12 +18,12 @@ "default": { "boto3": { "hashes": [ - "sha256:a09871805f8e462349a1c33c23eb413668df0bf68424e61d53518e1a7d883b2f", - "sha256:cc819cdbccbc2d0dc185f1dcfe74cf3809489c4cae63c2e5d6a557aa0c5ab928" + "sha256:21a3b18c3a7fd20e463708fe3fa035983105dc7f3a1c274e1903e1583ab91159", + "sha256:50dae461ab5fbedfb81b690895d48a918fed0d5fdff37be1c4232770c0dc9712" ], "index": "pypi", "markers": "python_version >= '3.8'", - "version": "==1.35.77" + "version": "==1.35.80" }, "boto3-stubs": { "extras": [ @@ -31,27 +31,27 @@ "ses" ], "hashes": [ - "sha256:2800b7b83c9c414ac33018eb19bf3ede9d026d9220e7916ff9e6e6343e848b87", - "sha256:e749e96a3c9908ef56a52a4c309c9c2c04ee44bd4f46491c0381b6bcb36320e9" + "sha256:a71b7dbae835ac045703282001a7f35b284c3c5c82e083b8804494516e23c028", + "sha256:b0d1034cce5c666176991b3be93f084c51c5745d054f756c533cde46bf25acf3" ], "markers": "python_version >= '3.8'", - "version": "==1.35.77" + "version": "==1.35.80" }, "botocore": { "hashes": [ - "sha256:17b778016644e9342ca3ff2f430c1d1db0c6126e9b41a57cff52ac58e7a455e0", - "sha256:3faa27d65841499762228902d7e215fa99a4c2fdc76c9113e1c3f339bdf685b8" + "sha256:36e589dccb62380abd628b08fecfa2f7c89b99f41ec9fc42c467c94008c0be4a", + "sha256:b8dfceca58891cb2711bd6455ec4f7159051f3796e0f64adef9bb334f19d8a92" ], "markers": "python_version >= '3.8'", - "version": "==1.35.77" + "version": "==1.35.80" }, "botocore-stubs": { "hashes": [ - "sha256:617508d023e0bc98901e0189b794c4b3f289c1747c7cc410173ad698c819a716", - "sha256:c977a049481d50a14bf2db0ef15020b76734ff628d4b8e0e77b8d1c65318369e" + "sha256:4c80901dfc3bc8cc717bd6cd641101bb7e7f05392187165a32abdd66a4a25ae6", + "sha256:f46df4dc584ef8e870c2d667cbf6ca0e0aed06273e089b00d2665ced400b8997" ], "markers": "python_version >= '3.8'", - "version": "==1.35.76" + "version": "==1.35.80" }, "certifi": { "hashes": [ @@ -115,10 +115,10 @@ }, "mypy-boto3-s3": { "hashes": [ - "sha256:35f9ae109c3cb64ac6b44596dffc429058085ddb82f4daaf5be0a39e5cc1b576", - "sha256:6cf1f034985fe610754c3e6ef287490629870d508ada13b7d61e7b9aaeb46108" + "sha256:34ac4cacf8acdafa6e71a2810116b2546376f241761f9eec6ac5a9887309372b", + "sha256:fd4a8734c3bb5a2da52e22258b1836a14aa3460816df25c831790e464334021f" ], - "version": "==1.35.76" + "version": "==1.35.76.post1" }, "mypy-boto3-ses": { "hashes": [ @@ -169,11 +169,11 @@ }, "types-awscrt": { "hashes": [ - "sha256:043c0ae0fe5d272618294cbeaf1c349a654a9f7c00121be64d27486933ac4a26", - "sha256:cc0057885cb7ce1e66856123a4c2861b051e9f0716b1767ad72bfe4ca26bbcd4" + "sha256:b1b9bb10f337e3fe8f5f508860eb354d9fe093f02e1485955a9e0bdd4e250074", + "sha256:eeb4bd596100927704c8b9f964ec8a246be4943d546f3fd2a8efdddebea422ea" ], "markers": "python_version >= '3.8'", - "version": "==0.23.3" + "version": "==0.23.4" }, "types-s3transfer": { "hashes": [ @@ -224,20 +224,20 @@ }, "boto3": { "hashes": [ - "sha256:a09871805f8e462349a1c33c23eb413668df0bf68424e61d53518e1a7d883b2f", - "sha256:cc819cdbccbc2d0dc185f1dcfe74cf3809489c4cae63c2e5d6a557aa0c5ab928" + "sha256:21a3b18c3a7fd20e463708fe3fa035983105dc7f3a1c274e1903e1583ab91159", + "sha256:50dae461ab5fbedfb81b690895d48a918fed0d5fdff37be1c4232770c0dc9712" ], "index": "pypi", "markers": "python_version >= '3.8'", - "version": "==1.35.77" + "version": "==1.35.80" }, "botocore": { "hashes": [ - "sha256:17b778016644e9342ca3ff2f430c1d1db0c6126e9b41a57cff52ac58e7a455e0", - "sha256:3faa27d65841499762228902d7e215fa99a4c2fdc76c9113e1c3f339bdf685b8" + "sha256:36e589dccb62380abd628b08fecfa2f7c89b99f41ec9fc42c467c94008c0be4a", + "sha256:b8dfceca58891cb2711bd6455ec4f7159051f3796e0f64adef9bb334f19d8a92" ], "markers": "python_version >= '3.8'", - "version": "==1.35.77" + "version": "==1.35.80" }, "certifi": { "hashes": [ @@ -535,7 +535,6 @@ "sha256:3c672a53c0fb4725a29c303be906d3c1fa99c32f58abe008a82705f9ee96f40b", "sha256:404fdc66ee5f83a1388be54300ae978b2efd538018de18556dde92575e05defc", "sha256:4ac4c9f37eba52cb6fbeaf5b59c152ea976726b865bd4cf87883a7e7006cc543", - "sha256:60eb32934076fa07e4316b7b2742fa52cbb190b42c2df2863dbc4230a0a9b385", "sha256:62901fb618f74d7d81bf408c8719e9ec14d863086efe4185afd07c352aee1d2c", "sha256:660cb7312a08bc38be15b696462fa7cc7cd85c3ed9c576e81f4dc4d8b2b31591", "sha256:708ee5f1bafe76d041b53a4f95eb28cdeb8d18da17e597d46d7833ee59b97ede", @@ -543,7 +542,6 @@ "sha256:831c3c4d0774e488fdc83a1923b49b9957d33287de923d58ebd3cec47a0ae43f", "sha256:84111ad4ff3f6253820e6d3e58be2cc2a00adb29335d4cacb5ab4d4d34f2a123", "sha256:8b3e6eae66cf54701ee7d9c83c30ac0a1e3fa17be486033000f2a73a12ab507c", - "sha256:9abcc2e083cbe8dde89124a47e5e53ec38751f0d7dfd36801008f316a127d7ba", "sha256:9e6fc8a08e116fb7c7dd1f040074c9d7b51d74a8ea40d4df2fc7aa08b76b9e6c", "sha256:a01956ddfa0a6790d594f5b34fc1bfa6098aca434696a03cfdbe469b8ed79285", "sha256:abc998e0c0eee3c8a1904221d3f67dcfa76422b23620173e28c11d3e626c21bd", @@ -898,28 +896,28 @@ }, "ruff": { "hashes": [ - "sha256:1ca4e3a87496dc07d2427b7dd7ffa88a1e597c28dad65ae6433ecb9f2e4f022f", - "sha256:2aae99ec70abf43372612a838d97bfe77d45146254568d94926e8ed5bbb409ea", - "sha256:32096b41aaf7a5cc095fa45b4167b890e4c8d3fd217603f3634c92a541de7248", - "sha256:5fe716592ae8a376c2673fdfc1f5c0c193a6d0411f90a496863c99cd9e2ae25d", - "sha256:60f578c11feb1d3d257b2fb043ddb47501ab4816e7e221fbb0077f0d5d4e7b6f", - "sha256:705832cd7d85605cb7858d8a13d75993c8f3ef1397b0831289109e953d833d29", - "sha256:729850feed82ef2440aa27946ab39c18cb4a8889c1128a6d589ffa028ddcfc22", - "sha256:81c148825277e737493242b44c5388a300584d73d5774defa9245aaef55448b0", - "sha256:ac42caaa0411d6a7d9594363294416e0e48fc1279e1b0e948391695db2b3d5b1", - "sha256:b402ddee3d777683de60ff76da801fa7e5e8a71038f57ee53e903afbcefdaa58", - "sha256:b84f4f414dda8ac7f75075c1fa0b905ac0ff25361f42e6d5da681a465e0f78e5", - "sha256:c49ab4da37e7c457105aadfd2725e24305ff9bc908487a9bf8d548c6dad8bb3d", - "sha256:cbd5cf9b0ae8f30eebc7b360171bd50f59ab29d39f06a670b3e4501a36ba5897", - "sha256:d261d7850c8367704874847d95febc698a950bf061c9475d4a8b7689adc4f7fa", - "sha256:e769083da9439508833cfc7c23e351e1809e67f47c50248250ce1ac52c21fb93", - "sha256:ec016beb69ac16be416c435828be702ee694c0d722505f9c1f35e1b9c0cc1bf5", - "sha256:f05cdf8d050b30e2ba55c9b09330b51f9f97d36d4673213679b965d25a785f3c", - "sha256:fb88e2a506b70cfbc2de6fae6681c4f944f7dd5f2fe87233a7233d888bad73e8" + "sha256:01b14b2f72a37390c1b13477c1c02d53184f728be2f3ffc3ace5b44e9e87b90d", + "sha256:19048f2f878f3ee4583fc6cb23fb636e48c2635e30fb2022b3a1cd293402f964", + "sha256:1ae441ce4cf925b7f363d33cd6570c51435972d697e3e58928973994e56e1452", + "sha256:53babd6e63e31f4e96ec95ea0d962298f9f0d9cc5990a1bbb023a6baf2503a82", + "sha256:5be450bb18f23f0edc5a4e5585c17a56ba88920d598f04a06bd9fd76d324cb20", + "sha256:5e7558304353b84279042fc584a4f4cb8a07ae79b2bf3da1a7551d960b5626d3", + "sha256:6567be9fb62fbd7a099209257fef4ad2c3153b60579818b31a23c886ed4147ea", + "sha256:75fb782f4db39501210ac093c79c3de581d306624575eddd7e4e13747e61ba18", + "sha256:7f26bc76a133ecb09a38b7868737eded6941b70a6d34ef53a4027e83913b6502", + "sha256:8d5d273ffffff0acd3db5bf626d4b131aa5a5ada1276126231c4174543ce20d6", + "sha256:8faeae3827eaa77f5721f09b9472a18c749139c891dbc17f45e72d8f2ca1f8fc", + "sha256:9c0a60a825e3e177116c84009d5ebaa90cf40dfab56e1358d1df4e29a9a14b13", + "sha256:c356e770811858bd20832af696ff6c7e884701115094f427b64b25093d6d932d", + "sha256:d7c65bc0cadce32255e93c57d57ecc2cca23149edd52714c0c5d6fa11ec328cd", + "sha256:db503486e1cf074b9808403991663e4277f5c664d3fe237ee0d994d1305bb060", + "sha256:e4d66a21de39f15c9757d00c50c8cdd20ac84f55684ca56def7891a025d7e939", + "sha256:f7df94f57d7418fa7c3ffb650757e0c2b96cf2501a0b192c18e4fb5571dfada9", + "sha256:fe2756edf68ea79707c8d68b78ca9a58ed9af22e430430491ee03e718b5e4936" ], "index": "pypi", "markers": "python_version >= '3.7'", - "version": "==0.8.2" + "version": "==0.8.3" }, "s3transfer": { "hashes": [ From 430f3d230de739f502bb233749ff111fc924b449 Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Tue, 17 Dec 2024 12:17:24 -0500 Subject: [PATCH 4/5] Updates based on discussion in PR # 21 * Restructure app to use workflows directory * Add DSS_INPUT_QUEUE env var * Add InvalidDSpaceMetadataError exception * Refactor generate_and_upload_dspace_metadata method to upload_dspace_metadata * Shift create_dspace_metadata and validate_dspace_metadata from ItemSubmission to BaseWorkflow * Add init params to BaseWorkflow * Add stub run method to BaseWorkflow class * Add dspace_metadata fixture * Add tests and fixtures for BaseWorkflow @final methods --- README.md | 1 + dsc/base.py | 96 -------------- dsc/config.py | 1 + dsc/exceptions.py | 4 + dsc/item_submission.py | 48 +------ dsc/workflows/__init__.py | 0 dsc/workflows/base/__init__.py | 0 dsc/workflows/base/base_workflow.py | 199 ++++++++++++++++++++++++++++ tests/conftest.py | 76 ++++++++++- tests/test_base_workflow.py | 49 +++++++ tests/test_itemsubmission.py | 48 +------ 11 files changed, 334 insertions(+), 188 deletions(-) delete mode 100644 dsc/base.py create mode 100644 dsc/workflows/__init__.py create mode 100644 dsc/workflows/base/__init__.py create mode 100644 dsc/workflows/base/base_workflow.py create mode 100644 tests/test_base_workflow.py diff --git a/README.md b/README.md index 2a10c84..4532e2d 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Description of the app SENTRY_DSN=### If set to a valid Sentry DSN, enables Sentry exception monitoring. This is not needed for local development. WORKSPACE=### Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform. AWS_REGION_NAME=### Default AWS region. +DSS_INPUT_QUEUE=### The DSS SQS input queue to which submission messages are sent. ``` ### Optional diff --git a/dsc/base.py b/dsc/base.py deleted file mode 100644 index 68dcc8d..0000000 --- a/dsc/base.py +++ /dev/null @@ -1,96 +0,0 @@ -from __future__ import annotations - -import logging -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Any, final - -from dsc.item_submission import ItemSubmission -from dsc.utilities.aws.s3 import S3Client - -if TYPE_CHECKING: - from collections.abc import Iterator - -logger = logging.getLogger(__name__) - - -class BaseWorkflow(ABC): - """A base workflow class from which other workflow classes are derived.""" - - def __init__( - self, - email_recipients: list[str], - metadata_mapping: dict, - s3_bucket: str, - s3_prefix: str | None, - ) -> None: - """Initialize base instance. - - Args: - email_recipients: The email addresses to notify after runs of the workflow. - metadata_mapping: A mapping file for generating DSpace metadata from the - workflow's source metadata. - s3_bucket: The S3 bucket containing bitstream and metadata files for the - workflow. - s3_prefix: The S3 prefix used for objects in this workflow. This prefix does - NOT include the bucket name. - """ - self.email_recipients: list[str] = email_recipients - self.metadata_mapping: dict = metadata_mapping - self.s3_bucket: str = s3_bucket - self.s3_prefix: str | None = s3_prefix - - @final - def generate_submission_batch(self) -> Iterator[tuple[str, list[str]]]: - """Generate a batch of item submissions for the DSpace Submission Service. - - MUST NOT be overridden by workflow subclasses. - """ - s3_client = S3Client() - batch_metadata = self.get_batch_metadata() - for item_metadata in batch_metadata: - item_identifier = self.get_item_identifier(item_metadata) - logger.info(f"Processing submission for '{item_identifier}'") - metadata_keyname = f"{self.s3_prefix}/{item_identifier}_metadata.json" - item_submission = ItemSubmission( - source_metadata=item_metadata, - metadata_mapping=self.metadata_mapping, - s3_client=s3_client, - bitstream_uris=self.get_bitstream_uris(item_identifier), - metadata_keyname=metadata_keyname, - ) - item_submission.generate_and_upload_dspace_metadata(self.s3_bucket) - yield item_submission.metadata_uri, item_submission.bitstream_uris - - @abstractmethod - def get_batch_metadata(self) -> list: - """Get source metadata for the batch of items submissions. - - MUST be overridden by workflow subclasses. - """ - - @abstractmethod - def get_item_identifier(self, item_metadata: Any) -> str: # noqa: ANN401 - """Get identifier for an item submission according to the workflow subclass. - - MUST be overridden by workflow subclasses. - - Args: - item_metadata: The item metadata from which the item identifier is extracted. - """ - - @abstractmethod - def get_bitstream_uris(self, item_identifier: str) -> list[str]: - """Get bitstreams for an item submission according to the workflow subclass. - - MUST be overridden by workflow subclasses. - - Args: - item_identifier: The identifier used for locating the item's bitstreams. - """ - - @abstractmethod - def process_deposit_results(self) -> list[str]: - """Process results generated by the deposit according to the workflow subclass. - - MUST be overridden by workflow subclasses. - """ diff --git a/dsc/config.py b/dsc/config.py index 7e39ee6..eb11bdd 100644 --- a/dsc/config.py +++ b/dsc/config.py @@ -11,6 +11,7 @@ class Config: "WORKSPACE", "SENTRY_DSN", "AWS_REGION_NAME", + "DSS_INPUT_QUEUE", ] OPTIONAL_ENV_VARS: Iterable[str] = ["LOG_LEVEL"] diff --git a/dsc/exceptions.py b/dsc/exceptions.py index 873432a..e6b8855 100644 --- a/dsc/exceptions.py +++ b/dsc/exceptions.py @@ -1,2 +1,6 @@ +class InvalidDSpaceMetadataError(Exception): + pass + + class InvalidSQSMessageError(Exception): pass diff --git a/dsc/item_submission.py b/dsc/item_submission.py index 88ec3bb..86cc449 100644 --- a/dsc/item_submission.py +++ b/dsc/item_submission.py @@ -12,57 +12,21 @@ class ItemSubmission: """A class to store the required values for a DSpace submission.""" - source_metadata: dict[str, Any] - metadata_mapping: dict[str, Any] - s3_client: S3Client + dspace_metadata: dict[str, Any] bitstream_uris: list[str] metadata_keyname: str metadata_uri: str = "" - def generate_and_upload_dspace_metadata(self, bucket: str) -> None: - """Generate DSpace metadata from the item's source metadata and upload it to S3. + def upload_dspace_metadata(self, bucket: str) -> None: + """Upload DSpace metadata to S3 using the specified bucket and keyname. Args: bucket: The S3 bucket for uploading the item metadata file. """ - dspace_metadata = self.create_dspace_metadata() - self.s3_client.put_file( - json.dumps(dspace_metadata), - bucket, - self.metadata_keyname, + s3_client = S3Client() + s3_client.put_file( + json.dumps(self.dspace_metadata), bucket, self.metadata_keyname ) metadata_uri = f"s3://{bucket}/{self.metadata_keyname}" logger.info(f"Metadata uploaded to S3: {metadata_uri}") self.metadata_uri = metadata_uri - - def create_dspace_metadata(self) -> dict[str, Any]: - """Create DSpace metadata from the item's source metadata.""" - metadata_entries = [] - for field_name, field_mapping in self.metadata_mapping.items(): - if field_name not in ["item_identifier", "source_system_identifier"]: - - field_value = self.source_metadata.get(field_mapping["source_field_name"]) - if field_value: - delimiter = field_mapping["delimiter"] - language = field_mapping["language"] - if delimiter: - metadata_entries.extend( - [ - { - "key": field_name, - "value": value, - "language": language, - } - for value in field_value.split(delimiter) - ] - ) - else: - metadata_entries.append( - { - "key": field_name, - "value": field_value, - "language": language, - } - ) - - return {"metadata": metadata_entries} diff --git a/dsc/workflows/__init__.py b/dsc/workflows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dsc/workflows/base/__init__.py b/dsc/workflows/base/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dsc/workflows/base/base_workflow.py b/dsc/workflows/base/base_workflow.py new file mode 100644 index 0000000..ddc04e1 --- /dev/null +++ b/dsc/workflows/base/base_workflow.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, final + +from dsc.exceptions import InvalidDSpaceMetadataError +from dsc.item_submission import ItemSubmission + +if TYPE_CHECKING: + from collections.abc import Iterator + +logger = logging.getLogger(__name__) + + +class BaseWorkflow(ABC): + """A base workflow class from which other workflow classes are derived.""" + + def __init__( + self, + workflow_name: str, + submission_system: str, + email_recipients: list[str], + metadata_mapping: dict, + s3_bucket: str, + s3_prefix: str | None, + collection_handle: str, + output_queue: str, + ) -> None: + """Initialize base instance. + + Args: + workflow_name: The name of the workflow. + submission_system: The system to which item submissions will be sent (e.g. + DSpace@MIT) + email_recipients: The email addresses to notify after runs of the workflow. + metadata_mapping: A mapping file for generating DSpace metadata from the + workflow's source metadata. + s3_bucket: The S3 bucket containing bitstream and metadata files for the + workflow. + s3_prefix: The S3 prefix used for objects in this workflow. This prefix does + NOT include the bucket name. + collection_handle: The handle of the DSpace collection to which submissions + will be uploaded + output_queue: The SQS output queue used for retrieving result messages from + the workflow's submissions. + """ + self.workflow_name: str = workflow_name + self.submission_system: str = submission_system + self.email_recipients: list[str] = email_recipients + self.metadata_mapping: dict = metadata_mapping + self.s3_bucket: str = s3_bucket + self.s3_prefix: str | None = s3_prefix + self.collection_handle: str = collection_handle + self.output_queue: str = output_queue + + @final # noqa: B027 + def run(self) -> None: + """Run workflow to submit items to the DSpace Submission Service. + + PLANNED CODE: + + sqs_client = SQSClient() + for item_submission in self.item_submissions_iter(): + item_submission.upload_dspace_metadata( + self.s3_bucket, item_submission.metadata_keyname + ) + sqs_client.send_submission_message( + item_submission.item_identifier, + self.workflow_name, + self.output_queue, + self.submission_system, + self.collection_handle, + item_submission.metadata_uri, + item_submission.bitstream_uris, + ) + """ + + @final + def item_submissions_iter(self) -> Iterator[ItemSubmission]: + """Generate a batch of item submissions for the DSpace Submission Service. + + MUST NOT be overridden by workflow subclasses. + """ + for item_metadata in self.batch_metadata_iter(): + item_identifier = self.get_item_identifier(item_metadata) + logger.info(f"Processing submission for '{item_identifier}'") + metadata_keyname = f"{self.s3_prefix}/{item_identifier}_metadata.json" + dspace_metadata = self.create_dspace_metadata(item_metadata) + if self.validate_dspace_metadata(dspace_metadata): + item_submission = ItemSubmission( + dspace_metadata=dspace_metadata, + bitstream_uris=self.get_bitstream_uris(item_identifier), + metadata_keyname=metadata_keyname, + ) + yield item_submission + + @abstractmethod + def batch_metadata_iter(self) -> Iterator[dict[str, Any]]: + """Iterate through batch metadata to yield item metadata. + + MUST be overridden by workflow subclasses. + """ + + @abstractmethod + def get_item_identifier(self, item_metadata: Any) -> str: # noqa: ANN401 + """Get identifier for an item submission according to the workflow subclass. + + MUST be overridden by workflow subclasses. + + Args: + item_metadata: The item metadata from which the item identifier is extracted. + """ + + @final + def create_dspace_metadata(self, item_metadata: dict[str, Any]) -> dict[str, Any]: + """Create DSpace metadata from the item's source metadata. + + A metadata mapping is a dict with the format seen below: + + { + "dc.contributor": { + "source_field_name": "contributor", + "language": None, + "delimiter": "|", + } + + MUST NOT be overridden by workflow subclasses. + + Args: + item_metadata: Item metadata from which the DSpace metadata will be derived. + """ + metadata_entries = [] + for field_name, field_mapping in self.metadata_mapping.items(): + if field_name not in ["item_identifier", "source_system_identifier"]: + + field_value = item_metadata.get(field_mapping["source_field_name"]) + if field_value: + delimiter = field_mapping["delimiter"] + language = field_mapping["language"] + if delimiter: + metadata_entries.extend( + [ + { + "key": field_name, + "value": value, + "language": language, + } + for value in field_value.split(delimiter) + ] + ) + else: + metadata_entries.append( + { + "key": field_name, + "value": field_value, + "language": language, + } + ) + + return {"metadata": metadata_entries} + + @final + def validate_dspace_metadata(self, dspace_metadata: dict[str, Any]) -> bool: + """Validate that DSpace metadata follows the expected format for DSpace 6.x. + + MUST NOT be overridden by workflow subclasses. + + Args: + dspace_metadata: DSpace metadata to be validated. + """ + valid = False + if dspace_metadata.get("metadata") is not None: + for element in dspace_metadata["metadata"]: + if element.get("key") is not None and element.get("value") is not None: + valid = True + logger.debug("Valid DSpace metadata created") + else: + raise InvalidDSpaceMetadataError( + f"Invalid DSpace metadata created: {dspace_metadata} ", + ) + return valid + + @abstractmethod + def get_bitstream_uris(self, item_identifier: str) -> list[str]: + """Get bitstreams for an item submission according to the workflow subclass. + + MUST be overridden by workflow subclasses. + + Args: + item_identifier: The identifier used for locating the item's bitstreams. + """ + + @abstractmethod + def process_deposit_results(self) -> list[str]: + """Process results generated by the deposit according to the workflow subclass. + + MUST be overridden by workflow subclasses. + """ diff --git a/tests/conftest.py b/tests/conftest.py index 5d282f7..e27c176 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,6 +11,7 @@ from dsc.utilities.aws.s3 import S3Client from dsc.utilities.aws.ses import SESClient from dsc.utilities.aws.sqs import SQSClient +from dsc.workflows.base.base_workflow import BaseWorkflow @pytest.fixture(autouse=True) @@ -20,6 +21,40 @@ def _test_env(monkeypatch): monkeypatch.setenv("AWS_REGION_NAME", "us-east-1") monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing") monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing") + monkeypatch.setenv("DSS_INPUT_QUEUE", "mock-input-queue") + + +@pytest.fixture +def base_workflow_instance(item_metadata, metadata_mapping, mocked_s3): + class TestBaseWorkflow(BaseWorkflow): + + def batch_metadata_iter(self): + yield from [item_metadata] + + def get_item_identifier(self, item_metadata): + return item_metadata["item_identifier"] + + def get_bitstream_uris(self, item_identifier): + bitstreams = [ + "s3://dsc/workflow/folder/123_01.pdf", + "s3://dsc/workflow/folder/123_02.pdf", + "s3://dsc/workflow/folder/456_01.pdf", + ] + return [bitstream for bitstream in bitstreams if item_identifier in bitstream] + + def process_deposit_results(self): + pass + + return TestBaseWorkflow( + workflow_name="test", + submission_system="Test@MIT", + email_recipients=["test@test.test"], + metadata_mapping=metadata_mapping, + s3_bucket="dsc", + s3_prefix="workflow/folder", + collection_handle="123.4/5678", + output_queue="mock-output_queue", + ) @pytest.fixture @@ -28,12 +63,41 @@ def config_instance(): @pytest.fixture -def item_submission_instance(metadata_mapping, s3_client): - source_metadata = {"title": "Title", "contributor": "Author 1|Author 2"} +def dspace_metadata(): + return { + "metadata": [ + { + "key": "dc.title", + "language": "en_US", + "value": "Title", + }, + { + "key": "dc.contributor", + "language": None, + "value": "Author 1", + }, + { + "key": "dc.contributor", + "language": None, + "value": "Author 2", + }, + ] + } + + +@pytest.fixture +def item_metadata(): + return { + "title": "Title", + "contributor": "Author 1|Author 2", + "item_identifier": "123", + } + + +@pytest.fixture +def item_submission_instance(dspace_metadata): return ItemSubmission( - source_metadata=source_metadata, - metadata_mapping=metadata_mapping, - s3_client=s3_client, + dspace_metadata=dspace_metadata, bitstream_uris=[ "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", @@ -80,7 +144,7 @@ def mocked_ses(config_instance): @pytest.fixture -def mocked_sqs_input(sqs_client, config_instance): +def mocked_sqs_input(config_instance): with mock_aws(): sqs = boto3.resource("sqs", region_name=config_instance.AWS_REGION_NAME) sqs.create_queue(QueueName="mock-input-queue") diff --git a/tests/test_base_workflow.py b/tests/test_base_workflow.py new file mode 100644 index 0000000..e9d9415 --- /dev/null +++ b/tests/test_base_workflow.py @@ -0,0 +1,49 @@ +import pytest + +from dsc.exceptions import InvalidDSpaceMetadataError +from dsc.item_submission import ItemSubmission + + +def test_base_workflow_item_submission_iter(base_workflow_instance): + assert next(base_workflow_instance.item_submissions_iter()) == ItemSubmission( + dspace_metadata={ + "metadata": [ + {"key": "dc.title", "value": "Title", "language": "en_US"}, + {"key": "dc.contributor", "value": "Author 1", "language": None}, + {"key": "dc.contributor", "value": "Author 2", "language": None}, + ] + }, + bitstream_uris=[ + "s3://dsc/workflow/folder/123_01.pdf", + "s3://dsc/workflow/folder/123_02.pdf", + ], + metadata_keyname="workflow/folder/123_metadata.json", + metadata_uri="", + ) + + +def test_base_workflow_create_dspace_metadata( + base_workflow_instance, + item_metadata, +): + assert base_workflow_instance.create_dspace_metadata(item_metadata) == { + "metadata": [ + {"key": "dc.title", "language": "en_US", "value": "Title"}, + {"key": "dc.contributor", "language": None, "value": "Author 1"}, + {"key": "dc.contributor", "language": None, "value": "Author 2"}, + ] + } + + +def test_base_workflow_validate_dspace_metadata_success( + base_workflow_instance, + dspace_metadata, +): + assert base_workflow_instance.validate_dspace_metadata(dspace_metadata) + + +def test_base_workflow_validate_dspace_metadata_invalid_raises_exception( + base_workflow_instance, +): + with pytest.raises(InvalidDSpaceMetadataError): + base_workflow_instance.validate_dspace_metadata({}) diff --git a/tests/test_itemsubmission.py b/tests/test_itemsubmission.py index 3cd3d96..7da0cda 100644 --- a/tests/test_itemsubmission.py +++ b/tests/test_itemsubmission.py @@ -1,24 +1,8 @@ from http import HTTPStatus -def test_itemsubmission_init_success(item_submission_instance): - assert item_submission_instance.source_metadata == { - "title": "Title", - "contributor": "Author 1|Author 2", - } - assert item_submission_instance.metadata_mapping == { - "dc.contributor": { - "delimiter": "|", - "language": None, - "source_field_name": "contributor", - }, - "dc.title": {"delimiter": "", "language": "en_US", "source_field_name": "title"}, - "item_identifier": { - "delimiter": "", - "language": None, - "source_field_name": "item_identifier", - }, - } +def test_itemsubmission_init_success(item_submission_instance, dspace_metadata): + assert item_submission_instance.dspace_metadata == dspace_metadata assert item_submission_instance.bitstream_uris == [ "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", @@ -28,10 +12,8 @@ def test_itemsubmission_init_success(item_submission_instance): ) -def test_generate_and_upload_dspace_metadata( - mocked_s3, item_submission_instance, s3_client -): - item_submission_instance.generate_and_upload_dspace_metadata("dsc") +def test_upload_dspace_metadata(mocked_s3, item_submission_instance, s3_client): + item_submission_instance.upload_dspace_metadata("dsc") assert ( item_submission_instance.metadata_uri == "s3://dsc/workflow/folder/123_metadata.json" @@ -40,25 +22,3 @@ def test_generate_and_upload_dspace_metadata( Bucket="dsc", Key="workflow/folder/123_metadata.json" ) assert response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK - - -def test_create_dspace_metadata(item_submission_instance): - assert item_submission_instance.create_dspace_metadata() == { - "metadata": [ - { - "key": "dc.title", - "language": "en_US", - "value": "Title", - }, - { - "key": "dc.contributor", - "language": None, - "value": "Author 1", - }, - { - "key": "dc.contributor", - "language": None, - "value": "Author 2", - }, - ] - } From dc07a1599e459074163d3d9af25669171c45d168 Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Wed, 18 Dec 2024 16:28:59 -0500 Subject: [PATCH 5/5] Further updates based on discussion in PR # 21 * Add ItemMetadatMissingRequiredFieldError exception * Rename metadata_keyname > metadata_s3_key * Shift BaseWorkflow to base/__init__.py and remove base_workflow.py * Add required key to metadata mapping and corresponding unit test * Update unit tests names to include _success --- dsc/exceptions.py | 4 + dsc/item_submission.py | 8 +- dsc/workflows/__init__.py | 10 ++ dsc/workflows/base/__init__.py | 207 ++++++++++++++++++++++++++++ dsc/workflows/base/base_workflow.py | 199 -------------------------- tests/conftest.py | 6 +- tests/test_base_workflow.py | 20 ++- tests/test_itemsubmission.py | 6 +- 8 files changed, 246 insertions(+), 214 deletions(-) delete mode 100644 dsc/workflows/base/base_workflow.py diff --git a/dsc/exceptions.py b/dsc/exceptions.py index e6b8855..caed71d 100644 --- a/dsc/exceptions.py +++ b/dsc/exceptions.py @@ -4,3 +4,7 @@ class InvalidDSpaceMetadataError(Exception): class InvalidSQSMessageError(Exception): pass + + +class ItemMetadatMissingRequiredFieldError(Exception): + pass diff --git a/dsc/item_submission.py b/dsc/item_submission.py index 86cc449..4b26d6f 100644 --- a/dsc/item_submission.py +++ b/dsc/item_submission.py @@ -14,7 +14,7 @@ class ItemSubmission: dspace_metadata: dict[str, Any] bitstream_uris: list[str] - metadata_keyname: str + metadata_s3_key: str metadata_uri: str = "" def upload_dspace_metadata(self, bucket: str) -> None: @@ -24,9 +24,7 @@ def upload_dspace_metadata(self, bucket: str) -> None: bucket: The S3 bucket for uploading the item metadata file. """ s3_client = S3Client() - s3_client.put_file( - json.dumps(self.dspace_metadata), bucket, self.metadata_keyname - ) - metadata_uri = f"s3://{bucket}/{self.metadata_keyname}" + s3_client.put_file(json.dumps(self.dspace_metadata), bucket, self.metadata_s3_key) + metadata_uri = f"s3://{bucket}/{self.metadata_s3_key}" logger.info(f"Metadata uploaded to S3: {metadata_uri}") self.metadata_uri = metadata_uri diff --git a/dsc/workflows/__init__.py b/dsc/workflows/__init__.py index e69de29..9b5c9ff 100644 --- a/dsc/workflows/__init__.py +++ b/dsc/workflows/__init__.py @@ -0,0 +1,10 @@ +"""dsc.workflows. + +All primary functions used by CLI are importable from here. +""" + +from dsc.workflows.base import BaseWorkflow + +__all__ = [ + "BaseWorkflow", +] diff --git a/dsc/workflows/base/__init__.py b/dsc/workflows/base/__init__.py index e69de29..95168cd 100644 --- a/dsc/workflows/base/__init__.py +++ b/dsc/workflows/base/__init__.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, final + +from dsc.exceptions import ( + InvalidDSpaceMetadataError, + ItemMetadatMissingRequiredFieldError, +) +from dsc.item_submission import ItemSubmission + +if TYPE_CHECKING: + from collections.abc import Iterator + +logger = logging.getLogger(__name__) + + +class BaseWorkflow(ABC): + """A base workflow class from which other workflow classes are derived.""" + + def __init__( + self, + workflow_name: str, + submission_system: str, + email_recipients: list[str], + metadata_mapping: dict, + s3_bucket: str, + s3_prefix: str | None, + collection_handle: str, + output_queue: str, + ) -> None: + """Initialize base instance. + + Args: + workflow_name: The name of the workflow. + submission_system: The system to which item submissions will be sent (e.g. + DSpace@MIT) + email_recipients: The email addresses to notify after runs of the workflow. + metadata_mapping: A mapping file for generating DSpace metadata from the + workflow's source metadata. + s3_bucket: The S3 bucket containing bitstream and metadata files for the + workflow. + s3_prefix: The S3 prefix used for objects in this workflow. This prefix does + NOT include the bucket name. + collection_handle: The handle of the DSpace collection to which submissions + will be uploaded + output_queue: The SQS output queue used for retrieving result messages from + the workflow's submissions. + """ + self.workflow_name: str = workflow_name + self.submission_system: str = submission_system + self.email_recipients: list[str] = email_recipients + self.metadata_mapping: dict = metadata_mapping + self.s3_bucket: str = s3_bucket + self.s3_prefix: str | None = s3_prefix + self.collection_handle: str = collection_handle + self.output_queue: str = output_queue + + @final # noqa: B027 + def run(self) -> None: + """Run workflow to submit items to the DSpace Submission Service. + + PLANNED CODE: + + sqs_client = SQSClient() + for item_submission in self.item_submissions_iter(): + item_submission.upload_dspace_metadata( + self.s3_bucket, item_submission.metadata_s3_key + ) + sqs_client.send_submission_message( + item_submission.item_identifier, + self.workflow_name, + self.output_queue, + self.submission_system, + self.collection_handle, + item_submission.metadata_uri, + item_submission.bitstream_uris, + ) + """ + + @final + def item_submissions_iter(self) -> Iterator[ItemSubmission]: + """Generate a batch of item submissions for the DSpace Submission Service. + + MUST NOT be overridden by workflow subclasses. + """ + for item_metadata in self.batch_metadata_iter(): + item_identifier = self.get_item_identifier(item_metadata) + logger.info(f"Processing submission for '{item_identifier}'") + metadata_s3_key = f"{self.s3_prefix}/{item_identifier}_metadata.json" + dspace_metadata = self.create_dspace_metadata(item_metadata) + self.validate_dspace_metadata(dspace_metadata) + item_submission = ItemSubmission( + dspace_metadata=dspace_metadata, + bitstream_uris=self.get_bitstream_uris(item_identifier), + metadata_s3_key=metadata_s3_key, + ) + yield item_submission + + @abstractmethod + def batch_metadata_iter(self) -> Iterator[dict[str, Any]]: + """Iterate through batch metadata to yield item metadata. + + MUST be overridden by workflow subclasses. + """ + + @abstractmethod + def get_item_identifier(self, item_metadata: dict[str, Any]) -> str: + """Get identifier for an item submission according to the workflow subclass. + + MUST be overridden by workflow subclasses. + + Args: + item_metadata: The item metadata from which the item identifier is extracted. + """ + + @final + def create_dspace_metadata(self, item_metadata: dict[str, Any]) -> dict[str, Any]: + """Create DSpace metadata from the item's source metadata. + + A metadata mapping is a dict with the format seen below: + + { + "dc.contributor": { + "source_field_name": "contributor", + "language": None, + "delimiter": "|", + } + + MUST NOT be overridden by workflow subclasses. + + Args: + item_metadata: Item metadata from which the DSpace metadata will be derived. + """ + metadata_entries = [] + for field_name, field_mapping in self.metadata_mapping.items(): + if field_name not in ["item_identifier"]: + + field_value = item_metadata.get(field_mapping["source_field_name"]) + if not field_value and field_mapping.get("required", False): + raise ItemMetadatMissingRequiredFieldError( + "Item metadata missing required field: '" + f"{field_mapping["source_field_name"]}'" + ) + if field_value: + delimiter = field_mapping["delimiter"] + language = field_mapping["language"] + if delimiter: + metadata_entries.extend( + [ + { + "key": field_name, + "value": value, + "language": language, + } + for value in field_value.split(delimiter) + ] + ) + else: + metadata_entries.append( + { + "key": field_name, + "value": field_value, + "language": language, + } + ) + + return {"metadata": metadata_entries} + + @final + def validate_dspace_metadata(self, dspace_metadata: dict[str, Any]) -> bool: + """Validate that DSpace metadata follows the expected format for DSpace 6.x. + + MUST NOT be overridden by workflow subclasses. + + Args: + dspace_metadata: DSpace metadata to be validated. + """ + valid = False + if dspace_metadata.get("metadata") is not None: + for element in dspace_metadata["metadata"]: + if element.get("key") is not None and element.get("value") is not None: + valid = True + logger.debug("Valid DSpace metadata created") + else: + raise InvalidDSpaceMetadataError( + f"Invalid DSpace metadata created: {dspace_metadata} ", + ) + return valid + + @abstractmethod + def get_bitstream_uris(self, item_identifier: str) -> list[str]: + """Get bitstreams for an item submission according to the workflow subclass. + + MUST be overridden by workflow subclasses. + + Args: + item_identifier: The identifier used for locating the item's bitstreams. + """ + + @abstractmethod + def process_deposit_results(self) -> list[str]: + """Process results generated by the deposit according to the workflow subclass. + + MUST be overridden by workflow subclasses. + """ diff --git a/dsc/workflows/base/base_workflow.py b/dsc/workflows/base/base_workflow.py deleted file mode 100644 index ddc04e1..0000000 --- a/dsc/workflows/base/base_workflow.py +++ /dev/null @@ -1,199 +0,0 @@ -from __future__ import annotations - -import logging -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Any, final - -from dsc.exceptions import InvalidDSpaceMetadataError -from dsc.item_submission import ItemSubmission - -if TYPE_CHECKING: - from collections.abc import Iterator - -logger = logging.getLogger(__name__) - - -class BaseWorkflow(ABC): - """A base workflow class from which other workflow classes are derived.""" - - def __init__( - self, - workflow_name: str, - submission_system: str, - email_recipients: list[str], - metadata_mapping: dict, - s3_bucket: str, - s3_prefix: str | None, - collection_handle: str, - output_queue: str, - ) -> None: - """Initialize base instance. - - Args: - workflow_name: The name of the workflow. - submission_system: The system to which item submissions will be sent (e.g. - DSpace@MIT) - email_recipients: The email addresses to notify after runs of the workflow. - metadata_mapping: A mapping file for generating DSpace metadata from the - workflow's source metadata. - s3_bucket: The S3 bucket containing bitstream and metadata files for the - workflow. - s3_prefix: The S3 prefix used for objects in this workflow. This prefix does - NOT include the bucket name. - collection_handle: The handle of the DSpace collection to which submissions - will be uploaded - output_queue: The SQS output queue used for retrieving result messages from - the workflow's submissions. - """ - self.workflow_name: str = workflow_name - self.submission_system: str = submission_system - self.email_recipients: list[str] = email_recipients - self.metadata_mapping: dict = metadata_mapping - self.s3_bucket: str = s3_bucket - self.s3_prefix: str | None = s3_prefix - self.collection_handle: str = collection_handle - self.output_queue: str = output_queue - - @final # noqa: B027 - def run(self) -> None: - """Run workflow to submit items to the DSpace Submission Service. - - PLANNED CODE: - - sqs_client = SQSClient() - for item_submission in self.item_submissions_iter(): - item_submission.upload_dspace_metadata( - self.s3_bucket, item_submission.metadata_keyname - ) - sqs_client.send_submission_message( - item_submission.item_identifier, - self.workflow_name, - self.output_queue, - self.submission_system, - self.collection_handle, - item_submission.metadata_uri, - item_submission.bitstream_uris, - ) - """ - - @final - def item_submissions_iter(self) -> Iterator[ItemSubmission]: - """Generate a batch of item submissions for the DSpace Submission Service. - - MUST NOT be overridden by workflow subclasses. - """ - for item_metadata in self.batch_metadata_iter(): - item_identifier = self.get_item_identifier(item_metadata) - logger.info(f"Processing submission for '{item_identifier}'") - metadata_keyname = f"{self.s3_prefix}/{item_identifier}_metadata.json" - dspace_metadata = self.create_dspace_metadata(item_metadata) - if self.validate_dspace_metadata(dspace_metadata): - item_submission = ItemSubmission( - dspace_metadata=dspace_metadata, - bitstream_uris=self.get_bitstream_uris(item_identifier), - metadata_keyname=metadata_keyname, - ) - yield item_submission - - @abstractmethod - def batch_metadata_iter(self) -> Iterator[dict[str, Any]]: - """Iterate through batch metadata to yield item metadata. - - MUST be overridden by workflow subclasses. - """ - - @abstractmethod - def get_item_identifier(self, item_metadata: Any) -> str: # noqa: ANN401 - """Get identifier for an item submission according to the workflow subclass. - - MUST be overridden by workflow subclasses. - - Args: - item_metadata: The item metadata from which the item identifier is extracted. - """ - - @final - def create_dspace_metadata(self, item_metadata: dict[str, Any]) -> dict[str, Any]: - """Create DSpace metadata from the item's source metadata. - - A metadata mapping is a dict with the format seen below: - - { - "dc.contributor": { - "source_field_name": "contributor", - "language": None, - "delimiter": "|", - } - - MUST NOT be overridden by workflow subclasses. - - Args: - item_metadata: Item metadata from which the DSpace metadata will be derived. - """ - metadata_entries = [] - for field_name, field_mapping in self.metadata_mapping.items(): - if field_name not in ["item_identifier", "source_system_identifier"]: - - field_value = item_metadata.get(field_mapping["source_field_name"]) - if field_value: - delimiter = field_mapping["delimiter"] - language = field_mapping["language"] - if delimiter: - metadata_entries.extend( - [ - { - "key": field_name, - "value": value, - "language": language, - } - for value in field_value.split(delimiter) - ] - ) - else: - metadata_entries.append( - { - "key": field_name, - "value": field_value, - "language": language, - } - ) - - return {"metadata": metadata_entries} - - @final - def validate_dspace_metadata(self, dspace_metadata: dict[str, Any]) -> bool: - """Validate that DSpace metadata follows the expected format for DSpace 6.x. - - MUST NOT be overridden by workflow subclasses. - - Args: - dspace_metadata: DSpace metadata to be validated. - """ - valid = False - if dspace_metadata.get("metadata") is not None: - for element in dspace_metadata["metadata"]: - if element.get("key") is not None and element.get("value") is not None: - valid = True - logger.debug("Valid DSpace metadata created") - else: - raise InvalidDSpaceMetadataError( - f"Invalid DSpace metadata created: {dspace_metadata} ", - ) - return valid - - @abstractmethod - def get_bitstream_uris(self, item_identifier: str) -> list[str]: - """Get bitstreams for an item submission according to the workflow subclass. - - MUST be overridden by workflow subclasses. - - Args: - item_identifier: The identifier used for locating the item's bitstreams. - """ - - @abstractmethod - def process_deposit_results(self) -> list[str]: - """Process results generated by the deposit according to the workflow subclass. - - MUST be overridden by workflow subclasses. - """ diff --git a/tests/conftest.py b/tests/conftest.py index e27c176..8badfa6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,7 +11,7 @@ from dsc.utilities.aws.s3 import S3Client from dsc.utilities.aws.ses import SESClient from dsc.utilities.aws.sqs import SQSClient -from dsc.workflows.base.base_workflow import BaseWorkflow +from dsc.workflows.base import BaseWorkflow @pytest.fixture(autouse=True) @@ -102,7 +102,7 @@ def item_submission_instance(dspace_metadata): "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", ], - metadata_keyname="workflow/folder/123_metadata.json", + metadata_s3_key="workflow/folder/123_metadata.json", ) @@ -113,11 +113,13 @@ def metadata_mapping(): "source_field_name": "item_identifier", "language": None, "delimiter": "", + "required": True, }, "dc.title": { "source_field_name": "title", "language": "en_US", "delimiter": "", + "required": True, }, "dc.contributor": { "source_field_name": "contributor", diff --git a/tests/test_base_workflow.py b/tests/test_base_workflow.py index e9d9415..c81aa8a 100644 --- a/tests/test_base_workflow.py +++ b/tests/test_base_workflow.py @@ -1,10 +1,13 @@ import pytest -from dsc.exceptions import InvalidDSpaceMetadataError +from dsc.exceptions import ( + InvalidDSpaceMetadataError, + ItemMetadatMissingRequiredFieldError, +) from dsc.item_submission import ItemSubmission -def test_base_workflow_item_submission_iter(base_workflow_instance): +def test_base_workflow_item_submission_iter_success(base_workflow_instance): assert next(base_workflow_instance.item_submissions_iter()) == ItemSubmission( dspace_metadata={ "metadata": [ @@ -17,12 +20,12 @@ def test_base_workflow_item_submission_iter(base_workflow_instance): "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", ], - metadata_keyname="workflow/folder/123_metadata.json", + metadata_s3_key="workflow/folder/123_metadata.json", metadata_uri="", ) -def test_base_workflow_create_dspace_metadata( +def test_base_workflow_create_dspace_metadata_success( base_workflow_instance, item_metadata, ): @@ -35,6 +38,15 @@ def test_base_workflow_create_dspace_metadata( } +def test_base_workflow_create_dspace_metadata_required_field_missing_raises_exception( + base_workflow_instance, + item_metadata, +): + item_metadata.pop("title") + with pytest.raises(ItemMetadatMissingRequiredFieldError): + base_workflow_instance.create_dspace_metadata(item_metadata) + + def test_base_workflow_validate_dspace_metadata_success( base_workflow_instance, dspace_metadata, diff --git a/tests/test_itemsubmission.py b/tests/test_itemsubmission.py index 7da0cda..6189643 100644 --- a/tests/test_itemsubmission.py +++ b/tests/test_itemsubmission.py @@ -7,12 +7,10 @@ def test_itemsubmission_init_success(item_submission_instance, dspace_metadata): "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", ] - assert ( - item_submission_instance.metadata_keyname == "workflow/folder/123_metadata.json" - ) + assert item_submission_instance.metadata_s3_key == "workflow/folder/123_metadata.json" -def test_upload_dspace_metadata(mocked_s3, item_submission_instance, s3_client): +def test_upload_dspace_metadata_success(mocked_s3, item_submission_instance, s3_client): item_submission_instance.upload_dspace_metadata("dsc") assert ( item_submission_instance.metadata_uri