Skip to content

Commit

Permalink
Add SES and SQS client (#9)
Browse files Browse the repository at this point in the history
* Add SES and SQS client

Why these changes are being introduced:
* SES and SQS clients are needed for sending logs to stakeholders and interacting with the DSS input and output queues

How this addresses that need:
* Add SESClient and SQSClient classes with corresponding unit tests and fixtures for the methods
* Add exceptions module
* Update dependencies

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/IN-1099

* Updates based on discussion in PR # 9

* Shift AWS modules to utilities/aws
* Refactor SQSClient.__init__ method to use queue_url property
* Rename methods from valid_result_message, valid_sqs_message > validate_message for consistency and brevity
* Refactor validate_message method to avoid duplicate logging and add corresponding unit tests
* Update log messages to use f-strings

* Update dependencies

* More updates based on discussion in PR # 9

* Refactor SQSClient
validate methods for greater efficiency and update corresponding unit tests

* Update sqs.py

* Ignore EM102 & TRY003 to pyproject.toml
  • Loading branch information
ehanson8 authored Dec 10, 2024
1 parent 8eb29e2 commit 656bcbb
Show file tree
Hide file tree
Showing 11 changed files with 721 additions and 123 deletions.
242 changes: 121 additions & 121 deletions Pipfile.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions dsc/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class InvalidSQSMessageError(Exception):
pass
Empty file added dsc/utilities/__init__.py
Empty file.
Empty file added dsc/utilities/aws/__init__.py
Empty file.
File renamed without changes.
84 changes: 84 additions & 0 deletions dsc/utilities/aws/ses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from __future__ import annotations

import logging
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from typing import TYPE_CHECKING

from boto3 import client

if TYPE_CHECKING:
from mypy_boto3_ses.type_defs import SendRawEmailResponseTypeDef

logger = logging.getLogger(__name__)


class SESClient:
"""A class to perform common SES operations for this application."""

def __init__(self, region: str) -> None:
self.client = client("ses", region_name=region)

def create_and_send_email(
self,
subject: str,
attachment_content: str,
attachment_name: str,
source_email_address: str,
recipient_email_address: str,
) -> None:
"""Create an email message and send it via SES.
Args:
subject: The subject of the email.
attachment_content: The content of the email attachment.
attachment_name: The name of the email attachment.
source_email_address: The email address of the sender.
recipient_email_address: The email address of the receipient.
"""
message = self._create_email(subject, attachment_content, attachment_name)
self._send_email(source_email_address, recipient_email_address, message)
logger.debug(f"Logs sent to {recipient_email_address}")

def _create_email(
self,
subject: str,
attachment_content: str,
attachment_name: str,
) -> MIMEMultipart:
"""Create an email.
Args:
subject: The subject of the email.
attachment_content: The content of the email attachment.
attachment_name: The name of the email attachment.
"""
message = MIMEMultipart()
message["Subject"] = subject
attachment_object = MIMEApplication(attachment_content)
attachment_object.add_header(
"Content-Disposition", "attachment", filename=attachment_name
)
message.attach(attachment_object)
return message

def _send_email(
self,
source_email_address: str,
recipient_email_address: str,
message: MIMEMultipart,
) -> SendRawEmailResponseTypeDef:
"""Send email via SES.
Args:
source_email_address: The email address of the sender.
recipient_email_address: The email address of the receipient.
message: The message to be sent.
"""
return self.client.send_raw_email(
Source=source_email_address,
Destinations=[recipient_email_address],
RawMessage={
"Data": message.as_string(),
},
)
202 changes: 202 additions & 0 deletions dsc/utilities/aws/sqs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
from __future__ import annotations

import json
import logging
from typing import TYPE_CHECKING, Any

from boto3 import client

from dsc.exceptions import InvalidSQSMessageError

if TYPE_CHECKING:
from collections.abc import Iterator, Mapping

from mypy_boto3_sqs.type_defs import (
EmptyResponseMetadataTypeDef,
MessageAttributeValueTypeDef,
MessageTypeDef,
SendMessageResultTypeDef,
)

logger = logging.getLogger(__name__)


class SQSClient:
"""A class to perform common SQS operations for this application."""

def __init__(
self, region: str, queue_name: str, queue_url: str | None = None
) -> None:
self.client = client("sqs", region_name=region)
self.queue_name = queue_name
self._queue_url: str | None = queue_url

@property
def queue_url(self) -> str:
"""Property to provide QueueUrl, caching it for reuse."""
if not self._queue_url:
self._queue_url = self.get_queue_url()
return self._queue_url

def get_queue_url(self) -> str:
"""Get SQS queue URL from name."""
return self.client.get_queue_url(QueueName=self.queue_name)["QueueUrl"]

@staticmethod
def create_dss_message_attributes(
package_id: str, submission_source: str, output_queue: str
) -> dict[str, Any]:
"""Create attributes for a DSpace Submission Service message.
Args:
package_id: The PackageID field which is populated by the submission's
identifier.
submission_source: The source for the submission.
output_queue: The SQS output queue used for retrieving result messages.
"""
return {
"PackageID": {"DataType": "String", "StringValue": package_id},
"SubmissionSource": {"DataType": "String", "StringValue": submission_source},
"OutputQueue": {"DataType": "String", "StringValue": output_queue},
}

@staticmethod
def create_dss_message_body(
submission_system: str,
collection_handle: str,
metadata_s3_uri: str,
bitstream_file_name: str,
bitstream_s3_uri: str,
) -> str:
"""Create body for a DSpace Submission Service message.
Args:
submission_system: The system where the article is uploaded.
collection_handle: The handle of collection where the article is uploaded.
metadata_s3_uri: The S3 URI for the metadata JSON file.
bitstream_file_name: The file name for the article content which is uploaded as a
bitstream.
bitstream_s3_uri: The S3 URI for the article content file.
"""
return json.dumps(
{
"SubmissionSystem": submission_system,
"CollectionHandle": collection_handle,
"MetadataLocation": metadata_s3_uri,
"Files": [
{
"BitstreamName": bitstream_file_name,
"FileLocation": bitstream_s3_uri,
"BitstreamDescription": None,
}
],
}
)

def delete(self, receipt_handle: str) -> EmptyResponseMetadataTypeDef:
"""Delete message from SQS queue.
Args:
receipt_handle: The receipt handle of the message to be deleted.
"""
logger.debug("Deleting '{receipt_handle}' from SQS queue: {self.queue_name}")
response = self.client.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle,
)
logger.debug(f"Message deleted from SQS queue: {response}")

