Skip to content

Commit

Permalink
enhancement!: break CLI update methods into reusable module
Browse files Browse the repository at this point in the history
  • Loading branch information
jsstevenson committed Nov 21, 2023
1 parent 98294ed commit 9c2951f
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 166 deletions.
200 changes: 34 additions & 166 deletions src/gene/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@
import logging
import os
from pathlib import Path
from timeit import default_timer as timer
from typing import Collection, List, Optional, Set
from typing import Optional

import click

from gene import SOURCES
from gene.database import (
AbstractDatabase,
DatabaseReadException,
DatabaseWriteException,
create_db,
)
from gene.database.database import DatabaseException
from gene.etl.update import update_all_sources, update_normalized, update_source
from gene.schemas import SourceName

logger = logging.getLogger("gene")
Expand Down Expand Up @@ -107,146 +104,6 @@ def dump_database(output_directory: Path, db_url: str) -> None:
click.get_current_context().exit(1)


def _update_normalizer(
sources: Collection[SourceName],
db: AbstractDatabase,
update_merged: bool,
use_existing: bool,
) -> None:
"""Update selected normalizer sources.
:param sources: names of sources to update
:param db: database instance
:param update_merged: if true, retain processed records to use in updating merged
records
:param use_existing: if True, use most recent local version of source data instead of
fetching from remote
"""
processed_ids = list()
for n in sources:
delete_time = _delete_source(n, db)
_load_source(n, db, delete_time, processed_ids, use_existing)

if update_merged:
_load_merge(db, processed_ids)


def _delete_source(n: SourceName, db: AbstractDatabase) -> float:
"""Delete individual source data.
:param n: name of source to delete
:param db: database instance
:return: time taken (in seconds) to delete
"""
msg = f"Deleting {n.value}..."
click.echo(f"\n{msg}")
logger.info(msg)
start_delete = timer()
db.delete_source(n)
end_delete = timer()
delete_time = end_delete - start_delete
msg = f"Deleted {n.value} in {delete_time:.5f} seconds."
click.echo(f"{msg}\n")
logger.info(msg)
return delete_time


_etl_dependency_help = "Are ETL dependencies installed? See the Installation page in the documentation for more info."


def _load_source(
n: SourceName,
db: AbstractDatabase,
delete_time: float,
processed_ids: List[str],
use_existing: bool,
) -> None:
"""Load individual source data.
:param n: name of source
:param db: database instance
:param delete_time: time taken (in seconds) to run deletion
:param processed_ids: in-progress list of processed gene IDs
:param use_existing: if True, use most recent local data files instead of
fetching from remote
"""
msg = f"Loading {n.value}..."
click.echo(msg)
logger.info(msg)
start_load = timer()

# used to get source class name from string
try:
from gene.etl import HGNC, NCBI, Ensembl # noqa: F401
from gene.etl.exceptions import GeneNormalizerEtlError
except ModuleNotFoundError as e:
click.echo(
f"Encountered ModuleNotFoundError attempting to import {e.name}. {_etl_dependency_help}"
)
click.get_current_context().exit()
SourceClass = eval(n.value) # noqa: N806

source = SourceClass(database=db)
try:
processed_ids += source.perform_etl(use_existing)
except GeneNormalizerEtlError as e:
logger.error(e)
click.echo(f"Encountered error while loading {n}: {e}.")
click.get_current_context().exit()
end_load = timer()
load_time = end_load - start_load
msg = f"Loaded {n.value} in {load_time:.5f} seconds."
click.echo(msg)
logger.info(msg)
msg = f"Total time for {n.value}: {(delete_time + load_time):.5f} seconds."
click.echo(msg)
logger.info(msg)


def _delete_normalized_data(database: AbstractDatabase) -> None:
"""Delete normalized concepts
:param database: DB instance
"""
click.echo("\nDeleting normalized records...")
start_delete = timer()
try:
database.delete_normalized_concepts()
except (DatabaseReadException, DatabaseWriteException) as e:
click.echo(f"Encountered exception during normalized data deletion: {e}")
end_delete = timer()
delete_time = end_delete - start_delete
click.echo(f"Deleted normalized records in {delete_time:.5f} seconds.")


def _load_merge(db: AbstractDatabase, processed_ids: Set[str]) -> None:
"""Load merged concepts
:param db: database instance
:param processed_ids: in-progress list of processed gene IDs
"""
start = timer()
_delete_normalized_data(db)
if not processed_ids:
processed_ids = db.get_all_concept_ids()

try:
from gene.etl.merge import Merge
except ModuleNotFoundError as e:
click.echo(
f"Encountered ModuleNotFoundError attempting to import {e.name}. {_etl_dependency_help}"
)
click.get_current_context().exit()

merge = Merge(database=db)
click.echo("Constructing normalized records...")
merge.create_merged_concepts(processed_ids)
end = timer()
click.echo(
f"Merged concept generation completed in " f"{(end - start):.5f} seconds"
)


@click.command()
@click.option("--sources", help="The source(s) you wish to update separated by spaces.")
@click.option("--aws_instance", is_flag=True, help="Using AWS DynamodDB instance.")
Expand Down Expand Up @@ -279,40 +136,51 @@ def update_normalizer_db(
See the documentation for more exhaustive information.
\f
:param sources: names of sources to update, comma-separated
:param sources: names of sources to update, space-separated (see example above)
:param aws_instance: if true, use cloud instance
:param db_url: URI pointing to database
:param update_all: if true, update all sources (ignore `normalizer` parameter)
:param update_all: if true, update all sources (ignore ``sources`` argument)
:param update_merged: if true, update normalized records
:param use_existing: if True, use most recent local data instead of fetching latest version
""" # noqa: D301
if (not sources) and (not update_all) and (not update_merged):
click.echo(
"Must select at least one of {``--sources``, ``--update_all``, ``--update_merged``}"
)
ctx = click.get_current_context()
click.echo(ctx.get_help())
ctx.exit(1)

db = create_db(db_url, aws_instance)

processed_ids = None
if update_all:
_update_normalizer(list(SourceName), db, update_merged, use_existing)
elif not sources:
if update_merged:
_load_merge(db, set())
else:
ctx = click.get_current_context()
processed_ids = update_all_sources(db, use_existing, silent=False)
elif sources:
raw_source_names = sources.lower().strip().split()
if len(raw_source_names) == 0:
click.echo(
"Must either enter 1 or more sources, or use `--update_all` parameter"
) # noqa: E501
"Error: must provide source names argument to ``--sources``. See example for more information."
)
ctx = click.get_current_context()
click.echo(ctx.get_help())
ctx.exit()
else:
sources_split = sources.lower().split()

if len(sources_split) == 0:
raise Exception("Must enter 1 or more source names to update")

non_sources = set(sources_split) - set(SOURCES)
ctx.exit(1)

non_sources = set(raw_source_names) - set(SOURCES)
if len(non_sources) != 0:
raise Exception(f"Not valid source(s): {non_sources}")
click.echo(f"Error: unrecognized sources: {non_sources}")
click.echo(f"Valid source options are {list(SourceName)}")
click.get_current_context().exit(1)

parsed_source_names = {SourceName(SOURCES[s]) for s in raw_source_names}
processed_ids = set()
for source_name in parsed_source_names:
processed_ids |= update_source(
source_name, db, use_existing=use_existing, silent=False
)

parsed_source_names = {SourceName(SOURCES[s]) for s in sources_split}
_update_normalizer(parsed_source_names, db, update_merged, use_existing)
if update_merged:
update_normalized(db, processed_ids, silent=False)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 9c2951f

Please sign in to comment.