diff --git a/README.md b/README.md index 71fd9564..e7cf2dee 100644 --- a/README.md +++ b/README.md @@ -72,9 +72,7 @@ Next, initialize the [Variation Normalizer](https://github.com/cancervariants/va The MetaKB can acquire all other needed normalizer data, except for that of [OMIM](https://www.omim.org/downloads), which must be manually placed: ```sh -cd disease/ # starting from the site-packages dir of your virtual environment's Python instance -mkdir -p data/omim -cp ~/YOUR/PATH/TO/mimTitles.txt data/omim/omim_.tsv # replace with date of data acquisition formatted as YYYYMMDD +cp ~/YOUR/PATH/TO/mimTitles.txt ~/.local/share/wags_tails/omim/omim_.tsv # replace with date of data acquisition formatted as YYYYMMDD ``` ### Environment Variables @@ -94,24 +92,24 @@ MetaKB relies on environment variables to set in order to work. ``` * Required when using the `--load_normalizers_db` or `--force_load_normalizers_db` arguments in CLI commands - * `RXNORM_API_KEY` + * `UMLS_API_KEY` * Used in Therapy Normalizer to retrieve RxNorm data - * RxNorm requires a UMLS license, which you can register for one [here](https://www.nlm.nih.gov/research/umls/index.html). You must set the `RxNORM_API_KEY` environment variable to your API key. This can be found in the [UTS 'My Profile' area](https://uts.nlm.nih.gov/uts/profile) after singing in. + * RxNorm requires a UMLS license, which you can register for one [here](https://www.nlm.nih.gov/research/umls/index.html). You must set the `UMLS_API_KEY` environment variable to your API key. This can be found in the [UTS 'My Profile' area](https://uts.nlm.nih.gov/uts/profile) after singing in. Example: ```shell script - export RXNORM_API_KEY={rxnorm_api_key} + export UMLS_API_KEY={rxnorm_api_key} ``` - * `DATAVERSE_API_KEY` + * `HARVARD_DATAVERSE_API_KEY` * Used in Therapy Normalizer to retrieve HemOnc data - * HemOnc.org data requires a Harvard Dataverse API key. After creating a user account on the Harvard Dataverse website, you can follow [these instructions](https://guides.dataverse.org/en/latest/user/account.html) to generate a key. You will create or login to your account at [this](https://dataverse.harvard.edu/) site. You must set the `DATAVERSE_API_KEY` environment variable to your API key. + * HemOnc.org data requires a Harvard Dataverse API key. After creating a user account on the Harvard Dataverse website, you can follow [these instructions](https://guides.dataverse.org/en/latest/user/account.html) to generate a key. You will create or login to your account at [this](https://dataverse.harvard.edu/) site. You must set the `HARVARD_DATAVERSE_API_KEY` environment variable to your API key. Example: ```shell script - export DATAVERSE_API_KEY={dataverse_api_key} + export HARVARD_DATAVERSE_API_KEY={dataverse_api_key} ``` ### Loading data diff --git a/src/metakb/cli.py b/src/metakb/cli.py index 30a34084..3314c0a0 100644 --- a/src/metakb/cli.py +++ b/src/metakb/cli.py @@ -7,401 +7,361 @@ from os import environ from pathlib import Path from timeit import default_timer as timer -from typing import Optional, Set, Union +from typing import Optional from zipfile import ZipFile import asyncclick as click import boto3 from boto3.exceptions import ResourceLoadException from botocore.config import Config -from disease.cli import CLI as DiseaseCLI # noqa: N811 -from disease.database import Database as DiseaseDatabase -from disease.schemas import SourceName as DiseaseSources -from gene.cli import CLI as GeneCLI # noqa: N811 -from gene.database import Database as GeneDatabase -from gene.schemas import SourceName as GeneSources -from therapy.cli import CLI as TherapyCLI # noqa: N811 -from therapy.database import Database as TherapyDatabase -from therapy.schemas import SourceName as TherapySources +from disease.cli import update_db as update_normalizer_disease_db +from gene.cli import update_normalizer_db as update_normalizer_gene_db +from therapy.cli import update_normalizer_db as update_normalizer_therapy_db from metakb import APP_ROOT from metakb.database import Graph -from metakb.harvesters import CivicHarvester, Harvester, MoaHarvester +from metakb.harvesters.civic import CivicHarvester +from metakb.harvesters.moa import MoaHarvester from metakb.schemas.app import SourceName -from metakb.transform import CivicTransform, MoaTransform, Transform +from metakb.transform import CivicTransform, MoaTransform -logger = logging.getLogger("metakb.cli") -logger.setLevel(logging.DEBUG) +logger = logging.getLogger(__name__) def echo_info(msg: str) -> None: """Log (as INFO) and echo given message. - :param str msg: message to emit + + :param msg: message to emit """ click.echo(msg) logger.info(msg) -class CLI: - """Update database.""" +@click.command() +@click.option( + "--db_url", + default="bolt://localhost:7687", + help=( + "URL endpoint for the application Neo4j database. Can also be provided via environment variable METAKB_DB_URL, which takes priority." + ), +) +@click.option( + "--db_username", + default="neo4j", + help=( + "Username to provide to application Neo4j database. Can also be provided via environment variable METAKB_DB_USERNAME, which takes priority." + ), +) +@click.option( + "--db_password", + default="password", + help=( + "Password to provide to application Neo4j database. Can also be provided via environment variable METAKB_DB_PASSWORD, which takes priority." + ), +) +@click.option( + "--force_load_normalizers_db", + "-f", + is_flag=True, + default=False, + help=("Load all normalizers data into DynamoDB database."), +) +@click.option( + "--normalizers_db_url", + help=( + "URL endpoint of normalizers DynamoDB database. If not given, defaults to `http://localhost:8000` per the configuration rules of the individual normalizers." + ), +) +@click.option( + "--load_latest_cdms", + "-l", + is_flag=True, + default=False, + help=( + "Clear MetaKB Neo4j database and load most recent available source CDM files. Does not run harvest and transform methods to generate new CDM files. Exclusive with --load_target_cdm and --load_latest_s3_cdms." + ), +) +@click.option( + "--load_target_cdm", + "-t", + type=click.Path(exists=True, dir_okay=False, readable=True, path_type=Path), + required=False, + help=( + "Load transformed CDM file at specified path. Exclusive with --load_latest_cdms and --load_latest_s3_cdms." + ), +) +@click.option( + "--load_latest_s3_cdms", + "-s", + is_flag=True, + default=False, + required=False, + help=( + "Clear MetaKB database, retrieve most recent data available from public VICC S3 bucket, and load the database with retrieved data. Exclusive with --load_latest_cdms and load_target_cdm." + ), +) +@click.option( + "--update_cached", + "-u", + is_flag=True, + default=False, + required=False, + help=( + "`True` if civicpy cache should be updated. Note this will take several minutes. `False` if local cache should be used" + ), +) +async def update_metakb_db( + db_url: str, + db_username: str, + db_password: str, + force_load_normalizers_db: bool, + normalizers_db_url: str, + load_latest_cdms: bool, + load_target_cdm: Optional[Path], + load_latest_s3_cdms: bool, + update_cached: bool, +) -> None: + """Execute data harvest and transformation from resources and upload to graph + datastore. + + :param db_url: URL endpoint for the application Neo4j database. Can also be provided + via environment variable ``METAKB_DB_URL``, which takes priority. + :param db_username: Username to provide to application Neo4j database. Can also be + provided via environment variable ``METAKB_DB_USERNAME``, which takes priority. + :param db_password: Password to provide to application Neo4j database. Can also be + provided via environment variable ``METAKB_DB_PASSWORD``, which takes priority. + :param force_load_normalizers_db: Load all normalizers data into DynamoDB database. + :param normalizers_db_url: URL endpoint of normalizers DynamoDB database. If not + given, defaults to ``http://localhost:8000`` per the configuration rules of the + individual normalizers. + :param load_latest_cdms: Clear MetaKB Neo4j database and load most recent available + source CDM files. Does not run harvest and transform methods to generate new CDM + files. Exclusive with --load_target_cdm and --load_latest_s3_cdms. + :param load_target_cdm: Load transformed CDM file at specified path. Exclusive with + --load_latest_cdms and --load_latest_s3_cdms. + :param load_latest_s3_cdms: Clear MetaKB database, retrieve most recent data + available from public VICC S3 bucket, and load the database with retrieved data. + Exclusive with --load_latest_cdms and load_target_cdm. + :param update_cached: `True` if civicpy cache should be updated. Note this will take + several minutes. `False` if local cache should be used + """ + if sum([load_latest_cdms, bool(load_target_cdm), load_latest_s3_cdms]) > 1: + _help_msg( + "Error: Can only use one of `--load_latest_cdms`, `--load_target_cdm`, `--load_latest_s3_cdms`." + ) - @staticmethod - @click.command() - @click.option( - "--db_url", - help=( - "URL endpoint for the application Neo4j database. Can also be " - "provided via environment variable METAKB_DB_URL." - ), - ) - @click.option( - "--db_username", - help=( - "Username to provide to application database. Can also be " - "provided via environment variable METAKB_DB_USERNAME." - ), - ) - @click.option( - "--db_password", - help=( - "Password to provide to application database. Can also be " - "provided via environment variable METAKB_DB_PASSWORD." - ), - ) - @click.option( - "--load_normalizers_db", - "-i", - is_flag=True, - default=False, - help="Check normalizers database and load data if necessary.", - ) - @click.option( - "--force_load_normalizers_db", - "-f", - is_flag=True, - default=False, - help=( - "Load all normalizers data into database. Overrides " - "--load_normalizers_db if both are selected." - ), - ) - @click.option( - "--normalizers_db_url", - default="http://localhost:8000", - help=( - "URL endpoint of normalizers DynamoDB database. Set to " - "`http://localhost:8000` by default." - ), - ) - @click.option( - "--load_latest_cdms", - "-l", - is_flag=True, - default=False, - help=( - "Clear MetaKB database and load most recent available source " - "CDM files. Does not run harvest and transform methods to " - "generate new CDM files. Exclusive with --load_target_cdm and " - "--load_latest_s3_cdms." - ), - ) - @click.option( - "--load_target_cdm", - "-t", - type=click.Path(exists=True, dir_okay=False, readable=True, path_type=Path), - required=False, - help=( - "Load transformed CDM file at specified path. Exclusive with " - "--load_latest_cdms and --load_latest_s3_cdms." - ), - ) - @click.option( - "--load_latest_s3_cdms", - "-s", - is_flag=True, - default=False, - required=False, - help=( - "Clear MetaKB database, retrieve most recent data available " - "from VICC S3 bucket, and load the database with retrieved " - "data. Exclusive with --load_latest_cdms and load_target_cdm." - ), - ) - @click.option( - "--update_cached", - "-u", - is_flag=True, - default=False, - required=False, - help=( - "`True` if civicpy cache should be updated. Note this will take serveral" - "minutes. `False` if local cache should be used" - ), - ) - async def update_metakb_db( - db_url: str, - db_username: str, - db_password: str, - load_normalizers_db: bool, - force_load_normalizers_db: bool, - normalizers_db_url: str, - load_latest_cdms: bool, - load_target_cdm: Optional[Path], - load_latest_s3_cdms: bool, - update_cached: bool, - ) -> None: - """Execute data harvest and transformation from resources and upload - to graph datastore. - """ - if sum([load_latest_cdms, bool(load_target_cdm), load_latest_s3_cdms]) > 1: - CLI()._help_msg( - "Error: Can only use one of `--load_latest_cdms`, " - "`--load_target_cdm`, `--load_latest_s3_cdms`." + if not any([load_latest_cdms, load_target_cdm, load_latest_s3_cdms]): + if force_load_normalizers_db: + if normalizers_db_url: + for env_var_name in [ + "GENE_NORM_DB_URL", + "THERAPY_NORM_DB_URL", + "DISEASE_NORM_DB_URL", + ]: + environ[env_var_name] = normalizers_db_url + + _load_normalizers_db() + + _harvest_sources(update_cached) + await _transform_sources() + + # Load neo4j database + start = timer() + echo_info("Loading neo4j database...") + + g = Graph(uri=db_url, credentials=(db_username, db_password)) + + if load_target_cdm: + g.load_from_json(load_target_cdm) + else: + version = _retrieve_s3_cdms() if load_latest_s3_cdms else None + g.clear() + + for src in sorted({v.value for v in SourceName.__members__.values()}): + pattern = ( + f"{src}_cdm_{version}.json" + if version is not None + else f"{src}_cdm_*.json" ) + globbed = (APP_ROOT / "data" / src / "transform").glob(pattern) + + try: + path = sorted(globbed)[-1] + except IndexError as e: + msg = f"No valid transform file found matching pattern: {pattern}" + raise FileNotFoundError(msg) from e - db_url = CLI()._check_db_param(db_url, "URL") - db_username = CLI()._check_db_param(db_username, "username") - db_password = CLI()._check_db_param(db_password, "password") + g.load_from_json(path) - if normalizers_db_url: - for env_var_name in [ - "GENE_NORM_DB_URL", - "THERAPY_NORM_DB_URL", - "DISEASE_NORM_DB_URL", - ]: - environ[env_var_name] = normalizers_db_url + g.close() + end = timer() + echo_info(f"Successfully loaded neo4j database in {(end - start):.5f} s\n") - if not any([load_latest_cdms, load_target_cdm, load_latest_s3_cdms]): - if load_normalizers_db or force_load_normalizers_db: - CLI()._load_normalizers_db(force_load_normalizers_db) - CLI()._harvest_sources(update_cached) - await CLI()._transform_sources() +def _help_msg(msg: str = "") -> None: + """Handle invalid user input. - # Load neo4j database + :param msg: Error message to display to user. + """ + ctx = click.get_current_context() + logger.fatal(msg) + + if msg: + click.echo(msg) + else: + click.echo(ctx.get_help()) + + ctx.exit() + + +def _load_normalizers_db() -> None: + """Load normalizer DynamoDB database source data.""" + for name, update_normalizer_db_fn in [ + ("Disease", update_normalizer_disease_db), + ("Therapy", update_normalizer_therapy_db), + ("Gene", update_normalizer_gene_db), + ]: + _update_normalizer_db(name, update_normalizer_db_fn) + + echo_info("Normalizers database loaded.\n") + + +def _update_normalizer_db( + name: str, + update_normalizer_db_fn: callable, +) -> None: + """Update Normalizer DynamoDB database. + + :param name: Name of the normalizer + :param update_normalizer_db_fn: Function to update the normalizer DynamoDB database + """ + try: + echo_info(f"\nLoading {name} Normalizer data...") + update_normalizer_db_fn(["--update_all", "--update_merged"]) + echo_info(f"Successfully Loaded {name} Normalizer data.\n") + except SystemExit as e: + if e.code != 0: + raise e + + +def _harvest_sources(update_cached: bool) -> None: + """Run harvesting procedure for all sources. + + :param update_cached: `True` if civicpy cache should be updated. Note this will take + several minutes. `False` if local cache should be used + """ + echo_info("Harvesting sources...") + harvester_sources = { + SourceName.CIVIC.value: CivicHarvester, + SourceName.MOA.value: MoaHarvester, + } + total_start = timer() + + for source_str, source_class in harvester_sources.items(): + echo_info(f"Harvesting {source_str}...") start = timer() - echo_info("Loading neo4j database...") - g = Graph(uri=db_url, credentials=(db_username, db_password)) - if load_target_cdm: - g.load_from_json(load_target_cdm) + if source_str == SourceName.CIVIC.value and update_cached: + # Use latest civic data + echo_info("(civicpy cache is also being updated)") + source = source_class(update_cache=True, update_from_remote=False) else: - version = None - if load_latest_s3_cdms: - version = CLI()._retrieve_s3_cdms() - g.clear() - for src in sorted({v.value for v in SourceName.__members__.values()}): - if version is not None: - pattern = f"{src}_cdm_{version}.json" - else: - pattern = f"{src}_cdm_*.json" - globbed = (APP_ROOT / "data" / src / "transform").glob(pattern) - try: - path = sorted(globbed)[-1] - except IndexError as e: - msg = f"No valid transform file found matching pattern: {pattern}" - raise FileNotFoundError(msg) from e - g.load_from_json(path) - g.close() + source = source_class() + + source_successful = source.harvest() + end = timer() - echo_info(f"Successfully loaded neo4j database in {(end - start):.5f} s\n") - s3_cdm_pattern = re.compile(r"cdm/20[23]\d[01]\d[0123]\d/(.*)_cdm_(.*).json.zip") + if not source_successful: + echo_info(f"{source_str} harvest failed.") + click.get_current_context().exit() + + echo_info(f"{source_str} harvest finished in {(end - start):.5f} s") + + total_end = timer() + echo_info( + f"Successfully harvested all sources in {(total_end - total_start):.5f} s\n" + ) + + +async def _transform_sources() -> None: + """Run transformation procedure for all sources.""" + echo_info("Transforming harvested data to CDM...") + transform_sources = { + SourceName.CIVIC.value: CivicTransform, + SourceName.MOA.value: MoaTransform, + } + total_start = timer() - def _retrieve_s3_cdms(self) -> str: - """Retrieve most recent CDM files from VICC S3 bucket. Expects to find - files in a path like the following: - s3://vicc-metakb/cdm/20220201/civic_cdm_20220201.json.zip - :return: date string from retrieved files to use when loading to DB. - :raise: ResourceLoadException if S3 initialization fails - :raise: FileNotFoundError if unable to find files matching expected + for src_str, src_name in transform_sources.items(): + echo_info(f"Transforming {src_str}...") + start = timer() + source = src_name() + await source.transform() + end = timer() + echo_info(f"{src_str} transform finished in {(end - start):.5f} s.") + source.create_json() + + total_end = timer() + echo_info( + f"Successfully transformed all sources to CDM in " + f"{(total_end - total_start):.5f} s\n" + ) + + +def _retrieve_s3_cdms() -> str: + """Retrieve most recent CDM files from VICC S3 bucket. + Expects to find files in a path like the following: + s3://vicc-metakb/cdm/20220201/civic_cdm_20220201.json.zip + + :raise ResourceLoadException: if S3 initialization fails + :raise FileNotFoundError: if unable to find files matching expected pattern in VICC MetaKB bucket. - """ - echo_info("Attempting to fetch CDM files from S3 bucket") - s3 = boto3.resource("s3", config=Config(region_name="us-east-2")) - if not s3: - msg = "Unable to initiate AWS S3 Resource" - raise ResourceLoadException(msg) - bucket = sorted( # noqa: C414 - list(s3.Bucket("vicc-metakb").objects.filter(Prefix="cdm").all()), - key=lambda f: f.key, - reverse=True, - ) - newest_version: Optional[str] = None - for file in bucket: - match = re.match(self.s3_cdm_pattern, file.key) - if match: - source = match.group(1) - if newest_version is None: - newest_version = match.group(2) - elif match.group(2) != newest_version: - continue - else: - continue + :return: date string from retrieved files to use when loading to DB. + """ + echo_info("Attempting to fetch CDM files from S3 bucket") + s3 = boto3.resource("s3", config=Config(region_name="us-east-2")) - tmp_path = Path(tempfile.gettempdir()) / "metakb_dl_tmp" - with tmp_path.open("wb") as f: - file.Object().download_fileobj(f) - - cdm_dir = APP_ROOT / "data" / source / "transform" - cdm_zip = ZipFile(tmp_path, "r") - cdm_zip.extract(f"{source}_cdm_{newest_version}.json", cdm_dir) - - if newest_version is None: - msg = "Unable to locate files matching expected resource pattern in VICC s3 bucket" - raise FileNotFoundError(msg) - echo_info(f"Retrieved CDM files dated {newest_version}") - return newest_version - - @staticmethod - def _harvest_sources(update_cached: bool) -> None: - """Run harvesting procedure for all sources.""" - echo_info("Harvesting sources...") - # TODO: Switch to using constant - harvester_sources = {"civic": CivicHarvester, "moa": MoaHarvester} - total_start = timer() - for source_str, source_class in harvester_sources.items(): - echo_info(f"Harvesting {source_str}...") - start = timer() - source: Harvester = source_class() - if source_str == "civic" and update_cached: - # Use latest civic data - echo_info("(civicpy cache is also being updated)") - source_successful = source.harvest(update_cache=True) - else: - source_successful = source.harvest() - end = timer() - if not source_successful: - echo_info(f"{source_str} harvest failed.") - click.get_current_context().exit() - echo_info(f"{source_str} harvest finished in {(end - start):.5f} s") - total_end = timer() - echo_info( - f"Successfully harvested all sources in " - f"{(total_end - total_start):.5f} s\n" - ) + if not s3: + msg = "Unable to initiate AWS S3 Resource" + raise ResourceLoadException(msg) - @staticmethod - async def _transform_sources() -> None: - """Run transformation procedure for all sources.""" - echo_info("Transforming harvested data to CDM...") - # TODO: Switch to using constant - transform_sources = {"civic": CivicTransform, "moa": MoaTransform} - total_start = timer() - for src_str, src_name in transform_sources.items(): - echo_info(f"Transforming {src_str}...") - start = timer() - source: Transform = src_name() - await source.transform() - end = timer() - echo_info(f"{src_str} transform finished in {(end - start):.5f} s.") - source.create_json() - total_end = timer() - echo_info( - f"Successfully transformed all sources to CDM in " - f"{(total_end - total_start):.5f} s\n" - ) + bucket = sorted( # noqa: C414 + list(s3.Bucket("vicc-metakb").objects.filter(Prefix="cdm").all()), + key=lambda f: f.key, + reverse=True, + ) + newest_version: Optional[str] = None - def _load_normalizers_db(self, load_normalizer_db: bool) -> None: - """Load normalizer database source data. + for file in bucket: + match = re.match( + re.compile(r"cdm/20[23]\d[01]\d[0123]\d/(.*)_cdm_(.*).json.zip"), file.key + ) - :param bool load_normalizer_db: Load normalizer database for each - normalizer - """ - if load_normalizer_db: - load_disease = load_therapy = load_gene = True + if match: + source = match.group(1) + if newest_version is None: + newest_version = match.group(2) + elif match.group(2) != newest_version: + continue else: - load_disease = self._check_normalizer( - DiseaseDatabase(), {src.value for src in DiseaseSources} - ) - load_therapy = self._check_normalizer( - TherapyDatabase(), set(TherapySources) - ) - load_gene = self._check_normalizer( - GeneDatabase(), {src.value for src in GeneSources} - ) + continue - for load_source, normalizer_cli in [ - (load_disease, DiseaseCLI), - (load_therapy, TherapyCLI), - (load_gene, GeneCLI), - ]: - name = str(normalizer_cli).split()[1].split(".")[0][1:].capitalize() - self._update_normalizer_db(name, load_source, normalizer_cli) - echo_info("Normalizers database loaded.\n") - - @staticmethod - def _check_normalizer( - db: Union[GeneDatabase, TherapyDatabase, DiseaseDatabase], sources: Set - ) -> bool: - """Check whether or not normalizer data needs to be loaded. - - :param Database db: Normalizer database - :param set sources: Sources that are needed in the normalizer db - :return: `True` If normalizer needs to be loaded. `False` otherwise. - """ - for src in sources: - response = db.metadata.get_item(Key={"src_name": src}) - if not response.get("Item"): - return True - return False - - @staticmethod - def _update_normalizer_db( - name: str, - load_normalizer: bool, - source_cli: Union[DiseaseCLI, TherapyCLI, GeneCLI], - ) -> None: - """Update Normalizer database. - - :param str name: Name of the normalizer - :param bool load_normalizer: Whether or not to load normalizer db - :param CLI source_cli: Normalizer CLI class for loading and - deleting source data - """ - if load_normalizer: - try: - echo_info(f"\nLoading {name} Normalizer data...") - source_cli.update_normalizer_db(["--update_all", "--update_merged"]) - echo_info(f"Successfully Loaded {name} Normalizer data.\n") - except SystemExit as e: - if e.code != 0: - raise e - else: - echo_info(f"{name} Normalizer is already loaded.\n") - - @staticmethod - def _check_db_param(param: str, name: str) -> str: - """Check for MetaKB database parameter. - :param str param: value of parameter as received from command line - :param str name: name of parameter - :return: parameter value, or exit with error message if unavailable - """ - if not param: - env_var_name = f"METAKB_DB_{name.upper()}" - if env_var_name in environ: - return environ[env_var_name] - # Default is local - if name == "URL": - return "bolt://localhost:7687" - if name == "username": - return "neo4j" - return "admin" - return param - - @staticmethod - def _help_msg(msg: str = "") -> None: - """Handle invalid user input. - :param str msg: Error message to display to user. - """ - ctx = click.get_current_context() - logger.fatal(msg) - if msg: - click.echo(msg) - else: - click.echo(ctx.get_help()) - ctx.exit() + tmp_path = Path(tempfile.gettempdir()) / "metakb_dl_tmp" + with tmp_path.open("wb") as f: + file.Object().download_fileobj(f) + + cdm_dir = APP_ROOT / "data" / source / "transform" + cdm_zip = ZipFile(tmp_path, "r") + cdm_zip.extract(f"{source}_cdm_{newest_version}.json", cdm_dir) + + if newest_version is None: + msg = "Unable to locate files matching expected resource pattern in VICC s3 bucket" + raise FileNotFoundError(msg) + + echo_info(f"Retrieved CDM files dated {newest_version}") + return newest_version if __name__ == "__main__": - CLI().update_metakb_db(_anyio_backend="asyncio") + update_metakb_db(_anyio_backend="asyncio")