diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java index c4d2591d71..b4f271060b 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java @@ -10,13 +10,16 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Predicate; @@ -47,7 +50,6 @@ import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import lombok.NonNull; -import lombok.SneakyThrows; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; @@ -353,7 +355,6 @@ private static byte[] debugUnGzip(byte[] bytes) throws IOException { * * @implNote This method is concurrent! */ - @SneakyThrows @Override public IterationStatistic forEach(StoreEntryConsumer consumer) { final IterationStatistic result = new IterationStatistic(); @@ -368,11 +369,24 @@ public IterationStatistic forEach(StoreEntryConsumer consumer) { final ListenableFuture> allJobs = Futures.allAsList(jobs); - List maybeFailed; + List maybeFailed = Collections.emptyList(); + + do { + try { + maybeFailed = allJobs.get(30, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + Thread.interrupted(); + log.debug("Thread was interrupted."); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + catch (TimeoutException e) { + log.debug("Still waiting for {} jobs.", jobs.stream().filter(Predicate.not(Future::isDone)).count()); + } + } while (!allJobs.isDone()); - while ((maybeFailed = allJobs.get(30, TimeUnit.SECONDS)) == null) { - log.debug("Still waiting for {} jobs.", jobs.stream().filter(Predicate.not(Future::isDone)).count()); - } final List unreadables = maybeFailed.stream().filter(Objects::nonNull).toList();