diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractCqrsDevice.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractCqrsDevice.java index 4b44148e2..b80811725 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractCqrsDevice.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractCqrsDevice.java @@ -5,6 +5,7 @@ import net.sf.dz3r.signal.Signal; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.ThreadContext; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; @@ -12,6 +13,7 @@ import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.concurrent.atomic.AtomicInteger; public abstract class AbstractCqrsDevice implements CqrsDevice { @@ -38,6 +40,9 @@ public abstract class AbstractCqrsDevice implements CqrsDevice { private final Disposable commandSubscription; protected final Sinks.Many, String>> stateSink = Sinks.many().multicast().onBackpressureBuffer(); + private I lastCommand; + private Instant lastSet; + protected O requested; protected O actual; @@ -53,9 +58,49 @@ protected AbstractCqrsDevice(String id, Clock clock, Duration heartbeat, Duratio commandSubscription = commandSink .asFlux() .publishOn(Schedulers.newSingle("cqrs-" + id)) + .flatMap(this::limitRate) .subscribe(this::setStateSync); } + /** + * Make sure {@link #setStateSync(Object)} is not called for identical commands more often than {@link #pace} interval in between. + * + * @param command Command to inspect + * + * @return Flux of just the input command if the pace is not exceeded, or empty Flux if it is. + */ + synchronized Flux limitRate(I command) { + + ThreadContext.push("limitRate"); + + try { + + var now = clock.instant(); + + if (!command.equals(lastCommand)) { + + logger.trace("{}: command={} pass - different from {}", id, command, lastCommand); + + lastCommand = command; + lastSet = now; + + return Flux.just(command); + } + + var interval = Duration.between(lastSet, now); + if (interval.compareTo(pace) < 0) { + logger.trace("{}: command={} drop - too soon ({} vs {})", id, command, interval, pace); + return Flux.empty(); + } + + logger.trace("{}: command={} pass - {} is beyond {}", id, command, interval, pace); + lastSet = now; + return Flux.just(command); + + } finally { + ThreadContext.pop(); + } + } @Override public DeviceState getState() { diff --git a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/AbstractCqrsDeviceTest.java b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/AbstractCqrsDeviceTest.java new file mode 100644 index 000000000..89945f25e --- /dev/null +++ b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/AbstractCqrsDeviceTest.java @@ -0,0 +1,83 @@ +package net.sf.dz3r.device.actuator; + +import net.sf.dz3r.common.TestClock; +import net.sf.dz3r.device.DeviceState; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Flux; + +import java.time.Clock; +import java.time.Duration; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +class AbstractCqrsDeviceTest { + + @ParameterizedTest + @MethodSource("getPaceStream") + void pace(Flux source) { + + var clock = new TestClock(); + var device = new PaceTest("pt", clock, null, Duration.ofSeconds(30)); + + source + .doOnNext(t -> { + clock.setOffset(t.offset); + assertThat(device.limitRate(t.command).blockFirst()).isEqualTo(t.expected); + }) + .blockLast(); + } + + private record PaceTuple( + String command, + Duration offset, + String expected + ) { + + } + + static Stream> getPaceStream() { + + return Stream.of( + Flux.just( + new PaceTuple("A", Duration.ZERO, "A"), // Pace is 30 seconds + new PaceTuple("A", Duration.ofSeconds(40), "A"), // Beyond the window, should be let through + new PaceTuple("A", Duration.ofSeconds(50), null), // Falls within, swallowed + new PaceTuple("B", Duration.ofSeconds(51), "B") // Different command, let through + ) + ); + } + + private class PaceTest extends AbstractCqrsDevice { + + protected PaceTest(String id, Clock clock, Duration heartbeat, Duration pace) { + super(id, clock, heartbeat, pace); + } + + @Override + protected void setStateSync(String command) { + throw new IllegalStateException("shouldn't be used"); + } + + @Override + protected String getCloseCommand() { + throw new IllegalStateException("shouldn't be used"); + } + + @Override + protected void closeSubclass() throws Exception { + throw new IllegalStateException("shouldn't be used"); + } + + @Override + public boolean isAvailable() { + throw new IllegalStateException("shouldn't be used"); + } + + @Override + public DeviceState setState(String state) { + throw new IllegalStateException("shouldn't be used"); + } + } +} diff --git a/dz3r-model/src/test/resources/log4j2.xml b/dz3r-model/src/test/resources/log4j2.xml index 9f4ce2f2b..d90353193 100644 --- a/dz3r-model/src/test/resources/log4j2.xml +++ b/dz3r-model/src/test/resources/log4j2.xml @@ -2,11 +2,11 @@ - + - +