From 94b1ca4dc4c0c9edb22bb07fb02e0e713afa8692 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Tue, 5 Sep 2023 17:16:44 +0200 Subject: [PATCH 1/2] fixes not respecting TimeoutExceptions --- .../xodus/stores/SerializingStore.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java index c4d2591d71..b4f271060b 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java @@ -10,13 +10,16 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Predicate; @@ -47,7 +50,6 @@ import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import lombok.NonNull; -import lombok.SneakyThrows; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; @@ -353,7 +355,6 @@ private static byte[] debugUnGzip(byte[] bytes) throws IOException { * * @implNote This method is concurrent! */ - @SneakyThrows @Override public IterationStatistic forEach(StoreEntryConsumer consumer) { final IterationStatistic result = new IterationStatistic(); @@ -368,11 +369,24 @@ public IterationStatistic forEach(StoreEntryConsumer consumer) { final ListenableFuture> allJobs = Futures.allAsList(jobs); - List maybeFailed; + List maybeFailed = Collections.emptyList(); + + do { + try { + maybeFailed = allJobs.get(30, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + Thread.interrupted(); + log.debug("Thread was interrupted."); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + catch (TimeoutException e) { + log.debug("Still waiting for {} jobs.", jobs.stream().filter(Predicate.not(Future::isDone)).count()); + } + } while (!allJobs.isDone()); - while ((maybeFailed = allJobs.get(30, TimeUnit.SECONDS)) == null) { - log.debug("Still waiting for {} jobs.", jobs.stream().filter(Predicate.not(Future::isDone)).count()); - } final List unreadables = maybeFailed.stream().filter(Objects::nonNull).toList(); From 2286fe81614b03a8daa6b737082ec9406596f548 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Tue, 5 Sep 2023 20:10:20 +0200 Subject: [PATCH 2/2] fixes handling of errors inside Futures --- .../xodus/stores/SerializingStore.java | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java index b4f271060b..bffb1a5426 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java @@ -295,6 +295,7 @@ private static void dumpToFile(byte[] gzippedObj, @NonNull String keyOfDump, Exc } if (!dumpfile.getParentFile().exists() && !dumpfile.getParentFile().mkdirs()) { + //TODO this seems to occur sometimes, is it maybe just a race condition? throw new IllegalStateException("Could not create `%s`.".formatted(dumpfile.getParentFile())); } @@ -412,24 +413,28 @@ public IterationStatistic forEach(StoreEntryConsumer consumer) { } private ByteIterable handle(StoreEntryConsumer consumer, IterationStatistic result, ByteIterable keyRaw, ByteIterable valueRaw) { - result.incrTotalProcessed(); - - // Try to read the key first - final KEY - key = - getDeserializedAndDumpFailed(keyRaw, SerializingStore.this::readKey, () -> new String(keyRaw.getBytesUnsafe()), valueRaw, "Could not parse key [{}]"); - if (key == null) { - result.incrFailedKeys(); - return keyRaw; - } + final KEY key; + final VALUE value; - // Try to read the value - final VALUE - value = - getDeserializedAndDumpFailed(valueRaw, SerializingStore.this::readValue, key::toString, valueRaw, "Could not parse value for key [{}]"); + try { + result.incrTotalProcessed(); - if (value == null) { - result.incrFailedValues(); + // Try to read the key first + key = getDeserializedAndDumpFailed(keyRaw, SerializingStore.this::readKey, () -> new String(keyRaw.getBytesUnsafe()), valueRaw, "Could not parse key [{}]"); + if (key == null) { + result.incrFailedKeys(); + return keyRaw; + } + + // Try to read the value + value = getDeserializedAndDumpFailed(valueRaw, SerializingStore.this::readValue, key::toString, valueRaw, "Could not parse value for key [{}]"); + + if (value == null) { + result.incrFailedValues(); + return keyRaw; + } + }catch(Exception e){ + log.error("Failed processing key/value", e); return keyRaw; }