Skip to content

Commit

Permalink
add merge
Browse files Browse the repository at this point in the history
  • Loading branch information
jsstevenson committed Jan 3, 2024
1 parent f022dd6 commit cf27bb9
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
44 changes: 36 additions & 8 deletions src/gene/etl/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from timeit import default_timer as timer
from typing import Dict, Optional, Set, Tuple

import click
from tqdm import tqdm

from gene.database import AbstractDatabase
from gene.database.database import DatabaseWriteError
from gene.schemas import GeneTypeFieldName, NamespacePrefix, RecordType, SourcePriority
Expand All @@ -13,36 +16,55 @@
class Merge:
"""Handles record merging."""

def __init__(self, database: AbstractDatabase) -> None:
def __init__(self, database: AbstractDatabase, silent: bool = True) -> None:
"""Initialize Merge instance.
:param database: db instance to use for record retrieval and creation.
:param silent: if True, don't print ETL result to console
"""
self._database = database
self._groups = {} # dict keying concept IDs to group Sets
self._silent = silent

def create_merged_concepts(self, record_ids: Set[str]) -> None:
"""Create concept groups, generate merged concept records, and update database.
:param record_ids: concept identifiers from which groups should be generated.
Should *not* include any records from excluded sources.
"""
_logger.info("Generating record ID sets...")
start = timer()
for record_id in record_ids:
msg = "Generating record ID sets..."
if not self._silent:
click.echo(msg)
_logger.info(msg)

for record_id in tqdm(
record_ids, total=len(record_ids), ncols=80, disable=self._silent
):
new_group = self._create_record_id_set(record_id)
if new_group:
for concept_id in new_group:
self._groups[concept_id] = new_group
end = timer()
_logger.debug(f"Built record ID sets in {end - start} seconds")
msg = f"Built record ID sets in {end - start} seconds"
if not self._silent:
click.echo(msg)
_logger.info(msg)

self._groups = {k: v for k, v in self._groups.items() if len(v) > 1}

_logger.info("Creating merged records and updating database...")
msg = "Creating merged records and updating database..."
if not self._silent:
click.echo(msg)
_logger.info(msg)
uploaded_ids = set()
start = timer()
for record_id, group in self._groups.items():
for record_id, group in tqdm(
self._groups.items(),
total=len(self._groups),
ncols=80,
disable=self._silent,
):
if record_id in uploaded_ids:
continue
merged_record = self._generate_merged_record(group)
Expand All @@ -65,9 +87,15 @@ def create_merged_concepts(self, record_ids: Set[str]) -> None:
_logger.error(str(dw))
uploaded_ids |= group
self._database.complete_write_transaction()
_logger.info("Merged concept generation successful.")
msg = "Merged concept generation successful."
if not self._silent:
click.echo(msg)
_logger.info(msg)
end = timer()
_logger.debug(f"Generated and added concepts in {end - start} seconds")
msg = f"Generated and added concepts in {end - start} seconds"
if not self._silent:
click.echo(msg)
_logger.debug(msg)

def _create_record_id_set(
self, record_id: str, observed_id_set: Optional[Set] = None
Expand Down
2 changes: 1 addition & 1 deletion src/gene/etl/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def update_normalized(
_logger.error(msg)
click.get_current_context().exit()

merge = Merge(database=db)
merge = Merge(database=db, silent=silent)
if not silent:
click.echo("Constructing normalized records...")
merge.create_merged_concepts(processed_ids)
Expand Down

0 comments on commit cf27bb9

Please sign in to comment.