diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 2487ca94c575..55e9d4261ca8 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1722,13 +1722,17 @@ def fail(): tool=self.tool, stdout=job.stdout, stderr=job.stderr) collected_bytes = 0 + quota_source_info = None # Once datasets are collected, set the total dataset size (includes extra files) for dataset_assoc in job.output_datasets: if not dataset_assoc.dataset.dataset.purged: + # assume all datasets in a job get written to the same objectstore + quota_source_info = dataset_assoc.dataset.dataset.quota_source_info collected_bytes += dataset_assoc.dataset.set_total_size() - if job.user: - job.user.adjust_total_disk_usage(collected_bytes) + user = job.user + if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use: + user.adjust_total_disk_usage(collected_bytes, quota_source_info.label) # Empirically, we need to update job.user and # job.workflow_invocation_step.workflow_invocation in separate diff --git a/lib/galaxy/managers/hdas.py b/lib/galaxy/managers/hdas.py index 9d6569d0f7df..8542c06a0aec 100644 --- a/lib/galaxy/managers/hdas.py +++ b/lib/galaxy/managers/hdas.py @@ -143,8 +143,9 @@ def purge(self, hda, flush=True): quota_amount_reduction = hda.quota_amount(user) super().purge(hda, flush=flush) # decrease the user's space used - if quota_amount_reduction: - user.adjust_total_disk_usage(-quota_amount_reduction) + quota_source_info = hda.dataset.quota_source_info + if quota_amount_reduction and quota_source_info.use: + user.adjust_total_disk_usage(-quota_amount_reduction, quota_source_info.label) return hda # .... states diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 2b10f7229841..bdcd12566cbe 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -24,6 +24,7 @@ from sqlalchemy import ( alias, and_, + bindparam, func, inspect, join, @@ -352,6 +353,100 @@ def stderr(self, stderr): raise NotImplementedError("Attempt to set stdout, must set tool_stderr or job_stderr") +UNIQUE_DATASET_USER_USAGE = """ +WITH per_user_histories AS +( + SELECT id + FROM history + WHERE user_id = :id + AND NOT purged +), +per_hist_hdas AS ( + SELECT DISTINCT dataset_id + FROM history_dataset_association + WHERE NOT purged + AND history_id IN (SELECT id FROM per_user_histories) +) +SELECT COALESCE(SUM(COALESCE(dataset.total_size, dataset.file_size, 0)), 0) +FROM dataset +LEFT OUTER JOIN library_dataset_dataset_association ON dataset.id = library_dataset_dataset_association.dataset_id +WHERE dataset.id IN (SELECT dataset_id FROM per_hist_hdas) + AND library_dataset_dataset_association.id IS NULL + AND ( + {dataset_condition} + ) +""" + + +def calculate_user_disk_usage_statements(user_id, quota_source_map, for_sqlite=False): + """Standalone function so can be reused for postgres directly in pgcleanup.py.""" + statements = [] + default_quota_enabled = quota_source_map.default_quota_enabled + default_exclude_ids = quota_source_map.default_usage_excluded_ids() + default_cond = "dataset.object_store_id IS NULL" if default_quota_enabled else "" + exclude_cond = "dataset.object_store_id NOT IN :exclude_object_store_ids" if default_exclude_ids else "" + use_or = " OR " if (default_cond != "" and exclude_cond != "") else "" + default_usage_dataset_condition = "{default_cond} {use_or} {exclude_cond}".format( + default_cond=default_cond, + exclude_cond=exclude_cond, + use_or=use_or, + ) + default_usage = UNIQUE_DATASET_USER_USAGE.format( + dataset_condition=default_usage_dataset_condition + ) + default_usage = """ +UPDATE galaxy_user SET disk_usage = (%s) +WHERE id = :id +""" % default_usage + params = {"id": user_id} + if default_exclude_ids: + params["exclude_object_store_ids"] = default_exclude_ids + statements.append((default_usage, params)) + source = quota_source_map.ids_per_quota_source() + for (quota_source_label, object_store_ids) in source.items(): + label_usage = UNIQUE_DATASET_USER_USAGE.format( + dataset_condition="dataset.object_store_id IN :include_object_store_ids" + ) + if for_sqlite: + # hacky alternative for older sqlite + statement = """ +WITH new (user_id, quota_source_label, disk_usage) AS ( + VALUES(:id, :label, ({label_usage})) +) +INSERT OR REPLACE INTO user_quota_source_usage (id, user_id, quota_source_label, disk_usage) +SELECT old.id, new.user_id, new.quota_source_label, new.disk_usage +FROM new + LEFT JOIN user_quota_source_usage AS old + ON new.user_id = old.user_id + AND new.quota_source_label = old.quota_source_label +""".format(label_usage=label_usage) + else: + statement = """ +INSERT INTO user_quota_source_usage(user_id, quota_source_label, disk_usage) +VALUES(:user_id, :label, ({label_usage})) +ON CONFLICT +ON constraint uqsu_unique_label_per_user +DO UPDATE SET disk_usage = excluded.disk_usage +""".format(label_usage=label_usage) + statements.append((statement, {"id": user_id, "label": quota_source_label, "include_object_store_ids": object_store_ids})) + + params = {"id": user_id} + source_labels = list(source.keys()) + if len(source_labels) > 0: + clean_old_statement = """ +DELETE FROM user_quota_source_usage +WHERE user_id = :id AND quota_source_label NOT IN :labels +""" + params["labels"] = source_labels + else: + clean_old_statement = """ +DELETE FROM user_quota_source_usage +WHERE user_id = :id AND quota_source_label IS NOT NULL +""" + statements.append((clean_old_statement, params)) + return statements + + class User(Dictifiable, RepresentById): use_pbkdf2 = True """ @@ -480,14 +575,31 @@ def all_roles_exploiting_cache(self): roles.append(role) return roles - def get_disk_usage(self, nice_size=False): + def get_disk_usage(self, nice_size=False, quota_source_label=None): """ Return byte count of disk space used by user or a human-readable string if `nice_size` is `True`. """ - rval = 0 - if self.disk_usage is not None: - rval = self.disk_usage + if quota_source_label is None: + rval = 0 + if self.disk_usage is not None: + rval = self.disk_usage + else: + statement = """ +SELECT DISK_USAGE +FROM user_quota_source_usage +WHERE user_id = :user_id and quota_source_label = :label +""" + sa_session = object_session(self) + params = { + 'user_id': self.id, + 'label': quota_source_label, + } + row = sa_session.execute(statement, params).fetchone() + if row is not None: + rval = row[0] + else: + rval = 0 if nice_size: rval = galaxy.util.nice_size(rval) return rval @@ -500,9 +612,36 @@ def set_disk_usage(self, bytes): total_disk_usage = property(get_disk_usage, set_disk_usage) - def adjust_total_disk_usage(self, amount): + def adjust_total_disk_usage(self, amount, quota_source_label): + assert amount is not None if amount != 0: - self.disk_usage = func.coalesce(self.table.c.disk_usage, 0) + amount + if quota_source_label is None: + self.disk_usage = func.coalesce(self.table.c.disk_usage, 0) + amount + else: + # else would work on newer sqlite - 3.24.0 + sa_session = object_session(self) + if "sqlite" in sa_session.bind.dialect.name: + # hacky alternative for older sqlite + statement = """ +WITH new (user_id, quota_source_label) AS ( VALUES(:user_id, :label) ) +INSERT OR REPLACE INTO user_quota_source_usage (id, user_id, quota_source_label, disk_usage) +SELECT old.id, new.user_id, new.quota_source_label, COALESCE(old.disk_usage + :amount, :amount) +FROM new LEFT JOIN user_quota_source_usage AS old ON new.user_id = old.user_id AND NEW.quota_source_label = old.quota_source_label; +""" + else: + statement = """ +INSERT INTO user_quota_source_usage(user_id, disk_usage, quota_source_label) +VALUES(:user_id, :amount, :label) +ON CONFLICT + ON constraint uqsu_unique_label_per_user + DO UPDATE SET disk_usage = disk_usage + :amount +""" + params = { + 'user_id': self.id, + 'amount': int(amount), + 'label': quota_source_label, + } + sa_session.execute(statement, params) @property def nice_total_disk_usage(self): @@ -511,51 +650,57 @@ def nice_total_disk_usage(self): """ return self.get_disk_usage(nice_size=True) - def calculate_disk_usage(self): + def calculate_disk_usage_default_source(self, object_store): """ Return byte count total of disk space used by all non-purged, non-library - HDAs in non-purged histories. + HDAs in non-purged histories assigned to default quota source. """ - # maintain a list so that we don't double count - return self._calculate_or_set_disk_usage(dryrun=True) + # only used in set_user_disk_usage.py + assert object_store is not None + quota_source_map = object_store.get_quota_source_map() + default_quota_enabled = quota_source_map.default_quota_enabled + default_cond = "dataset.object_store_id IS NULL OR" if default_quota_enabled else "" + default_usage_dataset_condition = "{default_cond} dataset.object_store_id NOT IN :exclude_object_store_ids".format( + default_cond=default_cond, + ) + default_usage = UNIQUE_DATASET_USER_USAGE.format( + dataset_condition=default_usage_dataset_condition + ) + sql_calc = text(default_usage) + sql_calc = sql_calc.bindparams( + bindparam("id"), + bindparam("exclude_object_store_ids", expanding=True) + ) + params = {'id': self.id, "exclude_object_store_ids": quota_source_map.default_usage_excluded_ids()} + sa_session = object_session(self) + usage = sa_session.scalar(sql_calc, params) + return usage - def calculate_and_set_disk_usage(self): + def calculate_and_set_disk_usage(self, object_store): """ Calculates and sets user disk usage. """ - self._calculate_or_set_disk_usage(dryrun=False) + self._calculate_or_set_disk_usage(object_store=object_store) - def _calculate_or_set_disk_usage(self, dryrun=True): + def _calculate_or_set_disk_usage(self, object_store): """ Utility to calculate and return the disk usage. If dryrun is False, the new value is set immediately. """ - sql_calc = """ - WITH per_user_histories AS - ( - SELECT id - FROM history - WHERE user_id = :id - AND NOT purged - ), - per_hist_hdas AS ( - SELECT DISTINCT dataset_id - FROM history_dataset_association - WHERE NOT purged - AND history_id IN (SELECT id FROM per_user_histories) - ) - SELECT SUM(COALESCE(dataset.total_size, dataset.file_size, 0)) - FROM dataset - LEFT OUTER JOIN library_dataset_dataset_association ON dataset.id = library_dataset_dataset_association.dataset_id - WHERE dataset.id IN (SELECT dataset_id FROM per_hist_hdas) - AND library_dataset_dataset_association.id IS NULL - """ + assert object_store is not None + quota_source_map = object_store.get_quota_source_map() sa_session = object_session(self) - usage = sa_session.scalar(sql_calc, {'id': self.id}) - if not dryrun: - self.set_disk_usage(usage) + for_sqlite = "sqlite" in sa_session.bind.dialect.name + statements = calculate_user_disk_usage_statements(self.id, quota_source_map, for_sqlite) + for (sql, args) in statements: + statement = text(sql) + binds = [] + for key, val in args.items(): + expand_binding = key.endswith("s") + binds.append(bindparam(key, expanding=expand_binding)) + statement = statement.bindparams(*binds) + sa_session.execute(statement, args) sa_session.flush() - return usage @staticmethod def user_template_environment(user): @@ -610,6 +755,18 @@ def is_authenticated(self): # https://github.com/python-social-auth/social-examples/blob/master/example-cherrypy/example/db/user.py return True + def dictify_usage(self): + rval = [{ + 'quota_source_label': None, + 'disk_usage': int(self.disk_usage or 0), + }] + for quota_source_usage in self.quota_source_usages: + rval.append({ + 'quota_source_label': quota_source_usage.quota_source_label, + 'disk_usage': int(quota_source_usage.disk_usage), + }) + return rval + class PasswordResetToken: def __init__(self, user, token=None): @@ -1804,7 +1961,9 @@ def add_dataset(self, dataset, parent_id=None, genome_build=None, set_hid=True, if set_hid: dataset.hid = self._next_hid() if quota and is_dataset and self.user: - self.user.adjust_total_disk_usage(dataset.quota_amount(self.user)) + quota_source_info = dataset.dataset.quota_source_info + if quota_source_info.use: + self.user.adjust_total_disk_usage(dataset.quota_amount(self.user), quota_source_info.label) dataset.history = self if is_dataset and genome_build not in [None, '?']: self.genome_build = genome_build @@ -1819,8 +1978,12 @@ def add_datasets(self, sa_session, datasets, parent_id=None, genome_build=None, if optimize: self.__add_datasets_optimized(datasets, genome_build=genome_build) if quota and self.user: - disk_usage = sum([d.get_total_size() for d in datasets if is_hda(d)]) - self.user.adjust_total_disk_usage(disk_usage) + hdas = [d for d in datasets if is_hda(d)] + disk_usage = sum([d.get_total_size() for d in hdas]) + if disk_usage: + quota_source_info = hdas[0].dataset.quota_source_info + if quota_source_info.use: + self.user.adjust_total_disk_usage(disk_usage, quota_source_info.label) sa_session.add_all(datasets) if flush: sa_session.flush() @@ -2152,6 +2315,10 @@ def __init__(self, name="", description="", type="system", deleted=False): self.deleted = deleted +class UserQuotaSourceUsage(Dictifiable, RepresentById): + dict_element_visible_keys = ['disk_usage', 'quota_source_label'] + + class UserQuotaAssociation(Dictifiable, RepresentById): dict_element_visible_keys = ['user'] @@ -2173,7 +2340,7 @@ class Quota(Dictifiable, RepresentById): dict_element_visible_keys = ['id', 'name', 'description', 'bytes', 'operation', 'display_amount', 'default', 'users', 'groups'] valid_operations = ('+', '-', '=') - def __init__(self, name="", description="", amount=0, operation="="): + def __init__(self, name="", description="", amount=0, operation="=", quota_source_label=None): self.name = name self.description = description if amount is None: @@ -2181,6 +2348,7 @@ def __init__(self, name="", description="", amount=0, operation="="): else: self.bytes = amount self.operation = operation + self.quota_source_label = quota_source_label def get_amount(self): if self.bytes == -1: @@ -2368,6 +2536,16 @@ def get_file_name(self): # Make filename absolute return os.path.abspath(filename) + @property + def quota_source_label(self): + return self.get_quota_source_info(self.object_store_id).label + + @property + def quota_source_info(self): + object_store_id = self.object_store_id + quota_source_map = self.object_store.get_quota_source_map() + return quota_source_map.get_quota_source_info(object_store_id) + def set_file_name(self, filename): if not filename: self.external_filename = None @@ -3283,11 +3461,11 @@ def get_access_roles(self, trans): """ return self.dataset.get_access_roles(trans) - def purge_usage_from_quota(self, user): + def purge_usage_from_quota(self, user, quota_source_info): """Remove this HDA's quota_amount from user's quota. """ - if user: - user.adjust_total_disk_usage(-self.quota_amount(user)) + if user and quota_source_info.use: + user.adjust_total_disk_usage(-self.quota_amount(user), quota_source_info.label) def quota_amount(self, user): """ diff --git a/lib/galaxy/model/mapping.py b/lib/galaxy/model/mapping.py index d6dc167fcd10..defd23d77788 100644 --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -408,6 +408,7 @@ Column("description", TEXT), Column("bytes", BigInteger), Column("operation", String(8)), + Column("quota_source_label", String(32), index=True), Column("deleted", Boolean, index=True, default=False)) model.DefaultQuotaAssociation.table = Table( @@ -415,9 +416,21 @@ Column("id", Integer, primary_key=True), Column("create_time", DateTime, default=now), Column("update_time", DateTime, default=now, onupdate=now), - Column("type", String(32), index=True, unique=True), + Column("type", String(32), index=True), Column("quota_id", Integer, ForeignKey("quota.id"), index=True)) +# Call it user_quota_source_usage instead of quota_source_usage so we can +# implement group-allocated storage in the future. +model.UserQuotaSourceUsage.table = Table( + "user_quota_source_usage", metadata, + Column("id", Integer, primary_key=True), + Column("user_id", Integer, ForeignKey("galaxy_user.id"), index=True), + Column("quota_source_label", String(32), index=True), + # user had an index on disk_usage - does that make any sense? -John + Column("disk_usage", Numeric(15, 0), default=0, nullable=False), + UniqueConstraint('user_id', 'quota_source_label', name="uqsu_unique_label_per_user"), +) + model.DatasetPermissions.table = Table( "dataset_permissions", metadata, Column("id", Integer, primary_key=True), @@ -2003,6 +2016,9 @@ def simple_mapping(model, **kwds): quota=relation(model.Quota) )) +simple_mapping(model.UserQuotaSourceUsage, + user=relation(model.User, backref="quota_source_usages", uselist=True)) + mapper(model.DefaultQuotaAssociation, model.DefaultQuotaAssociation.table, properties=dict( quota=relation(model.Quota, backref="default") )) diff --git a/lib/galaxy/model/migrate/versions/0169_add_user_quota_source_usage.py b/lib/galaxy/model/migrate/versions/0169_add_user_quota_source_usage.py new file mode 100644 index 000000000000..4e8f21ad210e --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0169_add_user_quota_source_usage.py @@ -0,0 +1,57 @@ +""" +Migration script to add a new user_quota_source_usage table. +""" + +import logging + +from migrate.changeset.constraint import UniqueConstraint as MigrateUniqueContraint +from sqlalchemy import Column, ForeignKey, Integer, MetaData, Numeric, String, Table, UniqueConstraint + +from galaxy.model.migrate.versions.util import add_column, drop_column + +log = logging.getLogger(__name__) +metadata = MetaData() + +user_quota_source_usage_table = Table( + "user_quota_source_usage", metadata, + Column("id", Integer, primary_key=True), + Column("user_id", Integer, ForeignKey("galaxy_user.id"), index=True), + Column("quota_source_label", String(32), index=True), + # user had an index on disk_usage - does that make any sense? -John + Column("disk_usage", Numeric(15, 0)), + UniqueConstraint('user_id', 'quota_source_label', name="uqsu_unique_label_per_user"), +) +# Column to add. +quota_source_label_col = Column("quota_source_label", String(32), default=None, nullable=True) + + +def upgrade(migrate_engine): + print(__doc__) + metadata.bind = migrate_engine + metadata.reflect() + + add_column(quota_source_label_col, 'quota', metadata) + + try: + user_quota_source_usage_table.create() + except Exception: + log.exception("Creating user_quota_source_usage_table table failed") + + try: + # TODO: need to verify this is actually doing something. + table = Table("default_quota_association", metadata, autoload=True) + MigrateUniqueContraint("type", table=table).drop() + except Exception: + log.exception("Dropping unique constraint on default_quota_association.type failed") + + +def downgrade(migrate_engine): + metadata.bind = migrate_engine + metadata.reflect() + + try: + user_quota_source_usage_table.drop() + except Exception: + log.exception("Dropping user_quota_source_usage_table table failed") + + drop_column(quota_source_label_col, 'quota', metadata) diff --git a/lib/galaxy/objectstore/__init__.py b/lib/galaxy/objectstore/__init__.py index 904cacbdc8c2..6e696697f0ae 100644 --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -12,12 +12,13 @@ import shutil import threading import time -from collections import OrderedDict +from collections import namedtuple, OrderedDict import yaml from galaxy.exceptions import ObjectInvalid, ObjectNotFound from galaxy.util import ( + asbool, directory_hash_id, force_symlink, parse_xml, @@ -31,6 +32,8 @@ from galaxy.util.sleeper import Sleeper NO_SESSION_ERROR_MESSAGE = "Attempted to 'create' object store entity in configuration with no database session present." +DEFAULT_QUOTA_SOURCE = None # Just track quota right on user object in Galaxy. +DEFAULT_QUOTA_ENABLED = True # enable quota tracking in object stores by default log = logging.getLogger(__name__) @@ -190,6 +193,10 @@ def get_store_by(self, obj): """ raise NotImplementedError() + @abc.abstractmethod + def get_quota_source_map(self): + """Return QuotaSourceMap describing mapping of object store IDs to quota sources.""" + class BaseObjectStore(ObjectStore): @@ -302,8 +309,9 @@ def get_store_by(self, obj, **kwargs): class ConcreteObjectStore(BaseObjectStore): """Subclass of ObjectStore for stores that don't delegate (non-nested). - Currently only adds store_by functionality. Which doesn't make - sense for the delegating object stores. + Adds store_by and quota_source functionality. These attributes do not make + sense for the delegating object stores, they should describe files at actually + persisted, not how a file is routed to a persistence source. """ def __init__(self, config, config_dict=None, **kwargs): @@ -323,15 +331,31 @@ def __init__(self, config, config_dict=None, **kwargs): config_dict = {} super().__init__(config=config, config_dict=config_dict, **kwargs) self.store_by = config_dict.get("store_by", None) or getattr(config, "object_store_store_by", "id") + # short label describing the quota source or null to use default + # quota source right on user object. + quota_config = config_dict.get("quota", {}) + self.quota_source = quota_config.get('source', DEFAULT_QUOTA_SOURCE) + self.quota_enabled = quota_config.get('enabled', DEFAULT_QUOTA_ENABLED) def to_dict(self): rval = super().to_dict() rval["store_by"] = self.store_by + rval["quota"] = { + "source": self.quota_source, + "enabled": self.quota_enabled, + } return rval def _get_store_by(self, obj): return self.store_by + def get_quota_source_map(self): + quota_source_map = QuotaSourceMap( + self.quota_source, + self.quota_enabled, + ) + return quota_source_map + class DiskObjectStore(ConcreteObjectStore): """ @@ -379,7 +403,12 @@ def parse_xml(clazz, config_xml): if store_by is not None: config_dict['store_by'] = store_by for e in config_xml: - if e.tag == 'files_dir': + if e.tag == 'quota': + config_dict['quota'] = { + 'source': e.get('source', DEFAULT_QUOTA_SOURCE), + 'enabled': asbool(e.get('enabled', DEFAULT_QUOTA_ENABLED)), + } + elif e.tag == 'files_dir': config_dict["files_dir"] = e.get('path') else: extra_dirs.append({"type": e.get('type'), "path": e.get('path')}) @@ -682,6 +711,19 @@ def _call_method(self, method, obj, default, default_is_exception, else: return default + def get_quota_source_map(self): + quota_source_map = QuotaSourceMap() + self._merge_quota_source_map(quota_source_map, self) + return quota_source_map + + @classmethod + def _merge_quota_source_map(clz, quota_source_map, object_store): + for backend_id, backend in object_store.backends.items(): + if isinstance(backend, NestedObjectStore): + clz._merge_quota_source_map(quota_source_map, backend) + else: + quota_source_map.backends[backend_id] = backend.get_quota_source_map() + class DistributedObjectStore(NestedObjectStore): @@ -872,7 +914,6 @@ def __get_store_id_for(self, obj, **kwargs): class HierarchicalObjectStore(NestedObjectStore): - """ ObjectStore that defers to a list of backends. @@ -1065,6 +1106,62 @@ def config_to_dict(config): } +QuotaSourceInfo = namedtuple('QuotaSourceInfo', ['label', 'use']) + + +class QuotaSourceMap: + + def __init__(self, source=DEFAULT_QUOTA_SOURCE, enabled=DEFAULT_QUOTA_ENABLED): + self.default_quota_source = source + self.default_quota_enabled = enabled + self.info = QuotaSourceInfo(self.default_quota_source, self.default_quota_enabled) + self.backends = {} + + def get_quota_source_info(self, object_store_id): + if object_store_id in self.backends: + return self.backends[object_store_id].get_quota_source_info(object_store_id) + else: + return self.info + + def get_quota_source_label(self, object_store_id): + if object_store_id in self.backends: + return self.backends[object_store_id].get_quota_source_label(object_store_id) + else: + return self.default_quota_source + + def get_quota_source_labels(self): + sources = set() + if self.default_quota_source: + sources.add(self.default_quota_source) + for backend in self.backends.values(): + sources = sources.union(backend.get_quota_source_labels()) + return sources + + def default_usage_excluded_ids(self): + exclude_object_store_ids = [] + for backend_id, backend_source_map in self.backends.items(): + if backend_source_map.default_quota_source is not None: + exclude_object_store_ids.append(backend_id) + elif not backend_source_map.default_quota_enabled: + exclude_object_store_ids.append(backend_id) + return exclude_object_store_ids + + def get_id_to_source_pairs(self): + pairs = [] + for backend_id, backend_source_map in self.backends.items(): + if backend_source_map.default_quota_source is not None and backend_source_map.default_quota_enabled: + pairs.append((backend_id, backend_source_map.default_quota_source)) + return pairs + + def ids_per_quota_source(self): + quota_sources = {} + for (object_id, quota_source_label) in self.get_id_to_source_pairs(): + if quota_source_label not in quota_sources: + quota_sources[quota_source_label] = [] + quota_sources[quota_source_label].append(object_id) + return quota_sources + + class ObjectStorePopulator: """ Small helper for interacting with the object store and making sure all datasets from a job end up with the same object_store_id. diff --git a/lib/galaxy/quota/__init__.py b/lib/galaxy/quota/__init__.py index f1a27458a0f0..e17e86d4cfcf 100644 --- a/lib/galaxy/quota/__init__.py +++ b/lib/galaxy/quota/__init__.py @@ -88,7 +88,7 @@ def __init__(self, model): self.model = model self.sa_session = model.context - def get_quota(self, user): + def get_quota(self, user, quota_source_label=None): """ Calculated like so: @@ -101,7 +101,7 @@ def get_quota(self, user): quotas. """ if not user: - return self._default_unregistered_quota + return self._default_unregistered_quota(quota_source_label) query = text(""" SELECT ( COALESCE(MAX(CASE WHEN union_quota.operation = '=' @@ -111,8 +111,9 @@ def get_quota(self, user): (SELECT default_quota.bytes FROM quota as default_quota LEFT JOIN default_quota_association on default_quota.id = default_quota_association.quota_id - WHERE default_quota_association.type == 'registered' - AND default_quota.deleted != :is_true)) + WHERE default_quota_association.type = 'registered' + AND default_quota.deleted != :is_true + AND default_quota.quota_source_label {label_cond})) + (CASE WHEN SUM(CASE WHEN union_quota.operation = '=' AND union_quota.bytes = -1 THEN 1 ELSE 0 @@ -129,41 +130,53 @@ def get_quota(self, user): ) FROM ( SELECT user_quota.operation as operation, user_quota.bytes as bytes - FROM galaxy_user as user - LEFT JOIN user_quota_association as uqa on user.id = uqa.user_id + FROM galaxy_user as guser + LEFT JOIN user_quota_association as uqa on guser.id = uqa.user_id LEFT JOIN quota as user_quota on user_quota.id = uqa.quota_id WHERE user_quota.deleted != :is_true - AND user.id = :user_id + AND user_quota.quota_source_label {label_cond} + AND guser.id = :user_id UNION ALL SELECT group_quota.operation as operation, group_quota.bytes as bytes - FROM galaxy_user as user - LEFT JOIN user_group_association as uga on user.id = uga.user_id + FROM galaxy_user as guser + LEFT JOIN user_group_association as uga on guser.id = uga.user_id LEFT JOIN galaxy_group on galaxy_group.id = uga.group_id LEFT JOIN group_quota_association as gqa on galaxy_group.id = gqa.group_id LEFT JOIN quota as group_quota on group_quota.id = gqa.quota_id WHERE group_quota.deleted != :is_true - AND user.id = :user_id + AND group_quota.quota_source_label {label_cond} + AND guser.id = :user_id ) as union_quota -""") +""".format(label_cond="IS NULL" if quota_source_label is None else " = :label")) conn = self.sa_session.connection() with conn.begin(): - res = conn.execute(query, is_true=True, user_id=user.id).fetchone() + res = conn.execute(query, is_true=True, user_id=user.id, label=quota_source_label).fetchone() if res: return res[0] else: return None - @property - def _default_unregistered_quota(self): - return self._default_quota(self.model.DefaultQuotaAssociation.types.UNREGISTERED) + def _default_unregistered_quota(self, quota_source_label): + return self._default_quota(self.model.DefaultQuotaAssociation.types.UNREGISTERED, quota_source_label) - def _default_quota(self, default_type): - dqa = self.sa_session.query(self.model.DefaultQuotaAssociation).filter(self.model.DefaultQuotaAssociation.table.c.type == default_type).first() - if not dqa: - return None - if dqa.quota.bytes < 0: - return None - return dqa.quota.bytes + def _default_quota(self, default_type, quota_source_label): + label_condition = "IS NULL" if quota_source_label is None else "= :label" + query = text(""" +SELECT bytes +FROM quota as default_quota +LEFT JOIN default_quota_association on default_quota.id = default_quota_association.quota_id +WHERE default_quota_association.type = :default_type + AND default_quota.deleted != :is_true + AND default_quota.quota_source_label {label_condition} +""".format(label_condition=label_condition)) + + conn = self.sa_session.connection() + with conn.begin(): + res = conn.execute(query, is_true=True, label=quota_source_label, default_type=default_type).fetchone() + if res: + return res[0] + else: + return None def set_default_quota(self, default_type, quota): # Unset the current default(s) associated with this quota, if there are any @@ -175,13 +188,18 @@ def set_default_quota(self, default_type, quota): for gqa in quota.groups: self.sa_session.delete(gqa) # Find the old default, assign the new quota if it exists - dqa = self.sa_session.query(self.model.DefaultQuotaAssociation).filter(self.model.DefaultQuotaAssociation.table.c.type == default_type).first() - if dqa: - dqa.quota = quota + label = quota.quota_source_label + dqas = self.sa_session.query(self.model.DefaultQuotaAssociation).filter(self.model.DefaultQuotaAssociation.table.c.type == default_type).all() + target_default = None + for dqa in dqas: + if dqa.quota.quota_source_label == label and not dqa.quota.deleted: + target_default = dqa + if target_default: + target_default.quota = quota # Or create if necessary else: - dqa = self.model.DefaultQuotaAssociation(default_type, quota) - self.sa_session.add(dqa) + target_default = self.model.DefaultQuotaAssociation(default_type, quota) + self.sa_session.add(target_default) self.sa_session.flush() def get_percent(self, trans=None, user=False, history=False, usage=False, quota=False): diff --git a/lib/galaxy/tools/actions/upload_common.py b/lib/galaxy/tools/actions/upload_common.py index 864d4fd5b826..88176592045d 100644 --- a/lib/galaxy/tools/actions/upload_common.py +++ b/lib/galaxy/tools/actions/upload_common.py @@ -187,7 +187,7 @@ def __new_history_upload(trans, uploaded_dataset, history=None, state=None): else: hda.state = hda.states.QUEUED trans.sa_session.flush() - history.add_dataset(hda, genome_build=uploaded_dataset.dbkey) + history.add_dataset(hda, genome_build=uploaded_dataset.dbkey, quota=False) permissions = trans.app.security_agent.history_get_default_permissions(history) trans.app.security_agent.set_all_dataset_permissions(hda.dataset, permissions) trans.sa_session.flush() diff --git a/lib/galaxy/webapps/galaxy/api/users.py b/lib/galaxy/webapps/galaxy/api/users.py index 031679eaf393..f3d53ffc66fb 100644 --- a/lib/galaxy/webapps/galaxy/api/users.py +++ b/lib/galaxy/webapps/galaxy/api/users.py @@ -143,36 +143,55 @@ def index(self, trans, deleted='False', f_email=None, f_name=None, f_any=None, * return rval @expose_api_anonymous - def show(self, trans, id, deleted='False', **kwd): + def show(self, trans, id, **kwd): """ GET /api/users/{encoded_id} GET /api/users/deleted/{encoded_id} GET /api/users/current Displays information about a user. """ + user = self._get_user_full(trans, id, **kwd) + if user is not None: + return self.user_serializer.serialize_to_view(user, view='detailed') + else: + item = self.anon_user_api_value(trans) + return item + + def _get_user_full(self, trans, user_id, **kwd): + """Return referenced user or None if anonymous user is referenced.""" + deleted = kwd.get("deleted", "False") deleted = util.string_as_bool(deleted) try: # user is requesting data about themselves - if id == "current": + if user_id == "current": # ...and is anonymous - return usage and quota (if any) if not trans.user: - item = self.anon_user_api_value(trans) - return item + return None # ...and is logged in - return full else: user = trans.user else: - user = self.get_user(trans, id, deleted=deleted) + user = self.get_user(trans, user_id, deleted=deleted) # check that the user is requesting themselves (and they aren't del'd) unless admin if not trans.user_is_admin: assert trans.user == user assert not user.deleted + return user except exceptions.ItemDeletionException: raise except Exception: - raise exceptions.RequestParameterInvalidException('Invalid user id specified', id=id) - return self.user_serializer.serialize_to_view(user, view='detailed') + raise exceptions.RequestParameterInvalidException('Invalid user id specified', id=user_id) + + @expose_api + def usage(self, trans, user_id, **kwd): + """ + GET /api/users/{user_id}/usage + + Get user's disk usage broken down by quota source. + """ + user = self._get_user_full(trans, user_id, **kwd) + return user.dictify_usage() @expose_api def create(self, trans, payload, **kwd): diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index e7a881f7343c..3d20d4c1832f 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -422,6 +422,7 @@ def populate_api_routes(webapp, app): webapp.mapper.connect('/api/container_resolvers/{index}/toolbox/install', action="resolve_toolbox_with_install", controller="container_resolution", conditions=dict(method=["POST"])) webapp.mapper.connect('/api/workflows/get_tool_predictions', action='get_tool_predictions', controller="workflows", conditions=dict(method=["POST"])) + webapp.mapper.connect('/api/users/{user_id}/usage', action='usage', controller="users", conditions=dict(method=["GET"])) webapp.mapper.resource_with_deleted('user', 'users', path_prefix='/api') webapp.mapper.resource('genome', 'genomes', path_prefix='/api') webapp.mapper.connect('/api/genomes/{id}/indexes', controller='genomes', action='indexes') diff --git a/lib/galaxy/webapps/galaxy/controllers/admin.py b/lib/galaxy/webapps/galaxy/controllers/admin.py index 470b6a7241d1..0b270ebad752 100644 --- a/lib/galaxy/webapps/galaxy/controllers/admin.py +++ b/lib/galaxy/webapps/galaxy/controllers/admin.py @@ -677,6 +677,9 @@ def create_quota(self, trans, payload=None, **kwd): if trans.request.method == 'GET': all_users = [] all_groups = [] + labels = trans.app.object_store.get_quota_source_map().get_quota_source_labels() + label_options = [("Default Quota", None)] + label_options.extend([(l, l) for l in labels]) for user in trans.sa_session.query(trans.app.model.User) \ .filter(trans.app.model.User.table.c.deleted == false()) \ .order_by(trans.app.model.User.table.c.email): @@ -688,30 +691,42 @@ def create_quota(self, trans, payload=None, **kwd): default_options = [('No', 'no')] for typ in trans.app.model.DefaultQuotaAssociation.types.__dict__.values(): default_options.append(('Yes, ' + typ, typ)) - return {'title' : 'Create Quota', - 'inputs' : [ - { - 'name' : 'name', - 'label' : 'Name' - }, { - 'name' : 'description', - 'label' : 'Description' - }, { - 'name' : 'amount', - 'label' : 'Amount', - 'help' : 'Examples: "10000MB", "99 gb", "0.2T", "unlimited"' - }, { - 'name' : 'operation', - 'label' : 'Assign, increase by amount, or decrease by amount?', - 'options' : [('=', '='), ('+', '+'), ('-', '-')] - }, { - 'name' : 'default', - 'label' : 'Is this quota a default for a class of users (if yes, what type)?', - 'options' : default_options, - 'help' : 'Warning: Any users or groups associated with this quota will be disassociated.' - }, - build_select_input('in_groups', 'Groups', all_groups, []), - build_select_input('in_users', 'Users', all_users, [])]} + rval = { + 'title' : 'Create Quota', + 'inputs' : [ + { + 'name' : 'name', + 'label' : 'Name' + }, { + 'name' : 'description', + 'label' : 'Description' + }, { + 'name' : 'amount', + 'label' : 'Amount', + 'help' : 'Examples: "10000MB", "99 gb", "0.2T", "unlimited"' + }, { + 'name' : 'operation', + 'label' : 'Assign, increase by amount, or decrease by amount?', + 'options' : [('=', '='), ('+', '+'), ('-', '-')] + }, { + 'name' : 'default', + 'label' : 'Is this quota a default for a class of users (if yes, what type)?', + 'options' : default_options, + 'help' : 'Warning: Any users or groups associated with this quota will be disassociated.' + } + ], + } + if len(label_options) > 1: + rval["inputs"].append({ + 'name' : 'quota_source_label', + 'label' : 'Apply quota to labeled object stores.', + 'options' : label_options, + }) + rval["inputs"].extend([ + build_select_input('in_groups', 'Groups', all_groups, []), + build_select_input('in_users', 'Users', all_users, []), + ]) + return rval else: try: quota, message = self._create_quota(util.Params(payload), decode_id=trans.security.decode_id) diff --git a/lib/galaxy/webapps/galaxy/controllers/dataset.py b/lib/galaxy/webapps/galaxy/controllers/dataset.py index 7c810c70d3f6..164e850a298b 100644 --- a/lib/galaxy/webapps/galaxy/controllers/dataset.py +++ b/lib/galaxy/webapps/galaxy/controllers/dataset.py @@ -826,7 +826,7 @@ def _purge(self, trans, dataset_id): hda.deleted = True # HDA is purgeable # Decrease disk usage first - hda.purge_usage_from_quota(user) + hda.purge_usage_from_quota(user, hda.dataset.quota_source_info) # Mark purged hda.purged = True trans.sa_session.add(hda) diff --git a/lib/galaxy/webapps/galaxy/controllers/history.py b/lib/galaxy/webapps/galaxy/controllers/history.py index 7c551660f9ee..76bb45483385 100644 --- a/lib/galaxy/webapps/galaxy/controllers/history.py +++ b/lib/galaxy/webapps/galaxy/controllers/history.py @@ -1055,7 +1055,7 @@ def purge_deleted_datasets(self, trans): for hda in trans.history.datasets: if not hda.deleted or hda.purged: continue - hda.purge_usage_from_quota(trans.user) + hda.purge_usage_from_quota(trans.user, hda.dataset.quota_source_info) hda.purged = True trans.sa_session.add(hda) trans.log_event("HDA id %s has been purged" % hda.id) diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index e5acaaad3001..68c05594111e 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -537,6 +537,11 @@ def user_private_role_id(self): assert len(users_roles) == 1 return users_roles[0]["id"] + def get_usage(self): + usage_response = self.galaxy_interactor.get("users/current/usage") + usage_response.raise_for_status() + return usage_response.json() + def create_role(self, user_ids, description=None): payload = { "name": self.get_random_name(prefix="testpop"), diff --git a/scripts/cleanup_datasets/pgcleanup.py b/scripts/cleanup_datasets/pgcleanup.py index 6bf7ae21cf27..aaa4980d3464 100755 --- a/scripts/cleanup_datasets/pgcleanup.py +++ b/scripts/cleanup_datasets/pgcleanup.py @@ -11,6 +11,7 @@ import inspect import logging import os +import re import string import sys from collections import namedtuple @@ -25,6 +26,7 @@ import galaxy.config from galaxy.exceptions import ObjectNotFound +from galaxy.model import calculate_user_disk_usage_statements from galaxy.objectstore import build_object_store_from_config from galaxy.util.script import app_properties_from_args, populate_config_args @@ -70,6 +72,7 @@ class Action(object): Generally you should set at least ``_action_sql`` in subclasses (although it's possible to just override ``sql`` directly.) """ + requires_objectstore = True update_time_sql = ", update_time = NOW() AT TIME ZONE 'utc'" force_retry_sql = " AND NOT purged" primary_key = None @@ -108,6 +111,9 @@ def __init__(self, app): self.__row_methods = [] self.__post_methods = [] self.__exit_methods = [] + if self.requires_objectstore: + self.object_store = build_object_store_from_config(self._config) + self._register_exit_method(self.object_store.shutdown) self._init() def __enter__(self): @@ -236,13 +242,14 @@ def _init(self): class RemovesObjects(object): """Base class for mixins that remove objects from object stores. """ + requires_objectstore = True + def _init(self): + super()._init() self.objects_to_remove = set() log.info('Initializing object store for action %s', self.name) - self.object_store = build_object_store_from_config(self._config) self._register_row_method(self.collect_removed_object_info) self._register_post_method(self.remove_objects) - self._register_exit_method(self.object_store.shutdown) def collect_removed_object_info(self, row): object_id = getattr(row, self.id_column, None) @@ -341,7 +348,10 @@ class RequiresDiskUsageRecalculation(object): To use, ensure your query returns a ``recalculate_disk_usage_user_id`` column. """ + requires_objectstore = True + def _init(self): + super()._init() self.__recalculate_disk_usage_user_ids = set() self._register_row_method(self.collect_recalculate_disk_usage_user_id) self._register_post_method(self.recalculate_disk_usage) @@ -361,30 +371,21 @@ def recalculate_disk_usage(self): """ log.info('Recalculating disk usage for users whose data were purged') for user_id in sorted(self.__recalculate_disk_usage_user_ids): - # TODO: h.purged = false should be unnecessary once all hdas in purged histories are purged. - sql = """ - UPDATE galaxy_user - SET disk_usage = ( - SELECT COALESCE(SUM(total_size), 0) - FROM ( SELECT d.total_size - FROM history_dataset_association hda - JOIN history h ON h.id = hda.history_id - JOIN dataset d ON hda.dataset_id = d.id - WHERE h.user_id = %(user_id)s - AND h.purged = false - AND hda.purged = false - AND d.purged = false - AND d.id NOT IN (SELECT dataset_id - FROM library_dataset_dataset_association) - GROUP BY d.id) AS sizes) - WHERE id = %(user_id)s - RETURNING disk_usage; - """ - args = {'user_id': user_id} - cur = self._update(sql, args, add_event=False) - for row in cur: - # disk_usage might be None (e.g. user has purged all data) - self.log.info('recalculate_disk_usage user_id %i to %s bytes' % (user_id, row.disk_usage)) + quota_source_map = self.object_store.get_quota_source_map() + statements = calculate_user_disk_usage_statements( + user_id, quota_source_map + ) + + for (sql, args) in statements: + sql, _ = re.subn(r"\:([\w]+)", r"%(\1)s", sql) + new_args = {} + for key, val in args.items(): + if isinstance(val, list): + val = tuple(val) + new_args[key] = val + self._update(sql, new_args, add_event=False) + + self.log.info('recalculate_disk_usage user_id %i' % user_id) class RemovesMetadataFiles(RemovesObjects): @@ -626,7 +627,7 @@ class PurgeDeletedUsers(PurgesHDAs, RemovesMetadataFiles, Action): ) def _init(self): - super(PurgeDeletedUsers, self)._init() + super()._init() self.__zero_disk_usage_user_ids = set() self._register_row_method(self.collect_zero_disk_usage_user_id) self._register_post_method(self.zero_disk_usage) diff --git a/test/integration/objectstore/test_selection.py b/test/integration/objectstore/test_selection.py index dd4c0e3dd28e..d8c6a5ce6f2b 100644 --- a/test/integration/objectstore/test_selection.py +++ b/test/integration/objectstore/test_selection.py @@ -23,11 +23,13 @@ + + @@ -69,7 +71,7 @@ def _assert_no_external_filename(self): for external_filename_tuple in self._app.model.session.query(self._app.model.Dataset.external_filename).all(): assert external_filename_tuple[0] is None - def test_tool_simple_constructs(self): + def test_objectstore_selection(self): with self.dataset_populator.test_history() as history_id: def _run_tool(tool_id, inputs): @@ -89,6 +91,10 @@ def _run_tool(tool_id, inputs): # One file uploaded, added to default object store ID. self._assert_file_counts(1, 0, 0, 0) + usage_list = self.dataset_populator.get_usage() + assert len(usage_list) == 1 + assert usage_list[0]["quota_source_label"] is None + assert usage_list[0]["disk_usage"] == 6 # should create two files in static object store. _run_tool("multi_data_param", {"f1": hda1_input, "f2": hda1_input}) @@ -102,6 +108,13 @@ def _run_tool(tool_id, inputs): _run_tool("create_10", create_10_inputs) self._assert_file_counts(1, 2, 10, 0) + usage_list = self.dataset_populator.get_usage() + assert len(usage_list) == 2 + assert usage_list[0]["quota_source_label"] is None + assert usage_list[0]["disk_usage"] == 6 + assert usage_list[1]["quota_source_label"] == "ebs" + assert usage_list[1]["disk_usage"] == 21 + # should create 10 files in S3 object store. create_10_inputs = { "__job_resource|__job_resource__select": "yes", diff --git a/test/unit/data/test_galaxy_mapping.py b/test/unit/data/test_galaxy_mapping.py index c8e8af4f1c01..eb05a577cf82 100644 --- a/test/unit/data/test_galaxy_mapping.py +++ b/test/unit/data/test_galaxy_mapping.py @@ -8,6 +8,7 @@ import galaxy.datatypes.registry import galaxy.model import galaxy.model.mapping as mapping +from galaxy.objectstore import QuotaSourceMap datatypes_registry = galaxy.datatypes.registry.Registry() datatypes_registry.load_datatypes() @@ -279,7 +280,7 @@ def test_default_disk_usage(self): u = model.User(email="disk_default@test.com", password="password") self.persist(u) - u.adjust_total_disk_usage(1) + u.adjust_total_disk_usage(1, None) u_id = u.id self.expunge() user_reload = model.session.query(model.User).get(u_id) @@ -584,8 +585,11 @@ def new_hda(self, history, **kwds): class MockObjectStore(object): - def __init__(self): - pass + def __init__(self, quota_source_map=None): + self._quota_source_map = quota_source_map or QuotaSourceMap() + + def get_quota_source_map(self): + return self._quota_source_map def size(self, dataset): return 42 diff --git a/test/unit/data/test_quota.py b/test/unit/data/test_quota.py index 340ac3851181..26f747844643 100644 --- a/test/unit/data/test_quota.py +++ b/test/unit/data/test_quota.py @@ -1,25 +1,94 @@ +import uuid + +from galaxy.objectstore import QuotaSourceInfo, QuotaSourceMap from galaxy.quota import DatabaseQuotaAgent -from .test_galaxy_mapping import BaseModelTestCase +from .test_galaxy_mapping import BaseModelTestCase, MockObjectStore -class CalculateUsageTestCase(BaseModelTestCase): +class PurgeUsageTestCase(BaseModelTestCase): - def test_calculate_usage(self): + def setUp(self): + super().setUp() model = self.model - u = model.User(email="calc_usage@example.com", password="password") + u = model.User(email="purge_usage@example.com", password="password") + u.disk_usage = 25 self.persist(u) - h = model.History(name="History for Usage", user=u) + h = model.History(name="History for Purging", user=u) self.persist(h) + self.u = u + self.h = h - d1 = model.HistoryDatasetAssociation(extension="txt", history=h, create_dataset=True, sa_session=model.session) + def _setup_dataset(self): + d1 = self.model.HistoryDatasetAssociation(extension="txt", history=self.h, create_dataset=True, sa_session=self.model.session) d1.dataset.total_size = 10 self.persist(d1) + return d1 + + def test_calculate_usage(self): + d1 = self._setup_dataset() + quota_source_info = QuotaSourceInfo(None, True) + d1.purge_usage_from_quota(self.u, quota_source_info) + self.persist(self.u) + assert int(self.u.disk_usage) == 15 + + def test_calculate_usage_untracked(self): + # test quota tracking off on the objectstore + d1 = self._setup_dataset() + quota_source_info = QuotaSourceInfo(None, False) + d1.purge_usage_from_quota(self.u, quota_source_info) + self.persist(self.u) + assert int(self.u.disk_usage) == 25 + + def test_calculate_usage_per_source(self): + self.u.adjust_total_disk_usage(124, "myquotalabel") + + # test quota tracking with a non-default quota label + d1 = self._setup_dataset() + quota_source_info = QuotaSourceInfo("myquotalabel", True) + d1.purge_usage_from_quota(self.u, quota_source_info) + self.persist(self.u) + assert int(self.u.disk_usage) == 25 + + usages = self.u.dictify_usage() + assert len(usages) == 2 + assert usages[1]["quota_source_label"] == "myquotalabel" + assert usages[1]["disk_usage"] == 114 + + +class CalculateUsageTestCase(BaseModelTestCase): + + def setUp(self): + model = self.model + u = model.User(email="calc_usage%s@example.com" % str(uuid.uuid1()), password="password") + self.persist(u) + h = model.History(name="History for Calculated Usage", user=u) + self.persist(h) + self.u = u + self.h = h + + def _add_dataset(self, total_size, object_store_id=None): + model = self.model + d1 = model.HistoryDatasetAssociation(extension="txt", history=self.h, create_dataset=True, sa_session=self.model.session) + d1.dataset.total_size = total_size + d1.dataset.object_store_id = object_store_id + self.persist(d1) + return d1 + + def test_calculate_usage(self): + model = self.model + u = self.u + h = self.h + + d1 = self._add_dataset(10) - assert u.calculate_disk_usage() == 10 + object_store = MockObjectStore() + assert u.calculate_disk_usage_default_source(object_store) == 10 assert u.disk_usage is None - u.calculate_and_set_disk_usage() - assert u.disk_usage == 10 + u.calculate_and_set_disk_usage(object_store) + assert u.calculate_disk_usage_default_source(object_store) == 10 + # method no longer updates user object + # assert u.disk_usage == 10 # Test dataset being in another history doesn't duplicate usage cost. h2 = model.History(name="Second usage history", user=u) @@ -31,7 +100,104 @@ def test_calculate_usage(self): d3 = model.HistoryDatasetAssociation(extension="txt", history=h, dataset=d1.dataset) self.persist(d3) - assert u.calculate_disk_usage() == 10 + assert u.calculate_disk_usage_default_source(object_store) == 10 + + def test_calculate_usage_disabled_quota(self): + u = self.u + + self._add_dataset(10, "not_tracked") + self._add_dataset(15, "tracked") + + quota_source_map = QuotaSourceMap() + not_tracked = QuotaSourceMap() + not_tracked.default_quota_enabled = False + quota_source_map.backends["not_tracked"] = not_tracked + + object_store = MockObjectStore(quota_source_map) + + assert u.calculate_disk_usage_default_source(object_store) == 15 + + def test_calculate_usage_alt_quota(self): + model = self.model + u = self.u + + self._add_dataset(10) + self._add_dataset(15, "alt_source_store") + + quota_source_map = QuotaSourceMap() + alt_source = QuotaSourceMap() + alt_source.default_quota_source = "alt_source" + quota_source_map.backends["alt_source_store"] = alt_source + + object_store = MockObjectStore(quota_source_map) + + u.calculate_and_set_disk_usage(object_store) + model.context.refresh(u) + usages = u.dictify_usage() + assert len(usages) == 2 + assert usages[0]["quota_source_label"] is None + assert usages[0]["disk_usage"] == 10 + + assert usages[1]["quota_source_label"] == "alt_source" + assert usages[1]["disk_usage"] == 15 + + def test_calculate_usage_removes_unused_quota_labels(self): + model = self.model + u = self.u + + self._add_dataset(10) + self._add_dataset(15, "alt_source_store") + + quota_source_map = QuotaSourceMap() + alt_source = QuotaSourceMap() + alt_source.default_quota_source = "alt_source" + quota_source_map.backends["alt_source_store"] = alt_source + + object_store = MockObjectStore(quota_source_map) + + u.calculate_and_set_disk_usage(object_store) + model.context.refresh(u) + usages = u.dictify_usage() + assert len(usages) == 2 + assert usages[0]["quota_source_label"] is None + assert usages[0]["disk_usage"] == 10 + + assert usages[1]["quota_source_label"] == "alt_source" + assert usages[1]["disk_usage"] == 15 + + alt_source.default_quota_source = "new_alt_source" + u.calculate_and_set_disk_usage(object_store) + model.context.refresh(u) + usages = u.dictify_usage() + assert len(usages) == 2 + assert usages[0]["quota_source_label"] is None + assert usages[0]["disk_usage"] == 10 + + assert usages[1]["quota_source_label"] == "new_alt_source" + assert usages[1]["disk_usage"] == 15 + + def test_calculate_usage_default_storage_disabled(self): + model = self.model + u = self.u + + self._add_dataset(10) + self._add_dataset(15, "alt_source_store") + + quota_source_map = QuotaSourceMap(None, False) + alt_source = QuotaSourceMap("alt_source", True) + quota_source_map.backends["alt_source_store"] = alt_source + + object_store = MockObjectStore(quota_source_map) + + u.calculate_and_set_disk_usage(object_store) + model.context.refresh(u) + usages = u.dictify_usage() + assert len(usages) == 2 + assert usages[0]["quota_source_label"] is None + assert usages[0]["disk_usage"] == 0 + + assert usages[1]["quota_source_label"] == "alt_source" + assert usages[1]["disk_usage"] == 15 class QuotaTestCase(BaseModelTestCase): @@ -87,6 +253,27 @@ def test_quota(self): self._add_group_quota(u, quota) self._assert_user_quota_is(u, None) + def test_labeled_quota(self): + model = self.model + u = model.User(email="labeled_quota@example.com", password="password") + self.persist(u) + + label1 = "coollabel1" + self._assert_user_quota_is(u, None, label1) + + quota = model.Quota(name="default registered labeled", amount=21, quota_source_label=label1) + self.quota_agent.set_default_quota( + model.DefaultQuotaAssociation.types.REGISTERED, + quota, + ) + + self._assert_user_quota_is(u, 21, label1) + + quota = model.Quota(name="user quota add labeled", amount=31, operation="+", quota_source_label=label1) + self._add_user_quota(u, quota) + + self._assert_user_quota_is(u, 52, label1) + def _add_group_quota(self, user, quota): group = self.model.Group() uga = self.model.UserGroupAssociation(user, group) @@ -98,18 +285,57 @@ def _add_user_quota(self, user, quota): user.quotas.append(uqa) self.persist(quota, uqa, user) - def _assert_user_quota_is(self, user, amount): - actual_quota = self.quota_agent.get_quota(user) + def _assert_user_quota_is(self, user, amount, quota_source_label=None): + actual_quota = self.quota_agent.get_quota(user, quota_source_label=quota_source_label) assert amount == actual_quota, "Expected quota [%s], got [%s]" % (amount, actual_quota) - if amount is None: - user.total_disk_usage = 1000 - job = self.model.Job() - job.user = user - assert not self.quota_agent.is_over_quota(job, None) - else: - job = self.model.Job() - job.user = user - user.total_disk_usage = amount - 1 - assert not self.quota_agent.is_over_quota(job, None) - user.total_disk_usage = amount + 1 - assert self.quota_agent.is_over_quota(job, None) + if quota_source_label is None: + if amount is None: + user.total_disk_usage = 1000 + job = self.model.Job() + job.user = user + assert not self.quota_agent.is_over_quota(job, None) + else: + job = self.model.Job() + job.user = user + user.total_disk_usage = amount - 1 + assert not self.quota_agent.is_over_quota(job, None) + user.total_disk_usage = amount + 1 + assert self.quota_agent.is_over_quota(job, None) + + +class UsageTestCase(BaseModelTestCase): + + def test_usage(self): + model = self.model + u = model.User(email="usage@example.com", password="password") + self.persist(u) + + u.adjust_total_disk_usage(123, None) + self.persist(u) + + assert u.get_disk_usage() == 123 + + def test_labeled_usage(self): + model = self.model + u = model.User(email="labeled.usage@example.com", password="password") + self.persist(u) + assert len(u.quota_source_usages) == 0 + + u.adjust_total_disk_usage(123, "foobar") + usages = u.dictify_usage() + assert len(usages) == 1 + + assert u.get_disk_usage() == 0 + assert u.get_disk_usage(quota_source_label="foobar") == 123 + self.model.context.refresh(u) + + usages = u.dictify_usage() + assert len(usages) == 2 + + u.adjust_total_disk_usage(124, "foobar") + self.model.context.refresh(u) + + usages = u.dictify_usage() + assert len(usages) == 2 + assert usages[1]["quota_source_label"] == "foobar" + assert usages[1]["disk_usage"] == 247 diff --git a/test/unit/test_routes.py b/test/unit/test_routes.py index a6645091bd07..5b56b9d0bf88 100644 --- a/test/unit/test_routes.py +++ b/test/unit/test_routes.py @@ -106,6 +106,12 @@ def test_galaxy_routes(): action="resolver_dependency" ) + test_webapp.assert_maps( + "/api/users/current/usage", + controller="users", + action="usage" + ) + def assert_url_is(actual, expected): assert actual == expected, "Expected URL [%s] but obtained [%s]" % (expected, actual)