From 9c2951fc4e1b95021cb11df62f15666c927ae199 Mon Sep 17 00:00:00 2001 From: James Stevenson Date: Mon, 20 Nov 2023 20:19:29 -0500 Subject: [PATCH] enhancement!: break CLI update methods into reusable module --- src/gene/cli.py | 200 +++++++--------------------------------- src/gene/etl/update.py | 202 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 236 insertions(+), 166 deletions(-) create mode 100644 src/gene/etl/update.py diff --git a/src/gene/cli.py b/src/gene/cli.py index ceb3e911..a2aa37aa 100644 --- a/src/gene/cli.py +++ b/src/gene/cli.py @@ -2,19 +2,16 @@ import logging import os from pathlib import Path -from timeit import default_timer as timer -from typing import Collection, List, Optional, Set +from typing import Optional import click from gene import SOURCES from gene.database import ( - AbstractDatabase, - DatabaseReadException, - DatabaseWriteException, create_db, ) from gene.database.database import DatabaseException +from gene.etl.update import update_all_sources, update_normalized, update_source from gene.schemas import SourceName logger = logging.getLogger("gene") @@ -107,146 +104,6 @@ def dump_database(output_directory: Path, db_url: str) -> None: click.get_current_context().exit(1) -def _update_normalizer( - sources: Collection[SourceName], - db: AbstractDatabase, - update_merged: bool, - use_existing: bool, -) -> None: - """Update selected normalizer sources. - - :param sources: names of sources to update - :param db: database instance - :param update_merged: if true, retain processed records to use in updating merged - records - :param use_existing: if True, use most recent local version of source data instead of - fetching from remote - """ - processed_ids = list() - for n in sources: - delete_time = _delete_source(n, db) - _load_source(n, db, delete_time, processed_ids, use_existing) - - if update_merged: - _load_merge(db, processed_ids) - - -def _delete_source(n: SourceName, db: AbstractDatabase) -> float: - """Delete individual source data. - - :param n: name of source to delete - :param db: database instance - :return: time taken (in seconds) to delete - """ - msg = f"Deleting {n.value}..." - click.echo(f"\n{msg}") - logger.info(msg) - start_delete = timer() - db.delete_source(n) - end_delete = timer() - delete_time = end_delete - start_delete - msg = f"Deleted {n.value} in {delete_time:.5f} seconds." - click.echo(f"{msg}\n") - logger.info(msg) - return delete_time - - -_etl_dependency_help = "Are ETL dependencies installed? See the Installation page in the documentation for more info." - - -def _load_source( - n: SourceName, - db: AbstractDatabase, - delete_time: float, - processed_ids: List[str], - use_existing: bool, -) -> None: - """Load individual source data. - - :param n: name of source - :param db: database instance - :param delete_time: time taken (in seconds) to run deletion - :param processed_ids: in-progress list of processed gene IDs - :param use_existing: if True, use most recent local data files instead of - fetching from remote - """ - msg = f"Loading {n.value}..." - click.echo(msg) - logger.info(msg) - start_load = timer() - - # used to get source class name from string - try: - from gene.etl import HGNC, NCBI, Ensembl # noqa: F401 - from gene.etl.exceptions import GeneNormalizerEtlError - except ModuleNotFoundError as e: - click.echo( - f"Encountered ModuleNotFoundError attempting to import {e.name}. {_etl_dependency_help}" - ) - click.get_current_context().exit() - SourceClass = eval(n.value) # noqa: N806 - - source = SourceClass(database=db) - try: - processed_ids += source.perform_etl(use_existing) - except GeneNormalizerEtlError as e: - logger.error(e) - click.echo(f"Encountered error while loading {n}: {e}.") - click.get_current_context().exit() - end_load = timer() - load_time = end_load - start_load - msg = f"Loaded {n.value} in {load_time:.5f} seconds." - click.echo(msg) - logger.info(msg) - msg = f"Total time for {n.value}: {(delete_time + load_time):.5f} seconds." - click.echo(msg) - logger.info(msg) - - -def _delete_normalized_data(database: AbstractDatabase) -> None: - """Delete normalized concepts - - :param database: DB instance - """ - click.echo("\nDeleting normalized records...") - start_delete = timer() - try: - database.delete_normalized_concepts() - except (DatabaseReadException, DatabaseWriteException) as e: - click.echo(f"Encountered exception during normalized data deletion: {e}") - end_delete = timer() - delete_time = end_delete - start_delete - click.echo(f"Deleted normalized records in {delete_time:.5f} seconds.") - - -def _load_merge(db: AbstractDatabase, processed_ids: Set[str]) -> None: - """Load merged concepts - - :param db: database instance - :param processed_ids: in-progress list of processed gene IDs - """ - start = timer() - _delete_normalized_data(db) - if not processed_ids: - processed_ids = db.get_all_concept_ids() - - try: - from gene.etl.merge import Merge - except ModuleNotFoundError as e: - click.echo( - f"Encountered ModuleNotFoundError attempting to import {e.name}. {_etl_dependency_help}" - ) - click.get_current_context().exit() - - merge = Merge(database=db) - click.echo("Constructing normalized records...") - merge.create_merged_concepts(processed_ids) - end = timer() - click.echo( - f"Merged concept generation completed in " f"{(end - start):.5f} seconds" - ) - - @click.command() @click.option("--sources", help="The source(s) you wish to update separated by spaces.") @click.option("--aws_instance", is_flag=True, help="Using AWS DynamodDB instance.") @@ -279,40 +136,51 @@ def update_normalizer_db( See the documentation for more exhaustive information. \f - :param sources: names of sources to update, comma-separated + :param sources: names of sources to update, space-separated (see example above) :param aws_instance: if true, use cloud instance :param db_url: URI pointing to database - :param update_all: if true, update all sources (ignore `normalizer` parameter) + :param update_all: if true, update all sources (ignore ``sources`` argument) :param update_merged: if true, update normalized records :param use_existing: if True, use most recent local data instead of fetching latest version """ # noqa: D301 + if (not sources) and (not update_all) and (not update_merged): + click.echo( + "Must select at least one of {``--sources``, ``--update_all``, ``--update_merged``}" + ) + ctx = click.get_current_context() + click.echo(ctx.get_help()) + ctx.exit(1) + db = create_db(db_url, aws_instance) + processed_ids = None if update_all: - _update_normalizer(list(SourceName), db, update_merged, use_existing) - elif not sources: - if update_merged: - _load_merge(db, set()) - else: - ctx = click.get_current_context() + processed_ids = update_all_sources(db, use_existing, silent=False) + elif sources: + raw_source_names = sources.lower().strip().split() + if len(raw_source_names) == 0: click.echo( - "Must either enter 1 or more sources, or use `--update_all` parameter" - ) # noqa: E501 + "Error: must provide source names argument to ``--sources``. See example for more information." + ) + ctx = click.get_current_context() click.echo(ctx.get_help()) - ctx.exit() - else: - sources_split = sources.lower().split() - - if len(sources_split) == 0: - raise Exception("Must enter 1 or more source names to update") - - non_sources = set(sources_split) - set(SOURCES) + ctx.exit(1) + non_sources = set(raw_source_names) - set(SOURCES) if len(non_sources) != 0: - raise Exception(f"Not valid source(s): {non_sources}") + click.echo(f"Error: unrecognized sources: {non_sources}") + click.echo(f"Valid source options are {list(SourceName)}") + click.get_current_context().exit(1) + + parsed_source_names = {SourceName(SOURCES[s]) for s in raw_source_names} + processed_ids = set() + for source_name in parsed_source_names: + processed_ids |= update_source( + source_name, db, use_existing=use_existing, silent=False + ) - parsed_source_names = {SourceName(SOURCES[s]) for s in sources_split} - _update_normalizer(parsed_source_names, db, update_merged, use_existing) + if update_merged: + update_normalized(db, processed_ids, silent=False) if __name__ == "__main__": diff --git a/src/gene/etl/update.py b/src/gene/etl/update.py new file mode 100644 index 00000000..692eaca8 --- /dev/null +++ b/src/gene/etl/update.py @@ -0,0 +1,202 @@ +"""Provide functions to perform Gene Normalizer updates.""" +import logging +from timeit import default_timer as timer +from typing import Optional, Set, Tuple + +import click + +from gene.database.database import ( + AbstractDatabase, + DatabaseReadException, + DatabaseWriteException, +) +from gene.schemas import SourceName + +_logger = logging.getLogger(__name__) + + +def delete_source( + source: SourceName, db: AbstractDatabase, silent: bool = True +) -> float: + """Delete all data for an individual source + + :param source: name of source to delete data for + :param db: database instance + :param silent: if True, suppress console output + :return: time spent deleting source + """ + msg = f"Deleting {source.value}..." + if not silent: + click.echo(f"\n{msg}") + _logger.info(msg) + start_delete = timer() + db.delete_source(source) + end_delete = timer() + delete_time = end_delete - start_delete + msg = f"Deleted {source.value} in {delete_time:.5f} seconds." + if not silent: + click.echo(f"{msg}\n") + _logger.info(msg) + return delete_time + + +_etl_dependency_help = "Are ETL dependencies installed? See the Installation page in the documentation for more info." + + +def load_source( + source: SourceName, db: AbstractDatabase, use_existing: bool, silent: bool = True +) -> Tuple[float, Set[str]]: + """Load data for an individual source. + + :param source: name of source to load data for + :param db: database instance + :param use_existing: if True, use latest available version of local data + :param silent: if True, suppress console output + :return: time spent loading data, and set of processed IDs from that source + """ + msg = f"Loading {source.value}..." + if not silent: + click.echo(msg) + _logger.info(msg) + start_load = timer() + + # used to get source class name from string + try: + from gene.etl import HGNC, NCBI, Ensembl # noqa: F401 + from gene.etl.exceptions import GeneNormalizerEtlError + except ModuleNotFoundError as e: + click.echo( + f"Encountered ModuleNotFoundError attempting to import {e.name}. {_etl_dependency_help}" + ) + click.get_current_context().exit() + sources_table = { + SourceName.HGNC: HGNC, + SourceName.ENSEMBL: Ensembl, + SourceName.NCBI: NCBI, + } + + source_class = sources_table[source](database=db) + try: + processed_ids = source_class.perform_etl(use_existing) + except GeneNormalizerEtlError as e: + _logger.error(e) + click.echo(f"Encountered error while loading {source}: {e}.") + click.get_current_context().exit() + end_load = timer() + load_time = end_load - start_load + msg = f"Loaded {source.value} in {load_time:.5f} seconds." + if not silent: + click.echo(msg) + _logger.info(msg) + + return (load_time, set(processed_ids)) + + +def update_source( + source: SourceName, db: AbstractDatabase, use_existing: bool, silent: bool = True +) -> Set[str]: + """Refresh data for an individual gene data source + + :param source: name of source to update + :param db: database instance + :param use_existing: if True, use latest available local data + :param silent: if True, suppress console output + :return: IDs for records created from source + """ + delete_time = delete_source(source, db, silent) + load_time, processed_ids = load_source(source, db, use_existing, silent) + msg = f"Total time for {source.value}: {(delete_time + load_time):.5f} seconds." + if not silent: + click.echo(msg) + _logger.info(msg) + return processed_ids + + +def update_all_sources( + db: AbstractDatabase, use_existing: bool, silent: bool = True +) -> Set[str]: + """Refresh data for all gene record sources + + :param db: database instance + :param use_existing: if True, use latest available local data for all sources + :param silent: if True, suppress console output + :return: IDs processed from all sources + """ + processed_ids = [] + for source in SourceName: + source_ids = update_source(source, db, use_existing, silent) + processed_ids.append(list(source_ids)) + return set(processed_ids) + + +def delete_normalized(database: AbstractDatabase, silent: bool = True) -> None: + """Delete normalized concepts + + :param database: DB instance + :param silent: if True, suppress console output + """ + msg = "\nDeleting normalized records..." + _logger.info(msg) + if not silent: + click.echo(msg) + start_delete = timer() + try: + database.delete_normalized_concepts() + except (DatabaseReadException, DatabaseWriteException) as e: + click.echo(f"Encountered exception during normalized data deletion: {e}") + raise e + end_delete = timer() + delete_time = end_delete - start_delete + msg = f"Deleted normalized records in {delete_time:.5f} seconds." + if not silent: + click.echo(msg) + _logger.info(msg) + + +def update_normalized( + db: AbstractDatabase, processed_ids: Optional[Set[str]], silent: bool = True +) -> None: + """Delete existing and update merged normalized records + + :param db: database instance + :param processed_ids: IDs to form normalized records from. Provide if available to + cut down on some potentially slow database calls. If unavailable, this method + will fetch all known IDs directly. + :param silent: if True, suppress console output + """ + start = timer() + delete_normalized(db, silent) + if not processed_ids: + processed_ids = db.get_all_concept_ids() + + try: + from gene.etl.merge import Merge + except ModuleNotFoundError as e: + msg = f"Encountered ModuleNotFoundError attempting to import {e.name}. {_etl_dependency_help}" + if not silent: + click.echo(msg) + _logger.error(msg) + click.get_current_context().exit() + + merge = Merge(database=db) + if not silent: + click.echo("Constructing normalized records...") + merge.create_merged_concepts(processed_ids) + end = timer() + msg = f"Merged concept generation completed in " f"{(end - start):.5f} seconds" + if not silent: + click.echo(msg) + _logger.info(msg) + + +def update_all_and_normalize( + db: AbstractDatabase, use_existing: bool, silent: bool = True +) -> None: + """Update all sources as well as normalized records. + + :param db: database instance + :param use_existing: if True, use latest local copy of data + :param silent: if True, suppress console output + """ + processed_ids = update_all_sources(db, use_existing, silent) + update_normalized(db, processed_ids, silent)