diff --git a/Pipfile b/Pipfile index 5ed5972f..9ea28096 100644 --- a/Pipfile +++ b/Pipfile @@ -15,6 +15,7 @@ boto3 = "*" gene = {editable = true, path = "."} gffutils = "*" "biocommons.seqrepo" = "*" +wags-tails = ">=0.1.1" psycopg = {version = "*", extras=["binary"]} pytest = "*" pre-commit = "*" diff --git a/docs/source/conf.py b/docs/source/conf.py index 471bb665..63ea4447 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -88,23 +88,32 @@ def linkcode_resolve(domain, info): # -- sphinx-click ------------------------------------------------------------ from typing import List import re + CMD_PATTERN = r"--[^ ]+" STR_PATTERN = r"\"[^ ]+\"" SNAKE_PATTERN = r"[A-Z]+_[A-Z_]*[A-Z]" + def _add_formatting_to_string(line: str) -> str: """Add fixed-width code formatting to span sections in lines: * shell options, eg `--update_all` - * strings, eg `"HGNC"` - * env vars, eg `GENE_NORM_REMOTE_DB_URL` + * double-quoted strings, eg `"HGNC"` + * all caps SNAKE_CASE env vars, eg `GENE_NORM_REMOTE_DB_URL` """ for pattern in (CMD_PATTERN, STR_PATTERN, SNAKE_PATTERN): line = re.sub(pattern, lambda x: f"``{x.group()}``", line) return line + def process_description(app, ctx, lines: List[str]): - """Add custom formatting to sphinx-click autodocs""" + """Add custom formatting to sphinx-click autodocs. This lets us write Click + docstrings in ways that look presentable in the CLI, but adds extra formatting when + generating Sphinx docs. + + * add fixed-width (code) font to certain words + * add code block formatting to example shell commands + """ # chop off params param_boundary = None for i, line in enumerate(lines): @@ -118,7 +127,7 @@ def process_description(app, ctx, lines: List[str]): # add code formatting to strings, commands, and env vars lines_to_fmt = [] for i, line in enumerate(lines): - if line.startswith("% "): + if line.startswith(" ") or line.startswith(">>> "): continue # skip example code blocks if any([ re.findall(CMD_PATTERN, line), @@ -129,13 +138,13 @@ def process_description(app, ctx, lines: List[str]): for line_num in lines_to_fmt: lines[line_num] = _add_formatting_to_string(lines[line_num]) - # add code block formatting to example commands + # add code block formatting to example console commands for i in range(len(lines) - 1, -1, -1): - if lines[i].startswith('% '): - lines[i] = " " + lines[i] - lines.insert(i, "") - lines.insert(i, ".. code-block:: sh") + if lines[i].startswith(' '): lines.insert(i + 2, "") + if i == 0 or not lines[i - 1].startswith(" "): + lines.insert(i, "") + lines.insert(i, ".. code-block:: console") def setup(app): diff --git a/docs/source/managing_data/loading_and_updating_data.rst b/docs/source/managing_data/loading_and_updating_data.rst index ccd993ea..cfa0984c 100644 --- a/docs/source/managing_data/loading_and_updating_data.rst +++ b/docs/source/managing_data/loading_and_updating_data.rst @@ -3,63 +3,13 @@ Loading and updating data ========================= +The Gene Normalizer defines a command line tool for data management. It includes functions for refreshing data, checking database status, and for the PostgreSQL data backend, dumping to a local file and updating from a remote backup. + .. note:: See the :ref:`ETL API documentation` for information on programmatic access to the data loader classes. -Refreshing all data -------------------- - -Calling the Gene Normalizer update command with the ``--all`` and ``--normalize`` flags will delete all existing data, fetch new source data if available, and then perform a complete reload of the database (including merged records): - -.. code-block:: shell - - gene-normalizer update --all --normalize - - -Reload individual sources -------------------------- - -To update specific sources, provide them as arguments to the ``update`` command. While it is possible to update individual source data without also updating the normalized record data, that may affect the proper function of the normalized query endpoints, so it is recommended to include the ``--normalize`` flag as well. - -.. code-block:: shell - - gene-normalizer update --normalize HGNC NCBI - - -Use local data --------------- - -The Gene Normalizer will fetch the latest available data from all sources if local data is out-of-date. To suppress this and force usage of local files, use the `--use_existing` flag: - -.. code-block:: shell - - gene-normalizer update --all --use_existing - - -Check DB health ---------------- - -The command ``check-db`` performs a basic check on the database status. It first confirms that the database's schema exists, and then identifies whether metadata is available for each source, and whether gene record and normalized concept tables are non-empty. Check the process's exit code for the result (per the UNIX standard, ``0`` means success, and any other return code means failure). - -.. code-block:: console - - $ gene-normalizer check-db - $ echo $? - 1 # indicates failure - -This command is equivalent to the combination of the database classes' ``check_schema_initialized`` and ``check_tables_populated`` methods: - -.. code-block:: python - - from gene.database import create_db - db = create_db() - db_is_healthy = db.check_schema_initialized() and db.check_tables_populated() - -Complete CLI documentation --------------------------- - .. click:: gene.cli:cli :prog: gene-normalizer :nested: full diff --git a/pyproject.toml b/pyproject.toml index 61687255..df44570c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,13 +36,9 @@ dynamic = ["version"] [project.optional-dependencies] pg = ["psycopg[binary]"] - -etl = ["gffutils", "biocommons.seqrepo"] - +etl = ["gffutils", "biocommons.seqrepo", "wags-tails>=0.1.1"] test = ["pytest>=6.0", "pytest-cov", "mock", "httpx"] - dev = ["pre-commit", "ruff>=0.1.2"] - docs = [ "sphinx==6.1.3", "sphinx-autodoc-typehints==1.22.0", diff --git a/src/gene/cli.py b/src/gene/cli.py index 5f9688dd..102d0213 100644 --- a/src/gene/cli.py +++ b/src/gene/cli.py @@ -47,12 +47,18 @@ def update( For example, the following command will update NCBI and HGNC source records: - % gene-normalizer update HGNC NCBI + $ gene-normalizer update HGNC NCBI To completely reload all source records and construct normalized concepts, use the --all and --normalize options: - % gene-normalizer update --all --normalize + $ gene-normalizer update --all --normalize + + The Gene Normalizer will fetch the latest available data from all sources if local + data is out-of-date. To suppress this and force usage of local files only, use the + –use_existing flag: + + $ gene-normalizer update --all --use_existing \f :param sources: tuple of raw names of sources to update @@ -137,6 +143,10 @@ def check_db(db_url: str, verbose: bool = False) -> None: """Perform basic checks on DB health and population. Exits with status code 1 if DB schema is uninitialized or if critical tables appear to be empty. + $ gene-normalizer check-db + $ echo $? + 1 # indicates failure + This command is equivalent to the combination of the database classes' ``check_schema_initialized()`` and ``check_tables_populated()`` methods: diff --git a/src/gene/etl/base.py b/src/gene/etl/base.py index f46f0044..f06c89a0 100644 --- a/src/gene/etl/base.py +++ b/src/gene/etl/base.py @@ -1,19 +1,16 @@ """A base class for extraction, transformation, and loading of data.""" -import datetime -import gzip import logging import re -import shutil from abc import ABC, abstractmethod -from ftplib import FTP -from os import environ, remove +from os import environ from pathlib import Path -from typing import Callable, Dict, List, Optional +from typing import Dict, List, Optional, Union +import click import pydantic from biocommons.seqrepo import SeqRepo -from dateutil import parser from gffutils.feature import Feature +from wags_tails import EnsemblData, HgncData, NcbiGeneData from gene.database import AbstractDatabase from gene.schemas import ITEM_TYPES, Gene, GeneSequenceLocation, MatchType, SourceName @@ -28,50 +25,75 @@ ) +DATA_DISPATCH = { + SourceName.HGNC: HgncData, + SourceName.ENSEMBL: EnsemblData, + SourceName.NCBI: NcbiGeneData, +} + + class Base(ABC): """The ETL base class.""" def __init__( self, database: AbstractDatabase, - host: str, - data_dir: str, - src_data_dir: Path, seqrepo_dir: Path = SEQREPO_ROOT_DIR, + data_path: Optional[Path] = None, + silent: bool = True, ) -> None: """Instantiate Base class. :param database: database instance - :param host: Hostname of FTP site - :param data_dir: Data directory of FTP site to look at - :param src_data_dir: Data directory for source :param seqrepo_dir: Path to seqrepo directory + :param data_path: path to app data directory + :param silent: if True, don't print ETL result to console """ - self.src_data_dir = src_data_dir - self.src_data_dir.mkdir(exist_ok=True, parents=True) + self._silent = silent self._src_name = SourceName(self.__class__.__name__) + self._data_source = self._get_data_handler(data_path) self._database = database - self._host = host - self._data_dir = data_dir - self._processed_ids = list() self.seqrepo = self.get_seqrepo(seqrepo_dir) + self._processed_ids = list() + + def _get_data_handler( + self, data_path: Optional[Path] = None + ) -> Union[HgncData, EnsemblData, NcbiGeneData]: + """Construct data handler instance for source. Overwrite for edge-case sources. + + :param data_path: location of data storage + :return: instance of wags_tails.DataSource to manage source file(s) + """ + return DATA_DISPATCH[self._src_name](data_dir=data_path, silent=self._silent) def perform_etl(self, use_existing: bool = False) -> List[str]: - """Extract, Transform, and Load data into database. + """Public-facing method to begin ETL procedures on given data. + Returned concept IDs can be passed to Merge method for computing + merged concepts. - :param use_existing: if true, use most recent available local files - :return: Concept IDs of concepts successfully loaded + :param use_existing: if True, don't try to retrieve latest source data + :return: list of concept IDs which were successfully processed and + uploaded. """ self._extract_data(use_existing) + if not self._silent: + click.echo("Transforming and loading data to DB...") self._add_meta() self._transform_data() self._database.complete_write_transaction() return self._processed_ids - @abstractmethod - def _extract_data(self, *args, **kwargs) -> None: # noqa: ANN002 - """Extract data from FTP site or local data directory.""" - raise NotImplementedError + def _extract_data(self, use_existing: bool) -> None: + """Acquire source data. + + This method is responsible for initializing an instance of a data handler and, + in most cases, setting ``self._data_file`` and ``self._version``. + + :param bool use_existing: if True, don't try to fetch latest source data + """ + self._data_file, self._version = self._data_source.get_latest( + from_local=use_existing + ) @abstractmethod def _transform_data(self) -> None: @@ -83,40 +105,6 @@ def _add_meta(self) -> None: """Add source meta to database source info.""" raise NotImplementedError - def _acquire_data_file( - self, - file_glob: str, - use_existing: bool, - check_latest_callback: Callable[[Path], bool], - download_callback: Callable[[], Path], - ) -> Path: - """Acquire data file. - - :param file_glob: pattern to match relevant files against - :param use_existing: don't fetch from remote origin if local versions are - available - :param check_latest_callback: function to check whether local data is up-to-date - :param download_callback: function to download from remote - :return: path to acquired data file - :raise FileNotFoundError: if unable to find any files matching the pattern - """ - matching_files = list(self.src_data_dir.glob(file_glob)) - if not matching_files: - if use_existing: - raise FileNotFoundError( - f"No local files matching pattern {self.src_data_dir.absolute().as_uri() + file_glob}" - ) - else: - return download_callback() - else: - latest_file = list(sorted(matching_files))[-1] - if use_existing: - return latest_file - if not check_latest_callback(latest_file): - return download_callback() - else: - return latest_file - def _load_gene(self, gene: Dict) -> None: """Load a gene record into database. This method takes responsibility for: * validating structure correctness @@ -147,49 +135,6 @@ def _load_gene(self, gene: Dict) -> None: self._database.add_record(gene, self._src_name) self._processed_ids.append(concept_id) - def _ftp_download( - self, host: str, data_dir: str, fn: str, source_dir: Path, data_fn: str - ) -> Optional[str]: - """Download data file from FTP site. - - :param host: Source's FTP host name - :param data_dir: Data directory located on FTP site - :param fn: Filename for downloaded file - :param source_dir: Source's data directory - :param data_fn: Filename on FTP site to be downloaded - :return: Date file was last updated - """ - with FTP(host) as ftp: - ftp.login() - timestamp = ftp.voidcmd(f"MDTM {data_dir}{data_fn}")[4:].strip() - date = str(parser.parse(timestamp)).split()[0] - version = datetime.datetime.strptime(date, "%Y-%m-%d").strftime("%Y%m%d") - ftp.cwd(data_dir) - self._ftp_download_file(ftp, data_fn, source_dir, fn) - return version - - def _ftp_download_file( - self, ftp: FTP, data_fn: str, source_dir: Path, fn: str - ) -> None: - """Download data file from FTP - - :param ftp: FTP instance - :param data_fn: Filename on FTP site to be downloaded - :param source_dir: Source's data directory - :param fn: Filename for downloaded file - """ - if data_fn.endswith(".gz"): - filepath = source_dir / f"{fn}.gz" - else: - filepath = source_dir / fn - with open(filepath, "wb") as fp: - ftp.retrbinary(f"RETR {data_fn}", fp.write) - if data_fn.endswith(".gz"): - with gzip.open(filepath, "rb") as f_in: - with open(source_dir / fn, "wb") as f_out: - shutil.copyfileobj(f_in, f_out) - remove(filepath) - def get_seqrepo(self, seqrepo_dir: Path) -> SeqRepo: """Return SeqRepo instance if seqrepo_dir exists. diff --git a/src/gene/etl/ensembl.py b/src/gene/etl/ensembl.py index f75ed034..4a52975a 100644 --- a/src/gene/etl/ensembl.py +++ b/src/gene/etl/ensembl.py @@ -1,21 +1,14 @@ """Defines the Ensembl ETL methods.""" import logging import re -from ftplib import FTP -from json import dumps -from pathlib import Path from typing import Dict import gffutils -import requests from gffutils.feature import Feature -from gene.database import AbstractDatabase -from gene.etl.base import APP_ROOT, Base +from gene.etl.base import Base from gene.etl.exceptions import ( - GeneFileVersionError, GeneNormalizerEtlError, - GeneSourceFetchError, ) from gene.schemas import NamespacePrefix, SourceMeta, SourceName, Strand @@ -26,95 +19,18 @@ class Ensembl(Base): """ETL the Ensembl source into the normalized database.""" - def __init__( - self, - database: AbstractDatabase, - host: str = "ftp.ensembl.org", - data_dir: str = "pub/current_gff3/homo_sapiens/", - src_data_dir: Path = APP_ROOT / "data" / "ensembl", - ) -> None: - """Initialize Ensembl ETL class. - - :param database: gene database for adding new data - :param host: FTP host name - :param data_dir: FTP data directory to use - :param src_data_dir: Data directory for Ensembl - """ - super().__init__(database, host, data_dir, src_data_dir) - self._data_file_pattern = re.compile(r"ensembl_(GRCh\d+)_(\d+)\.gff3") - self._version = None - self._data_url = {} - self._assembly = None - - def _is_up_to_date(self, data_file: Path) -> bool: - """Verify whether local data is up-to-date with latest available remote file. - - :param data_file: path to latest local file - :return: True if data is up-to-date - :raise GeneFileVersionError: if unable to parse version number from local file - :raise GeneSourceFetchError: if unable to get latest version from remote source - """ - local_match = re.match(self._data_file_pattern, data_file.name) - try: - version = int(local_match.groups()[1]) - except (AttributeError, IndexError, ValueError): - raise GeneFileVersionError( - f"Unable to parse version number from local file: {data_file.absolute()}" - ) - - ensembl_api = ( - "https://rest.ensembl.org/info/data/?content-type=application/json" - ) - response = requests.get(ensembl_api) - if response.status_code != 200: - raise GeneSourceFetchError( - f"Unable to get response from Ensembl version API endpoint: {ensembl_api}" - ) - releases = response.json().get("releases") - if not releases: - raise GeneSourceFetchError( - f"Malformed response from Ensembl version API endpoint: {dumps(response.json())}" - ) - releases.sort() - return version == releases[-1] - - def _download_data(self) -> Path: - """Download latest Ensembl GFF3 data file. - - :return: path to acquired file - :raise GeneSourceFetchError: if unable to find file matching expected pattern - """ - logger.info("Downloading latest Ensembl data file...") - pattern = r"Homo_sapiens\.(?PGRCh\d+)\.(?P\d+)\.gff3\.gz" - with FTP(self._host) as ftp: - ftp.login() - ftp.cwd(self._data_dir) - files = ftp.nlst() - for f in files: - match = re.match(pattern, f) - if match: - resp = match.groupdict() - assembly = resp["assembly"] - version = resp["version"] - new_fn = f"ensembl_{assembly}_{version}.gff3" - self._ftp_download_file(ftp, f, self.src_data_dir, new_fn) - logger.info( - f"Successfully downloaded Ensembl {version} data to {self.src_data_dir / new_fn}." - ) - return self.src_data_dir / new_fn - raise GeneSourceFetchError( - "Unable to find file matching expected Ensembl pattern via FTP" - ) - def _extract_data(self, use_existing: bool) -> None: - """Acquire Ensembl data file and get metadata. + """Acquire source data. + + This method is responsible for initializing an instance of a data handler and, + in most cases, setting ``self._data_file`` and ``self._version``. - :param use_existing: if True, use latest available local file + :param use_existing: if True, don't try to fetch latest source data """ - self._data_src = self._acquire_data_file( - "ensembl_*.gff3", use_existing, self._is_up_to_date, self._download_data + self._data_file, raw_version = self._data_source.get_latest( + from_local=use_existing ) - match = re.match(self._data_file_pattern, self._data_src.name) + match = re.match(r"(GRCh\d+)_(\d+)", raw_version) self._assembly = match.groups()[0] self._version = match.groups()[1] @@ -122,7 +38,7 @@ def _transform_data(self) -> None: """Transform the Ensembl source.""" logger.info("Transforming Ensembl...") db = gffutils.create_db( - str(self._data_src), + str(self._data_file), dbfn=":memory:", force=True, merge_strategy="create_unique", @@ -254,7 +170,7 @@ def _add_meta(self) -> None: :raise GeneNormalizerEtlError: if requisite metadata is unset """ - if not all([self._version, self._host, self._data_dir, self._assembly]): + if not self._version or not self._assembly: raise GeneNormalizerEtlError( "Source metadata unavailable -- was data properly acquired before attempting to load DB?" ) @@ -264,7 +180,7 @@ def _add_meta(self) -> None: "/legal/disclaimer.html", version=self._version, data_url={ - "genome_annotations": f"ftp://{self._host}/{self._data_dir}Homo_sapiens.{self._assembly}.{self._version}.gff3.gz" + "genome_annotations": f"ftp://ftp.ensembl.org/pub/release-{self._version}/gff3/homo_sapiens/Homo_sapiens.{self._assembly}.{self._version}.gff3.gz" }, rdp_url=None, data_license_attributes={ diff --git a/src/gene/etl/hgnc.py b/src/gene/etl/hgnc.py index bfd07a7e..9e4c048e 100644 --- a/src/gene/etl/hgnc.py +++ b/src/gene/etl/hgnc.py @@ -2,20 +2,11 @@ import json import logging import re -import shutil -from datetime import datetime -from ftplib import FTP -from pathlib import Path from typing import Dict -from dateutil import parser - -from gene.database import AbstractDatabase -from gene.etl.base import APP_ROOT, Base +from gene.etl.base import Base from gene.etl.exceptions import ( - GeneFileVersionError, GeneNormalizerEtlError, - GeneSourceFetchError, ) from gene.schemas import ( PREFIX_LOOKUP, @@ -34,85 +25,10 @@ class HGNC(Base): """ETL the HGNC source into the normalized database.""" - def __init__( - self, - database: AbstractDatabase, - host: str = "ftp.ebi.ac.uk", - data_dir: str = "pub/databases/genenames/hgnc/json/", - src_data_dir: Path = APP_ROOT / "data" / "hgnc", - fn: str = "hgnc_complete_set.json", - ) -> None: - """Initialize HGNC ETL class. - - :param database: gene database for adding new data - :param host: FTP host name - :param data_dir: FTP data directory to use - :param src_data_dir: Data directory for HGNC - :param fn: Data file to download - """ - super().__init__(database, host, data_dir, src_data_dir) - self._data_file_pattern = re.compile(r"hgnc_(\d+)\.json") - self._data_url = f"ftp://{host}/{data_dir}{fn}" - self._fn = fn - self._version = None - - def _is_up_to_date(self, data_file: Path) -> bool: - """Verify whether local data is up-to-date with latest available remote file. - - :param data_file: path to latest local file - :return: True if data is up-to-date - :raise GeneFileVersionError: if unable to get version from local HGNC file - :raise GeneSourceFetchError: if unable to get latest version available from HGNC - """ - local_match = re.match(self._data_file_pattern, data_file.name) - if not local_match: - raise GeneFileVersionError( - f"Unable to parse version number from local HGNC file: {data_file.absolute()}" - ) - version = local_match.groups()[0] - with FTP(self._host) as ftp: - ftp.login() - timestamp = ftp.voidcmd(f"MDTM {self._data_dir}{self._fn}")[4:].strip() - date = str(parser.parse(timestamp)).split()[0] - try: - remote_version = datetime.strptime(date, "%Y-%m-%d").strftime("%Y%m%d") - except ValueError: - raise GeneSourceFetchError( - f"Unable to parse version number from remote HGNC timestamp: {date}" - ) - return version == remote_version - - def _download_data(self) -> Path: - """Download HGNC JSON data file. - - :return: path to newly-downloaded file - """ - logger.info("Downloading HGNC data file...") - - tmp_fn = "hgnc_version.json" - version = self._ftp_download( - self._host, self._data_dir, tmp_fn, self.src_data_dir, self._fn - ) - final_location = f"{self.src_data_dir}/hgnc_{version}.json" - shutil.move(f"{self.src_data_dir}/{tmp_fn}", final_location) - logger.info(f"Successfully downloaded HGNC data file to {final_location}.") - return Path(final_location) - - def _extract_data(self, use_existing: bool) -> None: - """Acquire HGNC data file and get metadata. - - :param use_existing: if True, use latest available local file - """ - self._data_file = self._acquire_data_file( - "hgnc_*.json", use_existing, self._is_up_to_date, self._download_data - ) - match = self._data_file_pattern.match(self._data_file.name) - self._version = match.groups()[0] - def _transform_data(self) -> None: """Transform the HGNC source.""" logger.info("Transforming HGNC...") - with open(self._data_file, "r") as f: + with open(self._data_file, "r") as f: # type: ignore data = json.load(f) records = data["response"]["docs"] @@ -332,7 +248,7 @@ def _add_meta(self) -> None: :raise GeneNormalizerEtlError: if requisite metadata is unset """ - if not all([self._version, self._data_url]): + if not self._version: raise GeneNormalizerEtlError( "Source metadata unavailable -- was data properly acquired before attempting to load DB?" ) @@ -340,7 +256,9 @@ def _add_meta(self) -> None: data_license="CC0", data_license_url="https://www.genenames.org/about/license/", version=self._version, - data_url={"complete_set_archive": self._data_url}, + data_url={ + "complete_set_archive": "ftp.ebi.ac.uk/pub/databases/genenames/hgnc/json/hgnc_complete_set.json" + }, rdp_url=None, data_license_attributes={ "non_commercial": False, diff --git a/src/gene/etl/ncbi.py b/src/gene/etl/ncbi.py index 45d3c308..a136c4b2 100644 --- a/src/gene/etl/ncbi.py +++ b/src/gene/etl/ncbi.py @@ -2,19 +2,17 @@ import csv import logging import re -import shutil -from ftplib import FTP from pathlib import Path from typing import Dict, List, Optional import gffutils +from wags_tails import NcbiGenomeData +from wags_tails.ncbi import NcbiGenePaths from gene.database import AbstractDatabase -from gene.etl.base import APP_ROOT, Base +from gene.etl.base import SEQREPO_ROOT_DIR, Base from gene.etl.exceptions import ( - GeneFileVersionError, GeneNormalizerEtlError, - GeneSourceFetchError, ) from gene.schemas import ( PREFIX_LOOKUP, @@ -36,207 +34,37 @@ class NCBI(Base): def __init__( self, database: AbstractDatabase, - host: str = "ftp.ncbi.nlm.nih.gov", - data_dir: str = "gene/DATA/", - src_data_dir: Path = APP_ROOT / "data" / "ncbi", + seqrepo_dir: Path = SEQREPO_ROOT_DIR, + data_path: Optional[Path] = None, + silent: bool = True, ) -> None: - """Construct the NCBI ETL instance. + """Instantiate Base class. - :param database: gene database for adding new data - :param host: FTP host name - :param data_dir: FTP data directory to use - :param src_data_dir: Data directory for NCBI + :param database: database instance + :param seqrepo_dir: Path to seqrepo directory + :param data_path: path to app data directory + :param silent: if True, don't print ETL result to console """ - super().__init__(database, host, data_dir, src_data_dir) - self._ftp_hostname = host - self._assembly = None - self._gene_url = None - self._history_url = None - self._assembly_url = None - - @staticmethod - def _navigate_ftp_genome_assembly(ftp: FTP) -> None: - """Navigate NCBI FTP filesystem to directory containing latest assembly annotation data. - - :param ftp: logged-in FTP instance - :return: None, but modifies FTP connection in-place - :raise SourceFetchError: if navigation fails (e.g. because expected directories don't exist) - """ - major_annotation_pattern = r"GCF_\d+\.\d+_GRCh\d+.+" - ftp.cwd( - "genomes/refseq/vertebrate_mammalian/Homo_sapiens/" - "latest_assembly_versions" - ) - try: - grch_dirs = [d for d in ftp.nlst() if re.match(major_annotation_pattern, d)] - grch_dir = grch_dirs[0] - except (IndexError, AttributeError): - raise GeneSourceFetchError( - "No directories matching expected latest assembly version pattern" - ) - ftp.cwd(grch_dir) - - def _gff_is_up_to_date(self, gff: Path) -> bool: - """Verify whether local GRCh38 annotation file is up-to-date. Currently, their - API endpoints require auth keys (adding complexity for new users) and may or may not - give us exactly what we want, so we ascertain version availability by manually - checking what's listed in the FTP filesystem. - - :param gff: path to local GFF file (file should be saved like `ncbi_GRCh38.p14.gff`) - :return: True if file version matches most recent known remote version - :raise GeneFileVersionError: if unable to parse version from local or remote file - :raise GeneSourceFetchError: if unable to get version from NCBI - """ - try: - version = re.match(r"ncbi_(.+)", gff.stem).groups()[0] - except (IndexError, AttributeError): - raise GeneFileVersionError( - f"Unable to parse version from NCBI GRCh38 annotation file: {gff.absolute()}" - ) - - genomic_gff_pattern = r"GCF_\d+\.\d+_(GRCh\d+\.\w\d+)_genomic.gff.gz" - with FTP(self._host) as ftp: - ftp.login() - self._navigate_ftp_genome_assembly(ftp) - for file in ftp.nlst(): - match = re.match(genomic_gff_pattern, file) - if match and match.groups(): - latest_version = match.groups()[0] - return version == latest_version - raise GeneSourceFetchError( - "Unable to identify latest available NCBI GRCh38 annotation version" - ) - - def _download_gff(self) -> Path: - """Download NCBI GRCh38 annotation file. - - :return: Path to downloaded file - :raise SourceFetchError: if unable to identify latest available file - """ - logger.info("Downloading NCBI genome annotation file...") - genomic_gff_pattern = r"GCF_\d+\.\d+_(GRCh\d+\.\w\d+)_genomic.gff.gz" - with FTP(self._host) as ftp: - ftp.login() - self._navigate_ftp_genome_assembly(ftp) - genomic_filename = None - version = None - for f in ftp.nlst(): - gff_match = re.match(genomic_gff_pattern, f) - if gff_match and gff_match.groups(): - genomic_filename = f - version = gff_match.groups()[0] - if not version or not genomic_filename: - raise GeneSourceFetchError( - "Unable to find latest available NCBI GRCh38 annotation" - ) - new_filename = f"ncbi_{version}.gff" - self._ftp_download_file( - ftp, genomic_filename, self.src_data_dir, new_filename - ) - logger.info( - f"Downloaded NCBI genome annotation file to {self.src_data_dir / new_filename}" - ) - return self.src_data_dir / new_filename - - def _history_file_is_up_to_date(self, history_file: Path) -> bool: - """Verify whether local NCBI name history file is up-to-date. - - :param history_file: path to local history file (file should be saved like `ncbi_history_20230315.tsv`) - :return: True if file version matches most recent expected remote version - :raise GeneFileVersionError: if parsing version from local file fails - """ - try: - version = re.match(r"ncbi_history_(\d+).tsv", history_file.name).groups()[0] - except (IndexError, AttributeError): - raise GeneFileVersionError( - f"Unable to parse version from NCBI history file: {history_file.absolute()}" - ) - with FTP(self._host) as ftp: - ftp.login() - ftp.cwd("gene/DATA/") - file_changed_date = ftp.sendcmd("MDTM gene_history.gz")[4:12] - return version == file_changed_date - - def _download_history_file(self) -> Path: - """Download NCBI gene name history file - - :return: Path to downloaded file - """ - logger.info("Downloading NCBI gene_history...") - tmp_fn = "ncbi_history_tmp.tsv" - data_fn = "gene_history.gz" - version = self._ftp_download( - self._host, self._data_dir, tmp_fn, self.src_data_dir, data_fn - ) - final_location = f"{self.src_data_dir}/ncbi_history_{version}.tsv" - shutil.move(f"{self.src_data_dir}/{tmp_fn}", final_location) - logger.info(f"Successfully downloaded NCBI gene_history to {final_location}.") - return Path(final_location) - - def _gene_file_is_up_to_date(self, gene_file: Path) -> bool: - """Verify whether local NCBI gene info file is up-to-date. - - :param gene_file: path to local NCBI info file (file should be saved like `ncbi_info_20230315.tsv`) - :return: True if file version matches most recent known remote version - :raise GeneFileVersionError: if parsing version from local file fails - """ - try: - version = re.match(r"ncbi_info_(\d+).tsv", gene_file.name).groups()[0] - except (IndexError, AttributeError): - raise GeneFileVersionError( - f"Unable to parse version from NCBI gene file: {gene_file.absolute()}" - ) - with FTP(self._host) as ftp: - ftp.login() - ftp.cwd("gene/DATA/GENE_INFO/Mammalia/") - file_changed_date = ftp.sendcmd("MDTM Homo_sapiens.gene_info.gz")[4:12] - return version == file_changed_date - - def _download_gene_file(self) -> Path: - """Download NCBI gene info file - - :return: Path to downloaded file - """ - data_dir = f"{self._data_dir}GENE_INFO/Mammalia/" - tmp_fn = "ncbi_info_tmp.tsv" - data_fn = "Homo_sapiens.gene_info.gz" - logger.info("Downloading NCBI gene_info....") - version = self._ftp_download( - self._host, data_dir, tmp_fn, self.src_data_dir, data_fn - ) - final_location = f"{self.src_data_dir}/ncbi_info_{version}.tsv" - shutil.move(f"{self.src_data_dir}/{tmp_fn}", final_location) - logger.info(f"Successfully downloaded NCBI gene_info to {final_location}.") - return Path(final_location) + super().__init__(database, seqrepo_dir, data_path, silent) + self._genome_data_handler = NcbiGenomeData(data_path, silent) def _extract_data(self, use_existing: bool) -> None: """Acquire NCBI data file and get metadata. :param use_existing: if True, use latest available local file """ - self._gff_src = self._acquire_data_file( - "ncbi_GRCh*.gff", use_existing, self._gff_is_up_to_date, self._download_gff - ) - self._info_src = self._acquire_data_file( - "ncbi_info_*.tsv", - use_existing, - self._gene_file_is_up_to_date, - self._download_gene_file, - ) - self._version = self._info_src.stem.split("_")[-1] - self._history_src = self._acquire_data_file( - f"ncbi_history_{self._version}.tsv", - use_existing, - self._history_file_is_up_to_date, - self._download_history_file, - ) - - self._assembly = self._gff_src.stem.split("_")[-1] - self._gene_url = ( - f"{self._host}gene/DATA/GENE_INFO/Mammalia/Homo_sapiens.gene_info.gz" + self._gff_src, self._assembly = self._genome_data_handler.get_latest( + from_local=use_existing ) - self._history_url = f"{self._host}gene/DATA/gene_history.gz" - self._assembly_url = f"{self._host}genomes/refseq/vertebrate_mammalian/Homo_sapiens/latest_assembly_versions/" + gene_paths: NcbiGenePaths + gene_paths, self._version = self._data_source.get_latest( + from_local=use_existing + ) # type: ignore + self._info_src = gene_paths.gene_info + self._history_src = gene_paths.gene_history + self._gene_url = "ftp.ncbi.nlm.nih.gov/gene/DATA/GENE_INFO/Mammalia/Homo_sapiens.gene_info.gz" + self._history_url = "ftp.ncbi.nlm.nih.gov/gene/DATA/gene_history.gz" + self._assembly_url = "ftp.ncbi.nlm.nih.gov/genomes/refseq/vertebrate_mammalian/Homo_sapiens/latest_assembly_versions/" def _get_prev_symbols(self) -> Dict[str, str]: """Store a gene's symbol history. diff --git a/tests/unit/data/etl_data/ensembl_GRCh38_110.gff3 b/tests/unit/data/etl_data/ensembl_GRCh38_110.gff similarity index 100% rename from tests/unit/data/etl_data/ensembl_GRCh38_110.gff3 rename to tests/unit/data/etl_data/ensembl_GRCh38_110.gff diff --git a/tests/unit/test_database_and_etl.py b/tests/unit/test_database_and_etl.py index 778ae0b3..092cc6c3 100644 --- a/tests/unit/test_database_and_etl.py +++ b/tests/unit/test_database_and_etl.py @@ -94,7 +94,7 @@ def test_tables_created(db_fixture): def test_ensembl_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path): """Test that ensembl etl methods work correctly.""" test_get_seqrepo.return_value = None - e = Ensembl(db_fixture.db, src_data_dir=etl_data_path) + e = Ensembl(db_fixture.db, data_path=etl_data_path) e._get_seq_id_aliases = _get_aliases # type: ignore ensembl_ids = e.perform_etl(use_existing=True) processed_ids += ensembl_ids @@ -105,7 +105,7 @@ def test_ensembl_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path) def test_hgnc_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path): """Test that hgnc etl methods work correctly.""" test_get_seqrepo.return_value = None - h = HGNC(db_fixture.db, src_data_dir=etl_data_path) + h = HGNC(db_fixture.db, data_path=etl_data_path) hgnc_ids = h.perform_etl(use_existing=True) processed_ids += hgnc_ids @@ -115,7 +115,7 @@ def test_hgnc_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path): def test_ncbi_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path): """Test that ncbi etl methods work correctly.""" test_get_seqrepo.return_value = None - n = NCBI(db_fixture.db, src_data_dir=etl_data_path) + n = NCBI(db_fixture.db, data_path=etl_data_path) n._get_seq_id_aliases = _get_aliases # type: ignore ncbi_ids = n.perform_etl(use_existing=True) processed_ids += ncbi_ids diff --git a/tests/unit/test_ensembl_source.py b/tests/unit/test_ensembl_source.py index 9b959a06..1cfe0547 100644 --- a/tests/unit/test_ensembl_source.py +++ b/tests/unit/test_ensembl_source.py @@ -309,7 +309,7 @@ def test_meta_info(ensembl): ) assert resp.source_meta_.version == "110" assert resp.source_meta_.data_url == { - "genome_annotations": "ftp://ftp.ensembl.org/pub/current_gff3/homo_sapiens/Homo_sapiens.GRCh38.110.gff3.gz" + "genome_annotations": "ftp://ftp.ensembl.org/pub/release-110/gff3/homo_sapiens/Homo_sapiens.GRCh38.110.gff3.gz" } assert resp.source_meta_.rdp_url is None assert resp.source_meta_.genome_assemblies == ["GRCh38"] diff --git a/tests/unit/test_hgnc_source.py b/tests/unit/test_hgnc_source.py index 10db9462..1f143139 100644 --- a/tests/unit/test_hgnc_source.py +++ b/tests/unit/test_hgnc_source.py @@ -818,7 +818,7 @@ def test_meta_info(hgnc): ) assert datetime.strptime(resp.source_meta_.version, "%Y%m%d") assert resp.source_meta_.data_url == { - "complete_set_archive": "ftp://ftp.ebi.ac.uk/pub/databases/genenames/hgnc/json/hgnc_complete_set.json" + "complete_set_archive": "ftp.ebi.ac.uk/pub/databases/genenames/hgnc/json/hgnc_complete_set.json" } assert resp.source_meta_.rdp_url is None assert resp.source_meta_.genome_assemblies == [] diff --git a/tests/unit/test_ncbi_source.py b/tests/unit/test_ncbi_source.py index c90c353d..4e384b04 100644 --- a/tests/unit/test_ncbi_source.py +++ b/tests/unit/test_ncbi_source.py @@ -575,9 +575,9 @@ def spg37(): def source_urls(): """Provide source data URLs fixture.""" return { - "info_file": "ftp.ncbi.nlm.nih.govgene/DATA/GENE_INFO/Mammalia/Homo_sapiens.gene_info.gz", - "history_file": "ftp.ncbi.nlm.nih.govgene/DATA/gene_history.gz", - "assembly_file": "ftp.ncbi.nlm.nih.govgenomes/refseq/vertebrate_mammalian/Homo_sapiens/latest_assembly_versions/", + "info_file": "ftp.ncbi.nlm.nih.gov/gene/DATA/GENE_INFO/Mammalia/Homo_sapiens.gene_info.gz", + "history_file": "ftp.ncbi.nlm.nih.gov/gene/DATA/gene_history.gz", + "assembly_file": "ftp.ncbi.nlm.nih.gov/genomes/refseq/vertebrate_mammalian/Homo_sapiens/latest_assembly_versions/", }