Skip to content

Commit

Permalink
uses ExecutionId in ExecutionManager cache, because the underlying im…
Browse files Browse the repository at this point in the history
…plementation relies on hashing which breaks results on patch
  • Loading branch information
awildturtok committed May 8, 2023
1 parent db30fa4 commit ba02e8a
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private ManagedExecution tryReuse(QueryDescription query, ManagedExecutionId exe
if (!user.isOwner(execution)) {
final ManagedExecution
newExecution =
executionManager.createExecution(namespace, execution.getSubmitted(), user, execution.getDataset(), false);
executionManager.createExecution(execution.getSubmitted(), user, execution.getDataset(), false);
newExecution.setLabel(execution.getLabel());
newExecution.setTags(execution.getTags().clone());
storage.updateExecution(newExecution);
Expand Down Expand Up @@ -318,22 +318,6 @@ public void patchQuery(Subject subject, ManagedExecution execution, MetaDataPatc

patch.applyTo(execution, storage, subject);
storage.updateExecution(execution);

// TODO remove this, since we don't translate anymore
// Patch this query in other datasets
final List<Dataset> remainingDatasets = datasetRegistry.getAllDatasets();
remainingDatasets.remove(execution.getDataset());

for (Dataset dataset : remainingDatasets) {
final ManagedExecutionId id = new ManagedExecutionId(dataset.getId(), execution.getQueryId());
final ManagedExecution otherExecution = storage.getExecution(id);
if (otherExecution == null) {
continue;
}
log.trace("Patching {} ({}) with patch: {}", execution.getClass().getSimpleName(), id, patch);
patch.applyTo(otherExecution, storage, subject);
storage.updateExecution(execution);
}
}

public void reexecute(Subject subject, ManagedExecution query) {
Expand Down Expand Up @@ -399,7 +383,7 @@ public ExternalUploadResult uploadEntities(Subject subject, Dataset dataset, Ext
execution =
((ManagedQuery) namespace
.getExecutionManager()
.createExecution(namespace, query, subject.getUser(), dataset, false));
.createExecution(query, subject.getUser(), dataset, false));

execution.setLastResultCount((long) statistic.getResolved().size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,7 @@ public <E extends ManagedExecution & SingleTableResult> Response createResult(Su
}
};

return makeResponseWithFileName(
Response.ok(out),
String.join(".", exec.getLabelWithoutAutoLabelSuffix(), ResourceConstants.FILE_EXTENTION_CSV),
new MediaType("text", "csv", charset.toString()),
ResultUtil.ContentDispositionOption.ATTACHMENT
);
return makeResponseWithFileName(Response.ok(out), String.join(".", exec.getLabelWithoutAutoLabelSuffix(), ResourceConstants.FILE_EXTENTION_CSV), new MediaType("text", "csv", charset.toString()), ResultUtil.ContentDispositionOption.ATTACHMENT);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,17 @@
@RequiredArgsConstructor
public class MetaStorage extends ConqueryStorage implements Injectable {

@Getter
protected final CentralRegistry centralRegistry = new CentralRegistry();
@Getter
protected final DatasetRegistry datasetRegistry;
private final StoreFactory storageFactory;

private IdentifiableStore<ManagedExecution> executions;

private IdentifiableStore<FormConfig> formConfigs;
private IdentifiableStore<User> authUser;
private IdentifiableStore<Role> authRole;
private IdentifiableStore<Group> authGroup;

@Getter
protected final CentralRegistry centralRegistry = new CentralRegistry();
@Getter
protected final DatasetRegistry datasetRegistry;

public void openStores(ObjectMapper mapper) {
authUser = storageFactory.createUserStore(centralRegistry, "meta", this, mapper);
authRole = storageFactory.createRoleStore(centralRegistry, "meta", this, mapper);
Expand All @@ -66,7 +63,6 @@ public void openStores(ObjectMapper mapper) {
authUser,
authRole,
authGroup,

executions,
formConfigs
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.bakdata.conquery.models.execution.ExecutionState;
import com.bakdata.conquery.models.execution.InternalExecution;
import com.bakdata.conquery.models.execution.ManagedExecution;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.query.results.EntityResult;
import com.bakdata.conquery.models.query.results.ShardResult;
import com.bakdata.conquery.models.worker.Namespace;
Expand All @@ -33,36 +34,41 @@ public class ExecutionManager {

private final MetaStorage storage;

private final Cache<ManagedExecution, List<List<EntityResult>>> executionResults = CacheBuilder.newBuilder()
.softValues()
.removalListener(this::executionRemoved)
.build();
private final Cache<ManagedExecutionId, List<List<EntityResult>>> executionResults =
CacheBuilder.newBuilder()
.softValues()
.removalListener(this::executionRemoved)
.build();


/**
* Manage state of evicted Queries, setting them to NEW.
*/
private void executionRemoved(RemovalNotification<ManagedExecution, List<?>> removalNotification) {
private void executionRemoved(RemovalNotification<ManagedExecutionId, List<?>> removalNotification) {

// If removal was done manually we assume it was also handled properly
if (!removalNotification.wasEvicted()) {
return;
}

final ManagedExecution execution = removalNotification.getKey();
final ManagedExecutionId executionId = removalNotification.getKey();

log.warn("Evicted Results for Query[{}] (Reason: {})", execution.getId(), removalNotification.getCause());
log.warn("Evicted Results for Query[{}] (Reason: {})", executionId, removalNotification.getCause());

execution.reset();
storage.getExecution(executionId).reset();
}


public ManagedExecution runQuery(Namespace namespace, QueryDescription query, User user, Dataset submittedDataset, ConqueryConfig config, boolean system) {
final ManagedExecution execution = createExecution(namespace, query, user, submittedDataset, system);
final ManagedExecution execution = createExecution(query, user, submittedDataset, system);
execute(namespace, execution, config);

return execution;
}

public ManagedExecution createExecution(QueryDescription query, User user, Dataset submittedDataset, boolean system) {
return createQuery(query, UUID.randomUUID(), user, submittedDataset, system);
}

public void execute(Namespace namespace, ManagedExecution execution, ConqueryConfig config) {
// Initialize the query / create subqueries
try {
Expand Down Expand Up @@ -90,12 +96,8 @@ public void execute(Namespace namespace, ManagedExecution execution, ConqueryCon
}
}

public ManagedExecution createExecution(Namespace namespace, QueryDescription query, User user, Dataset submittedDataset, boolean system) {
return createQuery(namespace, query, UUID.randomUUID(), user, submittedDataset, system);
}


public ManagedExecution createQuery(Namespace namespace, QueryDescription query, UUID queryId, User user, Dataset submittedDataset, boolean system) {
public ManagedExecution createQuery(QueryDescription query, UUID queryId, User user, Dataset submittedDataset, boolean system) {
// Transform the submitted query into an initialized execution
ManagedExecution managed = query.toManagedExecution(user, submittedDataset, storage);
managed.setSystem(system);
Expand All @@ -107,7 +109,6 @@ public ManagedExecution createQuery(Namespace namespace, QueryDescription query,
return managed;
}


/**
* Receive part of query result and store into query.
*
Expand All @@ -134,32 +135,35 @@ public <R extends ShardResult, E extends ManagedExecution & InternalExecution<R>
}
}


/**
* Register another result for the execution.
*/
@SneakyThrows(ExecutionException.class) // can only occur if ArrayList::new fails which is unlikely and would have other problems also
public void addQueryResult(ManagedExecution execution, List<EntityResult> queryResults) {
// We don't collect all results together into a fat list as that would cause lots of huge re-allocations for little gain.
executionResults.get(execution, ArrayList::new)
executionResults.get(execution.getId(), ArrayList::new)
.add(queryResults);
}

/**
* Discard the query's results.
*/
public void clearQueryResults(ManagedExecution execution) {
executionResults.invalidate(execution);
executionResults.invalidate(execution.getId());
}

/**
* Stream the results of the query, if available.
*/
public Stream<EntityResult> streamQueryResults(ManagedExecution execution) {
final List<List<EntityResult>> resultParts = executionResults.getIfPresent(execution);
final List<List<EntityResult>> resultParts = executionResults.getIfPresent(execution.getId());

return resultParts == null
? Stream.empty()
: resultParts.stream().flatMap(List::stream);
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static void importPreviousQueries(StandaloneSupport support, RequiredData
ConceptQuery query = new ConceptQuery(new CQExternal(Arrays.asList("ID", "DATE_SET"), data, false));

ManagedExecution managed = support.getNamespace().getExecutionManager()
.createQuery(support.getNamespace(), query, queryId, user, support.getNamespace().getDataset(), false);
.createQuery(query, queryId, user, support.getNamespace().getDataset(), false);

user.addPermission(managed.createPermission(AbilitySets.QUERY_CREATOR));

Expand All @@ -91,7 +91,7 @@ public static void importPreviousQueries(StandaloneSupport support, RequiredData
ManagedExecution managed =
support.getNamespace()
.getExecutionManager()
.createQuery(support.getNamespace(), query, queryId, user, support.getNamespace().getDataset(), false);
.createQuery(query, queryId, user, support.getNamespace().getDataset(), false);

user.addPermission(ExecutionPermission.onInstance(AbilitySets.QUERY_CREATOR, managed.getId()));

Expand Down

0 comments on commit ba02e8a

Please sign in to comment.