Skip to content

Commit

Permalink
restructure backpressure calculation to avoid calculation in consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
awildturtok committed Jul 24, 2023
1 parent 1c90c68 commit 5dc7a33
Showing 1 changed file with 22 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import com.bakdata.conquery.io.mina.MessageSender;
import com.bakdata.conquery.io.mina.NetworkSession;
Expand Down Expand Up @@ -35,7 +36,7 @@ public class ShardNodeInformation extends MessageSender.Simple<MessageToShardNod
@JsonIgnore
@Getter
private final Set<JobManagerStatus> jobManagerStatus = new HashSet<>();

private final AtomicBoolean full = new AtomicBoolean(false);
private LocalDateTime lastStatusTime = LocalDateTime.now();

public ShardNodeInformation(NetworkSession session, int backpressure) {
Expand All @@ -57,12 +58,6 @@ private String getLatenessMetricName() {
* Calculate the time in Milliseconds since we last received a {@link JobManagerStatus} from the corresponding shard.
*/
private long getMillisSinceLastStatus() {
synchronized (jobManagerStatus) {
if (getJobManagerStatus().isEmpty()) {
return -1;
}
}

return lastStatusTime.until(LocalDateTime.now(), ChronoUnit.MILLIS);
}

Expand All @@ -81,25 +76,34 @@ public void addJobManagerStatus(JobManagerStatus incoming) {
jobManagerStatus.remove(incoming);
jobManagerStatus.add(incoming);

if (calculatePressure() < backpressure) {
jobManagerSync.notifyAll();

final long pressure = calculatePressure();
final boolean isFull = pressure > backpressure;

full.set(isFull);

if (!isFull) {
synchronized (jobManagerSync) {
jobManagerSync.notifyAll();
}
}
}


}

public long calculatePressure() {
synchronized (jobManagerStatus) {
return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum();
}
private long calculatePressure() {
return jobManagerStatus.stream().mapToLong(status -> status.getJobs().size()).sum();
}

public void waitForFreeJobQueue() throws InterruptedException {
if (calculatePressure() >= backpressure) {
synchronized (jobManagerSync) {
log.trace("Have to wait for free JobQueue (size = {})", jobManagerStatus.size());
jobManagerSync.wait();
}
if (!full.get()) {
return;
}

synchronized (jobManagerSync) {
log.trace("Have to wait for free JobQueue");
jobManagerSync.wait();
}
}
}

0 comments on commit 5dc7a33

Please sign in to comment.