From 1c90c6837c65a5c0aac3df843a514014ee61ddb0 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Tue, 4 Jul 2023 17:52:24 +0200 Subject: [PATCH] fixes synchronization of jobManagerStatus --- .../models/worker/ShardNodeInformation.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java index e369c9819b..279ddfce02 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java @@ -57,8 +57,10 @@ private String getLatenessMetricName() { * Calculate the time in Milliseconds since we last received a {@link JobManagerStatus} from the corresponding shard. */ private long getMillisSinceLastStatus() { - if(getJobManagerStatus().isEmpty()){ - return -1; + synchronized (jobManagerStatus) { + if (getJobManagerStatus().isEmpty()) { + return -1; + } } return lastStatusTime.until(LocalDateTime.now(), ChronoUnit.MILLIS); @@ -71,10 +73,6 @@ public void awaitClose() { SharedMetricRegistries.getDefault().remove(getLatenessMetricName()); } - public long calculatePressure() { - return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum(); - } - public void addJobManagerStatus(JobManagerStatus incoming) { lastStatusTime = LocalDateTime.now(); @@ -82,23 +80,24 @@ public void addJobManagerStatus(JobManagerStatus incoming) { // replace with new status jobManagerStatus.remove(incoming); jobManagerStatus.add(incoming); - } - if (calculatePressure() < backpressure) { - synchronized (jobManagerSync) { + if (calculatePressure() < backpressure) { jobManagerSync.notifyAll(); } } + } - public void waitForFreeJobQueue() throws InterruptedException { - if (jobManagerStatus.isEmpty()) { - return; + public long calculatePressure() { + synchronized (jobManagerStatus) { + return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum(); } + } + public void waitForFreeJobQueue() throws InterruptedException { if (calculatePressure() >= backpressure) { - log.trace("Have to wait for free JobQueue (size = {})", jobManagerStatus.size()); synchronized (jobManagerSync) { + log.trace("Have to wait for free JobQueue (size = {})", jobManagerStatus.size()); jobManagerSync.wait(); } }