Skip to content

Commit

Permalink
Merge pull request #84 from cancervariants/staging
Browse files Browse the repository at this point in the history
Staging
  • Loading branch information
jsstevenson authored Nov 2, 2022
2 parents 94f0e23 + e7e6fc9 commit fe767bc
Show file tree
Hide file tree
Showing 29 changed files with 633 additions and 758 deletions.
78 changes: 50 additions & 28 deletions .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
@@ -1,37 +1,59 @@
name: github-actions
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
env:
AWS_ACCESS_KEY_ID: ${{ secrets.DUMMY_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DUMMY_AWS_SECRET_ACCESS_KEY }}
AWS_DEFAULT_REGION: us-east-2
AWS_DEFAULT_OUTPUT: text
DISEASE_NORM_DB_URL: http://localhost:8000
TEST: DISEASE_TEST
steps:
- uses: actions/checkout@v2
test:
runs-on: ubuntu-latest
env:
AWS_ACCESS_KEY_ID: ${{ secrets.DUMMY_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DUMMY_AWS_SECRET_ACCESS_KEY }}
AWS_DEFAULT_REGION: us-east-2
AWS_DEFAULT_OUTPUT: text
DISEASE_NORM_DB_URL: http://localhost:8000
TEST: DISEASE_TEST
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10']
steps:
- uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v1
with:
python-version: 3.8
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python3 -m pip install pipenv
pipenv install --dev
- name: Install dependencies
run: |
python3 -m pip install pipenv
pipenv install --dev
- name: Build local DynamoDB
run: |
chmod +x ./tests/unit/dynamodb_build.bash
./tests/unit/dynamodb_build.bash
- name: Build local DynamoDB
run: |
chmod +x ./tests/unit/dynamodb_build.bash
./tests/unit/dynamodb_build.bash
- name: Load and Test DynamoDB
run: |
pipenv run pytest tests/unit/test_database.py
- name: Load and Test DynamoDB
run: |
pipenv run pytest tests/unit/test_database.py
- run: pipenv run flake8
- run: pipenv run pytest tests/
- run: pipenv run pytest tests/
lint:
runs-on: ubuntu-latest
env:
AWS_ACCESS_KEY_ID: ${{ secrets.DUMMY_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DUMMY_AWS_SECRET_ACCESS_KEY }}
AWS_DEFAULT_REGION: us-east-2
AWS_DEFAULT_OUTPUT: text
steps:
- uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.10'

- name: Install dependencies
run: |
python3 -m pip install pipenv
pipenv install --dev
- name: check style
run: pipenv run flake8
4 changes: 3 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ name = "pypi"

[dev-packages]
disease-normalizer = {editable = true, path = "."}
click = "*"
bioversions = "*"
requests = "*"
owlready2 = "*"
rdflib = "*"
pytest = "*"
Expand All @@ -24,3 +25,4 @@ fastapi = ">=0.72.0"
uvicorn = "*"
boto3 = "*"
"ga4gh.vrsatile.pydantic" = "==0.0.11"
click = "*"
2 changes: 1 addition & 1 deletion disease/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .version import __version__ # noqa: F401


PROJECT_ROOT = Path(__file__).resolve().parents[0]
APP_ROOT = Path(__file__).resolve().parents[0]

