From 1c90c6837c65a5c0aac3df843a514014ee61ddb0 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Tue, 4 Jul 2023 17:52:24 +0200 Subject: [PATCH 1/2] fixes synchronization of jobManagerStatus --- .../models/worker/ShardNodeInformation.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java index e369c9819b..279ddfce02 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java @@ -57,8 +57,10 @@ private String getLatenessMetricName() { * Calculate the time in Milliseconds since we last received a {@link JobManagerStatus} from the corresponding shard. */ private long getMillisSinceLastStatus() { - if(getJobManagerStatus().isEmpty()){ - return -1; + synchronized (jobManagerStatus) { + if (getJobManagerStatus().isEmpty()) { + return -1; + } } return lastStatusTime.until(LocalDateTime.now(), ChronoUnit.MILLIS); @@ -71,10 +73,6 @@ public void awaitClose() { SharedMetricRegistries.getDefault().remove(getLatenessMetricName()); } - public long calculatePressure() { - return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum(); - } - public void addJobManagerStatus(JobManagerStatus incoming) { lastStatusTime = LocalDateTime.now(); @@ -82,23 +80,24 @@ public void addJobManagerStatus(JobManagerStatus incoming) { // replace with new status jobManagerStatus.remove(incoming); jobManagerStatus.add(incoming); - } - if (calculatePressure() < backpressure) { - synchronized (jobManagerSync) { + if (calculatePressure() < backpressure) { jobManagerSync.notifyAll(); } } + } - public void waitForFreeJobQueue() throws InterruptedException { - if (jobManagerStatus.isEmpty()) { - return; + public long calculatePressure() { + synchronized (jobManagerStatus) { + return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum(); } + } + public void waitForFreeJobQueue() throws InterruptedException { if (calculatePressure() >= backpressure) { - log.trace("Have to wait for free JobQueue (size = {})", jobManagerStatus.size()); synchronized (jobManagerSync) { + log.trace("Have to wait for free JobQueue (size = {})", jobManagerStatus.size()); jobManagerSync.wait(); } } From 5dc7a335136aa04e1e34a0d9f09986a5622639d5 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Thu, 6 Jul 2023 11:14:22 +0200 Subject: [PATCH 2/2] restructure backpressure calculation to avoid calculation in consumers --- .../models/worker/ShardNodeInformation.java | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java index 279ddfce02..fe15ed6524 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java @@ -4,6 +4,7 @@ import java.time.temporal.ChronoUnit; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import com.bakdata.conquery.io.mina.MessageSender; import com.bakdata.conquery.io.mina.NetworkSession; @@ -35,7 +36,7 @@ public class ShardNodeInformation extends MessageSender.Simple jobManagerStatus = new HashSet<>(); - + private final AtomicBoolean full = new AtomicBoolean(false); private LocalDateTime lastStatusTime = LocalDateTime.now(); public ShardNodeInformation(NetworkSession session, int backpressure) { @@ -57,12 +58,6 @@ private String getLatenessMetricName() { * Calculate the time in Milliseconds since we last received a {@link JobManagerStatus} from the corresponding shard. */ private long getMillisSinceLastStatus() { - synchronized (jobManagerStatus) { - if (getJobManagerStatus().isEmpty()) { - return -1; - } - } - return lastStatusTime.until(LocalDateTime.now(), ChronoUnit.MILLIS); } @@ -81,25 +76,34 @@ public void addJobManagerStatus(JobManagerStatus incoming) { jobManagerStatus.remove(incoming); jobManagerStatus.add(incoming); - if (calculatePressure() < backpressure) { - jobManagerSync.notifyAll(); + + final long pressure = calculatePressure(); + final boolean isFull = pressure > backpressure; + + full.set(isFull); + + if (!isFull) { + synchronized (jobManagerSync) { + jobManagerSync.notifyAll(); + } } } + } - public long calculatePressure() { - synchronized (jobManagerStatus) { - return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum(); - } + private long calculatePressure() { + return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum(); } public void waitForFreeJobQueue() throws InterruptedException { - if (calculatePressure() >= backpressure) { - synchronized (jobManagerSync) { - log.trace("Have to wait for free JobQueue (size = {})", jobManagerStatus.size()); - jobManagerSync.wait(); - } + if (!full.get()) { + return; + } + + synchronized (jobManagerSync) { + log.trace("Have to wait for free JobQueue"); + jobManagerSync.wait(); } } }