Skip to content

Commit

Permalink
Merge pull request #3165 from ingef/release
Browse files Browse the repository at this point in the history
Merge Release
  • Loading branch information
awildturtok authored Sep 5, 2023
2 parents 9db4e42 + ab43b3d commit ff1d8dd
Show file tree
Hide file tree
Showing 151 changed files with 3,075 additions and 1,388 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/test_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ on:
jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 15
timeout-minutes: 20
steps:
- name: Cache local Maven repository
uses: actions/cache@v2
Expand All @@ -41,5 +41,4 @@ jobs:
- name: JSON based Integration Tests
run: mvn test -T 1C -pl backend -Dgroups="INTEGRATION_JSON"
- name: SQL based Integration Tests
if: ${{ startsWith(github.head_ref, 'sql/') }}
run: mvn test -T 1C -pl backend -Dgroups="INTEGRATION_SQL_BACKEND"
5 changes: 5 additions & 0 deletions backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -379,5 +379,10 @@
<version>1.17.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sap.cloud.db.jdbc</groupId>
<artifactId>ngdbc</artifactId>
<version>2.17.10</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class FullExecutionStatus extends ExecutionStatus {
*/
private boolean canExpand;

private boolean containsDates;

/**
* Is set to the query description if the user can expand all included concepts.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.ConceptElement;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeNode;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
Expand Down Expand Up @@ -148,6 +149,13 @@ public void resolve(QueryResolveContext context) {

final Map<SecondaryIdDescription, Integer> secondaryIdPositions = calculateSecondaryIdPositions(currentPosition);

final Set<ValidityDate> validityDates = tables.stream()
.map(CQConcept::getTables)
.flatMap(Collection::stream)
.map(CQTable::findValidityDate)
.filter(Objects::nonNull)
.collect(Collectors.toSet());

// We need to know if a column is a concept column so we can prioritize it if it is also a SecondaryId
final Set<Column> conceptColumns = tables.stream()
.map(CQConcept::getTables)
Expand All @@ -157,7 +165,7 @@ public void resolve(QueryResolveContext context) {
.filter(Objects::nonNull)
.collect(Collectors.toSet());

positions = calculateColumnPositions(currentPosition, tables, secondaryIdPositions, conceptColumns);
positions = calculateColumnPositions(currentPosition, tables, secondaryIdPositions, conceptColumns, validityDates);

resultInfos = createResultInfos(secondaryIdPositions, conceptColumns);
}
Expand All @@ -179,22 +187,21 @@ private Map<SecondaryIdDescription, Integer> calculateSecondaryIdPositions(Atomi
return secondaryIdPositions;
}

private static Map<Column, Integer> calculateColumnPositions(AtomicInteger currentPosition, List<CQConcept> tables, Map<SecondaryIdDescription, Integer> secondaryIdPositions, Set<Column> conceptColumns) {
private static Map<Column, Integer> calculateColumnPositions(AtomicInteger currentPosition, List<CQConcept> tables, Map<SecondaryIdDescription, Integer> secondaryIdPositions, Set<Column> conceptColumns, Set<ValidityDate> validityDates) {
final Map<Column, Integer> positions = new HashMap<>();


for (CQConcept concept : tables) {
for (CQTable table : concept.getTables()) {

final Column validityDateColumn = table.findValidityDateColumn();

if (validityDateColumn != null) {
positions.putIfAbsent(validityDateColumn, 0);
}

// Set column positions, set SecondaryId positions to precomputed ones.
for (Column column : table.getConnector().getTable().getColumns()) {

// ValidityDates are handled separately in column=0
if (validityDates.stream().anyMatch(vd -> vd.containsColumn(column))) {
continue;
}

if (positions.containsKey(column)) {
continue;
}
Expand Down Expand Up @@ -341,4 +348,4 @@ public void visit(Consumer<Visitable> visitor) {
public RequiredEntities collectRequiredEntities(QueryExecutionContext context) {
return query.collectRequiredEntities(context);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept;
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
import com.bakdata.conquery.io.jackson.serializer.NsIdRefCollection;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.datasets.concepts.select.Select;
import com.bakdata.conquery.models.query.QueryResolveContext;
import com.fasterxml.jackson.annotation.JsonBackReference;
Expand Down Expand Up @@ -75,18 +75,17 @@ public void resolve(QueryResolveContext context) {
}

@CheckForNull
public Column findValidityDateColumn() {
public ValidityDate findValidityDate() {

// if no dateColumn is provided, we use the default instead which is always the first one.
// Set to null if none-available in the connector.
if (dateColumn != null) {
return dateColumn.getValue().getColumn();
return dateColumn.getValue();
}

if (!connector.getValidityDates().isEmpty()) {
return connector.getValidityDates().get(0).getColumn();
return connector.getValidityDates().get(0);
}

return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void resolve(QueryResolveContext context) {
* Values of group filters can have an arbitrary format which is set by the filter itself.
* Hence, we treat the value for the filter as Object.class.
* <p>
* The resolved filter instructs the frontend on how to render and serialize the filter value using the {@link Filter#createFrontendConfig()} method. The filter must implement {@link GroupFilter} and provide the type information of the value to correctly deserialize the received object.
* The resolved filter instructs the frontend on how to render and serialize the filter value using the {@link Filter#createFrontendConfig(com.bakdata.conquery.models.config.ConqueryConfig)} method. The filter must implement {@link GroupFilter} and provide the type information of the value to correctly deserialize the received object.
*/
public static class GroupFilterDeserializer extends StdDeserializer<GroupFilterValue> {
private final NsIdReferenceDeserializer<FilterId, Filter<?>> nsIdDeserializer = new NsIdReferenceDeserializer<>(Filter.class, null, FilterId.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.ConceptElement;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.datasets.concepts.select.Select;
import com.bakdata.conquery.models.identifiable.ids.NamespacedIdentifiable;
import com.bakdata.conquery.models.query.DateAggregationMode;
Expand Down Expand Up @@ -226,7 +227,7 @@ public QPNode createQueryPlan(QueryPlanContext context, ConceptQueryPlan plan) {

final QPNode
conceptSpecificNode =
getConcept().createConceptQuery(context, filters, aggregators, eventDateUnionAggregators, selectValidityDateColumn(table));
getConcept().createConceptQuery(context, filters, aggregators, eventDateUnionAggregators, selectValidityDate(table));

// Link up the ExistsAggregators to the node
existsAggregators.forEach(agg -> agg.setReference(conceptSpecificNode));
Expand Down Expand Up @@ -272,14 +273,14 @@ private static List<Aggregator<?>> createAggregators(ConceptQueryPlan plan, List
.collect(Collectors.toList());
}

private Column selectValidityDateColumn(CQTable table) {
private ValidityDate selectValidityDate(CQTable table) {
if (table.getDateColumn() != null) {
return table.getDateColumn().getValue().getColumn();
return table.getDateColumn().getValue();
}

//else use this first defined validity date column
if (!table.getConnector().getValidityDates().isEmpty()) {
return table.getConnector().getValidityDates().get(0).getColumn();
return table.getConnector().getValidityDates().get(0);
}

return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package com.bakdata.conquery.commands;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.config.XodusStoreFactory;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -27,11 +34,13 @@
import jetbrains.exodus.env.StoreConfig;
import jetbrains.exodus.env.Transaction;
import kotlin.jvm.functions.Function4;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.jetbrains.annotations.NotNull;

/**
* Command allowing script based migration of databases. Especially useful for data that cannot be easily recreated after reimports, such as {@link com.bakdata.conquery.models.auth.entities.User}s and {@link com.bakdata.conquery.models.execution.ManagedExecution}s.
Expand All @@ -56,7 +65,6 @@
@Slf4j
public class MigrateCommand extends ConqueryCommand {


public MigrateCommand() {
super("migrate", "Run a migration script on a store.");
}
Expand All @@ -75,6 +83,18 @@ public void configure(Subparser subparser) {
.required(true)
.type(Arguments.fileType());

subparser
.addArgument("--in-gzip")
.help("If true, values are ungzipped before deserialization.")
.setDefault(true)
.type(Arguments.booleanType());

subparser
.addArgument("--out-gzip")
.help("If true, values are gzipped before writing.")
.setDefault(true)
.type(Arguments.booleanType());

subparser
.addArgument("--script")
.help("Migration Script returning a closure implementing MigrationScriptFactory. See supplementary example.groovy for details.\nSignature: String env, String store, String key, ObjectNode value -> return new Tuple(key,value)")
Expand All @@ -88,6 +108,10 @@ protected void run(io.dropwizard.setup.Environment environment, Namespace namesp
final File inStoreDirectory = namespace.get("in");
final File outStoreDirectory = namespace.get("out");

final boolean inGzip = namespace.get("in_gzip");
final boolean outGzip = namespace.get("out_gzip");


final long logsize = ((XodusStoreFactory) configuration.getStorage()).getXodus().getLogFileSize().toKilobytes();


Expand All @@ -99,11 +123,11 @@ protected void run(io.dropwizard.setup.Environment environment, Namespace namesp
}

// Create Groovy Shell and parse script
CompilerConfiguration config = new CompilerConfiguration();
final CompilerConfiguration config = new CompilerConfiguration();
config.setScriptBaseClass(MigrationScriptFactory.class.getName());
GroovyShell groovy = new GroovyShell(config);
final GroovyShell groovy = new GroovyShell(config);

MigrationScriptFactory factory = (MigrationScriptFactory) groovy.parse(In.file((File) namespace.get("script")).readAll());
final MigrationScriptFactory factory = (MigrationScriptFactory) groovy.parse(In.file((File) namespace.get("script")).readAll());

final Function4<String, String, JsonNode, JsonNode, Tuple> migrator = factory.run();

Expand All @@ -116,25 +140,12 @@ protected void run(io.dropwizard.setup.Environment environment, Namespace namesp
final File environmentDirectory = new File(outStoreDirectory, xenv.getName());
environmentDirectory.mkdirs();

processEnvironment(xenv, logsize, environmentDirectory, migrator, mapper);
processEnvironment(xenv, logsize, environmentDirectory, migrator, mapper, inGzip, outGzip);
});

}


/**
* Class defining the interface for the Groovy-Script.
*/
public abstract static class MigrationScriptFactory extends Script {

/**
* Environment -> Store -> Key -> Value -> (Key, Value)
*/
@Override
public abstract Function4<String, String, JsonNode, JsonNode, Tuple> run();
}

private void processEnvironment(File inStoreDirectory, long logSize, File outStoreDirectory, Function4<String, String, JsonNode, JsonNode, Tuple> migrator, ObjectMapper mapper) {
private void processEnvironment(File inStoreDirectory, long logSize, File outStoreDirectory, Function4<String, String, JsonNode, JsonNode, Tuple> migrator, ObjectMapper mapper, boolean inGzip, boolean outGzip) {
final jetbrains.exodus.env.Environment inEnvironment = Environments.newInstance(
inStoreDirectory,
new EnvironmentConfig().setLogFileSize(logSize)
Expand Down Expand Up @@ -175,7 +186,7 @@ private void processEnvironment(File inStoreDirectory, long logSize, File outSto
continue;
}

doMigrate(inStore, outStore, migrator, mapper);
migrateStore(inStore, outStore, migrator, mapper, inGzip, outGzip);
log.info("Done writing {}.", store);
}

Expand All @@ -191,7 +202,7 @@ private void processEnvironment(File inStoreDirectory, long logSize, File outSto
inEnvironment.close();
}

private void doMigrate(Store inStore, Store outStore, Function4<String, String, JsonNode, JsonNode, Tuple> migrator, ObjectMapper mapper) {
private void migrateStore(Store inStore, Store outStore, Function4<String, String, JsonNode, JsonNode, Tuple> migrator, ObjectMapper mapper, boolean inGzip, boolean outGzip) {

final Environment inEnvironment = inStore.getEnvironment();
final Environment outEnvironment = outStore.getEnvironment();
Expand All @@ -211,13 +222,12 @@ private void doMigrate(Store inStore, Store outStore, Function4<String, String,
while (cursor.getNext()) {

// Everything is mapped with Smile so even the keys.
final JsonNode key = mapper.readTree(cursor.getKey().getBytesUnsafe());
final JsonNode key = read(mapper, cursor.getKey(), inGzip);

final JsonNode node = mapper.readTree(cursor.getValue().getBytesUnsafe());
final JsonNode value = read(mapper, cursor.getValue(), inGzip);

// Apply the migrator, it will return new key and value
final Tuple<?> migrated =
migrator.invoke(inEnvironment.getLocation(), inStore.getName(), key, node);
final Tuple<?> migrated = migrator.invoke(inEnvironment.getLocation(), inStore.getName(), key, value);

// => Effectively delete the object
if (migrated == null) {
Expand All @@ -226,18 +236,18 @@ private void doMigrate(Store inStore, Store outStore, Function4<String, String,
}

// Serialize the values and write them into new Store.
final ByteIterable keyIter = new ArrayByteIterable(mapper.writeValueAsBytes(migrated.get(0)));
final byte[] keyWritten = write(mapper, ((JsonNode) migrated.get(0)), outGzip);

final ByteIterable valueIter = new ArrayByteIterable(mapper.writeValueAsBytes(migrated.get(1)));
final byte[] valueWritten = write(mapper, ((JsonNode) migrated.get(1)), outGzip);

if (log.isTraceEnabled()) {
log.trace("Mapped `{}` to \n{}", new String(keyIter.getBytesUnsafe()), new String(valueIter.getBytesUnsafe()));
log.trace("Mapped `{}` to \n{}", new String(keyWritten), new String(valueWritten));
}
else {
log.debug("Mapped `{}`", new String(keyIter.getBytesUnsafe()));
else if (log.isDebugEnabled()) {
log.debug("Mapped `{}`", new String(keyWritten));
}

outStore.put(writeTx, keyIter, valueIter);
outStore.put(writeTx, new ArrayByteIterable(keyWritten), new ArrayByteIterable(valueWritten));

if (++processed % (1 + (count / 10)) == 0) {
log.info("Processed {} / {} ({}%)", processed, count, Math.round(100f * (float) processed / (float) count));
Expand All @@ -261,4 +271,37 @@ private void doMigrate(Store inStore, Store outStore, Function4<String, String,
}
}

private JsonNode read(ObjectMapper mapper, ByteIterable cursor, boolean gzip) throws IOException {
InputStream inputStream = new ByteArrayInputStream(cursor.getBytesUnsafe());

if (gzip) {
inputStream = new GZIPInputStream(inputStream);
}

return mapper.readTree(inputStream);
}

@SneakyThrows
@NotNull
private byte[] write(ObjectMapper mapper, JsonNode value, boolean gzip) throws JsonProcessingException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();

try (OutputStream outputStream = gzip ? new GZIPOutputStream(baos) : baos) {
mapper.writeValue(outputStream, value);
}

return baos.toByteArray();
}

/**
* Class defining the interface for the Groovy-Script.
*/
public abstract static class MigrationScriptFactory extends Script {

/**
* Environment -> Store -> Key -> Value -> (Key, Value)
*/
@Override
public abstract Function4<String, String, JsonNode, JsonNode, Tuple> run();
}
}
Loading

0 comments on commit ff1d8dd

Please sign in to comment.