Skip to content

Commit

Permalink
refactor: use more pydantic models (#317)
Browse files Browse the repository at this point in the history
Co-authored-by: Kori Kuzma <korikuzma@gmail.com>
  • Loading branch information
jsstevenson and korikuzma authored Jan 2, 2024
1 parent 305359b commit 533af5c
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 136 deletions.
7 changes: 7 additions & 0 deletions src/gene/database/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import click
from boto3.dynamodb.conditions import Key
from botocore.exceptions import ClientError
from pydantic import BaseModel

from gene.database.database import (
AWS_ENV_VAR_NAME,
Expand Down Expand Up @@ -382,6 +383,12 @@ def add_record(self, record: Dict, src_name: SourceName) -> None:
label_and_type = f"{concept_id.lower()}##identity"
record["label_and_type"] = label_and_type
record["item_type"] = "identity"
for i, location in enumerate(record.get("locations", [])):
if isinstance(location, BaseModel):
record["locations"][i] = location.model_dump(
mode="json", exclude_none=True
)

try:
self.batch.put_item(Item=record)
except ClientError as e:
Expand Down
12 changes: 6 additions & 6 deletions src/gene/database/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,9 @@ def add_source_metadata(self, src_name: SourceName, meta: SourceMeta) -> None:
meta.version,
json.dumps(meta.data_url),
meta.rdp_url,
meta.data_license_attributes["non_commercial"],
meta.data_license_attributes["attribution"],
meta.data_license_attributes["share_alike"],
meta.data_license_attributes.non_commercial,
meta.data_license_attributes.attribution,
meta.data_license_attributes.share_alike,
meta.genome_assemblies,
],
)
Expand Down Expand Up @@ -566,10 +566,10 @@ def add_record(self, record: Dict, src_name: SourceName) -> None:
"""Add new record to database.
:param record: record to upload
:param src_name: name of source for record. Not used by PostgreSQL instance.
:param src_name: name of source for record.
"""
concept_id = record["concept_id"]
locations = [json.dumps(loc) for loc in record.get("locations", [])]
locations = [loc.model_dump_json() for loc in record.get("locations", [])]
if not locations:
locations = None
with self.conn.cursor() as cur:
Expand All @@ -578,7 +578,7 @@ def add_record(self, record: Dict, src_name: SourceName) -> None:
self._add_record_query,
[
concept_id,
record["src_name"],
src_name.value,
record.get("symbol_status"),
record.get("label"),
record.get("strand"),
Expand Down
42 changes: 18 additions & 24 deletions src/gene/etl/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from wags_tails import EnsemblData, HgncData, NcbiGeneData

from gene.database import AbstractDatabase
from gene.schemas import ITEM_TYPES, Gene, GeneSequenceLocation, MatchType, SourceName
from gene.schemas import ITEM_TYPES, Gene, MatchType, SourceName, StoredSequenceLocation

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -122,11 +122,6 @@ def _load_gene(self, gene: Dict) -> None:
except pydantic.ValidationError as e:
_logger.warning(f"Unable to load {gene} due to validation error: " f"{e}")
else:
concept_id = gene["concept_id"]
gene["label_and_type"] = f"{concept_id.lower()}##identity"
gene["src_name"] = self._src_name.value
gene["item_type"] = "identity"

for attr_type in ITEM_TYPES:
if attr_type in gene:
value = gene[attr_type]
Expand All @@ -137,7 +132,7 @@ def _load_gene(self, gene: Dict) -> None:
gene[attr_type] = list(set(value))

self._database.add_record(gene, self._src_name)
self._processed_ids.append(concept_id)
self._processed_ids.append(gene["concept_id"])

def get_seqrepo(self, seqrepo_dir: Path) -> SeqRepo:
"""Return SeqRepo instance if seqrepo_dir exists.
Expand Down Expand Up @@ -224,32 +219,31 @@ def _get_seq_id_aliases(self, seq_id: str) -> List[str]:
_logger.warning(f"SeqRepo raised KeyError: {e}")
return aliases

def _get_sequence_location(self, seq_id: str, gene: Feature, params: Dict) -> Dict:
"""Get a gene's GeneSequenceLocation.
def _build_sequence_location(
self, seq_id: str, gene: Feature, concept_id: str
) -> Optional[StoredSequenceLocation]:
"""Construct a sequence location for storing in a DB.
:param seq_id: The sequence ID.
:param gene: A gene from the source file.
:param params: The transformed gene record.
:return: A dictionary of a GA4GH VRS SequenceLocation, if seq_id alias found.
Else, empty dictionary
:param concept_id: record ID from source
:return: A storable SequenceLocation containing relevant params for returning a
VRS SequenceLocation, or None if unable to retrieve valid parameters
"""
location = {}
aliases = self._get_seq_id_aliases(seq_id)
if not aliases:
return location
if not aliases or gene.start is None or gene.end is None:
return None

sequence = aliases[0]

if gene.start != "." and gene.end != "." and sequence:
if 0 <= gene.start <= gene.end: # type: ignore
location = GeneSequenceLocation(
start=gene.start - 1, # type: ignore
end=gene.end, # type: ignore
if 0 <= gene.start <= gene.end:
return StoredSequenceLocation(
start=gene.start - 1,
end=gene.end,
sequence_id=sequence,
).model_dump() # type: ignore
)
else:
_logger.warning(
f"{params['concept_id']} has invalid interval:"
f"start={gene.start - 1} end={gene.end}"
) # type: ignore
return location
f"{concept_id} has invalid interval: start={gene.start - 1} end={gene.end}"
)
49 changes: 22 additions & 27 deletions src/gene/etl/ensembl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from gffutils.feature import Feature

