diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java index e369c9819b..fe15ed6524 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java @@ -4,6 +4,7 @@ import java.time.temporal.ChronoUnit; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import com.bakdata.conquery.io.mina.MessageSender; import com.bakdata.conquery.io.mina.NetworkSession; @@ -35,7 +36,7 @@ public class ShardNodeInformation extends MessageSender.Simple jobManagerStatus = new HashSet<>(); - + private final AtomicBoolean full = new AtomicBoolean(false); private LocalDateTime lastStatusTime = LocalDateTime.now(); public ShardNodeInformation(NetworkSession session, int backpressure) { @@ -57,10 +58,6 @@ private String getLatenessMetricName() { * Calculate the time in Milliseconds since we last received a {@link JobManagerStatus} from the corresponding shard. */ private long getMillisSinceLastStatus() { - if(getJobManagerStatus().isEmpty()){ - return -1; - } - return lastStatusTime.until(LocalDateTime.now(), ChronoUnit.MILLIS); } @@ -71,10 +68,6 @@ public void awaitClose() { SharedMetricRegistries.getDefault().remove(getLatenessMetricName()); } - public long calculatePressure() { - return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum(); - } - public void addJobManagerStatus(JobManagerStatus incoming) { lastStatusTime = LocalDateTime.now(); @@ -82,25 +75,35 @@ public void addJobManagerStatus(JobManagerStatus incoming) { // replace with new status jobManagerStatus.remove(incoming); jobManagerStatus.add(incoming); - } - if (calculatePressure() < backpressure) { - synchronized (jobManagerSync) { - jobManagerSync.notifyAll(); + + final long pressure = calculatePressure(); + final boolean isFull = pressure > backpressure; + + full.set(isFull); + + if (!isFull) { + synchronized (jobManagerSync) { + jobManagerSync.notifyAll(); + } } } + + + } + + private long calculatePressure() { + return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum(); } public void waitForFreeJobQueue() throws InterruptedException { - if (jobManagerStatus.isEmpty()) { + if (!full.get()) { return; } - if (calculatePressure() >= backpressure) { - log.trace("Have to wait for free JobQueue (size = {})", jobManagerStatus.size()); - synchronized (jobManagerSync) { - jobManagerSync.wait(); - } + synchronized (jobManagerSync) { + log.trace("Have to wait for free JobQueue"); + jobManagerSync.wait(); } } }