Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In 1101 base workflow #21

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 43 additions & 45 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 96 additions & 0 deletions dsc/base.py
Copy link

@ghukill ghukill Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting here, but this touches on file organization more generally.

  • what if base.py was renamed to workflows.py, where it may contain additional classes or helper functions as the project grows
  • what if item_submission.py was renamed to items.py, where it's a bit more generic to just providing classes about "Items" (maybe starting with ItemSubmission, but perhaps there is room eventually for something like ItemSource or ItemMetadata)
  • create a folder called /sources where actual workflows that extend these could live

Resulting in a structure like:

/ dsc
  / sources
    - .gitkeep  # ensures the directory persists, even before source workflows are defined
    - wiley.py  # which would contain a class like `WileyWorkflow`
    - ...
    - ...
  - items.py
  - workflows.py

This could help make clear that anything under /sources is an extension of these base files and classes. And then each of those "base" files would be descriptive about what kind of classes it held.

NOTE: we could create the /sources directory now and include a file like .gitkeep which would ensure it gets created and committed, even though we don't have sources yet.

Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, final

from dsc.item_submission import ItemSubmission
from dsc.utilities.aws.s3 import S3Client

if TYPE_CHECKING:
from collections.abc import Iterator

logger = logging.getLogger(__name__)


class BaseWorkflow(ABC):
"""A base workflow class from which other workflow classes are derived."""

def __init__(
self,
email_recipients: list[str],
metadata_mapping: dict,
s3_bucket: str,
s3_prefix: str | None,
) -> None:
"""Initialize base instance.

Args:
email_recipients: The email addresses to notify after runs of the workflow.
metadata_mapping: A mapping file for generating DSpace metadata from the
workflow's source metadata.
s3_bucket: The S3 bucket containing bitstream and metadata files for the
workflow.
s3_prefix: The S3 prefix used for objects in this workflow. This prefix does
NOT include the bucket name.
"""
self.email_recipients: list[str] = email_recipients
self.metadata_mapping: dict = metadata_mapping
self.s3_bucket: str = s3_bucket
self.s3_prefix: str | None = s3_prefix

@final
def generate_submission_batch(self) -> Iterator[tuple[str, list[str]]]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm realizing after my first pass that I have a couple of fundamental questions about this base workflow method.

When this is complete, is it correct that:

  1. each ItemSubmission instantiated in the loop will have uploaded a JSON file to S3 that is kind of like a SIP for DSS to retrieve and ingest into DSpace
  2. it yields the URI of each new ItemSubmission, which I believe to be the S3 URI of the JSON file uploaded to S3 for that item
  3. it also yields ItemSubmission.bitstream_uris, which are not yet getting set on ItemSubmission

My question: what is utilizing the results of this yielding? Will it be another class method? or the CLI itself? Unless I'm grossly misunderstanding this application, wouldn't something need to send these as an SQS message to the DSS input queue?

Related question -- and apologies if this will come later -- what happens with the bistream URIs? Are they included in the SQS message for DSS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The yielded URIs will be utilized by the deposit command to generate the SQS message, I mentioned in the PR description

with the expectation that unwritten CLI commands will be the interface with the DSS SQS input/output queues and the SES emails.

The workflow class is intended for just the workflow-opinionated code while the CLI command will be a wrapper with universal functionality (e.g. sending SQS messages, SES emails)

And yes, the bitstream URIs are in the the SQS message as well as the URI of the JSON metadata file that is uploaded to S3, DSS submission message spec for reference.

