diff --git a/.github/workflows/test_backend.yml b/.github/workflows/test_backend.yml
index e481274689..f250dfcc49 100644
--- a/.github/workflows/test_backend.yml
+++ b/.github/workflows/test_backend.yml
@@ -14,7 +14,7 @@ on:
jobs:
test:
runs-on: ubuntu-latest
- timeout-minutes: 15
+ timeout-minutes: 20
steps:
- name: Cache local Maven repository
uses: actions/cache@v2
@@ -41,5 +41,4 @@ jobs:
- name: JSON based Integration Tests
run: mvn test -T 1C -pl backend -Dgroups="INTEGRATION_JSON"
- name: SQL based Integration Tests
- if: ${{ startsWith(github.head_ref, 'sql/') }}
run: mvn test -T 1C -pl backend -Dgroups="INTEGRATION_SQL_BACKEND"
diff --git a/backend/pom.xml b/backend/pom.xml
index c2cdef06ad..5f4b400d24 100644
--- a/backend/pom.xml
+++ b/backend/pom.xml
@@ -379,5 +379,10 @@
1.17.6test
+
+ com.sap.cloud.db.jdbc
+ ngdbc
+ 2.17.10
+
diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/execution/FullExecutionStatus.java b/backend/src/main/java/com/bakdata/conquery/apiv1/execution/FullExecutionStatus.java
index 17c058884d..32d9d45c38 100644
--- a/backend/src/main/java/com/bakdata/conquery/apiv1/execution/FullExecutionStatus.java
+++ b/backend/src/main/java/com/bakdata/conquery/apiv1/execution/FullExecutionStatus.java
@@ -46,6 +46,8 @@ public class FullExecutionStatus extends ExecutionStatus {
*/
private boolean canExpand;
+ private boolean containsDates;
+
/**
* Is set to the query description if the user can expand all included concepts.
*/
diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/TableExportQuery.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/TableExportQuery.java
index 993cee86fc..bd5116638f 100644
--- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/TableExportQuery.java
+++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/TableExportQuery.java
@@ -34,6 +34,7 @@
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.ConceptElement;
import com.bakdata.conquery.models.datasets.concepts.Connector;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeNode;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
@@ -148,6 +149,13 @@ public void resolve(QueryResolveContext context) {
final Map secondaryIdPositions = calculateSecondaryIdPositions(currentPosition);
+ final Set validityDates = tables.stream()
+ .map(CQConcept::getTables)
+ .flatMap(Collection::stream)
+ .map(CQTable::findValidityDate)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+
// We need to know if a column is a concept column so we can prioritize it if it is also a SecondaryId
final Set conceptColumns = tables.stream()
.map(CQConcept::getTables)
@@ -157,7 +165,7 @@ public void resolve(QueryResolveContext context) {
.filter(Objects::nonNull)
.collect(Collectors.toSet());
- positions = calculateColumnPositions(currentPosition, tables, secondaryIdPositions, conceptColumns);
+ positions = calculateColumnPositions(currentPosition, tables, secondaryIdPositions, conceptColumns, validityDates);
resultInfos = createResultInfos(secondaryIdPositions, conceptColumns);
}
@@ -179,22 +187,21 @@ private Map calculateSecondaryIdPositions(Atomi
return secondaryIdPositions;
}
- private static Map calculateColumnPositions(AtomicInteger currentPosition, List tables, Map secondaryIdPositions, Set conceptColumns) {
+ private static Map calculateColumnPositions(AtomicInteger currentPosition, List tables, Map secondaryIdPositions, Set conceptColumns, Set validityDates) {
final Map positions = new HashMap<>();
for (CQConcept concept : tables) {
for (CQTable table : concept.getTables()) {
- final Column validityDateColumn = table.findValidityDateColumn();
-
- if (validityDateColumn != null) {
- positions.putIfAbsent(validityDateColumn, 0);
- }
-
// Set column positions, set SecondaryId positions to precomputed ones.
for (Column column : table.getConnector().getTable().getColumns()) {
+ // ValidityDates are handled separately in column=0
+ if (validityDates.stream().anyMatch(vd -> vd.containsColumn(column))) {
+ continue;
+ }
+
if (positions.containsKey(column)) {
continue;
}
@@ -341,4 +348,4 @@ public void visit(Consumer visitor) {
public RequiredEntities collectRequiredEntities(QueryExecutionContext context) {
return query.collectRequiredEntities(context);
}
-}
\ No newline at end of file
+}
diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/filter/CQTable.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/filter/CQTable.java
index bea2268426..b858c3741d 100644
--- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/filter/CQTable.java
+++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/filter/CQTable.java
@@ -10,8 +10,8 @@
import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept;
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
import com.bakdata.conquery.io.jackson.serializer.NsIdRefCollection;
-import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.concepts.Connector;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.datasets.concepts.select.Select;
import com.bakdata.conquery.models.query.QueryResolveContext;
import com.fasterxml.jackson.annotation.JsonBackReference;
@@ -75,18 +75,17 @@ public void resolve(QueryResolveContext context) {
}
@CheckForNull
- public Column findValidityDateColumn() {
+ public ValidityDate findValidityDate() {
- // if no dateColumn is provided, we use the default instead which is always the first one.
- // Set to null if none-available in the connector.
if (dateColumn != null) {
- return dateColumn.getValue().getColumn();
+ return dateColumn.getValue();
}
if (!connector.getValidityDates().isEmpty()) {
- return connector.getValidityDates().get(0).getColumn();
+ return connector.getValidityDates().get(0);
}
return null;
}
-}
\ No newline at end of file
+
+}
diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/filter/FilterValue.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/filter/FilterValue.java
index f3d2a1de35..11bd78c789 100644
--- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/filter/FilterValue.java
+++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/filter/FilterValue.java
@@ -191,7 +191,7 @@ public void resolve(QueryResolveContext context) {
* Values of group filters can have an arbitrary format which is set by the filter itself.
* Hence, we treat the value for the filter as Object.class.
*
- * The resolved filter instructs the frontend on how to render and serialize the filter value using the {@link Filter#createFrontendConfig()} method. The filter must implement {@link GroupFilter} and provide the type information of the value to correctly deserialize the received object.
+ * The resolved filter instructs the frontend on how to render and serialize the filter value using the {@link Filter#createFrontendConfig(com.bakdata.conquery.models.config.ConqueryConfig)} method. The filter must implement {@link GroupFilter} and provide the type information of the value to correctly deserialize the received object.
*/
public static class GroupFilterDeserializer extends StdDeserializer {
private final NsIdReferenceDeserializer> nsIdDeserializer = new NsIdReferenceDeserializer<>(Filter.class, null, FilterId.class);
diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/CQConcept.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/CQConcept.java
index f9ea072431..e425c8f106 100644
--- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/CQConcept.java
+++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/CQConcept.java
@@ -25,6 +25,7 @@
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.ConceptElement;
import com.bakdata.conquery.models.datasets.concepts.Connector;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.datasets.concepts.select.Select;
import com.bakdata.conquery.models.identifiable.ids.NamespacedIdentifiable;
import com.bakdata.conquery.models.query.DateAggregationMode;
@@ -226,7 +227,7 @@ public QPNode createQueryPlan(QueryPlanContext context, ConceptQueryPlan plan) {
final QPNode
conceptSpecificNode =
- getConcept().createConceptQuery(context, filters, aggregators, eventDateUnionAggregators, selectValidityDateColumn(table));
+ getConcept().createConceptQuery(context, filters, aggregators, eventDateUnionAggregators, selectValidityDate(table));
// Link up the ExistsAggregators to the node
existsAggregators.forEach(agg -> agg.setReference(conceptSpecificNode));
@@ -272,14 +273,14 @@ private static List> createAggregators(ConceptQueryPlan plan, List
.collect(Collectors.toList());
}
- private Column selectValidityDateColumn(CQTable table) {
+ private ValidityDate selectValidityDate(CQTable table) {
if (table.getDateColumn() != null) {
- return table.getDateColumn().getValue().getColumn();
+ return table.getDateColumn().getValue();
}
//else use this first defined validity date column
if (!table.getConnector().getValidityDates().isEmpty()) {
- return table.getConnector().getValidityDates().get(0).getColumn();
+ return table.getConnector().getValidityDates().get(0);
}
return null;
diff --git a/backend/src/main/java/com/bakdata/conquery/commands/MigrateCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/MigrateCommand.java
index 416ea09024..5a5484184d 100644
--- a/backend/src/main/java/com/bakdata/conquery/commands/MigrateCommand.java
+++ b/backend/src/main/java/com/bakdata/conquery/commands/MigrateCommand.java
@@ -1,14 +1,21 @@
package com.bakdata.conquery.commands;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.config.XodusStoreFactory;
import com.bakdata.conquery.util.io.ConqueryMDC;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -27,11 +34,13 @@
import jetbrains.exodus.env.StoreConfig;
import jetbrains.exodus.env.Transaction;
import kotlin.jvm.functions.Function4;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.codehaus.groovy.control.CompilerConfiguration;
+import org.jetbrains.annotations.NotNull;
/**
* Command allowing script based migration of databases. Especially useful for data that cannot be easily recreated after reimports, such as {@link com.bakdata.conquery.models.auth.entities.User}s and {@link com.bakdata.conquery.models.execution.ManagedExecution}s.
@@ -56,7 +65,6 @@
@Slf4j
public class MigrateCommand extends ConqueryCommand {
-
public MigrateCommand() {
super("migrate", "Run a migration script on a store.");
}
@@ -75,6 +83,18 @@ public void configure(Subparser subparser) {
.required(true)
.type(Arguments.fileType());
+ subparser
+ .addArgument("--in-gzip")
+ .help("If true, values are ungzipped before deserialization.")
+ .setDefault(true)
+ .type(Arguments.booleanType());
+
+ subparser
+ .addArgument("--out-gzip")
+ .help("If true, values are gzipped before writing.")
+ .setDefault(true)
+ .type(Arguments.booleanType());
+
subparser
.addArgument("--script")
.help("Migration Script returning a closure implementing MigrationScriptFactory. See supplementary example.groovy for details.\nSignature: String env, String store, String key, ObjectNode value -> return new Tuple(key,value)")
@@ -88,6 +108,10 @@ protected void run(io.dropwizard.setup.Environment environment, Namespace namesp
final File inStoreDirectory = namespace.get("in");
final File outStoreDirectory = namespace.get("out");
+ final boolean inGzip = namespace.get("in_gzip");
+ final boolean outGzip = namespace.get("out_gzip");
+
+
final long logsize = ((XodusStoreFactory) configuration.getStorage()).getXodus().getLogFileSize().toKilobytes();
@@ -99,11 +123,11 @@ protected void run(io.dropwizard.setup.Environment environment, Namespace namesp
}
// Create Groovy Shell and parse script
- CompilerConfiguration config = new CompilerConfiguration();
+ final CompilerConfiguration config = new CompilerConfiguration();
config.setScriptBaseClass(MigrationScriptFactory.class.getName());
- GroovyShell groovy = new GroovyShell(config);
+ final GroovyShell groovy = new GroovyShell(config);
- MigrationScriptFactory factory = (MigrationScriptFactory) groovy.parse(In.file((File) namespace.get("script")).readAll());
+ final MigrationScriptFactory factory = (MigrationScriptFactory) groovy.parse(In.file((File) namespace.get("script")).readAll());
final Function4 migrator = factory.run();
@@ -116,25 +140,12 @@ protected void run(io.dropwizard.setup.Environment environment, Namespace namesp
final File environmentDirectory = new File(outStoreDirectory, xenv.getName());
environmentDirectory.mkdirs();
- processEnvironment(xenv, logsize, environmentDirectory, migrator, mapper);
+ processEnvironment(xenv, logsize, environmentDirectory, migrator, mapper, inGzip, outGzip);
});
}
-
- /**
- * Class defining the interface for the Groovy-Script.
- */
- public abstract static class MigrationScriptFactory extends Script {
-
- /**
- * Environment -> Store -> Key -> Value -> (Key, Value)
- */
- @Override
- public abstract Function4 run();
- }
-
- private void processEnvironment(File inStoreDirectory, long logSize, File outStoreDirectory, Function4 migrator, ObjectMapper mapper) {
+ private void processEnvironment(File inStoreDirectory, long logSize, File outStoreDirectory, Function4 migrator, ObjectMapper mapper, boolean inGzip, boolean outGzip) {
final jetbrains.exodus.env.Environment inEnvironment = Environments.newInstance(
inStoreDirectory,
new EnvironmentConfig().setLogFileSize(logSize)
@@ -175,7 +186,7 @@ private void processEnvironment(File inStoreDirectory, long logSize, File outSto
continue;
}
- doMigrate(inStore, outStore, migrator, mapper);
+ migrateStore(inStore, outStore, migrator, mapper, inGzip, outGzip);
log.info("Done writing {}.", store);
}
@@ -191,7 +202,7 @@ private void processEnvironment(File inStoreDirectory, long logSize, File outSto
inEnvironment.close();
}
- private void doMigrate(Store inStore, Store outStore, Function4 migrator, ObjectMapper mapper) {
+ private void migrateStore(Store inStore, Store outStore, Function4 migrator, ObjectMapper mapper, boolean inGzip, boolean outGzip) {
final Environment inEnvironment = inStore.getEnvironment();
final Environment outEnvironment = outStore.getEnvironment();
@@ -211,13 +222,12 @@ private void doMigrate(Store inStore, Store outStore, Function4 migrated =
- migrator.invoke(inEnvironment.getLocation(), inStore.getName(), key, node);
+ final Tuple> migrated = migrator.invoke(inEnvironment.getLocation(), inStore.getName(), key, value);
// => Effectively delete the object
if (migrated == null) {
@@ -226,18 +236,18 @@ private void doMigrate(Store inStore, Store outStore, Function4 Store -> Key -> Value -> (Key, Value)
+ */
+ @Override
+ public abstract Function4 run();
+ }
}
diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/BigStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/BigStore.java
index 556c561b6d..4beb3633c0 100644
--- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/BigStore.java
+++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/BigStore.java
@@ -13,6 +13,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;
@@ -62,7 +64,7 @@ public BigStore(XodusStoreFactory config,
StoreInfo storeInfo,
Consumer storeCloseHook,
Consumer storeRemoveHook,
- ObjectMapper mapper) {
+ ObjectMapper mapper, ExecutorService executorService) {
this.storeInfo = storeInfo;
// Recommendation by the author of Xodus is to have logFileSize at least be 4 times the biggest file size.
@@ -77,7 +79,7 @@ public BigStore(XodusStoreFactory config,
BigStoreMetaKeys.class,
config.isValidateOnWrite(),
config.isRemoveUnreadableFromStore(),
- config.getUnreadableDataDumpDirectory()
+ config.getUnreadableDataDumpDirectory(), executorService
);
@@ -90,7 +92,7 @@ public BigStore(XodusStoreFactory config,
byte[].class,
config.isValidateOnWrite(),
config.isRemoveUnreadableFromStore(),
- config.getUnreadableDataDumpDirectory()
+ config.getUnreadableDataDumpDirectory(), executorService
);
@@ -158,7 +160,7 @@ public Collection getAll() {
@Override
public Collection getAllKeys() {
- List out = new ArrayList<>();
+ Collection out = new ConcurrentLinkedQueue<>(); // has to be concurrent because forEach is concurrent.
metaStore.forEach((key, value, size) -> out.add(key));
return out;
}
diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/KeyIncludingStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/KeyIncludingStore.java
index 9d1dbdf6ca..585eb6460d 100644
--- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/KeyIncludingStore.java
+++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/KeyIncludingStore.java
@@ -1,11 +1,10 @@
package com.bakdata.conquery.io.storage.xodus.stores;
-import com.bakdata.conquery.io.storage.Store;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
-import java.util.function.Consumer;
+
+import com.bakdata.conquery.io.storage.Store;
public abstract class KeyIncludingStore implements Closeable {
@@ -26,9 +25,7 @@ public VALUE get(KEY key) {
return store.get(key);
}
- public void forEach(Consumer consumer) {
- store.forEach((key, value, size) -> consumer.accept(value));
- }
+
public void update(VALUE value) {
updated(value);
diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java
index 6cf3fcddbd..c4d2591d71 100644
--- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java
+++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java
@@ -1,14 +1,28 @@
package com.bakdata.conquery.io.storage.xodus.stores;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.PrintStream;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.function.Supplier;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import javax.validation.Validator;
@@ -18,19 +32,26 @@
import com.bakdata.conquery.models.config.XodusStoreFactory;
import com.bakdata.conquery.models.exceptions.ValidatorHelper;
import com.bakdata.conquery.util.io.FileUtil;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import jetbrains.exodus.ArrayByteIterable;
import jetbrains.exodus.ByteIterable;
import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
import lombok.NonNull;
+import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
/**
* Key-value-store from {@link KEY} type values to {@link VALUE} values. ACID consistent, stored on disk using {@link jetbrains.exodus.env.Store} via {@link XodusStore}.
@@ -96,15 +117,9 @@ public class SerializingStore implements Store {
private final boolean removeUnreadablesFromUnderlyingStore;
private final ObjectMapper objectMapper;
+ private final ExecutorService executor;
- public , CLASS_V extends Class> SerializingStore(XodusStore store,
- Validator validator,
- ObjectMapper objectMapper,
- CLASS_K keyType,
- CLASS_V valueType,
- boolean validateOnWrite,
- boolean removeUnreadableFromStore,
- File unreadableDataDumpDirectory) {
+ public , CLASS_V extends Class> SerializingStore(XodusStore store, Validator validator, ObjectMapper objectMapper, CLASS_K keyType, CLASS_V valueType, boolean validateOnWrite, boolean removeUnreadableFromStore, File unreadableDataDumpDirectory, ExecutorService executorService) {
this.store = store;
this.validator = validator;
this.validateOnWrite = validateOnWrite;
@@ -125,9 +140,11 @@ public , CLASS_V extends Class> SerializingSto
unreadableValuesDumpDir = unreadableDataDumpDirectory;
+ executor = executorService;
+
if (shouldDumpUnreadables()) {
- if(!unreadableValuesDumpDir.exists() && !unreadableValuesDumpDir.mkdirs()) {
- throw new IllegalStateException("Could not create dump directory: " + unreadableValuesDumpDir);
+ if (!unreadableValuesDumpDir.exists() && !unreadableValuesDumpDir.mkdirs()) {
+ throw new IllegalStateException("Could not create dump directory: %s".formatted(unreadableValuesDumpDir));
}
else if (!unreadableValuesDumpDir.isDirectory()) {
throw new IllegalArgumentException(String.format("The provided path points to an existing file which is not a directory. Was: %s", unreadableValuesDumpDir.getAbsolutePath()));
@@ -139,163 +156,53 @@ private boolean shouldDumpUnreadables() {
return unreadableValuesDumpDir != null;
}
- @Override
- public void add(KEY key, VALUE value) {
- if (!valueType.isInstance(value)) {
- throw new IllegalStateException("The element " + value + " is not of the required type " + valueType);
- }
- if (validateOnWrite) {
- ValidatorHelper.failOnError(log, validator.validate(value));
- }
-
- store.add(writeKey(key), writeValue(value));
- }
-
- @Override
- public VALUE get(KEY key) {
- final ByteIterable binValue = store.get(writeKey(key));
-
- try {
- return readValue(binValue);
- } catch (Exception e) {
-
- if(unreadableValuesDumpDir != null) {
- dumpToFile(binValue, key.toString(), e, unreadableValuesDumpDir, store.getName(), objectMapper);
- }
-
- if(removeUnreadablesFromUnderlyingStore) {
- remove(key);
- // Null seems to be an acceptable return value in this case
- return null;
- }
-
- Throwables.throwIfUnchecked(e);
-
- throw new RuntimeException(e);
- }
- }
-
/**
- * Iterates a given consumer over the entries of this store.
- * Depending on the {@link XodusStoreFactory} corrupt entries may be dump to a file and/or removed from the store.
- * These entries are not submitted to the consumer.
+ * Generates a valid file name from the key of the dump object, the store and the current time.
+ * However, it does not ensure that there is no file with such a name.
+ *
+ * Current implementation is `$unreadableDumpDir/$today/$store/$key.json`
*/
- @Override
- public IterationStatistic forEach(StoreEntryConsumer consumer) {
- final IterationStatistic result = new IterationStatistic();
- final ArrayList unreadables = new ArrayList<>();
-
- store.forEach((k, v) -> {
- result.incrTotalProcessed();
-
- // Try to read the key first
- final KEY key = getDeserializedAndDumpFailed(
- k,
- this::readKey,
- () -> new String(k.getBytesUnsafe()),
- v,
- "Could not parse key [{}]"
- );
- if (key == null) {
- unreadables.add(k);
- result.incrFailedKeys();
- return;
- }
-
- // Try to read the value
- final VALUE value = getDeserializedAndDumpFailed(
- v,
- this::readValue,
- key::toString,
- v,
- "Could not parse value for key [{}]"
- );
-
- if (value == null) {
- unreadables.add(k);
- result.incrFailedValues();
- return;
- }
-
- // Apply the consumer to key and value
- try {
- consumer.accept(key, value, v.getLength());
- }
- catch (Exception e) {
- log.warn("Unable to apply for-each consumer on key[{}]", key, e);
- }
-
- });
- // Print some statistics
- final int total = result.getTotalProcessed();
- log.debug(
- String.format(
- "While processing store %s:\n\tEntries processed:\t%d\n\tKey read failure:\t%d (%.2f%%)\n\tValue read failure:\t%d (%.2f%%)",
- store.getName(),
- total,
- result.getFailedKeys(),
- total > 0 ? (float) result.getFailedKeys() / total * 100 : 0,
- result.getFailedValues(),
- total > 0 ? (float) result.getFailedValues() / total * 100 : 0));
+ @NotNull
+ public static File makeDumpFileName(@NotNull String keyOfDump, @NotNull File unreadableDumpDir, @NotNull String storeName) {
+ return unreadableDumpDir.toPath()
+ .resolve(DateTimeFormatter.BASIC_ISO_DATE.format(LocalDateTime.now()))
+ .resolve(storeName)
+ .resolve(sanitiseFileName(keyOfDump) + "." + DUMP_FILE_EXTENSION)
+ .toFile();
- // Remove corrupted entries from the store if configured so
- if (removeUnreadablesFromUnderlyingStore) {
- log.warn("Removing {} unreadable elements from the store {}.", unreadables.size(), store.getName());
- unreadables.forEach(store::remove);
- }
- return result;
}
/**
- * Deserializes the gives serial value (either a key or a value of an store entry) to a concrete object. If that fails the entry-value is dumped if configured so to a file using the entry-key for the filename.
- *
- * @param The deserialized object type.
- * @param serial The to be deserialized object (key or value of an entry)
- * @param deserializer The concrete deserializer to use.
- * @param onFailKeyStringSupplier When deserilization failed and dump is enabled this is used in the dump file name.
- * @param onFailOrigValue Will be the dumpfile content rendered as a json.
- * @param onFailWarnMsgFmt The warn message that will be logged on failure.
- * @return The deserialized value
+ * Generates a valid file name from the key of the dump object, the store and the current time.
+ * However, it does not ensure that there is no file with such a name.
+ *
+ * Current implementation is `$unreadableDumpDir/$today/$store/$key.exception`
*/
- private TYPE getDeserializedAndDumpFailed(ByteIterable serial, Function deserializer, Supplier onFailKeyStringSupplier, ByteIterable onFailOrigValue, String onFailWarnMsgFmt) {
- try {
- return deserializer.apply(serial);
- }
- catch (Exception e) {
- // With trace also print the stacktrace
- log.warn(onFailWarnMsgFmt, onFailKeyStringSupplier.get(), log.isTraceEnabled() ? e : null);
+ @NotNull
+ public static File makeExceptionFileName(@NotNull String keyOfDump, @NotNull File unreadableDumpDir, @NotNull String storeName) {
+ return unreadableDumpDir.toPath()
+ .resolve(DateTimeFormatter.BASIC_ISO_DATE.format(LocalDateTime.now()))
+ .resolve(storeName)
+ .resolve(sanitiseFileName(keyOfDump) + "." + EXCEPTION_FILE_EXTENSION)
+ .toFile();
- if (shouldDumpUnreadables()) {
- dumpToFile(onFailOrigValue, onFailKeyStringSupplier.get(), e, unreadableValuesDumpDir, store.getName(), objectMapper);
- }
- }
- return null;
}
+ private static String sanitiseFileName(@NotNull String name) {
+ return FileUtil.SAVE_FILENAME_REPLACEMENT_MATCHER.matcher(name).replaceAll("_");
+ }
+
+
@Override
- public void update(KEY key, VALUE value) {
+ public void add(KEY key, VALUE value) {
if (!valueType.isInstance(value)) {
- throw new IllegalStateException("The element " + value + " is not of the required type " + valueType);
+ throw new IllegalStateException("The element %s is not of the required type %s".formatted(value, valueType));
}
-
if (validateOnWrite) {
ValidatorHelper.failOnError(log, validator.validate(value));
}
- store.update(writeKey(key), writeValue(value));
- }
-
- @Override
- public void remove(KEY key) {
- log.trace("Removing value to key {} from Store[{}]", key, store.getName());
- store.remove(writeKey(key));
- }
-
- /**
- * Serialize value with {@code valueWriter}.
- */
- private ByteIterable writeValue(VALUE value) {
- return write(value, valueWriter);
+ store.add(writeKey(key), writeValue(value));
}
/**
@@ -306,17 +213,10 @@ private ByteIterable writeKey(KEY key) {
}
/**
- * Deserialize value with {@code valueReader}.
- */
- private VALUE readValue(ByteIterable value) {
- return read(valueReader, value);
- }
-
- /**
- * Deserialize value with {@code keyReader}.
+ * Serialize value with {@code valueWriter}.
*/
- private KEY readKey(ByteIterable key) {
- return read(keyReader, key);
+ private ByteIterable writeValue(VALUE value) {
+ return write(value, valueWriter);
}
/**
@@ -324,43 +224,65 @@ private KEY readKey(ByteIterable key) {
*/
private ByteIterable write(Object obj, ObjectWriter writer) {
try {
- final byte[] bytes = writer.writeValueAsBytes(obj);
- if (log.isTraceEnabled()) {
- final String json = JacksonUtil.toJsonDebug(bytes);
- log.trace("Written ({}): {}", valueType.getName(), json);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (final OutputStream outputStream = new GZIPOutputStream(baos)) {
+ writer.writeValue(outputStream, obj);
}
+
+ baos.close();
+
+ final byte[] bytes = baos.toByteArray();
+
return new ArrayByteIterable(bytes);
}
- catch (JsonProcessingException e) {
+ catch (IOException e) {
throw new RuntimeException("Failed to write " + obj, e);
}
}
- /**
- * Try read value with reader.
- */
- private T read(ObjectReader reader, ByteIterable obj) {
- if (obj == null) {
- return null;
- }
+ @Override
+ public VALUE get(KEY key) {
+ final ByteIterable binValue = store.get(writeKey(key));
+
try {
- return reader.readValue(obj.getBytesUnsafe(), 0, obj.getLength());
+ return readValue(binValue);
}
- catch (IOException e) {
- throw new RuntimeException("Failed to read " + JacksonUtil.toJsonDebug(obj.getBytesUnsafe()), e);
+ catch (Exception e) {
+
+ if (unreadableValuesDumpDir != null) {
+ dumpToFile(binValue.getBytesUnsafe(), key.toString(), e, unreadableValuesDumpDir, store.getName(), objectMapper);
+ }
+
+ if (removeUnreadablesFromUnderlyingStore) {
+ remove(key);
+ // Null seems to be an acceptable return value in this case
+ return null;
+ }
+
+ Throwables.throwIfUnchecked(e);
+
+ throw new RuntimeException(e);
}
}
+ /**
+ * Deserialize value with {@code valueReader}.
+ */
+ private VALUE readValue(ByteIterable value) {
+ return read(valueReader, value);
+ }
+
/**
* Dumps the content of an unreadable value to a file as a json (it tries to parse it as an object and than tries to dump it as a json).
*
- * @param obj The object to dump.
+ * @param gzippedObj The object to dump.
* @param keyOfDump The key under which the unreadable value is accessible. It is used for the file name.
- * @param reason The exception causing us to dump the file
+ * @param reason The exception causing us to dump the file
* @param unreadableDumpDir The director to dump to. The method assumes that the directory exists and is okay to write to.
* @param storeName The name of the store which is also used in the dump file name.
*/
- private static void dumpToFile(@NonNull ByteIterable obj, @NonNull String keyOfDump, Exception reason, @NonNull File unreadableDumpDir, String storeName, ObjectMapper objectMapper) {
+ private static void dumpToFile(byte[] gzippedObj, @NonNull String keyOfDump, Exception reason, @NonNull File unreadableDumpDir, String storeName, ObjectMapper objectMapper) {
// Create dump filehandle
final File dumpfile = makeDumpFileName(keyOfDump, unreadableDumpDir, storeName);
final File exceptionFileName = makeExceptionFileName(keyOfDump, unreadableDumpDir, storeName);
@@ -370,24 +292,22 @@ private static void dumpToFile(@NonNull ByteIterable obj, @NonNull String keyOfD
return;
}
- if(!dumpfile.getParentFile().exists() && !dumpfile.getParentFile().mkdirs()){
+ if (!dumpfile.getParentFile().exists() && !dumpfile.getParentFile().mkdirs()) {
throw new IllegalStateException("Could not create `%s`.".formatted(dumpfile.getParentFile()));
}
- //TODO FK: dump in a separate thread so we are not blocking the reader thread.
-
// Write json
try {
log.info("Dumping value of key {} to {} (because it cannot be deserialized anymore).", keyOfDump, dumpfile.getCanonicalPath());
- final JsonNode dump = objectMapper.readerFor(JsonNode.class).readValue(obj.getBytesUnsafe(), 0, obj.getLength());
+ final JsonNode dump = objectMapper.readerFor(JsonNode.class).readValue(new GZIPInputStream(new ByteArrayInputStream(gzippedObj)));
Jackson.MAPPER.writer().writeValue(dumpfile, dump);
}
catch (IOException e) {
log.error("Failed to dump unreadable value of key `{}` to file `{}`", keyOfDump, dumpfile, e);
}
- try(PrintStream out = new PrintStream(exceptionFileName)) {
+ try (PrintStream out = new PrintStream(exceptionFileName)) {
reason.printStackTrace(out);
}
catch (IOException e) {
@@ -396,40 +316,164 @@ private static void dumpToFile(@NonNull ByteIterable obj, @NonNull String keyOfD
}
+ @Override
+ public void remove(KEY key) {
+ log.trace("Removing value to key {} from Store[{}]", key, store.getName());
+ store.remove(writeKey(key));
+ }
+
/**
- * Generates a valid file name from the key of the dump object, the store and the current time.
- * However, it does not ensure that there is no file with such a name.
+ * Try read value with reader.
+ */
+ private T read(ObjectReader reader, ByteIterable obj) {
+ if (obj == null) {
+ return null;
+ }
+ try (final InputStream inputStream = new GZIPInputStream(new ByteArrayInputStream(obj.getBytesUnsafe(), 0, obj.getLength()))) {
+ return reader.readValue(inputStream);
+ }
+ catch (IOException e) {
+ try {
+ throw new RuntimeException("Failed to read " + JacksonUtil.toJsonDebug(debugUnGzip(obj.getBytesUnsafe())), e);
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ private static byte[] debugUnGzip(byte[] bytes) throws IOException {
+ return new GZIPInputStream(new ByteArrayInputStream(bytes)).readAllBytes();
+ }
+
+ /**
+ * Iterates a given consumer over the entries of this store.
+ * Depending on the {@link XodusStoreFactory} corrupt entries may be dump to a file and/or removed from the store.
+ * These entries are not submitted to the consumer.
*
- * Current implementation is `$unreadableDumpDir/$today/$store/$key.json`
+ * @implNote This method is concurrent!
*/
- @NotNull
- public static File makeDumpFileName(@NotNull String keyOfDump, @NotNull File unreadableDumpDir, @NotNull String storeName) {
- return unreadableDumpDir.toPath()
- .resolve(DateTimeFormatter.BASIC_ISO_DATE.format(LocalDateTime.now()))
- .resolve(storeName)
- .resolve(sanitiseFileName(keyOfDump) + "." + DUMP_FILE_EXTENSION)
- .toFile();
+ @SneakyThrows
+ @Override
+ public IterationStatistic forEach(StoreEntryConsumer consumer) {
+ final IterationStatistic result = new IterationStatistic();
+
+ final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(executor);
+
+ final Queue> jobs = new ConcurrentLinkedQueue<>();
+
+ // We read in single thread, and deserialize and dispatch in multiple threads.
+ store.forEach((k, v) -> jobs.add(executorService.submit(() -> handle(consumer, result, k, v))));
+
+ final ListenableFuture> allJobs = Futures.allAsList(jobs);
+
+ List maybeFailed;
+
+ while ((maybeFailed = allJobs.get(30, TimeUnit.SECONDS)) == null) {
+ log.debug("Still waiting for {} jobs.", jobs.stream().filter(Predicate.not(Future::isDone)).count());
+ }
+
+ final List unreadables = maybeFailed.stream().filter(Objects::nonNull).toList();
+
+ // Print some statistics
+ final int total = result.getTotalProcessed();
+
+ log.debug(String.format(
+ "While processing store %s:\n\tEntries processed:\t%d\n\tKey read failure:\t%d (%.2f%%)\n\tValue read failure:\t%d (%.2f%%)",
+ store.getName(),
+ total,
+ result.getFailedKeys(),
+ total > 0 ? (float) result.getFailedKeys() / total * 100 : 0,
+ result.getFailedValues(),
+ total > 0 ? (float) result.getFailedValues() / total * 100 : 0
+ ));
+
+ // Remove corrupted entries from the store if configured so
+ if (removeUnreadablesFromUnderlyingStore) {
+ log.warn("Removing {} unreadable elements from the store {}.", unreadables.size(), store.getName());
+ unreadables.forEach(store::remove);
+ }
+ return result;
}
- private static String sanitiseFileName(@NotNull String name) {
- return FileUtil.SAVE_FILENAME_REPLACEMENT_MATCHER.matcher(name).replaceAll("_");
+ private ByteIterable handle(StoreEntryConsumer consumer, IterationStatistic result, ByteIterable keyRaw, ByteIterable valueRaw) {
+ result.incrTotalProcessed();
+
+ // Try to read the key first
+ final KEY
+ key =
+ getDeserializedAndDumpFailed(keyRaw, SerializingStore.this::readKey, () -> new String(keyRaw.getBytesUnsafe()), valueRaw, "Could not parse key [{}]");
+ if (key == null) {
+ result.incrFailedKeys();
+ return keyRaw;
+ }
+
+ // Try to read the value
+ final VALUE
+ value =
+ getDeserializedAndDumpFailed(valueRaw, SerializingStore.this::readValue, key::toString, valueRaw, "Could not parse value for key [{}]");
+
+ if (value == null) {
+ result.incrFailedValues();
+ return keyRaw;
+ }
+
+ // Apply the consumer to key and value
+ try {
+ consumer.accept(key, value, valueRaw.getLength());
+ }
+ catch (Exception e) {
+ log.warn("Unable to apply for-each consumer on key[{}]", key, e);
+ }
+
+ return null;
}
/**
- * Generates a valid file name from the key of the dump object, the store and the current time.
- * However, it does not ensure that there is no file with such a name.
+ * Deserializes the gives serial value (either a key or a value of an store entry) to a concrete object. If that fails the entry-value is dumped if configured so to a file using the entry-key for the filename.
*
- * Current implementation is `$unreadableDumpDir/$today/$store/$key.exception`
+ * @param The deserialized object type.
+ * @param serial The to be deserialized object (key or value of an entry)
+ * @param deserializer The concrete deserializer to use.
+ * @param onFailKeyStringSupplier When deserilization failed and dump is enabled this is used in the dump file name.
+ * @param onFailOrigValue Will be the dumpfile content rendered as a json.
+ * @param onFailWarnMsgFmt The warning message that will be logged on failure.
+ * @return The deserialized value
*/
- @NotNull
- public static File makeExceptionFileName(@NotNull String keyOfDump, @NotNull File unreadableDumpDir, @NotNull String storeName) {
- return unreadableDumpDir.toPath()
- .resolve(DateTimeFormatter.BASIC_ISO_DATE.format(LocalDateTime.now()))
- .resolve(storeName)
- .resolve(sanitiseFileName(keyOfDump) + "." + EXCEPTION_FILE_EXTENSION)
- .toFile();
+ private TYPE getDeserializedAndDumpFailed(ByteIterable serial, Function deserializer, Supplier onFailKeyStringSupplier, ByteIterable onFailOrigValue, String onFailWarnMsgFmt) {
+ try {
+ return deserializer.apply(serial);
+ }
+ catch (Exception e) {
+ // With trace also print the stacktrace
+ log.warn(onFailWarnMsgFmt, onFailKeyStringSupplier.get(), log.isTraceEnabled() ? e : null);
+ if (shouldDumpUnreadables()) {
+ dumpToFile(onFailOrigValue.getBytesUnsafe(), onFailKeyStringSupplier.get(), e, unreadableValuesDumpDir, store.getName(), objectMapper);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Deserialize value with {@code keyReader}.
+ */
+ private KEY readKey(ByteIterable key) {
+ return read(keyReader, key);
+ }
+
+ @Override
+ public void update(KEY key, VALUE value) {
+ if (!valueType.isInstance(value)) {
+ throw new IllegalStateException("The element %s is not of the required type %s".formatted(value, valueType));
+ }
+
+ if (validateOnWrite) {
+ ValidatorHelper.failOnError(log, validator.validate(value));
+ }
+
+ store.update(writeKey(key), writeValue(value));
}
@Override
@@ -466,22 +510,55 @@ public void close() {
store.close();
}
+ @NoArgsConstructor
+ @EqualsAndHashCode
@Data
+ @ToString(onlyExplicitlyIncluded = false)
public static class IterationStatistic {
- private int totalProcessed;
- private int failedKeys;
- private int failedValues;
-
+ private final AtomicInteger totalProcessed = new AtomicInteger();
+ private final AtomicInteger failedKeys = new AtomicInteger();
+ private final AtomicInteger failedValues = new AtomicInteger();
+
public void incrTotalProcessed() {
- totalProcessed++;
+ totalProcessed.incrementAndGet();
}
-
+
public void incrFailedKeys() {
- failedKeys++;
+ failedKeys.incrementAndGet();
}
-
+
public void incrFailedValues() {
- failedValues++;
+ failedValues.incrementAndGet();
+ }
+
+ @EqualsAndHashCode.Include
+ public int getFailedKeys() {
+ return failedKeys.get();
+ }
+
+ @TestOnly
+ public void setFailedKeys(int failedKeys) {
+ this.failedKeys.set(failedKeys);
+ }
+
+ @EqualsAndHashCode.Include
+ public int getFailedValues() {
+ return failedValues.get();
+ }
+
+ @TestOnly
+ public void setFailedValues(int failedValues) {
+ this.failedValues.set(failedValues);
+ }
+
+ @EqualsAndHashCode.Include
+ public int getTotalProcessed() {
+ return totalProcessed.get();
+ }
+
+ @TestOnly
+ public void setTotalProcessed(int totalProcessed) {
+ this.totalProcessed.set(totalProcessed);
}
}
}
diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java
index 794df98a87..eb76f7ff68 100644
--- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java
+++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java
@@ -10,22 +10,34 @@
import com.bakdata.conquery.mode.ManagerProvider;
import com.bakdata.conquery.mode.NamespaceHandler;
import com.bakdata.conquery.models.config.ConqueryConfig;
+import com.bakdata.conquery.models.config.SqlConnectorConfig;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.models.worker.ShardNodeInformation;
import com.bakdata.conquery.sql.DslContextFactory;
import com.bakdata.conquery.sql.SqlContext;
+import com.bakdata.conquery.sql.conversion.dialect.HanaSqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.PostgreSqlDialect;
+import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
import io.dropwizard.setup.Environment;
+import org.jooq.DSLContext;
public class LocalManagerProvider implements ManagerProvider {
private static final Supplier> EMPTY_NODE_PROVIDER = Collections::emptyList;
public DelegateManager provideManager(ConqueryConfig config, Environment environment) {
+
InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator());
- // todo(tm): proper injection
- SqlContext sqlContext = new SqlContext(config.getSqlConnectorConfig(), new PostgreSqlDialect(DslContextFactory.create(config.getSqlConnectorConfig())));
+
+ SqlConnectorConfig sqlConnectorConfig = config.getSqlConnectorConfig();
+ DSLContext dslContext = DslContextFactory.create(sqlConnectorConfig);
+ SqlDialect sqlDialect = switch (sqlConnectorConfig.getDialect()) {
+ case POSTGRESQL -> new PostgreSqlDialect(dslContext);
+ case HANA -> new HanaSqlDialect(dslContext);
+ };
+ SqlContext sqlContext = new SqlContext(sqlConnectorConfig, sqlDialect);
+
NamespaceHandler namespaceHandler = new LocalNamespaceHandler(config, creator, sqlContext);
DatasetRegistry datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);
creator.init(datasetRegistry);
diff --git a/backend/src/main/java/com/bakdata/conquery/models/auth/apitoken/TokenStorage.java b/backend/src/main/java/com/bakdata/conquery/models/auth/apitoken/TokenStorage.java
index c42b24ec0c..86b34d5267 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/auth/apitoken/TokenStorage.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/auth/apitoken/TokenStorage.java
@@ -8,6 +8,7 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.Executors;
import javax.validation.Validator;
@@ -72,7 +73,7 @@ public void start(){
ApiTokenData.class,
true,
false,
- null
+ null, Executors.newSingleThreadExecutor()
));
openStoresInEnv.add(data);
@@ -90,7 +91,7 @@ public void start(){
ApiTokenData.MetaData.class,
true,
false,
- null
+ null, Executors.newSingleThreadExecutor()
));
openStoresInEnv.add(meta);
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java b/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java
index 2632931fc8..ed2e0eb49b 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java
@@ -4,6 +4,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Executors;
import javax.validation.Validator;
@@ -108,7 +109,7 @@ protected void onInit() {
PasswordHasher.HashedEntry.class,
false,
true,
- null
+ null, Executors.newSingleThreadExecutor()
));
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/Dialect.java b/backend/src/main/java/com/bakdata/conquery/models/config/Dialect.java
index 2ec655aea9..b57931bbac 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/config/Dialect.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/config/Dialect.java
@@ -6,7 +6,8 @@
@Getter
public enum Dialect {
- POSTGRESQL(SQLDialect.POSTGRES);
+ POSTGRESQL(SQLDialect.POSTGRES),
+ HANA(SQLDialect.DEFAULT);
private final SQLDialect jooqDialect;
diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/FrontendConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/FrontendConfig.java
index 1bbfcddd56..e2883326f2 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/config/FrontendConfig.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/config/FrontendConfig.java
@@ -60,6 +60,11 @@ public class FrontendConfig {
@Email
private String contactEmail;
+ /**
+ * If true, users are always allowed to add custom values into SelectFilter input fields.
+ */
+ private boolean alwaysAllowCreateValue = false;
+
@Data
public static class CurrencyConfig {
diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java
index 857018cada..e6e83b723b 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java
@@ -25,5 +25,6 @@ public class SqlConnectorConfig {
private String databasePassword;
private String jdbcConnectionUrl;
+
private String primaryColumn = "pid";
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java b/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java
index 437ba9c2b9..4e28a84f8c 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java
@@ -11,11 +11,16 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.Validator;
+import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import com.bakdata.conquery.io.cps.CPSType;
@@ -125,32 +130,61 @@ public class XodusStoreFactory implements StoreFactory {
private Path directory = Path.of("storage");
- private boolean validateOnWrite = false;
+ private boolean validateOnWrite;
@NotNull
@Valid
private XodusConfig xodus = new XodusConfig();
- private boolean useWeakDictionaryCaching = false;
+ /**
+ * Number of threads reading from XoduStore.
+ * @implNote it's always only one thread reading from disk, dispatching to multiple reader threads.
+ */
+ @Min(1)
+ private int readerWorkers = Runtime.getRuntime().availableProcessors();
+
+ /**
+ * How many slots of buffering to use before the IO thread is put to sleep.
+ */
+ @Min(1)
+ private int bufferPerWorker = 20;
+
+ @JsonIgnore
+ private ExecutorService readerExecutorService;
+
+ public ExecutorService getReaderExecutorService() {
+ if (readerExecutorService == null){
+ readerExecutorService = new ThreadPoolExecutor(
+ 1, getReaderWorkers(),
+ 5, TimeUnit.MINUTES,
+ new ArrayBlockingQueue<>(getReaderWorkers() * getBufferPerWorker()),
+ new ThreadPoolExecutor.CallerRunsPolicy()
+ );
+ }
+
+ return readerExecutorService;
+ }
+
+ private boolean useWeakDictionaryCaching;
@NotNull
private Duration weakCacheDuration = Duration.hours(48);
/**
* Flag for the {@link SerializingStore} whether to delete values from the underlying store, that cannot be mapped to an object anymore.
*/
- private boolean removeUnreadableFromStore = false;
+ private boolean removeUnreadableFromStore;
/**
* When set, all values that could not be deserialized from the persistent store, are dump into individual files.
*/
@Nullable
- private File unreadableDataDumpDirectory = null;
+ private File unreadableDataDumpDirectory;
/**
* If set, an environment will not be loaded if it misses a required store.
* If not set, the environment is loaded and the application needs to create the store.
* This is useful if a new version introduces a new store, but will also alter the environment upon reading.
*/
- private boolean loadEnvironmentWithMissingStores = false;
+ private boolean loadEnvironmentWithMissingStores;
@JsonIgnore
private transient Validator validator;
@@ -175,13 +209,13 @@ public Collection discoverWorkerStorages() {
private List loadNamespacedStores(String prefix, Function creator, Set storesToTest) {
- File baseDir = getDirectory().toFile();
+ final File baseDir = getDirectory().toFile();
if (baseDir.mkdirs()) {
log.warn("Had to create Storage Dir at `{}`", baseDir);
}
- List storages = new ArrayList<>();
+ final List storages = new ArrayList<>();
for (File directory : Objects.requireNonNull(baseDir.listFiles((file, name) -> file.isDirectory() && name.startsWith(prefix)))) {
@@ -194,7 +228,7 @@ private List loadNamespacedStores(String prefix
continue;
}
- T namespacedStorage = creator.apply(name);
+ final T namespacedStorage = creator.apply(name);
storages.add(namespacedStorage);
}
@@ -203,8 +237,8 @@ private List loadNamespacedStores(String prefix
}
private boolean environmentHasStores(File pathName, Set storesToTest) {
- Environment env = findEnvironment(pathName);
- boolean exists = env.computeInTransaction(t -> {
+ final Environment env = findEnvironment(pathName);
+ final boolean exists = env.computeInTransaction(t -> {
final List allStoreNames = env.getAllStoreNames(t);
final boolean complete = new HashSet<>(allStoreNames).containsAll(storesToTest);
if (complete) {
@@ -273,7 +307,7 @@ public IdentifiableStore createDictionaryStore(CentralRegistry centr
DICTIONARIES.storeInfo(),
this::closeStore,
this::removeStore,
- centralRegistry.injectIntoNew(objectMapper)
+ centralRegistry.injectIntoNew(objectMapper), getReaderExecutorService()
);
openStoresInEnv.put(bigStore.getDataXodusStore().getEnvironment(), bigStore.getDataXodusStore());
openStoresInEnv.put(bigStore.getMetaXodusStore().getEnvironment(), bigStore.getMetaXodusStore());
@@ -316,7 +350,7 @@ public SingletonStore createIdMappingStore(String pathName, ObjectM
synchronized (openStoresInEnv) {
final BigStore bigStore =
- new BigStore<>(this, validator, environment, ID_MAPPING.storeInfo(), this::closeStore, this::removeStore, objectMapper);
+ new BigStore<>(this, validator, environment, ID_MAPPING.storeInfo(), this::closeStore, this::removeStore, objectMapper, getReaderExecutorService());
openStoresInEnv.put(bigStore.getDataXodusStore().getEnvironment(), bigStore.getDataXodusStore());
openStoresInEnv.put(bigStore.getMetaXodusStore().getEnvironment(), bigStore.getMetaXodusStore());
@@ -396,14 +430,14 @@ private Environment findEnvironment(@NonNull File path) {
}
private Environment findEnvironment(String pathName) {
- File path = getStorageDir(pathName);
+ final File path = getStorageDir(pathName);
return findEnvironment(path);
}
private void closeStore(XodusStore store) {
- Environment env = store.getEnvironment();
+ final Environment env = store.getEnvironment();
synchronized (openStoresInEnv) {
- Collection stores = openStoresInEnv.get(env);
+ final Collection stores = openStoresInEnv.get(env);
stores.remove(store);
log.info("Closed XodusStore: {}", store);
@@ -427,9 +461,9 @@ private void closeEnvironment(Environment env) {
}
private void removeStore(XodusStore store) {
- Environment env = store.getEnvironment();
+ final Environment env = store.getEnvironment();
synchronized (openStoresInEnv){
- Collection stores = openStoresInEnv.get(env);
+ final Collection stores = openStoresInEnv.get(env);
stores.remove(store);
@@ -467,9 +501,11 @@ public Store createStore(Environment environment, Valid
if(openStoresInEnv.get(environment).stream().map(XodusStore::getName).anyMatch(name -> storeInfo.getName().equals(name))){
throw new IllegalStateException("Attempted to open an already opened store:" + storeInfo.getName());
}
- final XodusStore store =
- new XodusStore(environment, storeInfo.getName(), this::closeStore, this::removeStore);
+
+ final XodusStore store = new XodusStore(environment, storeInfo.getName(), this::closeStore, this::removeStore);
+
openStoresInEnv.put(environment, store);
+
return new CachedStore<>(
new SerializingStore<>(
store,
@@ -477,9 +513,10 @@ public Store createStore(Environment environment, Valid
objectMapper,
storeInfo.getKeyType(),
storeInfo.getValueType(),
- this.isValidateOnWrite(),
- this.isRemoveUnreadableFromStore(),
- this.getUnreadableDataDumpDirectory()
+ isValidateOnWrite(),
+ isRemoveUnreadableFromStore(),
+ getUnreadableDataDumpDirectory(),
+ getReaderExecutorService()
));
}
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/Concept.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/Concept.java
index d4c51d7c3e..368334b54c 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/Concept.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/Concept.java
@@ -14,7 +14,6 @@
import com.bakdata.conquery.models.auth.permissions.ConceptPermission;
import com.bakdata.conquery.models.auth.permissions.ConqueryPermission;
import com.bakdata.conquery.models.common.CDateSet;
-import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.datasets.concepts.select.Select;
import com.bakdata.conquery.models.exceptions.ConfigurationException;
@@ -89,12 +88,12 @@ public int countElements() {
/**
* Allows concepts to create their own altered FiltersNode if necessary.
*/
- public QPNode createConceptQuery(QueryPlanContext context, List> filters, List> aggregators, List> eventDateAggregators, Column validityDateColumn) {
+ public QPNode createConceptQuery(QueryPlanContext context, List> filters, List> aggregators, List> eventDateAggregators, ValidityDate validityDate) {
final QPNode child = filters.isEmpty() && aggregators.isEmpty() ? new Leaf() : FiltersNode.create(filters, aggregators, eventDateAggregators);
// Only if a validityDateColumn exists, capsule children in ValidityDateNode
- return validityDateColumn != null ? new ValidityDateNode(validityDateColumn, child) : child;
+ return validityDate != null ? new ValidityDateNode(validityDate, child) : child;
}
@Override
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/FrontEndConceptBuilder.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/FrontEndConceptBuilder.java
index fee4d75bda..d89c285838 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/FrontEndConceptBuilder.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/FrontEndConceptBuilder.java
@@ -23,6 +23,7 @@
import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.models.auth.entities.Subject;
import com.bakdata.conquery.models.auth.permissions.Ability;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.datasets.concepts.select.Select;
@@ -35,17 +36,19 @@
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptTreeChildId;
import com.bakdata.conquery.models.identifiable.ids.specific.StructureNodeId;
-import lombok.AllArgsConstructor;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* This class constructs the concept tree as it is presented to the front end.
*/
-@AllArgsConstructor
+@Data
@Slf4j
public class FrontEndConceptBuilder {
- public static FrontendRoot createRoot(NamespaceStorage storage, Subject subject) {
+ private final ConqueryConfig conqueryConfig;
+
+ public FrontendRoot createRoot(NamespaceStorage storage, Subject subject) {
final FrontendRoot root = new FrontendRoot();
final Map, FrontendNode> roots = root.getConcepts();
@@ -95,7 +98,7 @@ public static FrontendRoot createRoot(NamespaceStorage storage, Subject subject)
return root;
}
- private static FrontendNode createConceptRoot(Concept> concept, StructureNode[] structureNodes) {
+ private FrontendNode createConceptRoot(Concept> concept, StructureNode[] structureNodes) {
final MatchingStats matchingStats = concept.getMatchingStats();
@@ -121,8 +124,8 @@ private static FrontendNode createConceptRoot(Concept> concept, StructureNode[
.flatMap(Collection::stream)
.findAny()
.isEmpty())
- .selects(concept.getSelects().stream().map(FrontEndConceptBuilder::createSelect).collect(Collectors.toList()))
- .tables(concept.getConnectors().stream().map(FrontEndConceptBuilder::createTable).collect(Collectors.toList()))
+ .selects(concept.getSelects().stream().map(this::createSelect).collect(Collectors.toList()))
+ .tables(concept.getConnectors().stream().map(this::createTable).collect(Collectors.toList()))
.build();
if (concept instanceof ConceptTreeNode> tree && tree.getChildren() != null) {
@@ -132,7 +135,7 @@ private static FrontendNode createConceptRoot(Concept> concept, StructureNode[
}
@Nullable
- private static FrontendNode createStructureNode(StructureNode structureNode, Map, FrontendNode> roots) {
+ private FrontendNode createStructureNode(StructureNode structureNode, Map, FrontendNode> roots) {
final List unstructured = new ArrayList<>();
for (ConceptId id : structureNode.getContainedRoots()) {
if (!roots.containsKey(id)) {
@@ -158,7 +161,7 @@ private static FrontendNode createStructureNode(StructureNode structureNode, Map
.build();
}
- public static FrontendSelect createSelect(Select select) {
+ public FrontendSelect createSelect(Select select) {
return FrontendSelect.builder()
.id(select.getId())
.label(select.getLabel())
@@ -168,7 +171,7 @@ public static FrontendSelect createSelect(Select select) {
.build();
}
- public static FrontendTable createTable(Connector con) {
+ public FrontendTable createTable(Connector con) {
final FrontendTable
result =
FrontendTable.builder()
@@ -176,8 +179,8 @@ public static FrontendTable createTable(Connector con) {
.connectorId(con.getId())
.label(con.getLabel())
.isDefault(con.isDefault())
- .filters(con.collectAllFilters().stream().map(FrontEndConceptBuilder::createFilter).collect(Collectors.toList()))
- .selects(con.getSelects().stream().map(FrontEndConceptBuilder::createSelect).collect(Collectors.toList()))
+ .filters(con.collectAllFilters().stream().map(this::createFilter).collect(Collectors.toList()))
+ .selects(con.getSelects().stream().map(this::createSelect).collect(Collectors.toList()))
.supportedSecondaryIds(Arrays.stream(con.getTable().getColumns())
.map(Column::getSecondaryId)
.filter(Objects::nonNull)
@@ -199,16 +202,16 @@ public static FrontendTable createTable(Connector con) {
return result;
}
- public static FrontendFilterConfiguration.Top createFilter(Filter> filter) {
+ public FrontendFilterConfiguration.Top createFilter(Filter> filter) {
try {
- return filter.createFrontendConfig();
+ return filter.createFrontendConfig(conqueryConfig);
}
catch (ConceptConfigurationException e) {
throw new IllegalStateException(e);
}
}
- private static FrontendNode createCTNode(ConceptElement> ce) {
+ private FrontendNode createCTNode(ConceptElement> ce) {
final MatchingStats matchingStats = ce.getMatchingStats();
FrontendNode.FrontendNodeBuilder nodeBuilder = FrontendNode.builder()
.active(null)
@@ -248,13 +251,13 @@ private static FrontendNode createCTNode(ConceptElement> ce) {
return n;
}
- public static FrontendList createTreeMap(Concept> concept) {
+ public FrontendList createTreeMap(Concept> concept) {
final FrontendList map = new FrontendList();
fillTreeMap(concept, map);
return map;
}
- private static void fillTreeMap(ConceptElement> ce, FrontendList map) {
+ private void fillTreeMap(ConceptElement> ce, FrontendList map) {
map.add(ce.getId(), createCTNode(ce));
if (ce instanceof ConceptTreeNode && ((ConceptTreeNode>) ce).getChildren() != null) {
for (ConceptTreeChild c : ((ConceptTreeNode>) ce).getChildren()) {
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/ValidityDate.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/ValidityDate.java
index 5bdbe1c8bf..762d507209 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/ValidityDate.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/ValidityDate.java
@@ -1,10 +1,14 @@
package com.bakdata.conquery.models.datasets.concepts;
-import javax.validation.constraints.NotNull;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
+import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Dataset;
+import com.bakdata.conquery.models.events.Bucket;
+import com.bakdata.conquery.models.events.MajorTypeId;
import com.bakdata.conquery.models.identifiable.Labeled;
import com.bakdata.conquery.models.identifiable.ids.NamespacedIdentifiable;
import com.bakdata.conquery.models.identifiable.ids.specific.ValidityDateId;
@@ -15,6 +19,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Getter
@@ -24,39 +29,108 @@
public class ValidityDate extends Labeled implements NamespacedIdentifiable {
@NsIdRef
- @NotNull
+ @Nullable
private Column column;
+ @NsIdRef
+ @Nullable
+ private Column startColumn;
+ @NsIdRef
+ @Nullable
+ private Column endColumn;
@JsonBackReference
+ @ToString.Exclude
@EqualsAndHashCode.Exclude
private Connector connector;
+ public static ValidityDate create(Column column) {
+ final ValidityDate validityDate = new ValidityDate();
+ validityDate.setColumn(column);
+ return validityDate;
+ }
+
+ public static ValidityDate create(Column startColumn, Column endColumn) {
+ final ValidityDate validityDate = new ValidityDate();
+ validityDate.setColumn(startColumn);
+ validityDate.setColumn(endColumn);
+ return validityDate;
+ }
+
@Override
public ValidityDateId createId() {
return new ValidityDateId(connector.getId(), getName());
}
- @JsonIgnore
- @ValidationMethod(message = "Column is not of Date or DateRange.")
- public boolean isValidValidityDates() {
- if (getColumn().getType().isDateCompatible()) {
- return true;
+ @CheckForNull
+ public CDateRange getValidityDate(int event, Bucket bucket) {
+ // I spent a lot of time trying to create two classes implementing single/multi-column valditiy dates separately.
+ // JsonCreator was not happy, and I could not figure out why. This is probably the most performant implementation that's not two classes.
+
+ if (getColumn() != null) {
+ if (bucket.has(event, getColumn())) {
+ return bucket.getAsDateRange(event, getColumn());
+ }
+
+ return null;
+ }
+
+ final Column startColumn = getStartColumn();
+ final Column endColumn = getEndColumn();
+
+ final boolean hasStart = bucket.has(event, startColumn);
+ final boolean hasEnd = bucket.has(event, endColumn);
+
+ if (!hasStart && !hasEnd) {
+ return null;
}
- log.error("ValidityDate-Column[{}] is not of type DATE or DATERANGE", getColumn().getId());
- return false;
+ final int start = hasStart ? bucket.getDate(event, startColumn) : Integer.MIN_VALUE;
+ final int end = hasEnd ? bucket.getDate(event, endColumn) : Integer.MAX_VALUE;
+
+ return CDateRange.of(start, end);
+ }
+
+ public boolean containsColumn(Column column) {
+ return column.equals(getColumn()) || column.equals(getStartColumn()) || column.equals(getEndColumn());
}
@JsonIgnore
@ValidationMethod(message = "ValidityDate is not for Connectors' Table.")
public boolean isForConnectorsTable() {
- if (getColumn().getTable().equals(connector.getTable())) {
- return true;
+ final boolean anyColumnNotForConnector =
+ (startColumn != null && !startColumn.getTable().equals(connector.getTable()))
+ || (endColumn != null && !endColumn.getTable().equals(connector.getTable()));
+
+ final boolean columnNotForConnector = column != null && !column.getTable().equals(connector.getTable());
+
+ return !anyColumnNotForConnector && !columnNotForConnector;
+ }
+
+ @JsonIgnore
+ @ValidationMethod(message = "Single column date range (set via column) and two column date range (set via startColumn and endColumn) are exclusive.")
+ public boolean isExclusiveValidityDates() {
+ if (column == null) {
+ return startColumn != null && endColumn != null;
}
+ return startColumn == null && endColumn == null;
+ }
- log.error("ValidityDate[{}](Column = `{}`) does not belong to Connector[{}]#Table[{}]", getId(), getColumn().getId(), getId(), connector.getTable().getId());
+ @JsonIgnore
+ @ValidationMethod(message = "Both columns of a two-column validity date have to be of type DATE.")
+ public boolean isValidTwoColumnValidityDates() {
+ if (startColumn == null || endColumn == null) {
+ return true;
+ }
+ return startColumn.getType() == MajorTypeId.DATE && endColumn.getType() == MajorTypeId.DATE;
+ }
- return false;
+ @JsonIgnore
+ @ValidationMethod(message = "Column is not of type DATE or DATE_RANGE.")
+ public boolean isValidValidityDatesSingleColumn() {
+ if (column == null) {
+ return true;
+ }
+ return column.getType().isDateCompatible();
}
@JsonIgnore
@@ -64,4 +138,5 @@ public boolean isForConnectorsTable() {
public Dataset getDataset() {
return connector.getDataset();
}
-}
\ No newline at end of file
+
+}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/Filter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/Filter.java
index 4447e29f9f..c9949d085e 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/Filter.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/Filter.java
@@ -4,6 +4,7 @@
import com.bakdata.conquery.apiv1.frontend.FrontendFilterConfiguration;
import com.bakdata.conquery.io.cps.CPSBase;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.datasets.concepts.Connector;
@@ -52,21 +53,21 @@ public Dataset getDataset() {
return getConnector().getDataset();
}
- public FrontendFilterConfiguration.Top createFrontendConfig() throws ConceptConfigurationException {
- FrontendFilterConfiguration.Top f = FrontendFilterConfiguration.Top.builder()
- .id(getId())
- .label(getLabel())
- .tooltip(getTooltip())
- .unit(getUnit())
- .allowDropFile(getAllowDropFile())
- .pattern(getPattern())
- .defaultValue(getDefaultValue())
- .build();
- configureFrontend(f);
+ public FrontendFilterConfiguration.Top createFrontendConfig(ConqueryConfig conqueryConfig) throws ConceptConfigurationException {
+ final FrontendFilterConfiguration.Top f = FrontendFilterConfiguration.Top.builder()
+ .id(getId())
+ .label(getLabel())
+ .tooltip(getTooltip())
+ .unit(getUnit())
+ .allowDropFile(getAllowDropFile())
+ .pattern(getPattern())
+ .defaultValue(getDefaultValue())
+ .build();
+ configureFrontend(f, conqueryConfig);
return f;
}
- protected abstract void configureFrontend(FrontendFilterConfiguration.Top f) throws ConceptConfigurationException;
+ protected abstract void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) throws ConceptConfigurationException;
@JsonIgnore
public abstract List getRequiredColumns();
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/CountFilter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/CountFilter.java
index 845648ad90..e45f0d7c9b 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/CountFilter.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/CountFilter.java
@@ -12,6 +12,7 @@
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
import com.bakdata.conquery.io.jackson.serializer.NsIdRefCollection;
import com.bakdata.conquery.models.common.Range;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.query.filter.RangeFilterNode;
@@ -36,7 +37,7 @@ public class CountFilter extends Filter {
private boolean distinct;
@Override
- public void configureFrontend(FrontendFilterConfiguration.Top f) {
+ public void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) {
f.setType(FrontendFilterType.Fields.INTEGER_RANGE);
f.setMin(1);
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/CountQuartersFilter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/CountQuartersFilter.java
index b0997ffc3c..41d42f9ae3 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/CountQuartersFilter.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/CountQuartersFilter.java
@@ -6,6 +6,7 @@
import com.bakdata.conquery.apiv1.frontend.FrontendFilterType;
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.common.Range;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.datasets.concepts.filters.SingleColumnFilter;
import com.bakdata.conquery.models.events.MajorTypeId;
@@ -26,7 +27,7 @@ public EnumSet getAcceptedColumnTypes() {
}
@Override
- public void configureFrontend(FrontendFilterConfiguration.Top f) {
+ public void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) {
f.setType(FrontendFilterType.Fields.INTEGER_RANGE);
f.setMin(1);
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DateDistanceFilter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DateDistanceFilter.java
index c811b797af..ecad117745 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DateDistanceFilter.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DateDistanceFilter.java
@@ -9,6 +9,7 @@
import com.bakdata.conquery.apiv1.frontend.FrontendFilterType;
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.common.Range;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.datasets.concepts.filters.SingleColumnFilter;
import com.bakdata.conquery.models.events.MajorTypeId;
@@ -35,7 +36,7 @@ public EnumSet getAcceptedColumnTypes() {
}
@Override
- public void configureFrontend(FrontendFilterConfiguration.Top f) throws ConceptConfigurationException {
+ public void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) throws ConceptConfigurationException {
if (getColumn().getType() != MajorTypeId.DATE) {
throw new ConceptConfigurationException(getConnector(), "DATE_DISTANCE filter is incompatible with columns of type " + getColumn().getType());
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DurationSumFilter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DurationSumFilter.java
index 09087ee70d..cfc180d3ff 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DurationSumFilter.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DurationSumFilter.java
@@ -6,6 +6,7 @@
import com.bakdata.conquery.apiv1.frontend.FrontendFilterType;
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.common.Range;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.datasets.concepts.filters.SingleColumnFilter;
import com.bakdata.conquery.models.events.MajorTypeId;
@@ -29,7 +30,7 @@ public EnumSet getAcceptedColumnTypes() {
}
@Override
- public void configureFrontend(FrontendFilterConfiguration.Top f) throws ConceptConfigurationException {
+ public void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) throws ConceptConfigurationException {
if (getColumn().getType() != MajorTypeId.DATE_RANGE) {
throw new ConceptConfigurationException(getConnector(), "DURATION_SUM filter is incompatible with columns of type "
+ getColumn().getType());
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/FlagFilter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/FlagFilter.java
index 324ecc22da..0a5c2598ba 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/FlagFilter.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/FlagFilter.java
@@ -11,6 +11,7 @@
import com.bakdata.conquery.apiv1.frontend.FrontendValue;
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.io.jackson.serializer.NsIdRefCollection;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.error.ConqueryError;
@@ -39,7 +40,7 @@ public class FlagFilter extends Filter {
private final Map flags;
@Override
- protected void configureFrontend(FrontendFilterConfiguration.Top f) throws ConceptConfigurationException {
+ protected void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) throws ConceptConfigurationException {
f.setType(FrontendFilterType.Fields.MULTI_SELECT);
f.setOptions(flags.keySet().stream().map(key -> new FrontendValue(key, key)).toList());
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/NumberFilter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/NumberFilter.java
index 0040baab68..1894783631 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/NumberFilter.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/NumberFilter.java
@@ -7,6 +7,7 @@
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.common.IRange;
import com.bakdata.conquery.models.common.Range;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.datasets.concepts.filters.SingleColumnFilter;
import com.bakdata.conquery.models.exceptions.ConceptConfigurationException;
@@ -29,7 +30,7 @@
public class NumberFilter> extends SingleColumnFilter {
@Override
- public void configureFrontend(FrontendFilterConfiguration.Top f) throws ConceptConfigurationException {
+ public void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) throws ConceptConfigurationException {
final String type = switch (getColumn().getType()) {
case MONEY -> FrontendFilterType.Fields.MONEY_RANGE;
case INTEGER -> FrontendFilterType.Fields.INTEGER_RANGE;
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/PrefixTextFilter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/PrefixTextFilter.java
index bd5e83db3e..02480ea143 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/PrefixTextFilter.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/PrefixTextFilter.java
@@ -5,6 +5,7 @@
import com.bakdata.conquery.apiv1.frontend.FrontendFilterConfiguration;
import com.bakdata.conquery.apiv1.frontend.FrontendFilterType;
import com.bakdata.conquery.io.cps.CPSType;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.datasets.concepts.filters.SingleColumnFilter;
import com.bakdata.conquery.models.events.MajorTypeId;
@@ -20,7 +21,7 @@ public class PrefixTextFilter extends SingleColumnFilter {
@Override
- public void configureFrontend(FrontendFilterConfiguration.Top f) {
+ public void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) {
f.setType(FrontendFilterType.Fields.STRING);
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/QuartersInYearFilter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/QuartersInYearFilter.java
index 11bc8c36fa..1dac22cb0a 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/QuartersInYearFilter.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/QuartersInYearFilter.java
@@ -6,6 +6,7 @@
import com.bakdata.conquery.apiv1.frontend.FrontendFilterType;
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.common.Range;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.datasets.concepts.filters.SingleColumnFilter;
import com.bakdata.conquery.models.events.MajorTypeId;
@@ -24,7 +25,7 @@ public EnumSet getAcceptedColumnTypes() {
}
@Override
- public void configureFrontend(FrontendFilterConfiguration.Top f) {
+ public void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) {
f.setType(FrontendFilterType.Fields.INTEGER_RANGE);
f.setMin(1);
f.setMax(4);
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/SelectFilter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/SelectFilter.java
index 8a547e5308..947edb2fcd 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/SelectFilter.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/SelectFilter.java
@@ -12,6 +12,7 @@
import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
import com.bakdata.conquery.io.storage.NamespaceStorage;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.config.IndexConfig;
import com.bakdata.conquery.models.datasets.concepts.Searchable;
import com.bakdata.conquery.models.datasets.concepts.filters.SingleColumnFilter;
@@ -56,12 +57,12 @@ public EnumSet getAcceptedColumnTypes() {
}
@Override
- public void configureFrontend(FrontendFilterConfiguration.Top f) throws ConceptConfigurationException {
+ public void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) throws ConceptConfigurationException {
f.setTemplate(getTemplate());
f.setType(getFilterType());
// If either not searches are available or all are disabled, we allow users to supply their own values
- f.setCreatable(getSearchReferences().stream().noneMatch(Predicate.not(Searchable::isSearchDisabled)));
+ f.setCreatable(conqueryConfig.getFrontend().isAlwaysAllowCreateValue() || getSearchReferences().stream().noneMatch(Predicate.not(Searchable::isSearchDisabled)));
f.setOptions(collectLabels());
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/SumFilter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/SumFilter.java
index c89db064ba..643e8e8807 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/SumFilter.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/SumFilter.java
@@ -14,6 +14,7 @@
import com.bakdata.conquery.io.jackson.serializer.NsIdRefCollection;
import com.bakdata.conquery.models.common.IRange;
import com.bakdata.conquery.models.common.Range;
+import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.events.MajorTypeId;
@@ -57,7 +58,7 @@ public class SumFilter> extends Filter
private List distinctByColumn = Collections.emptyList();
@Override
- public void configureFrontend(FrontendFilterConfiguration.Top f) throws ConceptConfigurationException {
+ public void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) throws ConceptConfigurationException {
final String type = switch (getColumn().getType()) {
case MONEY -> FrontendFilterType.Fields.MONEY_RANGE;
case INTEGER -> FrontendFilterType.Fields.INTEGER_RANGE;
diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java
index d500616780..92640e3213 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java
@@ -152,35 +152,39 @@ private ConceptTreeChild findMostSpecificChild(String stringValue, CalculatedVal
continue;
}
- if (match == null) {
- match = n;
+ if (match != null) {
+ failed = true;
+ log.error("Value '{}' matches the two nodes {} and {} in the tree {} (row={}))"
+ , stringValue, match.getId(), n.getId(), n.getConcept().getId(), rowMap.getValue());
+ continue;
+ }
- if (n.getChildIndex() != null) {
- ConceptTreeChild specificChild = n.getChildIndex().findMostSpecificChild(stringValue);
+ match = n;
- if (specificChild != null) {
- match = specificChild;
- }
- }
+ if (n.getChildIndex() == null) {
+ continue;
}
- else {
- failed = true;
- log.error("Value '{}' matches the two nodes {} and {} in the tree {} (row={}))"
- , stringValue, match.getLabel(), n.getLabel(), n.getConcept().getLabel(), rowMap.getValue());
- // TODO Why don't we return null here and drop the `failed`-flag?
+
+ final ConceptTreeChild specificChild = n.getChildIndex().findMostSpecificChild(stringValue);
+
+ if (specificChild == null) {
+ continue;
}
+
+ match = specificChild;
}
if (failed) {
return null;
}
- else if (match != null) {
- best = match;
- currentList = match.getChildren();
- }
- else {
- break;
+
+ // Nothing better found below, so return best-so far match
+ if (match == null) {
+ return best;
}
+
+ best = match;
+ currentList = match.getChildren();
}
return best;
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java b/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java
index f7df84d48c..618920194f 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java
@@ -17,6 +17,7 @@
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.datasets.Import;
import com.bakdata.conquery.models.datasets.Table;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.stores.root.BooleanStore;
import com.bakdata.conquery.models.events.stores.root.ColumnStore;
import com.bakdata.conquery.models.events.stores.root.DateRangeStore;
@@ -164,8 +165,14 @@ public CDateRange getDateRange(int event, Column column) {
return ((DateRangeStore) getStore(column)).getDateRange(event);
}
- public boolean eventIsContainedIn(int event, Column column, CDateSet dateRanges) {
- return dateRanges.intersects(getAsDateRange(event, column));
+ public boolean eventIsContainedIn(int event, ValidityDate validityDate, CDateSet dateRanges) {
+ final CDateRange dateRange = validityDate.getValidityDate(event, this);
+
+ if (dateRange == null){
+ return false;
+ }
+
+ return dateRanges.intersects(dateRange);
}
public CDateRange getAsDateRange(int event, Column column) {
@@ -181,10 +188,10 @@ public Object createScriptValue(int event, @NotNull Column column) {
}
public Map calculateMap(int event) {
- Map out = new HashMap<>(stores.length);
+ final Map out = new HashMap<>(stores.length);
for (int i = 0; i < stores.length; i++) {
- ColumnStore store = stores[i];
+ final ColumnStore store = stores[i];
if (!store.has(event)) {
continue;
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java b/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java
index e31cee658a..d820c82fb2 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java
@@ -7,6 +7,7 @@
import com.bakdata.conquery.models.common.CDateSet;
import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.datasets.Column;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.stores.root.ColumnStore;
import lombok.Getter;
@@ -25,7 +26,7 @@ public EmptyBucket() {
@Override
- public boolean eventIsContainedIn(int event, Column column, CDateSet dateRanges) {
+ public boolean eventIsContainedIn(int event, ValidityDate column, CDateSet dateRanges) {
return false;
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java b/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java
index e34e7f5998..fb709ad704 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java
@@ -5,6 +5,7 @@
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -20,6 +21,9 @@
import com.bakdata.conquery.apiv1.execution.FullExecutionStatus;
import com.bakdata.conquery.apiv1.execution.OverviewExecutionStatus;
import com.bakdata.conquery.apiv1.query.QueryDescription;
+import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept;
+import com.bakdata.conquery.apiv1.query.concept.specific.external.CQExternal;
+import com.bakdata.conquery.apiv1.query.concept.specific.external.DateFormat;
import com.bakdata.conquery.io.cps.CPSBase;
import com.bakdata.conquery.io.jackson.serializer.MetaIdRef;
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
@@ -41,7 +45,6 @@
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.query.PrintSettings;
import com.bakdata.conquery.models.query.Visitable;
-import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.Namespace;
import com.bakdata.conquery.util.QueryUtils;
import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector;
@@ -120,7 +123,7 @@ public abstract class ManagedExecution extends IdentifiableImpl {
+ if (visitable instanceof CQConcept cqConcept) {
+ return cqConcept.isAggregateEventDates();
+ }
+
+ if (visitable instanceof CQExternal external) {
+ return Arrays.stream(DateFormat.values()).anyMatch(external.getFormat()::contains);
+ }
+
+ return false;
+ });
+ }
+
+ private boolean canSubjectExpand(Subject subject, QueryDescription query) {
NamespacedIdentifiableCollector namespacesIdCollector = new NamespacedIdentifiableCollector();
query.visit(namespacesIdCollector);
@@ -358,9 +385,7 @@ protected void setAdditionalFieldsForStatusWithSource(Subject subject, FullExecu
.collect(Collectors.toSet());
boolean canExpand = subject.isPermittedAll(concepts, Ability.READ);
-
- status.setCanExpand(canExpand);
- status.setQuery(canExpand ? getSubmitted() : null);
+ return canExpand;
}
@JsonIgnore
diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java
index 43a0e1a36f..ed8f78ac7a 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java
@@ -28,6 +28,7 @@
import com.bakdata.conquery.models.query.resultinfo.ResultInfo;
import com.bakdata.conquery.models.query.results.EntityResult;
import com.bakdata.conquery.models.query.results.FormShardResult;
+import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.OptBoolean;
@@ -205,4 +206,8 @@ private boolean allSubQueriesDone() {
return flatSubQueries.values().stream().allMatch(q -> q.getState().equals(ExecutionState.DONE));
}
}
+
+ public DistributedNamespace getNamespace() {
+ return (DistributedNamespace) super.getNamespace();
+ }
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java b/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java
index 3703763c2c..daba9b5fbc 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java
@@ -1,6 +1,12 @@
package com.bakdata.conquery.models.messages.network.specific;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.Objects;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId;
@@ -14,7 +20,6 @@
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.bakdata.conquery.util.progressreporter.ProgressReporter;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import lombok.AccessLevel;
@@ -25,7 +30,8 @@
import lombok.ToString;
/**
- * Messages are sent serialized and only deserialized when they are being processed. This ensures that messages that were sent just shortly before to setup state later messages depend upon is correct.
+ * @implNote Messages are sent serialized and only deserialized when they are being processed. This ensures that messages that were sent just shortly before to setup state later messages depend upon is correct.
+ * @implNote Messages are additionally sent gzipped, to avoid hogging memory with long queues.
*/
@CPSType(id = "FORWARD_TO_WORKER", base = NetworkMessage.class)
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
@@ -33,45 +39,53 @@
@ToString(of = {"workerId", "text"})
public class ForwardToWorker extends MessageToShardNode implements SlowMessage {
- @SneakyThrows(JsonProcessingException.class)
+ private final WorkerId workerId;
+ private final byte[] messageRaw;
+ // We cache these on the sender side.
+ @Getter(onMethod_ = @JsonIgnore(false))
+ private final boolean slowMessage;
+ private final String text;
+ @JsonIgnore
+ @Setter
+ private ProgressReporter progressReporter;
+
public static ForwardToWorker create(WorkerId worker, WorkerMessage message, ObjectWriter writer) {
return new ForwardToWorker(
worker,
- writer.writeValueAsBytes(message),
+ serializeMessage(message, writer),
true,
message.toString()
);
}
- private final WorkerId workerId;
- private final byte[] messageRaw;
+ @SneakyThrows(IOException.class)
+ private static byte[] serializeMessage(WorkerMessage message, ObjectWriter writer) {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (OutputStream outputStream = new GZIPOutputStream(baos)) {
+ writer.writeValue(outputStream, message);
+ }
- // We cache these on the sender side.
- @Getter(onMethod_ = @JsonIgnore(false))
- private final boolean slowMessage;
- private final String text;
+ return baos.toByteArray();
+ }
- @JsonIgnore
- @Setter
- private ProgressReporter progressReporter;
+ private static WorkerMessage deserializeMessage(byte[] messageRaw, ObjectMapper mapper) throws java.io.IOException {
+ return mapper.readerFor(WorkerMessage.class).readValue(new GZIPInputStream(new ByteArrayInputStream(messageRaw)));
+ }
@Override
public void react(ShardNodeNetworkContext context) throws Exception {
- Worker worker = Objects.requireNonNull(context.getWorkers().getWorker(workerId));
+ final Worker worker = Objects.requireNonNull(context.getWorkers().getWorker(workerId));
ConqueryMDC.setLocation(worker.toString());
// Jobception: this is to ensure that no subsequent message is deserialized before one message is processed
- worker.getJobManager().addSlowJob(new SimpleJob("Deserialize and process WorkerMessage", () -> {
+ worker.getJobManager().addSlowJob(new SimpleJob("Process %s".formatted(getText()), () -> {
- WorkerMessage message = deserializeMessage(messageRaw, worker.getCommunicationMapper());
+ final WorkerMessage message = deserializeMessage(messageRaw, worker.getCommunicationMapper());
message.setProgressReporter(progressReporter);
message.react(worker);
}));
}
- private static WorkerMessage deserializeMessage(byte[] messageRaw, ObjectMapper binaryMapper) throws java.io.IOException {
- return binaryMapper.readerFor(WorkerMessage.class).readValue(messageRaw);
- }
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/QueryExecutionContext.java b/backend/src/main/java/com/bakdata/conquery/models/query/QueryExecutionContext.java
index 666962d7e0..f0cb08f30d 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/query/QueryExecutionContext.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/query/QueryExecutionContext.java
@@ -7,10 +7,10 @@
import com.bakdata.conquery.io.storage.ModificationShieldedWorkerStorage;
import com.bakdata.conquery.models.common.CDate;
import com.bakdata.conquery.models.common.CDateSet;
-import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.SecondaryIdDescription;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.datasets.concepts.Connector;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.BucketManager;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
@@ -36,7 +36,7 @@ public class QueryExecutionContext {
private final BucketManager bucketManager;
- private Column validityDateColumn;
+ private ValidityDate validityDateColumn;
@NonNull
private CDateSet dateRestriction = CDateSet.createFull();
private Connector connector;
diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/TableExportQueryPlan.java b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/TableExportQueryPlan.java
index 73551b7f07..9288871085 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/TableExportQueryPlan.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/TableExportQueryPlan.java
@@ -7,7 +7,9 @@
import com.bakdata.conquery.apiv1.query.concept.filter.CQTable;
import com.bakdata.conquery.models.common.CDateSet;
+import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.datasets.Column;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.CBlock;
import com.bakdata.conquery.models.query.QueryExecutionContext;
@@ -80,7 +82,7 @@ public Optional execute(QueryExecutionContext ctx, Entity
for (Map.Entry entry : tables.entrySet()) {
final CQTable cqTable = entry.getKey();
- final Column validityDateColumn = cqTable.findValidityDateColumn();
+ final ValidityDate validityDate = cqTable.findValidityDate();
final QPNode query = entry.getValue();
final Map cblocks = ctx.getBucketManager().getEntityCBlocksForConnector(entity, cqTable.getConnector());
@@ -95,8 +97,8 @@ public Optional execute(QueryExecutionContext ctx, Entity
for (int event = start; event < end; event++) {
- if (validityDateColumn != null
- && !bucket.eventIsContainedIn(event, validityDateColumn, dateRange)) {
+ if (validityDate != null
+ && !bucket.eventIsContainedIn(event, validityDate, dateRange)) {
continue;
}
@@ -104,7 +106,7 @@ public Optional execute(QueryExecutionContext ctx, Entity
continue;
}
- final Object[] resultRow = collectRow(totalColumns, cqTable, bucket, event, validityDateColumn, cblocks.get(bucket));
+ final Object[] resultRow = collectRow(totalColumns, cqTable, bucket, event, validityDate, cblocks.get(bucket));
results.add(resultRow);
}
@@ -146,29 +148,40 @@ private boolean isRowIncluded(QPNode query, Bucket bucket, Entity entity, int ev
return query.isContained();
}
- private Object[] collectRow(int totalColumns, CQTable exportDescription, Bucket bucket, int event, Column validityDateColumn, CBlock cblock) {
+ private Object[] collectRow(int totalColumns, CQTable exportDescription, Bucket bucket, int event, ValidityDate validityDate, CBlock cblock) {
final Object[] entry = new Object[totalColumns];
+
+ final CDateRange date;
+
+ if(validityDate != null && (date = validityDate.getValidityDate(event, bucket)) != null) {
+ entry[0] = List.of(date);
+ }
+
entry[1] = exportDescription.getConnector().getTable().getLabel();
for (Column column : exportDescription.getConnector().getTable().getColumns()) {
- if (!bucket.has(event, column)) {
+ // ValidityDates are handled separately.
+ if (validityDate != null && validityDate.containsColumn(column)){
continue;
}
- final int position = positions.get(column);
+ if (!positions.containsKey(column)) {
+ continue;
+ }
- if (column.equals(validityDateColumn)) {
- entry[position] = List.of(bucket.getAsDateRange(event, column));
+ if (!bucket.has(event, column)) {
continue;
}
+ final int position = positions.get(column);
+
if (!rawConceptValues && column.equals(exportDescription.getConnector().getColumn())) {
entry[position] = cblock.getMostSpecificChildLocalId(event);
continue;
}
-
+
entry[position] = bucket.createScriptValue(event, column);
}
return entry;
diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/EventDateUnionAggregator.java b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/EventDateUnionAggregator.java
index 7f10736554..0f3344f648 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/EventDateUnionAggregator.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/EventDateUnionAggregator.java
@@ -3,8 +3,9 @@
import java.util.Set;
import com.bakdata.conquery.models.common.CDateSet;
-import com.bakdata.conquery.models.datasets.Column;
+import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.datasets.Table;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.query.QueryExecutionContext;
import com.bakdata.conquery.models.query.entity.Entity;
@@ -24,7 +25,7 @@
public class EventDateUnionAggregator extends Aggregator {
private final Set
requiredTables;
- private Column validityDateColumn;
+ private ValidityDate validityDateColumn;
private CDateSet set = CDateSet.createEmpty();
private CDateSet dateRestriction;
@@ -41,10 +42,7 @@ public void init(Entity entity, QueryExecutionContext context) {
@Override
public void nextTable(QueryExecutionContext ctx, Table currentTable) {
validityDateColumn = ctx.getValidityDateColumn();
- if (validityDateColumn != null && !validityDateColumn.getType().isDateCompatible()) {
- throw new IllegalStateException("The validityDateColumn " + validityDateColumn + " is not a DATE TYPE");
- }
-
+
dateRestriction = ctx.getDateRestriction();
super.nextTable(ctx, currentTable);
}
@@ -61,10 +59,13 @@ public void acceptEvent(Bucket bucket, int event) {
return;
}
- if (!bucket.has(event, validityDateColumn)) {
+ final CDateRange dateRange = validityDateColumn.getValidityDate(event, bucket);
+
+ if (dateRange == null){
return;
}
- set.maskedAdd(bucket.getAsDateRange(event, validityDateColumn), dateRestriction);
+
+ set.maskedAdd(dateRange, dateRestriction);
}
@Override
diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/EventDurationSumAggregator.java b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/EventDurationSumAggregator.java
index 4a0119f910..5f563588cc 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/EventDurationSumAggregator.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/EventDurationSumAggregator.java
@@ -6,8 +6,8 @@
import com.bakdata.conquery.models.common.CDateSet;
import com.bakdata.conquery.models.common.daterange.CDateRange;
-import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Table;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.query.QueryExecutionContext;
import com.bakdata.conquery.models.query.entity.Entity;
@@ -26,7 +26,7 @@ public class EventDurationSumAggregator extends Aggregator {
@CheckForNull
private CDateSet dateRestriction;
@CheckForNull
- private Column validityDateColumn;
+ private ValidityDate validityDateColumn;
private int realUpperBound;
@Override
@@ -48,12 +48,12 @@ public void acceptEvent(Bucket bucket, int event) {
return;
}
- if (!bucket.has(event, validityDateColumn)) {
+ final CDateRange value = validityDateColumn.getValidityDate(event, bucket);
+
+ if (value == null){
return;
}
- final CDateRange value = bucket.getAsDateRange(event, validityDateColumn);
-
set.maskedAdd(value, dateRestriction, realUpperBound);
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/QuarterAggregator.java b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/QuarterAggregator.java
index f75a71f8d7..4e09852192 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/QuarterAggregator.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/QuarterAggregator.java
@@ -8,8 +8,8 @@
import com.bakdata.conquery.models.common.CDateSet;
import com.bakdata.conquery.models.common.QuarterUtils;
import com.bakdata.conquery.models.common.daterange.CDateRange;
-import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Table;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.query.QueryExecutionContext;
import com.bakdata.conquery.models.query.entity.Entity;
@@ -31,7 +31,7 @@ public class QuarterAggregator extends Aggregator {
private CDateSet set = CDateSet.createEmpty();
private CDateSet dateRestriction;
- private Column column;
+ private ValidityDate validityDate;
private int realUpperBound;
@@ -48,19 +48,19 @@ public void init(Entity entity, QueryExecutionContext context) {
@Override
public void nextTable(QueryExecutionContext ctx, Table currentTable) {
- column = ctx.getValidityDateColumn();
+ validityDate = ctx.getValidityDateColumn();
dateRestriction = ctx.getDateRestriction();
}
@Override
public void acceptEvent(Bucket bucket, int event) {
- if (getColumn() == null || !bucket.has(event, getColumn())) {
+ final CDateRange dateRange = validityDate.getValidityDate(event, bucket);
+
+ if (dateRange == null){
return;
}
- final CDateRange value = bucket.getAsDateRange(event, getColumn());
-
- set.maskedAdd(value, dateRestriction, realUpperBound);
+ set.maskedAdd(dateRange, dateRestriction, realUpperBound);
}
@Override
diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/SpecialDateUnion.java b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/SpecialDateUnion.java
index 7723b7cd33..e2ad370f73 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/SpecialDateUnion.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/SpecialDateUnion.java
@@ -1,8 +1,9 @@
package com.bakdata.conquery.models.query.queryplan.aggregators.specific;
import com.bakdata.conquery.models.common.CDateSet;
-import com.bakdata.conquery.models.datasets.Column;
+import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.datasets.Table;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.query.QueryExecutionContext;
import com.bakdata.conquery.models.query.entity.Entity;
@@ -20,7 +21,7 @@ public class SpecialDateUnion extends Aggregator {
private CDateSet set = CDateSet.createEmpty();
- private Column currentColumn;
+ private ValidityDate validityDate;
private CDateSet dateRestriction;
@@ -31,18 +32,25 @@ public void init(Entity entity, QueryExecutionContext context) {
@Override
public void nextTable(QueryExecutionContext ctx, Table table) {
- currentColumn = ctx.getValidityDateColumn();
+ validityDate = ctx.getValidityDateColumn();
dateRestriction = ctx.getDateRestriction();
}
@Override
public void acceptEvent(Bucket bucket, int event) {
- if (currentColumn == null || !bucket.has(event, currentColumn)) {
+ if (validityDate == null) {
set.addAll(dateRestriction);
return;
}
- set.maskedAdd(bucket.getAsDateRange(event, currentColumn), dateRestriction);
+ final CDateRange dateRange = validityDate.getValidityDate(event, bucket);
+
+ if (dateRange == null){
+ set.addAll(dateRestriction);
+ return;
+ }
+
+ set.maskedAdd(dateRange, dateRestriction);
}
/**
diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/value/FirstValueAggregator.java b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/value/FirstValueAggregator.java
index 1887a4fbd4..7e70495cc3 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/value/FirstValueAggregator.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/value/FirstValueAggregator.java
@@ -5,6 +5,7 @@
import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Table;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.query.QueryExecutionContext;
import com.bakdata.conquery.models.query.entity.Entity;
@@ -26,7 +27,7 @@ public class FirstValueAggregator extends SingleColumnAggregator {
private int date = CDateRange.POSITIVE_INFINITY;
- private Column validityDateColumn;
+ private ValidityDate validityDate;
public FirstValueAggregator(Column column) {
super(column);
@@ -41,7 +42,7 @@ public void init(Entity entity, QueryExecutionContext context) {
@Override
public void nextTable(QueryExecutionContext ctx, Table currentTable) {
- validityDateColumn = ctx.getValidityDateColumn();
+ validityDate = ctx.getValidityDateColumn();
}
@Override
@@ -50,23 +51,25 @@ public void acceptEvent(Bucket bucket, int event) {
return;
}
- if (validityDateColumn == null) {
+ if (validityDate == null) {
// If there is no validity date, take the first possible value
if(selectedBucket == null) {
selectedBucket = bucket;
selectedEvent = OptionalInt.of(event);
} else {
- log.trace("There is more than one value for the {}. Choosing the very first one encountered", this.getClass().getSimpleName());
+ log.trace("There is more than one value for the {}. Choosing the very first one encountered", getClass().getSimpleName());
}
return;
}
- if(! bucket.has(event, validityDateColumn)) {
- // TODO this might be an IllegalState
+
+ final CDateRange dateRange = validityDate.getValidityDate(event, bucket);
+
+ if (dateRange == null){
return;
}
- int next = bucket.getAsDateRange(event, validityDateColumn).getMinValue();
+ final int next = dateRange.getMinValue();
if (next < date) {
date = next;
@@ -74,7 +77,7 @@ public void acceptEvent(Bucket bucket, int event) {
selectedBucket = bucket;
}
else if (next == date) {
- log.trace("There is more than one value for the {}. Choosing the very first one encountered", this.getClass().getSimpleName());
+ log.trace("There is more than one value for the {}. Choosing the very first one encountered", getClass().getSimpleName());
}
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/value/LastValueAggregator.java b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/value/LastValueAggregator.java
index bee141cbab..0e359c3f98 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/value/LastValueAggregator.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/specific/value/LastValueAggregator.java
@@ -5,6 +5,7 @@
import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Table;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.query.QueryExecutionContext;
import com.bakdata.conquery.models.query.entity.Entity;
@@ -26,7 +27,7 @@ public class LastValueAggregator extends SingleColumnAggregator {
private Bucket selectedBucket;
private int date;
- private Column validityDateColumn;
+ private ValidityDate validityDateColumn;
public LastValueAggregator(Column column) {
super(column);
@@ -60,14 +61,14 @@ public void acceptEvent(Bucket bucket, int event) {
}
return;
}
-
- if(! bucket.has(event, validityDateColumn)) {
- // TODO this might be an IllegalState
+
+ final CDateRange dateRange = validityDateColumn.getValidityDate(event, bucket);
+
+ if (dateRange == null){
return;
}
-
- int next = bucket.getAsDateRange(event, validityDateColumn).getMaxValue();
+ int next = dateRange.getMaxValue();
if (next > date) {
date = next;
diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/specific/ValidityDateNode.java b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/specific/ValidityDateNode.java
index 940912753b..9d29a7f8dd 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/specific/ValidityDateNode.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/specific/ValidityDateNode.java
@@ -5,8 +5,8 @@
import com.bakdata.conquery.models.common.CDateSet;
import com.bakdata.conquery.models.common.daterange.CDateRange;
-import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Table;
+import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.CBlock;
import com.bakdata.conquery.models.query.QueryExecutionContext;
@@ -15,32 +15,30 @@
import com.google.common.base.Preconditions;
import lombok.ToString;
-@ToString(of = "validityDateColumn", callSuper = true)
+@ToString(of = "validityDate", callSuper = true)
public class ValidityDateNode extends QPChainNode {
- private final Column validityDateColumn;
+ private final ValidityDate validityDate;
private transient CDateSet restriction;
protected Map preCurrentRow;
- public ValidityDateNode(Column validityDateColumn, QPNode child) {
+ public ValidityDateNode(ValidityDate validityDate, QPNode child) {
super(child);
- Preconditions.checkNotNull(validityDateColumn, this.getClass().getSimpleName() + " needs a validityDateColumn");
- this.validityDateColumn = validityDateColumn;
+ Preconditions.checkNotNull(validityDate, this.getClass().getSimpleName() + " needs a validityDate");
+ this.validityDate = validityDate;
}
@Override
public void acceptEvent(Bucket bucket, int event) {
+ //no dateRestriction or event is in date restriction
+ final boolean contained = bucket.eventIsContainedIn(event, validityDate, context.getDateRestriction());
- //if event has null validityDate cancel
- if (!bucket.has(event, validityDateColumn)) {
+ if (!contained){
return;
}
- //no dateRestriction or event is in date restriction
- if (restriction.isAll() || bucket.eventIsContainedIn(event, validityDateColumn, context.getDateRestriction())) {
- getChild().acceptEvent(bucket, event);
- }
+ getChild().acceptEvent(bucket, event);
}
@Override
@@ -59,7 +57,7 @@ public boolean isContained() {
@Override
public void nextTable(QueryExecutionContext ctx, Table currentTable) {
- super.nextTable(ctx.withValidityDateColumn(validityDateColumn), currentTable);
+ super.nextTable(ctx.withValidityDateColumn(validityDate), currentTable);
restriction = ctx.getDateRestriction();
preCurrentRow = ctx.getBucketManager().getEntityCBlocksForConnector(getEntity(), context.getConnector());
diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/UniqueNamer.java b/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/UniqueNamer.java
index 5c91e52b56..ce9502d9ea 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/UniqueNamer.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/UniqueNamer.java
@@ -1,15 +1,11 @@
package com.bakdata.conquery.models.query.resultinfo;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Objects;
import com.bakdata.conquery.models.query.PrintSettings;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.Multiset;
-import com.google.common.collect.Multisets;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
@@ -38,7 +34,8 @@ public class UniqueNamer {
@NonNull
@JsonIgnore
public final String getUniqueName(ResultInfo info) {
- @NonNull String label = Objects.requireNonNullElse(info.userColumnName(settings), info.defaultColumnName(settings));
+ @NonNull
+ String label = Objects.requireNonNullElse(info.userColumnName(settings), info.defaultColumnName(settings));
// lookup if prefix is needed and computed it if necessary
String uniqueName = label;
synchronized (ocurrenceCounter) {
diff --git a/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java b/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java
index ab877f9b41..d2c1ebac6b 100644
--- a/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java
+++ b/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java
@@ -65,11 +65,14 @@ public class ConceptsProcessor {
private final ConqueryConfig config;
+ @Getter(lazy = true)
+ private final FrontEndConceptBuilder frontEndConceptBuilder = new FrontEndConceptBuilder(getConfig());
+
private final LoadingCache, FrontendList> nodeCache =
CacheBuilder.newBuilder().softValues().expireAfterWrite(10, TimeUnit.MINUTES).build(new CacheLoader<>() {
@Override
public FrontendList load(Concept> concept) {
- return FrontEndConceptBuilder.createTreeMap(concept);
+ return getFrontEndConceptBuilder().createTreeMap(concept);
}
});
@@ -106,7 +109,7 @@ public CursorAndLength load(Searchable> searchable) {
public FrontendRoot getRoot(NamespaceStorage storage, Subject subject) {
- final FrontendRoot root = FrontEndConceptBuilder.createRoot(storage, subject);
+ final FrontendRoot root = getFrontEndConceptBuilder().createRoot(storage, subject);
// Report Violation
ValidatorHelper.createViolationsString(validator.validate(root), log.isTraceEnabled()).ifPresent(log::warn);
diff --git a/backend/src/main/java/com/bakdata/conquery/sql/DslContextFactory.java b/backend/src/main/java/com/bakdata/conquery/sql/DslContextFactory.java
index 16bfe54ecd..980aa597bb 100644
--- a/backend/src/main/java/com/bakdata/conquery/sql/DslContextFactory.java
+++ b/backend/src/main/java/com/bakdata/conquery/sql/DslContextFactory.java
@@ -6,6 +6,7 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.jooq.DSLContext;
+import org.jooq.conf.RenderQuotedNames;
import org.jooq.conf.Settings;
import org.jooq.impl.DSL;
@@ -19,10 +20,16 @@ public static DSLContext create(SqlConnectorConfig config) {
DataSource dataSource = new HikariDataSource(hikariConfig);
+ Settings settings = new Settings()
+ .withRenderFormatted(config.isWithPrettyPrinting())
+ // enforces all identifiers to be quoted if not explicitly unquoted via DSL.unquotedName()
+ // to prevent any lowercase/uppercase SQL dialect specific identifier naming issues
+ .withRenderQuotedNames(RenderQuotedNames.EXPLICIT_DEFAULT_QUOTED);
+
return DSL.using(
dataSource,
config.getDialect().getJooqDialect(),
- new Settings().withRenderFormatted(config.isWithPrettyPrinting())
+ settings
);
}
diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/context/ConversionContext.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/context/ConversionContext.java
index b07383dacf..cdd73d7f8f 100644
--- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/context/ConversionContext.java
+++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/context/ConversionContext.java
@@ -1,5 +1,7 @@
package com.bakdata.conquery.sql.conversion.context;
+import java.util.List;
+
import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.config.SqlConnectorConfig;
import com.bakdata.conquery.sql.conversion.NodeConverterService;
@@ -12,8 +14,6 @@
import org.jooq.Record;
import org.jooq.Select;
-import java.util.List;
-
@Value
@With
@Builder(toBuilder = true)
@@ -25,20 +25,23 @@ public class ConversionContext {
@Singular
List querySteps;
Select finalQuery;
- boolean negation;
CDateRange dateRestrictionRange;
int queryStepCounter;
-
+ boolean negation;
+ boolean isGroupBy;
public boolean dateRestrictionActive() {
return this.dateRestrictionRange != null;
}
+ /**
+ * Adds a converted {@link QueryStep} to the list of query steps of this {@link ConversionContext} and increments its conceptCounter by 1.
+ */
public ConversionContext withQueryStep(QueryStep queryStep) {
return this.toBuilder()
- .queryStep(queryStep)
- .queryStepCounter(queryStepCounter + 1)
- .build();
+ .queryStep(queryStep)
+ .queryStepCounter(queryStepCounter + 1)
+ .build();
}
}
diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/context/selects/ConceptSelects.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/context/selects/ConceptSelects.java
index 8e42dde561..9e398edad3 100644
--- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/context/selects/ConceptSelects.java
+++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/context/selects/ConceptSelects.java
@@ -1,11 +1,13 @@
package com.bakdata.conquery.sql.conversion.context.selects;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Optional;
-import java.util.function.Function;
import java.util.stream.Stream;
import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept;
+import com.bakdata.conquery.sql.models.ColumnDateRange;
import lombok.Builder;
import lombok.Value;
import lombok.With;
@@ -20,19 +22,26 @@
public class ConceptSelects implements Selects {
Field