from gene.etl.base import Base, GeneNormalizerEtlError
from gene.schemas import NamespacePrefix, SourceMeta, SourceName, Strand
from gene.schemas import (
DataLicenseAttributes,
NamespacePrefix,
SourceMeta,
Strand,
)

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -66,22 +71,23 @@ def _add_gene(self, f: Feature, accession_numbers: Dict) -> Dict:
:param accession_numbers: Accession numbers for each chromosome and scaffold
:return: A gene dictionary containing data if the ID attribute exists.
"""
gene = dict()
gene_params = dict()
if f.strand == "-":
gene["strand"] = Strand.REVERSE.value
gene_params["strand"] = Strand.REVERSE.value
elif f.strand == "+":
gene["strand"] = Strand.FORWARD.value
gene["src_name"] = SourceName.ENSEMBL.value
gene_params["strand"] = Strand.FORWARD.value

self._add_attributes(f, gene)
location = self._add_location(f, gene, accession_numbers)
self._add_attributes(f, gene_params)
location = self._build_sequence_location(
accession_numbers[f.seqid], f, gene_params["concept_id"]
)
if location:
gene["locations"] = [location]
gene_params["locations"] = [location]

gene["label_and_type"] = f"{gene['concept_id'].lower()}##identity"
gene["item_type"] = "identity"
gene_params["label_and_type"] = f"{gene_params['concept_id'].lower()}##identity"
gene_params["item_type"] = "identity"

return gene
return gene_params

def _add_attributes(self, f: Feature, gene: Dict) -> None:
"""Add concept_id, symbol, xrefs, and associated_with to a gene record.
Expand Down Expand Up @@ -132,17 +138,6 @@ def _add_attributes(self, f: Feature, gene: Dict) -> None:

gene[attributes[key]] = val

def _add_location(self, f: Feature, gene: Dict, accession_numbers: Dict) -> Dict:
"""Add GA4GH SequenceLocation to a gene record.
https://vr-spec.readthedocs.io/en/1.1/terms_and_model.html#sequencelocation
:param f: A gene from the data
:param gene: A transformed gene record
:param accession_numbers: Accession numbers for each chromosome and scaffold
:return: gene record dictionary with location added
"""
return self._get_sequence_location(accession_numbers[f.seqid], f, gene)