# set up logging
logging.basicConfig(
Expand Down
146 changes: 95 additions & 51 deletions disease/cli.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
"""This module provides a CLI util to make updates to normalizer database."""
from timeit import default_timer as timer
from os import environ
from typing import Optional, List

import click
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key

from disease import SOURCES_LOWER_LOOKUP, logger
from disease import logger
from disease.schemas import SourceName
from disease.etl.merge import Merge
from disease.database import Database, confirm_aws_db_use, SKIP_AWS_DB_ENV_NAME, \
VALID_AWS_ENV_NAMES, AWS_ENV_VAR_NAME
from disease.etl.merge import Merge
from disease.etl.mondo import Mondo
from disease.etl import NCIt # noqa: F401
from disease.etl import DO # noqa: F401
from disease.etl import OncoTree # noqa: F401
from disease.etl import OMIM # noqa: F401
from disease.etl import Mondo, NCIt, DO, OncoTree, OMIM # noqa: F401


# Use to lookup class object from source name. Should be one key-value pair
Expand Down Expand Up @@ -50,11 +47,16 @@ class CLI:
@click.option(
'--update_merged',
is_flag=True,
help='Update concepts for normalize endpoint. Must select either '
'--update_all or include Mondo as a normalizer source argument.'
help='Update concepts for /normalize endpoint.'
)
@click.option(
'--from_local',
is_flag=True,
default=False,
help="Use most recent local source data instead of fetching latest versions."
)
def update_normalizer_db(normalizer, aws_instance, db_url, update_all,
update_merged):
update_merged, from_local):
"""Update selected source(s) in the Disease Normalizer database."""
# If SKIP_AWS_CONFIRMATION is accidentally set, we should verify that the
# aws instance should actually be used
Expand All @@ -69,86 +71,128 @@ def update_normalizer_db(normalizer, aws_instance, db_url, update_all,
else:
if db_url:
endpoint_url = db_url
elif 'DISEASE_NORM_DB_URL' in environ.keys():
endpoint_url = environ['DISEASE_NORM_DB_URL']
elif "DISEASE_NORM_DB_URL" in environ:
endpoint_url = environ["DISEASE_NORM_DB_URL"]
else:
endpoint_url = 'http://localhost:8000'
db: Database = Database(db_url=endpoint_url)
endpoint_url = "http://localhost:8000"
db = Database(db_url=endpoint_url)

if update_all:
normalizers = list(src for src in SOURCES_CLASS_LOOKUP)
CLI()._update_normalizers(normalizers, db, update_merged)
sources_to_update = list(src for src in SOURCES_CLASS_LOOKUP)
CLI()._update_sources(sources_to_update, db, update_merged, from_local)
elif not normalizer:
CLI()._help_msg("Must provide 1 or more source names, or use `--update_all` parameter") # noqa: E501
if update_merged:
CLI()._update_merged(db, [])
else:
CLI()._help_msg()
else:
normalizers = normalizer.lower().split()

if len(normalizers) == 0:
CLI()._help_msg("Must provide 1 or more source names, or use `--update_all` parameter") # noqa: E501

non_sources = set(normalizers) - {src for src
in SOURCES_LOWER_LOOKUP}

if len(non_sources) != 0:
raise Exception(f"Not valid source(s): {non_sources}. \n"
f"Legal sources are "
f"{list(SOURCES_LOWER_LOOKUP.values())}.")
sources_to_update = str(normalizer).lower().split()
if len(sources_to_update) == 0:
CLI()._help_msg()

if update_merged and 'mondo' not in normalizers:
CLI()._help_msg("Must include Mondo in sources to update for `--update_merged`") # noqa: E501
invalid_sources = set(sources_to_update) - {src for src
in SOURCES_CLASS_LOOKUP}
if len(invalid_sources) != 0:
raise Exception(f"Not valid sources: {invalid_sources}")

CLI()._update_normalizers(normalizers, db, update_merged)
CLI()._update_sources(sources_to_update, db, update_merged, from_local)

@staticmethod
def _help_msg(message):
def _help_msg(message: Optional[str] = None):
"""Display help message."""
ctx = click.get_current_context()
click.echo(message)
if message:
click.echo(message)
click.echo(ctx.get_help())
ctx.exit()

@staticmethod
def _update_normalizers(normalizers, db, update_merged):
def _update_sources(sources: List[str], db: Database, update_merged: bool,
from_local: bool = False):
"""Update selected normalizer sources."""
processed_ids = []
for n in normalizers:
msg = f"Deleting {n}..."
added_ids = []
for source in sources:
msg = f"Deleting {source}..."
click.echo(f"\n{msg}")
logger.info(msg)
start_delete = timer()
CLI()._delete_data(n, db)
CLI()._delete_data(source, db)
end_delete = timer()
delete_time = end_delete - start_delete
msg = f"Deleted {n} in {delete_time:.5f} seconds."
msg = f"Deleted {source} in {delete_time:.5f} seconds."
click.echo(f"{msg}\n")
logger.info(msg)

msg = f"Loading {n}..."
msg = f"Loading {source}..."
click.echo(msg)
logger.info(msg)
start_load = timer()
source = SOURCES_CLASS_LOOKUP[n](database=db)
source = SOURCES_CLASS_LOOKUP[source](database=db)
if isinstance(source, Mondo):
processed_ids = source.perform_etl()
added_ids = source.perform_etl(from_local)
else:
source.perform_etl()
end_load = timer()
load_time = end_load - start_load
msg = f"Loaded {n} in {load_time:.5f} seconds."
msg = f"Loaded {source} in {load_time:.5f} seconds."
click.echo(msg)
logger.info(msg)
msg = f"Total time for {n}: " \
msg = f"Total time for {source}: " \
f"{(delete_time + load_time):.5f} seconds."
click.echo(msg)
logger.info(msg)
if update_merged and processed_ids:
click.echo("Generating merged concepts...")
merge = Merge(database=db)
merge.create_merged_concepts(processed_ids)
click.echo("Merged concept generation complete.")
if update_merged:
CLI()._update_merged(db, added_ids)

def _update_merged(self, db: Database, added_ids: List[str]) -> None:
"""Update merged concepts and references.
:param db: Database client
:param added_ids: list of concept IDs to use for normalized groups.
Should consist solely of MONDO IDs. If empty, will be fetched.
"""
start_merge = timer()
if not added_ids:
CLI()._delete_merged_data(db)
added_ids = db.get_ids_for_merge()
merge = Merge(database=db)
click.echo("Constructing normalized records...")
merge.create_merged_concepts(added_ids)
end_merge = timer()
click.echo(f"Merged concept generation completed in"
f" {(end_merge - start_merge):.5f} seconds.")

@staticmethod
def _delete_merged_data(database: Database):
"""Delete data pertaining to merged records.
:param database: DynamoDB client
"""
click.echo("\nDeleting normalized records...")
start_delete = timer()
try:
while True:
with database.diseases.batch_writer(
overwrite_by_pkeys=["label_and_type", "concept_id"]) \
as batch:
response = database.diseases.query(
IndexName="type_index",
KeyConditionExpression=Key("item_type").eq("merger"),
)
records = response["Items"]
if not records:
break
for record in records:
batch.delete_item(Key={
"label_and_type": record["label_and_type"],
"concept_id": record["concept_id"]
})
except ClientError as e:
click.echo(e.response["Error"]["Message"])
end_delete = timer()
delete_time = end_delete - start_delete
click.echo(f"Deleted normalized records in {delete_time:.5f} seconds.")

@staticmethod
def _delete_data(source, database):
def _delete_data(source: str, database: Database):
# Delete source's metadata
try:
metadata = database.metadata.query(
Expand Down
26 changes: 26 additions & 0 deletions disease/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,32 @@ def get_records_by_type(self, query: str,
f"{e.response['Error']['Message']}")
return []

def get_ids_for_merge(self) -> List[str]:
"""Retrieve concept IDs for use in generating normalized records.
:return: List of concept IDs as strings.
"""
last_evaluated_key = None
concept_ids = []
params = {
"ProjectionExpression": "concept_id,item_type,src_name",
}
while True:
if last_evaluated_key:
response = self.diseases.scan(
ExclusiveStartKey=last_evaluated_key, **params
)
else:
response = self.diseases.scan(**params)
records = response["Items"]
for record in records:
if record["item_type"] == "identity" and record["src_name"] == "Mondo":
concept_id = record["concept_id"]
concept_ids.append(concept_id)
last_evaluated_key = response.get("LastEvaluatedKey")
if not last_evaluated_key:
break
return concept_ids

def add_record(self, record: Dict, record_type: str = "identity"):
"""Add new record to database.
Expand Down
Loading

0 comments on commit fe767bc

Please sign in to comment.