-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
In 1101 base workflow #21
Changes from 2 commits
21414da
769d6da
ead9f88
430f3d2
dc07a15
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
from abc import ABC, abstractmethod | ||
from typing import TYPE_CHECKING, Any, final | ||
|
||
from dsc.item_submission import ItemSubmission | ||
from dsc.utilities.aws.s3 import S3Client | ||
|
||
if TYPE_CHECKING: | ||
from collections.abc import Iterator | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class BaseWorkflow(ABC): | ||
"""A base workflow class from which other workflow classes are derived.""" | ||
|
||
def __init__( | ||
self, | ||
email_recipients: list[str], | ||
metadata_mapping: dict, | ||
s3_bucket: str, | ||
s3_prefix: str | None, | ||
) -> None: | ||
"""Initialize base instance. | ||
|
||
Args: | ||
email_recipients: The email addresses to notify after runs of the workflow. | ||
metadata_mapping: A mapping file for generating DSpace metadata from the | ||
workflow's source metadata. | ||
s3_bucket: The S3 bucket containing bitstream and metadata files for the | ||
workflow. | ||
s3_prefix: The S3 prefix used for objects in this workflow. This prefix does | ||
NOT include the bucket name. | ||
""" | ||
self.email_recipients: list[str] = email_recipients | ||
self.metadata_mapping: dict = metadata_mapping | ||
self.s3_bucket: str = s3_bucket | ||
self.s3_prefix: str | None = s3_prefix | ||
|
||
@final | ||
def generate_submission_batch(self) -> Iterator[tuple[str, list[str]]]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm realizing after my first pass that I have a couple of fundamental questions about this base workflow method. When this is complete, is it correct that:
My question: what is utilizing the results of this yielding? Will it be another class method? or the CLI itself? Unless I'm grossly misunderstanding this application, wouldn't something need to send these as an SQS message to the DSS input queue? Related question -- and apologies if this will come later -- what happens with the bistream URIs? Are they included in the SQS message for DSS? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The yielded URIs will be utilized by the
The workflow class is intended for just the workflow-opinionated code while the CLI command will be a wrapper with universal functionality (e.g. sending SQS messages, SES emails) And yes, the bitstream URIs are in the the SQS message as well as the URI of the JSON metadata file that is uploaded to S3, DSS submission message spec for reference. The steps would be:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, sorry missed that in the PR! Thanks for pointing that out. And thanks for sharing the DSS submission message spec. That document is SUPER helpful, and I think it'd be handy to include in docstrings in this application where applicable. And, thanks for the steps; I had missed that This is quite helpful for helping me clarify my next question: any reason why we couldn't yield the Happy to wait on that until the CLI level orchestration is in place, where these would get sent out as SQS messages. Are you envisioning some kind of Which I think has a natural follow-up question: why couldn't Happy to meet and discuss if helpful, or keep hashing out in comments. When I see a class like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think it certainly could! Let's get @jonavellecuerdo 's thoughts as well but now I'm inclined to build SQS message formatting into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do think yielding the actual |
||
"""Generate a batch of item submissions for the DSpace Submission Service. | ||
|
||
MUST NOT be overridden by workflow subclasses. | ||
""" | ||
s3_client = S3Client() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might propose that I would think that Or, have a property like this if multiple methods may eventually require an S3 client:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do! |
||
batch_metadata = self.get_batch_metadata() | ||
for item_metadata in batch_metadata: | ||
item_identifier = self.get_item_identifier(item_metadata) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be a can of worms, but I think it gets at one of my primary questions at this point. So Where I think I'm going with this, is that While perhaps many workflows might have a CSV file they parse and then return a However, I see that we assign this "metadata object" to I was going to propose something like: # near top of file
ItemMetadata = TypeVar("ItemMetadata")
...
...
def get_batch_metadata(self) -> list[ItemMetadata]:
...
def get_item_identifier(self, item_metadata: ItemMetadata) -> str:
... Where we through through a generic type that these are related. But, this kind of generic type + flexibility doesn't really work if To keep things simple, do you think it'd be worth just typing the output of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would say every method the the workflow classes are tightly coupled and opinionated to that workflow. In retrospect, I thought I errored in typing However! That is now suggesting to me that a
Currently, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So would it be accurate to say?
I'm having trouble articulating what feels confusing about the relationship of,
It feels kind of like they are very tightly coupled, and creating new workflows that extend this class would be jumping through hoops to define those inter-related methods. When -- and maybe missing something here -- a workflow could just be responsible for defining something like Which, and I'm feeling dizzy from circular questioning, why wouldn't they directly yield an Each of those workflows could, and should, define helper methods as needed to support this work. But in some cases, like reading a CSV from S3, it feels like import pandas as pd
df = pd.read_csv("s3://....")
for _, row in df.iterrows():
yield row # <----- basically an Item or to extend that for some questions above: import pandas as pd
from dsc import ItemSubmission
class FooWorkflow:
def __init__(self, csv_location):
self.csv_location = csv_location
def fetch_items():
df = pd.read_csv(self.csv_location)
for _, row in df.iterrows():
# extract identifiers, bit streams, etc.
yield ItemSubmission(...) Sorry if confusing the issue, and happy to hold off on these questions until more is established. |
||
logger.info(f"Processing submission for '{item_identifier}'") | ||
metadata_keyname = f"{self.s3_prefix}/{item_identifier}_metadata.json" | ||
item_submission = ItemSubmission( | ||
source_metadata=item_metadata, | ||
metadata_mapping=self.metadata_mapping, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My next can-o-worms question is here. So it's safe to assume that Super rough example: {
"title":"dc.title",
...
} Do you expect there will be any difficulties with nesting, logic, or multivalued fields? For example: what if the input metadata for workflow But, this assumes quite a bit. First, that DSpace's metadata profile can (or will) allow for more complex or multivalued fields. Second, that we don't just require the input metadata format to have basically a 1:1 mapping to DSpace fields already present. I think this consideration points to Going to leave this comment here, as-is, and followup in that method. |
||
s3_client=s3_client, | ||
bitstream_uris=self.get_bitstream_uris(item_identifier), | ||
metadata_keyname=metadata_keyname, | ||
) | ||
item_submission.generate_and_upload_dspace_metadata(self.s3_bucket) | ||
yield item_submission.metadata_uri, item_submission.bitstream_uris | ||
|
||
@abstractmethod | ||
def get_batch_metadata(self) -> list: | ||
"""Get source metadata for the batch of items submissions. | ||
|
||
MUST be overridden by workflow subclasses. | ||
""" | ||
|
||
@abstractmethod | ||
def get_item_identifier(self, item_metadata: Any) -> str: # noqa: ANN401 | ||
"""Get identifier for an item submission according to the workflow subclass. | ||
|
||
MUST be overridden by workflow subclasses. | ||
|
||
Args: | ||
item_metadata: The item metadata from which the item identifier is extracted. | ||
""" | ||
|
||
@abstractmethod | ||
def get_bitstream_uris(self, item_identifier: str) -> list[str]: | ||
"""Get bitstreams for an item submission according to the workflow subclass. | ||
|
||
MUST be overridden by workflow subclasses. | ||
|
||
Args: | ||
item_identifier: The identifier used for locating the item's bitstreams. | ||
""" | ||
|
||
@abstractmethod | ||
def process_deposit_results(self) -> list[str]: | ||
"""Process results generated by the deposit according to the workflow subclass. | ||
|
||
MUST be overridden by workflow subclasses. | ||
""" |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taking some of the other file reorganizing and renaming into consideration -- and thanks BTW, it's feeling good to navigate around! -- I could envision this file called something more high level like I don't have evidence or even a pointable philosophy to support it, but when the classname mirrors the filename, it feels a bit off. While we may not need more "item" type classes, it's tight coupling between file and class names. Totally optional. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import json | ||
import logging | ||
from dataclasses import dataclass | ||
from typing import Any | ||
|
||
from dsc.utilities.aws.s3 import S3Client | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@dataclass | ||
class ItemSubmission: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Side note: If we find that |
||
"""A class to store the required values for a DSpace submission.""" | ||
|
||
source_metadata: dict[str, Any] | ||
metadata_mapping: dict[str, Any] | ||
s3_client: S3Client | ||
bitstream_uris: list[str] | ||
metadata_keyname: str | ||
metadata_uri: str = "" | ||
|
||
def generate_and_upload_dspace_metadata(self, bucket: str) -> None: | ||
"""Generate DSpace metadata from the item's source metadata and upload it to S3. | ||
|
||
Args: | ||
bucket: The S3 bucket for uploading the item metadata file. | ||
""" | ||
dspace_metadata = self.create_dspace_metadata() | ||
self.s3_client.put_file( | ||
json.dumps(dspace_metadata), | ||
bucket, | ||
self.metadata_keyname, | ||
) | ||
metadata_uri = f"s3://{bucket}/{self.metadata_keyname}" | ||
logger.info(f"Metadata uploaded to S3: {metadata_uri}") | ||
self.metadata_uri = metadata_uri | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I considered exception handling here but as with |
||
|
||
def create_dspace_metadata(self) -> dict[str, Any]: | ||
"""Create DSpace metadata from the item's source metadata.""" | ||
metadata_entries = [] | ||
for field_name, field_mapping in self.metadata_mapping.items(): | ||
if field_name not in ["item_identifier", "source_system_identifier"]: | ||
|
||
field_value = self.source_metadata.get(field_mapping["source_field_name"]) | ||
if field_value: | ||
delimiter = field_mapping["delimiter"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah-ha! Related to my comment above about Request: can you add an example of what this metadata mapping might look like? With an example, I think I would understand that a single dictionary could suffice. |
||
language = field_mapping["language"] | ||
if delimiter: | ||
metadata_entries.extend( | ||
[ | ||
{ | ||
"key": field_name, | ||
"value": value, | ||
"language": language, | ||
} | ||
for value in field_value.split(delimiter) | ||
] | ||
) | ||
else: | ||
metadata_entries.append( | ||
{ | ||
"key": field_name, | ||
"value": field_value, | ||
"language": language, | ||
} | ||
) | ||
|
||
return {"metadata": metadata_entries} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
from moto import mock_aws | ||
|
||
from dsc.config import Config | ||
from dsc.item_submission import ItemSubmission | ||
from dsc.utilities.aws.s3 import S3Client | ||
from dsc.utilities.aws.ses import SESClient | ||
from dsc.utilities.aws.sqs import SQSClient | ||
|
@@ -22,15 +23,51 @@ def _test_env(monkeypatch): | |
|
||
|
||
@pytest.fixture | ||
def config_instance() -> Config: | ||
def config_instance(): | ||
return Config() | ||
|
||
|
||
@pytest.fixture | ||
def item_submission_instance(metadata_mapping, s3_client): | ||
source_metadata = {"title": "Title", "contributor": "Author 1|Author 2"} | ||
return ItemSubmission( | ||
source_metadata=source_metadata, | ||
metadata_mapping=metadata_mapping, | ||
s3_client=s3_client, | ||
bitstream_uris=[ | ||
"s3://dsc/workflow/folder/123_01.pdf", | ||
"s3://dsc/workflow/folder/123_02.pdf", | ||
], | ||
metadata_keyname="workflow/folder/123_metadata.json", | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def metadata_mapping(): | ||
return { | ||
"item_identifier": { | ||
"source_field_name": "item_identifier", | ||
"language": None, | ||
"delimiter": "", | ||
}, | ||
"dc.title": { | ||
"source_field_name": "title", | ||
"language": "en_US", | ||
"delimiter": "", | ||
}, | ||
"dc.contributor": { | ||
"source_field_name": "contributor", | ||
"language": None, | ||
"delimiter": "|", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for adding an example here of a delimiter. My comment above requesting an example in the docstring stands, but double-helpful to have this example here (and test it, of course). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I'll add that to the docstring! |
||
}, | ||
} | ||
|
||
|
||
@pytest.fixture | ||
def mocked_s3(config_instance): | ||
with mock_aws(): | ||
s3 = boto3.client("s3", region_name=config_instance.AWS_REGION_NAME) | ||
s3.create_bucket(Bucket="awd") | ||
s3.create_bucket(Bucket="dsc") | ||
yield s3 | ||
|
||
|
||
|
@@ -130,11 +167,11 @@ def submission_message_body(): | |
{ | ||
"SubmissionSystem": "DSpace@MIT", | ||
"CollectionHandle": "123.4/5678", | ||
"MetadataLocation": "s3://awd/10.1002-term.3131.json", | ||
"MetadataLocation": "s3://dsc/10.1002-term.3131.json", | ||
"Files": [ | ||
{ | ||
"BitstreamName": "10.1002-term.3131.pdf", | ||
"FileLocation": "s3://awd/10.1002-term.3131.pdf", | ||
"FileLocation": "s3://dsc/10.1002-term.3131.pdf", | ||
"BitstreamDescription": None, | ||
} | ||
], | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from http import HTTPStatus | ||
|
||
|
||
def test_itemsubmission_init_success(item_submission_instance): | ||
assert item_submission_instance.source_metadata == { | ||
"title": "Title", | ||
"contributor": "Author 1|Author 2", | ||
} | ||
assert item_submission_instance.metadata_mapping == { | ||
"dc.contributor": { | ||
"delimiter": "|", | ||
"language": None, | ||
"source_field_name": "contributor", | ||
}, | ||
"dc.title": {"delimiter": "", "language": "en_US", "source_field_name": "title"}, | ||
"item_identifier": { | ||
"delimiter": "", | ||
"language": None, | ||
"source_field_name": "item_identifier", | ||
}, | ||
} | ||
assert item_submission_instance.bitstream_uris == [ | ||
"s3://dsc/workflow/folder/123_01.pdf", | ||
"s3://dsc/workflow/folder/123_02.pdf", | ||
] | ||
assert ( | ||
item_submission_instance.metadata_keyname == "workflow/folder/123_metadata.json" | ||
) | ||
|
||
|
||
def test_generate_and_upload_dspace_metadata( | ||
mocked_s3, item_submission_instance, s3_client | ||
): | ||
item_submission_instance.generate_and_upload_dspace_metadata("dsc") | ||
assert ( | ||
item_submission_instance.metadata_uri | ||
== "s3://dsc/workflow/folder/123_metadata.json" | ||
) | ||
response = s3_client.client.get_object( | ||
Bucket="dsc", Key="workflow/folder/123_metadata.json" | ||
) | ||
assert response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK | ||
|
||
|
||
def test_create_dspace_metadata(item_submission_instance): | ||
assert item_submission_instance.create_dspace_metadata() == { | ||
"metadata": [ | ||
{ | ||
"key": "dc.title", | ||
"language": "en_US", | ||
"value": "Title", | ||
}, | ||
{ | ||
"key": "dc.contributor", | ||
"language": None, | ||
"value": "Author 1", | ||
}, | ||
{ | ||
"key": "dc.contributor", | ||
"language": None, | ||
"value": "Author 2", | ||
}, | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commenting here, but this touches on file organization more generally.
base.py
was renamed toworkflows.py
, where it may contain additional classes or helper functions as the project growsitem_submission.py
was renamed toitems.py
, where it's a bit more generic to just providing classes about "Items" (maybe starting withItemSubmission
, but perhaps there is room eventually for something likeItemSource
orItemMetadata
)/sources
where actual workflows that extend these could liveResulting in a structure like:
This could help make clear that anything under
/sources
is an extension of these base files and classes. And then each of those "base" files would be descriptive about what kind of classes it held.NOTE: we could create the
/sources
directory now and include a file like.gitkeep
which would ensure it gets created and committed, even though we don't have sources yet.