From ba02e8ad83e45cb19d82bfb80a9b0f74a9cd147e Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Wed, 3 May 2023 12:50:32 +0200 Subject: [PATCH] uses ExecutionId in ExecutionManager cache, because the underlying implementation relies on hashing which breaks results on patch --- .../conquery/apiv1/QueryProcessor.java | 20 +-------- .../io/result/csv/ResultCsvProcessor.java | 7 +-- .../conquery/io/storage/MetaStorage.java | 12 ++--- .../models/query/ExecutionManager.java | 44 ++++++++++--------- .../integration/common/LoadingUtil.java | 4 +- 5 files changed, 33 insertions(+), 54 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java b/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java index 8021c91643..4a9e91d0f9 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java @@ -184,7 +184,7 @@ private ManagedExecution tryReuse(QueryDescription query, ManagedExecutionId exe if (!user.isOwner(execution)) { final ManagedExecution newExecution = - executionManager.createExecution(namespace, execution.getSubmitted(), user, execution.getDataset(), false); + executionManager.createExecution(execution.getSubmitted(), user, execution.getDataset(), false); newExecution.setLabel(execution.getLabel()); newExecution.setTags(execution.getTags().clone()); storage.updateExecution(newExecution); @@ -318,22 +318,6 @@ public void patchQuery(Subject subject, ManagedExecution execution, MetaDataPatc patch.applyTo(execution, storage, subject); storage.updateExecution(execution); - - // TODO remove this, since we don't translate anymore - // Patch this query in other datasets - final List remainingDatasets = datasetRegistry.getAllDatasets(); - remainingDatasets.remove(execution.getDataset()); - - for (Dataset dataset : remainingDatasets) { - final ManagedExecutionId id = new ManagedExecutionId(dataset.getId(), execution.getQueryId()); - final ManagedExecution otherExecution = storage.getExecution(id); - if (otherExecution == null) { - continue; - } - log.trace("Patching {} ({}) with patch: {}", execution.getClass().getSimpleName(), id, patch); - patch.applyTo(otherExecution, storage, subject); - storage.updateExecution(execution); - } } public void reexecute(Subject subject, ManagedExecution query) { @@ -399,7 +383,7 @@ public ExternalUploadResult uploadEntities(Subject subject, Dataset dataset, Ext execution = ((ManagedQuery) namespace .getExecutionManager() - .createExecution(namespace, query, subject.getUser(), dataset, false)); + .createExecution(query, subject.getUser(), dataset, false)); execution.setLastResultCount((long) statistic.getResolved().size()); diff --git a/backend/src/main/java/com/bakdata/conquery/io/result/csv/ResultCsvProcessor.java b/backend/src/main/java/com/bakdata/conquery/io/result/csv/ResultCsvProcessor.java index 490337eddd..543cf3a195 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/result/csv/ResultCsvProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/io/result/csv/ResultCsvProcessor.java @@ -75,12 +75,7 @@ public Response createResult(Su } }; - return makeResponseWithFileName( - Response.ok(out), - String.join(".", exec.getLabelWithoutAutoLabelSuffix(), ResourceConstants.FILE_EXTENTION_CSV), - new MediaType("text", "csv", charset.toString()), - ResultUtil.ContentDispositionOption.ATTACHMENT - ); + return makeResponseWithFileName(Response.ok(out), String.join(".", exec.getLabelWithoutAutoLabelSuffix(), ResourceConstants.FILE_EXTENTION_CSV), new MediaType("text", "csv", charset.toString()), ResultUtil.ContentDispositionOption.ATTACHMENT); } } diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/MetaStorage.java b/backend/src/main/java/com/bakdata/conquery/io/storage/MetaStorage.java index b07a579087..12e283dfaa 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/MetaStorage.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/MetaStorage.java @@ -30,20 +30,17 @@ @RequiredArgsConstructor public class MetaStorage extends ConqueryStorage implements Injectable { + @Getter + protected final CentralRegistry centralRegistry = new CentralRegistry(); + @Getter + protected final DatasetRegistry datasetRegistry; private final StoreFactory storageFactory; - private IdentifiableStore executions; - private IdentifiableStore formConfigs; private IdentifiableStore authUser; private IdentifiableStore authRole; private IdentifiableStore authGroup; - @Getter - protected final CentralRegistry centralRegistry = new CentralRegistry(); - @Getter - protected final DatasetRegistry datasetRegistry; - public void openStores(ObjectMapper mapper) { authUser = storageFactory.createUserStore(centralRegistry, "meta", this, mapper); authRole = storageFactory.createRoleStore(centralRegistry, "meta", this, mapper); @@ -66,7 +63,6 @@ public void openStores(ObjectMapper mapper) { authUser, authRole, authGroup, - executions, formConfigs ); diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java index d075834b09..ea2e844612 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java @@ -17,6 +17,7 @@ import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.execution.InternalExecution; import com.bakdata.conquery.models.execution.ManagedExecution; +import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.query.results.EntityResult; import com.bakdata.conquery.models.query.results.ShardResult; import com.bakdata.conquery.models.worker.Namespace; @@ -33,36 +34,41 @@ public class ExecutionManager { private final MetaStorage storage; - private final Cache>> executionResults = CacheBuilder.newBuilder() - .softValues() - .removalListener(this::executionRemoved) - .build(); + private final Cache>> executionResults = + CacheBuilder.newBuilder() + .softValues() + .removalListener(this::executionRemoved) + .build(); + /** * Manage state of evicted Queries, setting them to NEW. */ - private void executionRemoved(RemovalNotification> removalNotification) { + private void executionRemoved(RemovalNotification> removalNotification) { // If removal was done manually we assume it was also handled properly if (!removalNotification.wasEvicted()) { return; } - final ManagedExecution execution = removalNotification.getKey(); + final ManagedExecutionId executionId = removalNotification.getKey(); - log.warn("Evicted Results for Query[{}] (Reason: {})", execution.getId(), removalNotification.getCause()); + log.warn("Evicted Results for Query[{}] (Reason: {})", executionId, removalNotification.getCause()); - execution.reset(); + storage.getExecution(executionId).reset(); } - public ManagedExecution runQuery(Namespace namespace, QueryDescription query, User user, Dataset submittedDataset, ConqueryConfig config, boolean system) { - final ManagedExecution execution = createExecution(namespace, query, user, submittedDataset, system); + final ManagedExecution execution = createExecution(query, user, submittedDataset, system); execute(namespace, execution, config); return execution; } + public ManagedExecution createExecution(QueryDescription query, User user, Dataset submittedDataset, boolean system) { + return createQuery(query, UUID.randomUUID(), user, submittedDataset, system); + } + public void execute(Namespace namespace, ManagedExecution execution, ConqueryConfig config) { // Initialize the query / create subqueries try { @@ -90,12 +96,8 @@ public void execute(Namespace namespace, ManagedExecution execution, ConqueryCon } } - public ManagedExecution createExecution(Namespace namespace, QueryDescription query, User user, Dataset submittedDataset, boolean system) { - return createQuery(namespace, query, UUID.randomUUID(), user, submittedDataset, system); - } - - public ManagedExecution createQuery(Namespace namespace, QueryDescription query, UUID queryId, User user, Dataset submittedDataset, boolean system) { + public ManagedExecution createQuery(QueryDescription query, UUID queryId, User user, Dataset submittedDataset, boolean system) { // Transform the submitted query into an initialized execution ManagedExecution managed = query.toManagedExecution(user, submittedDataset, storage); managed.setSystem(system); @@ -107,7 +109,6 @@ public ManagedExecution createQuery(Namespace namespace, QueryDescription query, return managed; } - /** * Receive part of query result and store into query. * @@ -134,14 +135,13 @@ public } } - /** * Register another result for the execution. */ @SneakyThrows(ExecutionException.class) // can only occur if ArrayList::new fails which is unlikely and would have other problems also public void addQueryResult(ManagedExecution execution, List queryResults) { // We don't collect all results together into a fat list as that would cause lots of huge re-allocations for little gain. - executionResults.get(execution, ArrayList::new) + executionResults.get(execution.getId(), ArrayList::new) .add(queryResults); } @@ -149,17 +149,21 @@ public void addQueryResult(ManagedExecution execution, List queryR * Discard the query's results. */ public void clearQueryResults(ManagedExecution execution) { - executionResults.invalidate(execution); + executionResults.invalidate(execution.getId()); } /** * Stream the results of the query, if available. */ public Stream streamQueryResults(ManagedExecution execution) { - final List> resultParts = executionResults.getIfPresent(execution); + final List> resultParts = executionResults.getIfPresent(execution.getId()); return resultParts == null ? Stream.empty() : resultParts.stream().flatMap(List::stream); } + + + + } diff --git a/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java b/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java index 20833bd437..c8ada30207 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java @@ -74,7 +74,7 @@ public static void importPreviousQueries(StandaloneSupport support, RequiredData ConceptQuery query = new ConceptQuery(new CQExternal(Arrays.asList("ID", "DATE_SET"), data, false)); ManagedExecution managed = support.getNamespace().getExecutionManager() - .createQuery(support.getNamespace(), query, queryId, user, support.getNamespace().getDataset(), false); + .createQuery(query, queryId, user, support.getNamespace().getDataset(), false); user.addPermission(managed.createPermission(AbilitySets.QUERY_CREATOR)); @@ -91,7 +91,7 @@ public static void importPreviousQueries(StandaloneSupport support, RequiredData ManagedExecution managed = support.getNamespace() .getExecutionManager() - .createQuery(support.getNamespace(), query, queryId, user, support.getNamespace().getDataset(), false); + .createQuery(query, queryId, user, support.getNamespace().getDataset(), false); user.addPermission(ExecutionPermission.onInstance(AbilitySets.QUERY_CREATOR, managed.getId()));