return response

def process_result_message(self, sqs_message: MessageTypeDef) -> tuple[str, str]:
"""Validate, extract data, and delete an SQS result message.
Args:
sqs_message: An SQS result message to be processed.
"""
self.validate_message(sqs_message)
identifier = sqs_message["MessageAttributes"]["PackageID"]["StringValue"]
message_body = json.loads(str(sqs_message["Body"]))
self.delete(sqs_message["ReceiptHandle"])
return identifier, message_body

def receive(self) -> Iterator[MessageTypeDef]:
"""Receive messages from SQS queue."""
logger.debug(f"Receiving messages from SQS queue: {self.queue_name}")
while True:
response = self.client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
MessageAttributeNames=["All"],
)
if "Messages" in response:
for message in response["Messages"]:
logger.debug(
f"Message retrieved from SQS queue {self.queue_name}: {message}"
)
yield message
else:
logger.debug(f"No more messages from SQS queue: {self.queue_name}")
break

def send(
self,
message_attributes: Mapping[str, MessageAttributeValueTypeDef],
message_body: str,
) -> SendMessageResultTypeDef:
"""Send message via SQS.
Args:
message_attributes: The attributes of the message to send.
message_body: The body of the message to send.
"""
logger.debug(f"Sending message to SQS queue: {self.queue_name}")
response = self.client.send_message(
QueueUrl=self.queue_url,
MessageAttributes=message_attributes,
MessageBody=str(message_body),
)
logger.debug(f"Response from SQS queue: {response}")
return response

def validate_message(self, sqs_message: MessageTypeDef) -> None:
"""Validate that an SQS message is formatted as expected.
Args:
sqs_message: An SQS message to be evaluated.
"""
if not sqs_message.get("ReceiptHandle"):
raise InvalidSQSMessageError(
f"Failed to retrieve 'ReceiptHandle' from message: {sqs_message}"
)
self.validate_message_attributes(sqs_message=sqs_message)
self.validate_message_body(sqs_message=sqs_message)

@staticmethod
def validate_message_attributes(sqs_message: MessageTypeDef) -> None:
"""Validate that "MessageAttributes" field is formatted as expected.
Args:
sqs_message: An SQS message to be evaluated.
"""
if (
"MessageAttributes" not in sqs_message
or "PackageID" not in sqs_message["MessageAttributes"]
or not sqs_message["MessageAttributes"]["PackageID"].get("StringValue")
):
raise InvalidSQSMessageError(
f"Failed to parse SQS message attributes: {sqs_message}"
)

@staticmethod
def validate_message_body(sqs_message: MessageTypeDef) -> None:
"""Validate that "Body" field is formatted as expected.
Args:
sqs_message: An SQS message to be evaluated.
"""
if "Body" not in sqs_message or not json.loads(str(sqs_message["Body"])):
raise InvalidSQSMessageError(
f"Failed to parse SQS message body: {sqs_message}"
)
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ ignore = [
"D103",
"D104",
"D107",
"EM102",
"G004",
"N812",
"PLR0912",
"PLR0913",
"PLR0915",
"PTH",
"S320",
"S321",
"S321",
"TRY003",
]

# allow autofix behavior for specified rules
Expand Down
Loading

0 comments on commit 656bcbb

Please sign in to comment.