Skip to content

Commit

Permalink
wip: stash progress
Browse files Browse the repository at this point in the history
  • Loading branch information
jsstevenson committed Oct 20, 2023
1 parent 1c18396 commit 66f1bda
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 31 deletions.
17 changes: 8 additions & 9 deletions src/gene/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from pathlib import Path
from timeit import default_timer as timer
from typing import Collection, List, Optional, Set
from typing import Collection, Optional, Set

import click

Expand Down Expand Up @@ -124,8 +124,8 @@ def _update_normalizer(
"""
processed_ids = list()
for n in sources:
delete_time = _delete_source(n, db)
_load_source(n, db, delete_time, processed_ids, use_existing)
# delete_time = _delete_source(n, db)
_load_source(n, db, processed_ids, use_existing)

if update_merged:
_load_merge(db, processed_ids)
Expand Down Expand Up @@ -157,15 +157,13 @@ def _delete_source(n: SourceName, db: AbstractDatabase) -> float:
def _load_source(
n: SourceName,
db: AbstractDatabase,
delete_time: float,
processed_ids: List[str],
processed_ids: Set[str],
use_existing: bool,
) -> None:
"""Load individual source data.
:param n: name of source
:param db: database instance
:param delete_time: time taken (in seconds) to run deletion
:param processed_ids: in-progress list of processed gene IDs
:param use_existing: if True, use most recent local data files instead of
fetching from remote
Expand All @@ -187,20 +185,21 @@ def _load_source(
SourceClass = eval(n.value) # noqa: N806

source = SourceClass(database=db)
previous_concept_ids = db.get_all_concept_ids(n)
try:
processed_ids += source.perform_etl(use_existing)
except GeneNormalizerEtlError as e:
logger.error(e)
click.echo(f"Encountered error while loading {n}: {e}.")
click.get_current_context().exit()
for concept_id in previous_concept_ids - processed_ids:
db.delete_record(concept_id)

end_load = timer()
load_time = end_load - start_load
msg = f"Loaded {n.value} in {load_time:.5f} seconds."
click.echo(msg)
logger.info(msg)
msg = f"Total time for {n.value}: {(delete_time + load_time):.5f} seconds."
click.echo(msg)
logger.info(msg)


def _delete_normalized_data(database: AbstractDatabase) -> None:
Expand Down
42 changes: 40 additions & 2 deletions src/gene/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,20 @@ def get_refs_by_type(self, search_term: str, ref_type: RefType) -> List[str]:
"""

@abc.abstractmethod
def get_all_concept_ids(self) -> Set[str]:
def get_all_concept_ids(self, src_name: Optional[SourceName] = None) -> Set[str]:
"""Retrieve all available concept IDs for use in generating normalized records.
:return: List of concept IDs as strings.
:param src_name: if given, only return concept IDs provided by this source
:return: Set of concept IDs as strings.
"""

@abc.abstractmethod
def get_all_normalized_ids(self, merged_only: bool = False) -> Set[str]:
"""Retrieve all normalized concept IDs.
:param merged_only: if True, only return concept IDs for normalized concepts
which consist of >1 source records
:return: Set of normalized IDs
"""

@abc.abstractmethod
Expand Down Expand Up @@ -186,6 +196,25 @@ def add_merged_record(self, record: Dict) -> None:
:param record: merged record to add
"""

@abc.abstractmethod
def update_source_record(self, concept_id: str, record: Dict) -> None:
"""Replace source record located under ``concept_id`` with ``record``.
:param concept_id: record concept ID
:param record: new record version
"""

@abc.abstractmethod
def update_merged_record(self, concept_id: str, record: Dict) -> None:
"""Replace normalized record located under ``concept_id`` with ``record``.
Should be a merged record distinct from an individual source record -- i.e.,
must contain more than one source records. This check should occur before
calling this method.
:param concept_id: record concept ID
:param record: new record version
"""

@abc.abstractmethod
def update_merge_ref(self, concept_id: str, merge_ref: Any) -> None: # noqa: ANN401
"""Update the merged record reference of an individual record to a new value.
Expand All @@ -195,6 +224,15 @@ def update_merge_ref(self, concept_id: str, merge_ref: Any) -> None: # noqa: AN
:raise DatabaseWriteException: if attempting to update non-existent record
"""

@abc.abstractmethod
def delete_record(self, concept_id: str, normalized: bool = False) -> None:
"""Remove an individual record and associated references from the DB.
:param concept_id: ID of record to delete
:param normalized: if True, remove the merged, normalized version of this record
:raise DatabaseWriteException: if deletion call fails
"""

@abc.abstractmethod
def delete_normalized_concepts(self) -> None:
"""Remove merged records from the database. Use when performing a new update
Expand Down
127 changes: 107 additions & 20 deletions src/gene/database/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,20 +440,51 @@ def get_refs_by_type(self, search_term: str, ref_type: RefType) -> List[str]:
return []

_ids_query = b"SELECT concept_id FROM gene_concepts;"
_ids_query_filtered = b"SELECT concept_id FROM gene_concepts gc LEFT JOIN gene_sources s ON s.name = gc.source WHERE name = %s;"

def get_all_concept_ids(self) -> Set[str]:
"""Retrieve concept IDs for use in generating normalized records.
def get_all_concept_ids(self, src_name: Optional[SourceName] = None) -> Set[str]:
"""Retrieve all available concept IDs for use in generating normalized records.
:param src_name: if given, only return concept IDs provided by this source
:return: Set of concept IDs as strings.
"""
if src_name:
with self.conn.cursor() as cur:
cur.execute(self._ids_query_filtered, [src_name.value])
ids_tuple = cur.fetchall()
else:
with self.conn.cursor() as cur:
cur.execute(self._ids_query)
ids_tuple = cur.fetchall()
return {i[0] for i in ids_tuple}

_normalized_ids_merged_query = b"SELECT concept_id FROM gene_merged;"
_normalized_ids_unmerged_query = (
b"SELECT concept_id FROM gene_concepts WHERE merge_ref IS NOT NULL;"
)

def get_all_normalized_ids(self, merged_only: bool = False) -> Set[str]:
"""Retrieve all normalized concept IDs.
:param merged_only: if True, only return concept IDs for normalized concepts
which consist of >1 source records
:return: Set of normalized IDs
"""
with self.conn.cursor() as cur:
cur.execute(self._ids_query)
cur.execute(self._normalized_ids_merged_query)
ids_tuple = cur.fetchall()
return {i[0] for i in ids_tuple}
normalized_ids = {i[0] for i in ids_tuple}
if not merged_only:
with self.conn.cursor() as cur:
cur.execute(self._normalized_ids_unmerged_query)
ids_tuple = cur.fetchall()
unmerged_ids = {i[0] for i in ids_tuple}
normalized_ids.update(unmerged_ids)
return normalized_ids

_get_all_normalized_records_query = b"SELECT * FROM gene_merged;"
_get_all_unmerged_source_records_query = (
b"SELECT * FROM record_lookup_view WHERE merge_ref IS NULL;" # noqa: E501
b"SELECT * FROM record_lookup_view WHERE merge_ref IS NULL;"
)
_get_all_source_records_query = b"SELECT * FROM record_lookup_view;"

Expand All @@ -472,7 +503,7 @@ def get_all_records(self, record_type: RecordType) -> Generator[Dict, None, None
Unlike DynamoDB, merged records are stored in a separate table from source
records. As a result, when fetching all normalized records, merged records are
return first, and iteration continues with all source records that don't
returned first, and iteration continues with all source records that don't
belong to a normalized concept group.
:param record_type: type of result to return
Expand Down Expand Up @@ -641,6 +672,32 @@ def add_merged_record(self, record: Dict) -> None:
)
self.conn.commit()

def update_source_record(
self, concept_id: str, record: Dict, src_name: SourceName
) -> None:
"""Replace source record located under ``concept_id`` with ``record``.
In the future, consider a more elegant way of doing this than deleting/recreating
the whole thing.
:param concept_id: record concept ID
:param record: new record version
:param src_name: name of source
"""
self.delete_record(concept_id)
self.add_record(record, src_name)

def update_merged_record(self, concept_id: str, record: Dict) -> None:
"""Replace normalized record located under ``concept_id`` with ``record``.
Should be a merged record distinct from an individual source record -- i.e.,
must contain more than one source records. This check should occur before
calling this method.
:param concept_id: record concept ID
:param record: new record version
"""
# TODO

_update_merge_ref_query = b"""
UPDATE gene_concepts
SET merge_ref = %(merge_ref)s
Expand Down Expand Up @@ -668,6 +725,36 @@ def update_merge_ref(self, concept_id: str, merge_ref: Any) -> None: # noqa: AN
f"No such record exists for primary key {concept_id}"
)

_record_drop_aliases_query = b"DELETE FROM gene_aliases WHERE concept_id = %s;"
_record_drop_associations_query = (
b"DELETE FROM gene_associations WHERE concept_id = %s;"
)
_record_drop_prev_symbols_query = (
b"DELETE FROM gene_previous_symbols WHERE concept_id = %s;"
)
_record_drop_symbol_query = b"DELETE FROM gene_symbols WHERE concept_id = %s;"
_record_drop_xrefs_query = b"DELETE FROM gene_xrefs WHERE concept_id = %s;"
_record_drop_concept_query = b"DELETE FROM gene_concepts WHERE concept_id = %s;"

def delete_record(self, concept_id: str, normalized: bool = False) -> None:
"""Remove an individual record and associated references from the DB.
TODO: how to invalidate normalized concept
TODO: use normalized param... API questions? throw error if not normalized?
:param concept_id: ID of record to delete
:param normalized: if True, remove the merged, normalized version of this record
:raise DatabaseWriteException: if deletion call fails
"""
with self.conn.cursor() as cur:
cur.execute(self._record_drop_aliases_query, [concept_id])
cur.execute(self._record_drop_associations_query, [concept_id])
cur.execute(self._record_drop_prev_symbols_query, [concept_id])
cur.execute(self._record_drop_symbol_query, [concept_id])
cur.execute(self._record_drop_xrefs_query, [concept_id])
cur.execute(self._record_drop_concept_query, [concept_id])
self.conn.commit()

def delete_normalized_concepts(self) -> None:
"""Remove merged records from the database. Use when performing a new update
of normalized data.
Expand All @@ -686,43 +773,43 @@ def delete_normalized_concepts(self) -> None:
cur.execute((SCRIPTS_DIR / "delete_normalized_concepts.sql").read_bytes())
self.conn.commit()

_drop_aliases_query = b"""
_src_drop_aliases_query = b"""
DELETE FROM gene_aliases WHERE id IN (
SELECT ga.id FROM gene_aliases ga LEFT JOIN gene_concepts gc
ON gc.concept_id = ga.concept_id
WHERE gc.source = %s
);
"""
_drop_associations_query = b"""
_src_drop_associations_query = b"""
DELETE FROM gene_associations WHERE id IN (
SELECT ga.id FROM gene_associations ga LEFT JOIN gene_concepts gc
ON gc.concept_id = ga.concept_id
WHERE gc.source = %s
);
"""
_drop_prev_symbols_query = b"""
_src_drop_prev_symbols_query = b"""
DELETE FROM gene_previous_symbols WHERE id IN (
SELECT gps.id FROM gene_previous_symbols gps LEFT JOIN gene_concepts gc
ON gc.concept_id = gps.concept_id
WHERE gc.source = %s
);
"""
_drop_symbols_query = b"""
_src_drop_symbols_query = b"""
DELETE FROM gene_symbols WHERE id IN (
SELECT gs.id FROM gene_symbols gs LEFT JOIN gene_concepts gc
ON gc.concept_id = gs.concept_id
WHERE gc.source = %s
);
"""
_drop_xrefs_query = b"""
_src_drop_xrefs_query = b"""
DELETE FROM gene_xrefs WHERE id IN (
SELECT gx.id FROM gene_xrefs gx LEFT JOIN gene_concepts gc
ON gc.concept_id = gx.concept_id
WHERE gc.source = %s
);
"""
_drop_concepts_query = b"DELETE FROM gene_concepts WHERE source = %s;"
_drop_source_query = b"DELETE FROM gene_sources gs WHERE gs.name = %s;"
_src_drop_concepts_query = b"DELETE FROM gene_concepts WHERE source = %s;"
_src_drop_source_query = b"DELETE FROM gene_sources gs WHERE gs.name = %s;"

def delete_source(self, src_name: SourceName) -> None:
"""Delete all data for a source. Use when updating source data.
Expand All @@ -740,17 +827,17 @@ def delete_source(self, src_name: SourceName) -> None:
:raise DatabaseWriteException: if deletion call fails
"""
with self.conn.cursor() as cur:
cur.execute(self._drop_aliases_query, [src_name.value])
cur.execute(self._drop_associations_query, [src_name.value])
cur.execute(self._drop_prev_symbols_query, [src_name.value])
cur.execute(self._drop_symbols_query, [src_name.value])
cur.execute(self._drop_xrefs_query, [src_name.value])
cur.execute(self._src_drop_aliases_query, [src_name.value])
cur.execute(self._src_drop_associations_query, [src_name.value])
cur.execute(self._src_drop_prev_symbols_query, [src_name.value])
cur.execute(self._src_drop_symbols_query, [src_name.value])
cur.execute(self._src_drop_xrefs_query, [src_name.value])
self._drop_fkeys()
self._drop_indexes()

with self.conn.cursor() as cur:
cur.execute(self._drop_concepts_query, [src_name.value])
cur.execute(self._drop_source_query, [src_name.value])
cur.execute(self._src_drop_concepts_query, [src_name.value])
cur.execute(self._src_drop_source_query, [src_name.value])
self.conn.commit()

self._add_fkeys()
Expand Down

0 comments on commit 66f1bda

Please sign in to comment.