def _get_xref_associated_with(self, src_name: str, src_id: str) -> Dict:
"""Get xref or associated_with concept.
Expand Down Expand Up @@ -181,11 +176,11 @@ def _add_meta(self) -> None:
"genome_annotations": f"ftp://ftp.ensembl.org/pub/release-{self._version}/gff3/homo_sapiens/Homo_sapiens.{self._assembly}.{self._version}.gff3.gz"
},
rdp_url=None,
data_license_attributes={
"non_commercial": False,
"share_alike": False,
"attribution": False,
},
data_license_attributes=DataLicenseAttributes(
non_commercial=False,
share_alike=False,
attribution=False,
),
genome_assemblies=[self._assembly],
)

Expand Down
46 changes: 23 additions & 23 deletions src/gene/etl/hgnc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import json
import logging
import re
from typing import Dict
from typing import Dict, List

from gene.etl.base import Base, GeneNormalizerEtlError
from gene.schemas import (
PREFIX_LOOKUP,
Annotation,
Chromosome,
DataLicenseAttributes,
NamespacePrefix,
SourceMeta,
SourceName,
Expand All @@ -30,19 +31,16 @@ def _transform_data(self) -> None:
records = data["response"]["docs"]

for r in records:
gene = dict()
gene["concept_id"] = r["hgnc_id"].lower()
gene["label_and_type"] = f"{gene['concept_id']}##identity"
gene["item_type"] = "identity"
gene["symbol"] = r["symbol"]
gene["label"] = r["name"]
gene["src_name"] = SourceName.HGNC.value
gene = {
"concept_id": r["hgnc_id"].lower(),
"symbol": r["symbol"],
"label": r["name"],
}
if r["status"]:
if r["status"] == "Approved":
gene["symbol_status"] = SymbolStatus.APPROVED.value
elif r["status"] == "Entry Withdrawn":
gene["symbol_status"] = SymbolStatus.WITHDRAWN.value
gene["src_name"] = SourceName.HGNC.value

# store alias, xref, associated_with, prev_symbols, location
self._get_aliases(r, gene)
Expand Down Expand Up @@ -83,10 +81,10 @@ def _get_previous_symbols(self, r: Dict, gene: Dict) -> None:
if prev_symbols:
gene["previous_symbols"] = list(set(prev_symbols))

def _get_xrefs_associated_with(self, r: Dict, gene: Dict) -> None:
def _get_xrefs_associated_with(self, record: Dict, gene: Dict) -> None:
"""Store xrefs and/or associated_with refs in a gene record.
:param r: A gene record in the HGNC data file
:param record: A gene record in the HGNC data file
:param gene: A transformed gene record
"""
xrefs = list()
Expand Down Expand Up @@ -119,7 +117,7 @@ def _get_xrefs_associated_with(self, r: Dict, gene: Dict) -> None:
]

for src in sources:
if src in r:
if src in record:
if "-" in src:
key = src.split("-")[0]
elif "." in src:
Expand All @@ -131,9 +129,11 @@ def _get_xrefs_associated_with(self, r: Dict, gene: Dict) -> None:

if key.upper() in NamespacePrefix.__members__:
if NamespacePrefix[key.upper()].value in PREFIX_LOOKUP.keys():
self._get_xref_associated_with(key, src, r, xrefs)
self._get_xref_associated_with(key, src, record, xrefs)
else:
self._get_xref_associated_with(key, src, r, associated_with)
self._get_xref_associated_with(
key, src, record, associated_with
)
else:
_logger.warning(f"{key} not in schemas.py")

Expand All @@ -143,7 +143,7 @@ def _get_xrefs_associated_with(self, r: Dict, gene: Dict) -> None:
gene["associated_with"] = associated_with

def _get_xref_associated_with(
self, key: str, src: str, r: Dict, src_type: Dict
self, key: str, src: str, r: Dict, src_type: List[str]
) -> None:
"""Add an xref or associated_with ref to a gene record.
Expand Down Expand Up @@ -194,6 +194,8 @@ def _get_location(self, r: Dict, gene: Dict) -> None:
if not gene["location_annotations"]:
del gene["location_annotations"]

_annotation_types = {v.value for v in Annotation.__members__.values()}

def _set_annotation(self, loc: str, gene: Dict) -> None:
"""Set the annotations attribute if one is provided.
Return `True` if a location is provided, `False` otherwise.
Expand All @@ -202,9 +204,7 @@ def _set_annotation(self, loc: str, gene: Dict) -> None:
:param gene: in-progress gene record
:return: A bool whether or not a gene map location is provided
"""
annotations = {v.value for v in Annotation.__members__.values()}

for annotation in annotations:
for annotation in self._annotation_types:
if annotation in loc:
gene["location_annotations"].append(annotation)
# Check if location is also included
Expand Down Expand Up @@ -256,11 +256,11 @@ def _add_meta(self) -> None:
"complete_set_archive": "ftp.ebi.ac.uk/pub/databases/genenames/hgnc/json/hgnc_complete_set.json"
},
rdp_url=None,
data_license_attributes={
"non_commercial": False,
"share_alike": False,
"attribution": False,
},
data_license_attributes=DataLicenseAttributes(
non_commercial=False,
share_alike=False,
attribution=False,
),
genome_assemblies=[],
)
self._database.add_source_metadata(SourceName.HGNC, metadata)
Loading

0 comments on commit 533af5c

Please sign in to comment.