diff --git a/Pipfile b/Pipfile index 5ed5972f..9ea28096 100644 --- a/Pipfile +++ b/Pipfile @@ -15,6 +15,7 @@ boto3 = "*" gene = {editable = true, path = "."} gffutils = "*" "biocommons.seqrepo" = "*" +wags-tails = ">=0.1.1" psycopg = {version = "*", extras=["binary"]} pytest = "*" pre-commit = "*" diff --git a/pyproject.toml b/pyproject.toml index 8f54c725..41c8f7fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,13 +36,9 @@ dynamic = ["version"] [project.optional-dependencies] pg = ["psycopg[binary]"] - -etl = ["gffutils", "biocommons.seqrepo"] - +etl = ["gffutils", "biocommons.seqrepo", "wags-tails>=0.1.1"] test = ["pytest>=6.0", "pytest-cov", "mock", "httpx"] - dev = ["pre-commit", "ruff>=0.1.2"] - docs = [ "sphinx==6.1.3", "sphinx-autodoc-typehints==1.22.0", diff --git a/src/gene/cli.py b/src/gene/cli.py index 30beb762..f7eef4f5 100644 --- a/src/gene/cli.py +++ b/src/gene/cli.py @@ -185,7 +185,7 @@ def _load_source( click.get_current_context().exit() SourceClass = eval(n.value) # noqa: N806 - source = SourceClass(database=db) + source = SourceClass(database=db, silent=False) try: processed_ids += source.perform_etl(use_existing) except GeneNormalizerEtlError as e: diff --git a/src/gene/etl/base.py b/src/gene/etl/base.py index f46f0044..f06c89a0 100644 --- a/src/gene/etl/base.py +++ b/src/gene/etl/base.py @@ -1,19 +1,16 @@ """A base class for extraction, transformation, and loading of data.""" -import datetime -import gzip import logging import re -import shutil from abc import ABC, abstractmethod -from ftplib import FTP -from os import environ, remove +from os import environ from pathlib import Path -from typing import Callable, Dict, List, Optional +from typing import Dict, List, Optional, Union +import click import pydantic from biocommons.seqrepo import SeqRepo -from dateutil import parser from gffutils.feature import Feature +from wags_tails import EnsemblData, HgncData, NcbiGeneData from gene.database import AbstractDatabase from gene.schemas import ITEM_TYPES, Gene, GeneSequenceLocation, MatchType, SourceName @@ -28,50 +25,75 @@ ) +DATA_DISPATCH = { + SourceName.HGNC: HgncData, + SourceName.ENSEMBL: EnsemblData, + SourceName.NCBI: NcbiGeneData, +} + + class Base(ABC): """The ETL base class.""" def __init__( self, database: AbstractDatabase, - host: str, - data_dir: str, - src_data_dir: Path, seqrepo_dir: Path = SEQREPO_ROOT_DIR, + data_path: Optional[Path] = None, + silent: bool = True, ) -> None: """Instantiate Base class. :param database: database instance - :param host: Hostname of FTP site - :param data_dir: Data directory of FTP site to look at - :param src_data_dir: Data directory for source :param seqrepo_dir: Path to seqrepo directory + :param data_path: path to app data directory + :param silent: if True, don't print ETL result to console """ - self.src_data_dir = src_data_dir - self.src_data_dir.mkdir(exist_ok=True, parents=True) + self._silent = silent self._src_name = SourceName(self.__class__.__name__) + self._data_source = self._get_data_handler(data_path) self._database = database - self._host = host - self._data_dir = data_dir - self._processed_ids = list() self.seqrepo = self.get_seqrepo(seqrepo_dir) + self._processed_ids = list() + + def _get_data_handler( + self, data_path: Optional[Path] = None + ) -> Union[HgncData, EnsemblData, NcbiGeneData]: + """Construct data handler instance for source. Overwrite for edge-case sources. + + :param data_path: location of data storage + :return: instance of wags_tails.DataSource to manage source file(s) + """ + return DATA_DISPATCH[self._src_name](data_dir=data_path, silent=self._silent) def perform_etl(self, use_existing: bool = False) -> List[str]: - """Extract, Transform, and Load data into database. + """Public-facing method to begin ETL procedures on given data. + Returned concept IDs can be passed to Merge method for computing + merged concepts. - :param use_existing: if true, use most recent available local files - :return: Concept IDs of concepts successfully loaded + :param use_existing: if True, don't try to retrieve latest source data + :return: list of concept IDs which were successfully processed and + uploaded. """ self._extract_data(use_existing) + if not self._silent: + click.echo("Transforming and loading data to DB...") self._add_meta() self._transform_data() self._database.complete_write_transaction() return self._processed_ids - @abstractmethod - def _extract_data(self, *args, **kwargs) -> None: # noqa: ANN002 - """Extract data from FTP site or local data directory.""" - raise NotImplementedError + def _extract_data(self, use_existing: bool) -> None: + """Acquire source data. + + This method is responsible for initializing an instance of a data handler and, + in most cases, setting ``self._data_file`` and ``self._version``. + + :param bool use_existing: if True, don't try to fetch latest source data + """ + self._data_file, self._version = self._data_source.get_latest( + from_local=use_existing + ) @abstractmethod def _transform_data(self) -> None: @@ -83,40 +105,6 @@ def _add_meta(self) -> None: """Add source meta to database source info.""" raise NotImplementedError - def _acquire_data_file( - self, - file_glob: str, - use_existing: bool, - check_latest_callback: Callable[[Path], bool], - download_callback: Callable[[], Path], - ) -> Path: - """Acquire data file. - - :param file_glob: pattern to match relevant files against - :param use_existing: don't fetch from remote origin if local versions are - available - :param check_latest_callback: function to check whether local data is up-to-date - :param download_callback: function to download from remote - :return: path to acquired data file - :raise FileNotFoundError: if unable to find any files matching the pattern - """ - matching_files = list(self.src_data_dir.glob(file_glob)) - if not matching_files: - if use_existing: - raise FileNotFoundError( - f"No local files matching pattern {self.src_data_dir.absolute().as_uri() + file_glob}" - ) - else: - return download_callback() - else: - latest_file = list(sorted(matching_files))[-1] - if use_existing: - return latest_file - if not check_latest_callback(latest_file): - return download_callback() - else: - return latest_file - def _load_gene(self, gene: Dict) -> None: """Load a gene record into database. This method takes responsibility for: * validating structure correctness @@ -147,49 +135,6 @@ def _load_gene(self, gene: Dict) -> None: self._database.add_record(gene, self._src_name) self._processed_ids.append(concept_id) - def _ftp_download( - self, host: str, data_dir: str, fn: str, source_dir: Path, data_fn: str - ) -> Optional[str]: - """Download data file from FTP site. - - :param host: Source's FTP host name - :param data_dir: Data directory located on FTP site - :param fn: Filename for downloaded file - :param source_dir: Source's data directory - :param data_fn: Filename on FTP site to be downloaded - :return: Date file was last updated - """ - with FTP(host) as ftp: - ftp.login() - timestamp = ftp.voidcmd(f"MDTM {data_dir}{data_fn}")[4:].strip() - date = str(parser.parse(timestamp)).split()[0] - version = datetime.datetime.strptime(date, "%Y-%m-%d").strftime("%Y%m%d") - ftp.cwd(data_dir) - self._ftp_download_file(ftp, data_fn, source_dir, fn) - return version - - def _ftp_download_file( - self, ftp: FTP, data_fn: str, source_dir: Path, fn: str - ) -> None: - """Download data file from FTP - - :param ftp: FTP instance - :param data_fn: Filename on FTP site to be downloaded - :param source_dir: Source's data directory - :param fn: Filename for downloaded file - """ - if data_fn.endswith(".gz"): - filepath = source_dir / f"{fn}.gz" - else: - filepath = source_dir / fn - with open(filepath, "wb") as fp: - ftp.retrbinary(f"RETR {data_fn}", fp.write) - if data_fn.endswith(".gz"): - with gzip.open(filepath, "rb") as f_in: - with open(source_dir / fn, "wb") as f_out: - shutil.copyfileobj(f_in, f_out) - remove(filepath) - def get_seqrepo(self, seqrepo_dir: Path) -> SeqRepo: """Return SeqRepo instance if seqrepo_dir exists. diff --git a/src/gene/etl/ensembl.py b/src/gene/etl/ensembl.py index f75ed034..4a52975a 100644 --- a/src/gene/etl/ensembl.py +++ b/src/gene/etl/ensembl.py @@ -1,21 +1,14 @@ """Defines the Ensembl ETL methods.""" import logging import re -from ftplib import FTP -from json import dumps -from pathlib import Path from typing import Dict import gffutils -import requests from gffutils.feature import Feature -from gene.database import AbstractDatabase -from gene.etl.base import APP_ROOT, Base +from gene.etl.base import Base from gene.etl.exceptions import ( - GeneFileVersionError, GeneNormalizerEtlError, - GeneSourceFetchError, ) from gene.schemas import NamespacePrefix, SourceMeta, SourceName, Strand @@ -26,95 +19,18 @@ class Ensembl(Base): """ETL the Ensembl source into the normalized database.""" - def __init__( - self, - database: AbstractDatabase, - host: str = "ftp.ensembl.org", - data_dir: str = "pub/current_gff3/homo_sapiens/", - src_data_dir: Path = APP_ROOT / "data" / "ensembl", - ) -> None: - """Initialize Ensembl ETL class. - - :param database: gene database for adding new data - :param host: FTP host name - :param data_dir: FTP data directory to use - :param src_data_dir: Data directory for Ensembl - """ - super().__init__(database, host, data_dir, src_data_dir) - self._data_file_pattern = re.compile(r"ensembl_(GRCh\d+)_(\d+)\.gff3") - self._version = None - self._data_url = {} - self._assembly = None - - def _is_up_to_date(self, data_file: Path) -> bool: - """Verify whether local data is up-to-date with latest available remote file. - - :param data_file: path to latest local file - :return: True if data is up-to-date - :raise GeneFileVersionError: if unable to parse version number from local file - :raise GeneSourceFetchError: if unable to get latest version from remote source - """ - local_match = re.match(self._data_file_pattern, data_file.name) - try: - version = int(local_match.groups()[1]) - except (AttributeError, IndexError, ValueError): - raise GeneFileVersionError( - f"Unable to parse version number from local file: {data_file.absolute()}" - ) - - ensembl_api = ( - "https://rest.ensembl.org/info/data/?content-type=application/json" - ) - response = requests.get(ensembl_api) - if response.status_code != 200: - raise GeneSourceFetchError( - f"Unable to get response from Ensembl version API endpoint: {ensembl_api}" - ) - releases = response.json().get("releases") - if not releases: - raise GeneSourceFetchError( - f"Malformed response from Ensembl version API endpoint: {dumps(response.json())}" - ) - releases.sort() - return version == releases[-1] - - def _download_data(self) -> Path: - """Download latest Ensembl GFF3 data file. - - :return: path to acquired file - :raise GeneSourceFetchError: if unable to find file matching expected pattern - """ - logger.info("Downloading latest Ensembl data file...") - pattern = r"Homo_sapiens\.(?PGRCh\d+)\.(?P\d+)\.gff3\.gz" - with FTP(self._host) as ftp: - ftp.login() - ftp.cwd(self._data_dir) - files = ftp.nlst() - for f in files: - match = re.match(pattern, f) - if match: - resp = match.groupdict() - assembly = resp["assembly"] - version = resp["version"] - new_fn = f"ensembl_{assembly}_{version}.gff3" - self._ftp_download_file(ftp, f, self.src_data_dir, new_fn) - logger.info( - f"Successfully downloaded Ensembl {version} data to {self.src_data_dir / new_fn}." - ) - return self.src_data_dir / new_fn - raise GeneSourceFetchError( - "Unable to find file matching expected Ensembl pattern via FTP" - ) - def _extract_data(self, use_existing: bool) -> None: - """Acquire Ensembl data file and get metadata. + """Acquire source data. + + This method is responsible for initializing an instance of a data handler and, + in most cases, setting ``self._data_file`` and ``self._version``. - :param use_existing: if True, use latest available local file + :param use_existing: if True, don't try to fetch latest source data """ - self._data_src = self._acquire_data_file( - "ensembl_*.gff3", use_existing, self._is_up_to_date, self._download_data + self._data_file, raw_version = self._data_source.get_latest( + from_local=use_existing ) - match = re.match(self._data_file_pattern, self._data_src.name) + match = re.match(r"(GRCh\d+)_(\d+)", raw_version) self._assembly = match.groups()[0] self._version = match.groups()[1] @@ -122,7 +38,7 @@ def _transform_data(self) -> None: """Transform the Ensembl source.""" logger.info("Transforming Ensembl...") db = gffutils.create_db( - str(self._data_src), + str(self._data_file), dbfn=":memory:", force=True, merge_strategy="create_unique", @@ -254,7 +170,7 @@ def _add_meta(self) -> None: :raise GeneNormalizerEtlError: if requisite metadata is unset """ - if not all([self._version, self._host, self._data_dir, self._assembly]): + if not self._version or not self._assembly: raise GeneNormalizerEtlError( "Source metadata unavailable -- was data properly acquired before attempting to load DB?" ) @@ -264,7 +180,7 @@ def _add_meta(self) -> None: "/legal/disclaimer.html", version=self._version, data_url={ - "genome_annotations": f"ftp://{self._host}/{self._data_dir}Homo_sapiens.{self._assembly}.{self._version}.gff3.gz" + "genome_annotations": f"ftp://ftp.ensembl.org/pub/release-{self._version}/gff3/homo_sapiens/Homo_sapiens.{self._assembly}.{self._version}.gff3.gz" }, rdp_url=None, data_license_attributes={ diff --git a/src/gene/etl/hgnc.py b/src/gene/etl/hgnc.py index bfd07a7e..9e4c048e 100644 --- a/src/gene/etl/hgnc.py +++ b/src/gene/etl/hgnc.py @@ -2,20 +2,11 @@ import json import logging import re -import shutil -from datetime import datetime -from ftplib import FTP -from pathlib import Path from typing import Dict -from dateutil import parser - -from gene.database import AbstractDatabase -from gene.etl.base import APP_ROOT, Base +from gene.etl.base import Base from gene.etl.exceptions import ( - GeneFileVersionError, GeneNormalizerEtlError, - GeneSourceFetchError, ) from gene.schemas import ( PREFIX_LOOKUP, @@ -34,85 +25,10 @@ class HGNC(Base): """ETL the HGNC source into the normalized database.""" - def __init__( - self, - database: AbstractDatabase, - host: str = "ftp.ebi.ac.uk", - data_dir: str = "pub/databases/genenames/hgnc/json/", - src_data_dir: Path = APP_ROOT / "data" / "hgnc", - fn: str = "hgnc_complete_set.json", - ) -> None: - """Initialize HGNC ETL class. - - :param database: gene database for adding new data - :param host: FTP host name - :param data_dir: FTP data directory to use - :param src_data_dir: Data directory for HGNC - :param fn: Data file to download - """ - super().__init__(database, host, data_dir, src_data_dir) - self._data_file_pattern = re.compile(r"hgnc_(\d+)\.json") - self._data_url = f"ftp://{host}/{data_dir}{fn}" - self._fn = fn - self._version = None - - def _is_up_to_date(self, data_file: Path) -> bool: - """Verify whether local data is up-to-date with latest available remote file. - - :param data_file: path to latest local file - :return: True if data is up-to-date - :raise GeneFileVersionError: if unable to get version from local HGNC file - :raise GeneSourceFetchError: if unable to get latest version available from HGNC - """ - local_match = re.match(self._data_file_pattern, data_file.name) - if not local_match: - raise GeneFileVersionError( - f"Unable to parse version number from local HGNC file: {data_file.absolute()}" - ) - version = local_match.groups()[0] - with FTP(self._host) as ftp: - ftp.login() - timestamp = ftp.voidcmd(f"MDTM {self._data_dir}{self._fn}")[4:].strip() - date = str(parser.parse(timestamp)).split()[0] - try: - remote_version = datetime.strptime(date, "%Y-%m-%d").strftime("%Y%m%d") - except ValueError: - raise GeneSourceFetchError( - f"Unable to parse version number from remote HGNC timestamp: {date}" - ) - return version == remote_version - - def _download_data(self) -> Path: - """Download HGNC JSON data file. - - :return: path to newly-downloaded file - """ - logger.info("Downloading HGNC data file...") - - tmp_fn = "hgnc_version.json" - version = self._ftp_download( - self._host, self._data_dir, tmp_fn, self.src_data_dir, self._fn - ) - final_location = f"{self.src_data_dir}/hgnc_{version}.json" - shutil.move(f"{self.src_data_dir}/{tmp_fn}", final_location) - logger.info(f"Successfully downloaded HGNC data file to {final_location}.") - return Path(final_location) - - def _extract_data(self, use_existing: bool) -> None: - """Acquire HGNC data file and get metadata. - - :param use_existing: if True, use latest available local file - """ - self._data_file = self._acquire_data_file( - "hgnc_*.json", use_existing, self._is_up_to_date, self._download_data - ) - match = self._data_file_pattern.match(self._data_file.name) - self._version = match.groups()[0] - def _transform_data(self) -> None: """Transform the HGNC source.""" logger.info("Transforming HGNC...") - with open(self._data_file, "r") as f: + with open(self._data_file, "r") as f: # type: ignore data = json.load(f) records = data["response"]["docs"] @@ -332,7 +248,7 @@ def _add_meta(self) -> None: :raise GeneNormalizerEtlError: if requisite metadata is unset """ - if not all([self._version, self._data_url]): + if not self._version: raise GeneNormalizerEtlError( "Source metadata unavailable -- was data properly acquired before attempting to load DB?" ) @@ -340,7 +256,9 @@ def _add_meta(self) -> None: data_license="CC0", data_license_url="https://www.genenames.org/about/license/", version=self._version, - data_url={"complete_set_archive": self._data_url}, + data_url={ + "complete_set_archive": "ftp.ebi.ac.uk/pub/databases/genenames/hgnc/json/hgnc_complete_set.json" + }, rdp_url=None, data_license_attributes={ "non_commercial": False, diff --git a/src/gene/etl/ncbi.py b/src/gene/etl/ncbi.py index 45d3c308..1604c7e6 100644 --- a/src/gene/etl/ncbi.py +++ b/src/gene/etl/ncbi.py @@ -2,19 +2,17 @@ import csv import logging import re -import shutil -from ftplib import FTP from pathlib import Path from typing import Dict, List, Optional import gffutils +from wags_tails import NcbiGenomeData +from wags_tails.ncbi import NcbiGenePaths from gene.database import AbstractDatabase -from gene.etl.base import APP_ROOT, Base +from gene.etl.base import SEQREPO_ROOT_DIR, Base from gene.etl.exceptions import ( - GeneFileVersionError, GeneNormalizerEtlError, - GeneSourceFetchError, ) from gene.schemas import ( PREFIX_LOOKUP, @@ -36,207 +34,39 @@ class NCBI(Base): def __init__( self, database: AbstractDatabase, - host: str = "ftp.ncbi.nlm.nih.gov", - data_dir: str = "gene/DATA/", - src_data_dir: Path = APP_ROOT / "data" / "ncbi", + seqrepo_dir: Path = SEQREPO_ROOT_DIR, + data_path: Optional[Path] = None, + silent: bool = True, ) -> None: - """Construct the NCBI ETL instance. + """Instantiate Base class. - :param database: gene database for adding new data - :param host: FTP host name - :param data_dir: FTP data directory to use - :param src_data_dir: Data directory for NCBI + :param database: database instance + :param seqrepo_dir: Path to seqrepo directory + :param data_path: path to app data directory + :param silent: if True, don't print ETL result to console """ - super().__init__(database, host, data_dir, src_data_dir) - self._ftp_hostname = host - self._assembly = None - self._gene_url = None - self._history_url = None - self._assembly_url = None - - @staticmethod - def _navigate_ftp_genome_assembly(ftp: FTP) -> None: - """Navigate NCBI FTP filesystem to directory containing latest assembly annotation data. - - :param ftp: logged-in FTP instance - :return: None, but modifies FTP connection in-place - :raise SourceFetchError: if navigation fails (e.g. because expected directories don't exist) - """ - major_annotation_pattern = r"GCF_\d+\.\d+_GRCh\d+.+" - ftp.cwd( - "genomes/refseq/vertebrate_mammalian/Homo_sapiens/" - "latest_assembly_versions" - ) - try: - grch_dirs = [d for d in ftp.nlst() if re.match(major_annotation_pattern, d)] - grch_dir = grch_dirs[0] - except (IndexError, AttributeError): - raise GeneSourceFetchError( - "No directories matching expected latest assembly version pattern" - ) - ftp.cwd(grch_dir) - - def _gff_is_up_to_date(self, gff: Path) -> bool: - """Verify whether local GRCh38 annotation file is up-to-date. Currently, their - API endpoints require auth keys (adding complexity for new users) and may or may not - give us exactly what we want, so we ascertain version availability by manually - checking what's listed in the FTP filesystem. - - :param gff: path to local GFF file (file should be saved like `ncbi_GRCh38.p14.gff`) - :return: True if file version matches most recent known remote version - :raise GeneFileVersionError: if unable to parse version from local or remote file - :raise GeneSourceFetchError: if unable to get version from NCBI - """ - try: - version = re.match(r"ncbi_(.+)", gff.stem).groups()[0] - except (IndexError, AttributeError): - raise GeneFileVersionError( - f"Unable to parse version from NCBI GRCh38 annotation file: {gff.absolute()}" - ) - - genomic_gff_pattern = r"GCF_\d+\.\d+_(GRCh\d+\.\w\d+)_genomic.gff.gz" - with FTP(self._host) as ftp: - ftp.login() - self._navigate_ftp_genome_assembly(ftp) - for file in ftp.nlst(): - match = re.match(genomic_gff_pattern, file) - if match and match.groups(): - latest_version = match.groups()[0] - return version == latest_version - raise GeneSourceFetchError( - "Unable to identify latest available NCBI GRCh38 annotation version" - ) - - def _download_gff(self) -> Path: - """Download NCBI GRCh38 annotation file. - - :return: Path to downloaded file - :raise SourceFetchError: if unable to identify latest available file - """ - logger.info("Downloading NCBI genome annotation file...") - genomic_gff_pattern = r"GCF_\d+\.\d+_(GRCh\d+\.\w\d+)_genomic.gff.gz" - with FTP(self._host) as ftp: - ftp.login() - self._navigate_ftp_genome_assembly(ftp) - genomic_filename = None - version = None - for f in ftp.nlst(): - gff_match = re.match(genomic_gff_pattern, f) - if gff_match and gff_match.groups(): - genomic_filename = f - version = gff_match.groups()[0] - if not version or not genomic_filename: - raise GeneSourceFetchError( - "Unable to find latest available NCBI GRCh38 annotation" - ) - new_filename = f"ncbi_{version}.gff" - self._ftp_download_file( - ftp, genomic_filename, self.src_data_dir, new_filename - ) - logger.info( - f"Downloaded NCBI genome annotation file to {self.src_data_dir / new_filename}" - ) - return self.src_data_dir / new_filename - - def _history_file_is_up_to_date(self, history_file: Path) -> bool: - """Verify whether local NCBI name history file is up-to-date. - - :param history_file: path to local history file (file should be saved like `ncbi_history_20230315.tsv`) - :return: True if file version matches most recent expected remote version - :raise GeneFileVersionError: if parsing version from local file fails - """ - try: - version = re.match(r"ncbi_history_(\d+).tsv", history_file.name).groups()[0] - except (IndexError, AttributeError): - raise GeneFileVersionError( - f"Unable to parse version from NCBI history file: {history_file.absolute()}" - ) - with FTP(self._host) as ftp: - ftp.login() - ftp.cwd("gene/DATA/") - file_changed_date = ftp.sendcmd("MDTM gene_history.gz")[4:12] - return version == file_changed_date - - def _download_history_file(self) -> Path: - """Download NCBI gene name history file - - :return: Path to downloaded file - """ - logger.info("Downloading NCBI gene_history...") - tmp_fn = "ncbi_history_tmp.tsv" - data_fn = "gene_history.gz" - version = self._ftp_download( - self._host, self._data_dir, tmp_fn, self.src_data_dir, data_fn - ) - final_location = f"{self.src_data_dir}/ncbi_history_{version}.tsv" - shutil.move(f"{self.src_data_dir}/{tmp_fn}", final_location) - logger.info(f"Successfully downloaded NCBI gene_history to {final_location}.") - return Path(final_location) - - def _gene_file_is_up_to_date(self, gene_file: Path) -> bool: - """Verify whether local NCBI gene info file is up-to-date. - - :param gene_file: path to local NCBI info file (file should be saved like `ncbi_info_20230315.tsv`) - :return: True if file version matches most recent known remote version - :raise GeneFileVersionError: if parsing version from local file fails - """ - try: - version = re.match(r"ncbi_info_(\d+).tsv", gene_file.name).groups()[0] - except (IndexError, AttributeError): - raise GeneFileVersionError( - f"Unable to parse version from NCBI gene file: {gene_file.absolute()}" - ) - with FTP(self._host) as ftp: - ftp.login() - ftp.cwd("gene/DATA/GENE_INFO/Mammalia/") - file_changed_date = ftp.sendcmd("MDTM Homo_sapiens.gene_info.gz")[4:12] - return version == file_changed_date - - def _download_gene_file(self) -> Path: - """Download NCBI gene info file - - :return: Path to downloaded file - """ - data_dir = f"{self._data_dir}GENE_INFO/Mammalia/" - tmp_fn = "ncbi_info_tmp.tsv" - data_fn = "Homo_sapiens.gene_info.gz" - logger.info("Downloading NCBI gene_info....") - version = self._ftp_download( - self._host, data_dir, tmp_fn, self.src_data_dir, data_fn - ) - final_location = f"{self.src_data_dir}/ncbi_info_{version}.tsv" - shutil.move(f"{self.src_data_dir}/{tmp_fn}", final_location) - logger.info(f"Successfully downloaded NCBI gene_info to {final_location}.") - return Path(final_location) + super().__init__(database, seqrepo_dir, data_path, silent) + self._genome_data_handler = NcbiGenomeData(data_path, silent) def _extract_data(self, use_existing: bool) -> None: """Acquire NCBI data file and get metadata. :param use_existing: if True, use latest available local file """ - self._gff_src = self._acquire_data_file( - "ncbi_GRCh*.gff", use_existing, self._gff_is_up_to_date, self._download_gff - ) - self._info_src = self._acquire_data_file( - "ncbi_info_*.tsv", - use_existing, - self._gene_file_is_up_to_date, - self._download_gene_file, + self._gff_src, self._assembly = self._genome_data_handler.get_latest( + from_local=use_existing ) - self._version = self._info_src.stem.split("_")[-1] - self._history_src = self._acquire_data_file( - f"ncbi_history_{self._version}.tsv", - use_existing, - self._history_file_is_up_to_date, - self._download_history_file, - ) - - self._assembly = self._gff_src.stem.split("_")[-1] + gene_paths: NcbiGenePaths + gene_paths, self._version = self._data_source.get_latest( + from_local=use_existing + ) # type: ignore + self._info_src = gene_paths.gene_info + self._history_src = gene_paths.gene_history self._gene_url = ( - f"{self._host}gene/DATA/GENE_INFO/Mammalia/Homo_sapiens.gene_info.gz" + "ftp.ncbi.nlm.nih.govgene/DATA/GENE_INFO/Mammalia/Homo_sapiens.gene_info.gz" ) - self._history_url = f"{self._host}gene/DATA/gene_history.gz" - self._assembly_url = f"{self._host}genomes/refseq/vertebrate_mammalian/Homo_sapiens/latest_assembly_versions/" + self._history_url = "ftp.ncbi.nlm.nih.govgene/DATA/gene_history.gz" + self._assembly_url = "ftp.ncbi.nlm.nih.govgenomes/refseq/vertebrate_mammalian/Homo_sapiens/latest_assembly_versions/" def _get_prev_symbols(self) -> Dict[str, str]: """Store a gene's symbol history. diff --git a/tests/unit/data/etl_data/ensembl_GRCh38_110.gff3 b/tests/unit/data/etl_data/ensembl_GRCh38_110.gff similarity index 100% rename from tests/unit/data/etl_data/ensembl_GRCh38_110.gff3 rename to tests/unit/data/etl_data/ensembl_GRCh38_110.gff diff --git a/tests/unit/test_database_and_etl.py b/tests/unit/test_database_and_etl.py index 778ae0b3..092cc6c3 100644 --- a/tests/unit/test_database_and_etl.py +++ b/tests/unit/test_database_and_etl.py @@ -94,7 +94,7 @@ def test_tables_created(db_fixture): def test_ensembl_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path): """Test that ensembl etl methods work correctly.""" test_get_seqrepo.return_value = None - e = Ensembl(db_fixture.db, src_data_dir=etl_data_path) + e = Ensembl(db_fixture.db, data_path=etl_data_path) e._get_seq_id_aliases = _get_aliases # type: ignore ensembl_ids = e.perform_etl(use_existing=True) processed_ids += ensembl_ids @@ -105,7 +105,7 @@ def test_ensembl_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path) def test_hgnc_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path): """Test that hgnc etl methods work correctly.""" test_get_seqrepo.return_value = None - h = HGNC(db_fixture.db, src_data_dir=etl_data_path) + h = HGNC(db_fixture.db, data_path=etl_data_path) hgnc_ids = h.perform_etl(use_existing=True) processed_ids += hgnc_ids @@ -115,7 +115,7 @@ def test_hgnc_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path): def test_ncbi_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path): """Test that ncbi etl methods work correctly.""" test_get_seqrepo.return_value = None - n = NCBI(db_fixture.db, src_data_dir=etl_data_path) + n = NCBI(db_fixture.db, data_path=etl_data_path) n._get_seq_id_aliases = _get_aliases # type: ignore ncbi_ids = n.perform_etl(use_existing=True) processed_ids += ncbi_ids diff --git a/tests/unit/test_ensembl_source.py b/tests/unit/test_ensembl_source.py index 9b959a06..1cfe0547 100644 --- a/tests/unit/test_ensembl_source.py +++ b/tests/unit/test_ensembl_source.py @@ -309,7 +309,7 @@ def test_meta_info(ensembl): ) assert resp.source_meta_.version == "110" assert resp.source_meta_.data_url == { - "genome_annotations": "ftp://ftp.ensembl.org/pub/current_gff3/homo_sapiens/Homo_sapiens.GRCh38.110.gff3.gz" + "genome_annotations": "ftp://ftp.ensembl.org/pub/release-110/gff3/homo_sapiens/Homo_sapiens.GRCh38.110.gff3.gz" } assert resp.source_meta_.rdp_url is None assert resp.source_meta_.genome_assemblies == ["GRCh38"] diff --git a/tests/unit/test_hgnc_source.py b/tests/unit/test_hgnc_source.py index 10db9462..1f143139 100644 --- a/tests/unit/test_hgnc_source.py +++ b/tests/unit/test_hgnc_source.py @@ -818,7 +818,7 @@ def test_meta_info(hgnc): ) assert datetime.strptime(resp.source_meta_.version, "%Y%m%d") assert resp.source_meta_.data_url == { - "complete_set_archive": "ftp://ftp.ebi.ac.uk/pub/databases/genenames/hgnc/json/hgnc_complete_set.json" + "complete_set_archive": "ftp.ebi.ac.uk/pub/databases/genenames/hgnc/json/hgnc_complete_set.json" } assert resp.source_meta_.rdp_url is None assert resp.source_meta_.genome_assemblies == []