Skip to content

Commit

Permalink
fixes synchronization of jobManagerStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
awildturtok committed Jul 24, 2023
1 parent 5c4ef0b commit 1c90c68
Showing 1 changed file with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ private String getLatenessMetricName() {
* Calculate the time in Milliseconds since we last received a {@link JobManagerStatus} from the corresponding shard.
*/
private long getMillisSinceLastStatus() {
if(getJobManagerStatus().isEmpty()){
return -1;
synchronized (jobManagerStatus) {
if (getJobManagerStatus().isEmpty()) {
return -1;
}
}

return lastStatusTime.until(LocalDateTime.now(), ChronoUnit.MILLIS);
Expand All @@ -71,34 +73,31 @@ public void awaitClose() {
SharedMetricRegistries.getDefault().remove(getLatenessMetricName());
}

public long calculatePressure() {
return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum();
}

public void addJobManagerStatus(JobManagerStatus incoming) {
lastStatusTime = LocalDateTime.now();

synchronized (jobManagerStatus) {
// replace with new status
jobManagerStatus.remove(incoming);
jobManagerStatus.add(incoming);
}

if (calculatePressure() < backpressure) {
synchronized (jobManagerSync) {
if (calculatePressure() < backpressure) {
jobManagerSync.notifyAll();
}
}

}

public void waitForFreeJobQueue() throws InterruptedException {
if (jobManagerStatus.isEmpty()) {
return;
public long calculatePressure() {
synchronized (jobManagerStatus) {
return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum();
}
}

public void waitForFreeJobQueue() throws InterruptedException {
if (calculatePressure() >= backpressure) {
log.trace("Have to wait for free JobQueue (size = {})", jobManagerStatus.size());
synchronized (jobManagerSync) {
log.trace("Have to wait for free JobQueue (size = {})", jobManagerStatus.size());
jobManagerSync.wait();
}
}
Expand Down

0 comments on commit 1c90c68

Please sign in to comment.