The steps would be:

  1. ItemSubmission instantiated with source metadata and bitstreams URIs
  2. DSpace metadata generated from source metadata
  3. DSpace metadata uploaded to S3 as JSON file and URI is returned
  4. ItemSubmission yields metadata_uris and bitstream_uris to deposit` CLI command for SQS message
  5. deposit CLI command generates and send SQS message to DSS

Copy link

@ghukill ghukill Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry missed that in the PR! Thanks for pointing that out.

And thanks for sharing the DSS submission message spec. That document is SUPER helpful, and I think it'd be handy to include in docstrings in this application where applicable.

And, thanks for the steps; I had missed that get_bitstream_uris() is a method that workflows will need to define, and that's where they come from.

This is quite helpful for helping me clarify my next question: any reason why we couldn't yield the ItemSubmission object itself? If I'm understanding correctly, it will have the metadata_uri and the bitstream_uris.

Happy to wait on that until the CLI level orchestration is in place, where these would get sent out as SQS messages. Are you envisioning some kind of DSSInputMessage class that could encapsulate preparing the final SQS payload for sending, that would meet the spec?

Which I think has a natural follow-up question: why couldn't ItemSubmission have such a method to perform this work? Like to_dss_input_queue_message()?

Happy to meet and discuss if helpful, or keep hashing out in comments. When I see a class like ItemSubmission, which is responsible for uploading the components of a SIP to S3 (metadata and files), I wonder why it wouldn't have the capabilities of actually preparing (and maybe actually sending?) the SQS message for DSS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which I think has a natural follow-up question: why couldn't ItemSubmission have such a method to perform this work? Like to_dss_input_queue_message()?

I think it certainly could! Let's get @jonavellecuerdo 's thoughts as well but now I'm inclined to build SQS message formatting into ItemSubmission rather than the CLI command

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think yielding the actual ItemSubmission instance will allow the object to be accessed outside the context of the BaseWorkflow.generate_submission_batch method. 🤔 I also think if all ItemSubmission instances must comply with the DSS submission message spec, then--as @ehanson8 writes--it also follows that defining the SQS message formatting in ItemSubmission would make sense!

"""Generate a batch of item submissions for the DSpace Submission Service.

MUST NOT be overridden by workflow subclasses.
"""
s3_client = S3Client()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might propose that ItemSubmission could initialize it's own S3Client to use. I know we've touched on caching and reusing clients, but I think the performance gain is basically non-existent here, given how quickly this is initialized.

I would think that ItemSubmission.generate_and_upload_dspace_metadata() could just instantiate and use a client right when it needs it, instead of applying to the ItemSubmission class instance.

Or, have a property like this if multiple methods may eventually require an S3 client:

@property
def s3_client(self):
    return S3Client()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!

batch_metadata = self.get_batch_metadata()
for item_metadata in batch_metadata:
item_identifier = self.get_item_identifier(item_metadata)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a can of worms, but I think it gets at one of my primary questions at this point.

So get_batch_metadata() is expected to return a list of metadata objects for each item that will be submitted. And then we pass that metadata object to get_item_identifier(), where the metadata object could be anything that this method is equipped to handle (noting item_metadata:Any in the signature).

Where I think I'm going with this, is that get_batch_metadata() and get_item_identifier() are kind of tightly coupled -- and this isn't necessarily a bad thing -- in that the objects returned by get_batch_metadata() must satisfy the input to get_item_identifier().

While perhaps many workflows might have a CSV file they parse and then return a dict, and get_item_identifier() could just grab a key... perhaps others might even return their own dataclass object or something, and then get_item_identifier() would use a property on that class.

However, I see that we assign this "metadata object" to ItemSubmission.source_metadata which itself is typed as dict[str, Any].

I was going to propose something like:

# near top of file
ItemMetadata = TypeVar("ItemMetadata")
...
...
def get_batch_metadata(self) -> list[ItemMetadata]:
  ...
def get_item_identifier(self, item_metadata: ItemMetadata) -> str:
  ...

Where we through through a generic type that these are related. But, this kind of generic type + flexibility doesn't really work if ItemSubmission is expecting a dictionary for source_metadata.

To keep things simple, do you think it'd be worth just typing the output of get_batch_metadata() to something like list[dict]? And then get_item_identifier() could accept a dict? Could always revisit if some workflow would really benefit from not yielding a dictionary for each item's metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say every method the the workflow classes are tightly coupled and opinionated to that workflow. In retrospect, I thought I errored in typing item_metadata=Any in get_item_identifier and source_metadata=dict[str, Any] in ItemSubmission since I didn't expect that object to be transformed between getting the identifier and instantiating ItemSubmission.

However! That is now suggesting to me that a get_item_metadata(batch_metadata) method is needed and that those type hints should remain:

batch_metadata = self.get_batch_metadata()
for item_metadata in self.get_item_metadata(batch_metadata):
      item_identifier = self.get_item_identifier(item_metadata)

Currently, batch_metadata in most workflows is a CSV so get_item_metadata would be a simple dict iterator. For OpenCourseWare, it's a list of zip files, so get_item_metadata iterates and extracts a dict from each zip file. If a future source uses XML, get_item_metadata would iterate and generate a dict from the XML. Thoughts?

Copy link

@ghukill ghukill Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So would it be accurate to say?

  1. get_batch_metadata() --> retrieves metadata from somewhere, but what is returned still contains multiple items; probably most often downloading from S3, but theoretically could get it other ways too (e.g. an HTTP request to ETD... 😎 ). The result might be a single CSV reader/dataframe, a single parsed XML file object, etc.

  2. get_item_metadata() --> would yield actual dictionaries of item metadata, for each item, doing whatever is needed (e.g. unzipping files, parsing XML, looping through CSV rows, etc.)

I'm having trouble articulating what feels confusing about the relationship of,

  • get_batch_metadata
  • get_item_identifier
  • get_bitstream_uris
  • and now the proposed get_item_metadata

It feels kind of like they are very tightly coupled, and creating new workflows that extend this class would be jumping through hoops to define those inter-related methods. When -- and maybe missing something here -- a workflow could just be responsible for defining something like fetch_items() which would yield nearly everything you need to instantiate an ItemSubmission.

Which, and I'm feeling dizzy from circular questioning, why wouldn't they directly yield an ItemSubmission then?

Each of those workflows could, and should, define helper methods as needed to support this work. But in some cases, like reading a CSV from S3, it feels like get_batch_metadata (reading the CSV) and yielding items get_item_metadata could be achieved with a couple lines like:

import pandas as pd

df = pd.read_csv("s3://....")
for _, row in df.iterrows():
   yield row # <----- basically an Item

or to extend that for some questions above:

import pandas as pd
from dsc import ItemSubmission

class FooWorkflow:

    def __init__(self, csv_location):
        self.csv_location = csv_location
  
    def fetch_items():        
        df = pd.read_csv(self.csv_location)
        for _, row in df.iterrows():
             # extract identifiers, bit streams, etc.
             yield ItemSubmission(...)

Sorry if confusing the issue, and happy to hold off on these questions until more is established.

logger.info(f"Processing submission for '{item_identifier}'")
metadata_keyname = f"{self.s3_prefix}/{item_identifier}_metadata.json"
item_submission = ItemSubmission(
source_metadata=item_metadata,
metadata_mapping=self.metadata_mapping,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My next can-o-worms question is here.

So it's safe to assume that metadata_mapping is a pretty critically important part of each workflow? This would be a dictionary that maps the item's "original" metadata dictionary to metadata fields that align with the dspace metadata profile?

Super rough example:

{
    "title":"dc.title",
    ...
}

Do you expect there will be any difficulties with nesting, logic, or multivalued fields?

For example: what if the input metadata for workflow Foo has a column in the CSV with semicolon delimited subjects, e.g. apples;washington;mountains. And the goal would be multiple dc.subject field values for each. I'm unsure how a dictionary mapping could explode a single value into multiple values.

But, this assumes quite a bit. First, that DSpace's metadata profile can (or will) allow for more complex or multivalued fields. Second, that we don't just require the input metadata format to have basically a 1:1 mapping to DSpace fields already present.

I think this consideration points to ItemSubmission.create_dspace_metadata() (which I have not yet looked closely at) which I would expect to take the source_metadata and apply the metadata_mapping to get the final item metadata for DSpace.

Going to leave this comment here, as-is, and followup in that method.

s3_client=s3_client,
bitstream_uris=self.get_bitstream_uris(item_identifier),
metadata_keyname=metadata_keyname,
)
item_submission.generate_and_upload_dspace_metadata(self.s3_bucket)
yield item_submission.metadata_uri, item_submission.bitstream_uris

@abstractmethod
def get_batch_metadata(self) -> list:
"""Get source metadata for the batch of items submissions.

MUST be overridden by workflow subclasses.
"""

@abstractmethod
def get_item_identifier(self, item_metadata: Any) -> str: # noqa: ANN401
"""Get identifier for an item submission according to the workflow subclass.

MUST be overridden by workflow subclasses.

Args:
item_metadata: The item metadata from which the item identifier is extracted.
"""

@abstractmethod
def get_bitstream_uris(self, item_identifier: str) -> list[str]:
"""Get bitstreams for an item submission according to the workflow subclass.

MUST be overridden by workflow subclasses.

Args:
item_identifier: The identifier used for locating the item's bitstreams.
"""

@abstractmethod
def process_deposit_results(self) -> list[str]:
"""Process results generated by the deposit according to the workflow subclass.

MUST be overridden by workflow subclasses.
"""
68 changes: 68 additions & 0 deletions dsc/item_submission.py
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking some of the other file reorganizing and renaming into consideration -- and thanks BTW, it's feeling good to navigate around! -- I could envision this file called something more high level like items.py. If other classes ever made sense to add specific to items, it would be a natural place for it.

I don't have evidence or even a pointable philosophy to support it, but when the classname mirrors the filename, it feels a bit off. While we may not need more "item" type classes, it's tight coupling between file and class names.

Totally optional.

Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import json
import logging
from dataclasses import dataclass
from typing import Any

from dsc.utilities.aws.s3 import S3Client

logger = logging.getLogger(__name__)


@dataclass
class ItemSubmission:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think ItemSubmission would benefit from some kind of validate() method? I'm unsure offhand what that might look for, but I'm noticing now that we call generate_and_upload_dspace_metadata() in the for loop of the workflow and it might be helpful to check things look good before we fire it off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did have a valid_dspace_metadata method in wiley-deposits but I may want to rework it since that had specific field names to validate while these workflows may be less constrained. But yes, there should be some validation!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: If we find that dataclass module is the right fit for this module, I propose calling the proposed validate() method in a __post_init__ method!

"""A class to store the required values for a DSpace submission."""

source_metadata: dict[str, Any]
metadata_mapping: dict[str, Any]
s3_client: S3Client
bitstream_uris: list[str]
metadata_keyname: str
metadata_uri: str = ""

def generate_and_upload_dspace_metadata(self, bucket: str) -> None:
"""Generate DSpace metadata from the item's source metadata and upload it to S3.

Args:
bucket: The S3 bucket for uploading the item metadata file.
"""
dspace_metadata = self.create_dspace_metadata()
self.s3_client.put_file(
json.dumps(dspace_metadata),
bucket,
self.metadata_keyname,
)
metadata_uri = f"s3://{bucket}/{self.metadata_keyname}"
logger.info(f"Metadata uploaded to S3: {metadata_uri}")
self.metadata_uri = metadata_uri
Copy link
Contributor Author

@ehanson8 ehanson8 Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered exception handling here but as with wiley-deposits, I think it's better to crash the run on a ClientError than continue if we can't get a metadata file into S3


def create_dspace_metadata(self) -> dict[str, Any]:
"""Create DSpace metadata from the item's source metadata."""
metadata_entries = []
for field_name, field_mapping in self.metadata_mapping.items():
if field_name not in ["item_identifier", "source_system_identifier"]:

field_value = self.source_metadata.get(field_mapping["source_field_name"])
if field_value:
delimiter = field_mapping["delimiter"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah-ha! Related to my comment above about metadata_mapping, I see now that each entry in that is an object itself, which can provide more information for this mapping.

Request: can you add an example of what this metadata mapping might look like? With an example, I think I would understand that a single dictionary could suffice.

language = field_mapping["language"]
if delimiter:
metadata_entries.extend(
[
{
"key": field_name,
"value": value,
"language": language,
}
for value in field_value.split(delimiter)
]
)
else:
metadata_entries.append(
{
"key": field_name,
"value": field_value,
"language": language,
}
)

return {"metadata": metadata_entries}
Loading
Loading