Skip to content

Commit

Permalink
fixes not respecting TimeoutExceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
awildturtok committed Sep 5, 2023
1 parent ff1d8dd commit 94b1ca4
Showing 1 changed file with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -47,7 +50,6 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -353,7 +355,6 @@ private static byte[] debugUnGzip(byte[] bytes) throws IOException {
*
* @implNote This method is concurrent!
*/
@SneakyThrows
@Override
public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
final IterationStatistic result = new IterationStatistic();
Expand All @@ -368,11 +369,24 @@ public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
final ListenableFuture<List<ByteIterable>> allJobs = Futures.allAsList(jobs);


List<ByteIterable> maybeFailed;
List<ByteIterable> maybeFailed = Collections.emptyList();

do {
try {
maybeFailed = allJobs.get(30, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.interrupted();
log.debug("Thread was interrupted.");
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
catch (TimeoutException e) {
log.debug("Still waiting for {} jobs.", jobs.stream().filter(Predicate.not(Future::isDone)).count());
}
} while (!allJobs.isDone());

while ((maybeFailed = allJobs.get(30, TimeUnit.SECONDS)) == null) {
log.debug("Still waiting for {} jobs.", jobs.stream().filter(Predicate.not(Future::isDone)).count());
}

final List<ByteIterable> unreadables = maybeFailed.stream().filter(Objects::nonNull).toList();

Expand Down

0 comments on commit 94b1ca4

Please sign in to comment.