Skip to content

Commit

Permalink
Merge pull request #3138 from ingef/hotfix/synchronise-job-queue
Browse files Browse the repository at this point in the history
Fixes synchonization on job queue, as that was causing issues with long import jobs
  • Loading branch information
awildturtok authored Jul 24, 2023
2 parents 5c4ef0b + 5dc7a33 commit ab16837
Showing 1 changed file with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import com.bakdata.conquery.io.mina.MessageSender;
import com.bakdata.conquery.io.mina.NetworkSession;
Expand Down Expand Up @@ -35,7 +36,7 @@ public class ShardNodeInformation extends MessageSender.Simple<MessageToShardNod
@JsonIgnore
@Getter
private final Set<JobManagerStatus> jobManagerStatus = new HashSet<>();

private final AtomicBoolean full = new AtomicBoolean(false);
private LocalDateTime lastStatusTime = LocalDateTime.now();

public ShardNodeInformation(NetworkSession session, int backpressure) {
Expand All @@ -57,10 +58,6 @@ private String getLatenessMetricName() {
* Calculate the time in Milliseconds since we last received a {@link JobManagerStatus} from the corresponding shard.
*/
private long getMillisSinceLastStatus() {
if(getJobManagerStatus().isEmpty()){
return -1;
}

return lastStatusTime.until(LocalDateTime.now(), ChronoUnit.MILLIS);
}

Expand All @@ -71,36 +68,42 @@ public void awaitClose() {
SharedMetricRegistries.getDefault().remove(getLatenessMetricName());
}

public long calculatePressure() {
return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum();
}

public void addJobManagerStatus(JobManagerStatus incoming) {
lastStatusTime = LocalDateTime.now();

synchronized (jobManagerStatus) {
// replace with new status
jobManagerStatus.remove(incoming);
jobManagerStatus.add(incoming);
}

if (calculatePressure() < backpressure) {
synchronized (jobManagerSync) {
jobManagerSync.notifyAll();

final long pressure = calculatePressure();
final boolean isFull = pressure > backpressure;

full.set(isFull);

if (!isFull) {
synchronized (jobManagerSync) {
jobManagerSync.notifyAll();
}
}
}


}

private long calculatePressure() {
return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum();
}

public void waitForFreeJobQueue() throws InterruptedException {
if (jobManagerStatus.isEmpty()) {
if (!full.get()) {
return;
}

if (calculatePressure() >= backpressure) {
log.trace("Have to wait for free JobQueue (size = {})", jobManagerStatus.size());
synchronized (jobManagerSync) {
jobManagerSync.wait();
}
synchronized (jobManagerSync) {
log.trace("Have to wait for free JobQueue");
jobManagerSync.wait();
}
}
}

0 comments on commit ab16837

Please sign in to comment.