Skip to content

Commit

Permalink
Implemented rate limiting for CQRS compliant devices (#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Oct 28, 2023
1 parent d350d07 commit a83e002
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import net.sf.dz3r.signal.Signal;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class AbstractCqrsDevice<I, O> implements CqrsDevice<I, O> {
Expand All @@ -38,6 +40,9 @@ public abstract class AbstractCqrsDevice<I, O> implements CqrsDevice<I, O> {
private final Disposable commandSubscription;
protected final Sinks.Many<Signal<DeviceState<O>, String>> stateSink = Sinks.many().multicast().onBackpressureBuffer();

private I lastCommand;
private Instant lastSet;

protected O requested;
protected O actual;

Expand All @@ -53,9 +58,49 @@ protected AbstractCqrsDevice(String id, Clock clock, Duration heartbeat, Duratio
commandSubscription = commandSink
.asFlux()
.publishOn(Schedulers.newSingle("cqrs-" + id))
.flatMap(this::limitRate)
.subscribe(this::setStateSync);
}

/**
* Make sure {@link #setStateSync(Object)} is not called for identical commands more often than {@link #pace} interval in between.
*
* @param command Command to inspect
*
* @return Flux of just the input command if the pace is not exceeded, or empty Flux if it is.
*/
synchronized Flux<I> limitRate(I command) {

ThreadContext.push("limitRate");

try {

var now = clock.instant();

if (!command.equals(lastCommand)) {

logger.trace("{}: command={} pass - different from {}", id, command, lastCommand);

lastCommand = command;
lastSet = now;

return Flux.just(command);
}

var interval = Duration.between(lastSet, now);
if (interval.compareTo(pace) < 0) {
logger.trace("{}: command={} drop - too soon ({} vs {})", id, command, interval, pace);
return Flux.empty();
}

logger.trace("{}: command={} pass - {} is beyond {}", id, command, interval, pace);
lastSet = now;
return Flux.just(command);

} finally {
ThreadContext.pop();
}
}

@Override
public DeviceState<O> getState() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package net.sf.dz3r.device.actuator;

import net.sf.dz3r.common.TestClock;
import net.sf.dz3r.device.DeviceState;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;

import java.time.Clock;
import java.time.Duration;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

class AbstractCqrsDeviceTest {

@ParameterizedTest
@MethodSource("getPaceStream")
void pace(Flux<PaceTuple> source) {

var clock = new TestClock();
var device = new PaceTest("pt", clock, null, Duration.ofSeconds(30));

source
.doOnNext(t -> {
clock.setOffset(t.offset);
assertThat(device.limitRate(t.command).blockFirst()).isEqualTo(t.expected);
})
.blockLast();
}

private record PaceTuple(
String command,
Duration offset,
String expected
) {

}

static Stream<Flux<PaceTuple>> getPaceStream() {

return Stream.of(
Flux.just(
new PaceTuple("A", Duration.ZERO, "A"), // Pace is 30 seconds
new PaceTuple("A", Duration.ofSeconds(40), "A"), // Beyond the window, should be let through
new PaceTuple("A", Duration.ofSeconds(50), null), // Falls within, swallowed
new PaceTuple("B", Duration.ofSeconds(51), "B") // Different command, let through
)
);
}

private class PaceTest extends AbstractCqrsDevice<String, String> {

protected PaceTest(String id, Clock clock, Duration heartbeat, Duration pace) {
super(id, clock, heartbeat, pace);
}

@Override
protected void setStateSync(String command) {
throw new IllegalStateException("shouldn't be used");
}

@Override
protected String getCloseCommand() {
throw new IllegalStateException("shouldn't be used");
}

@Override
protected void closeSubclass() throws Exception {
throw new IllegalStateException("shouldn't be used");
}

@Override
public boolean isAvailable() {
throw new IllegalStateException("shouldn't be used");
}

@Override
public DeviceState<String> setState(String state) {
throw new IllegalStateException("shouldn't be used");
}
}
}
4 changes: 2 additions & 2 deletions dz3r-model/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
<Configuration status="WARN">
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT">
<PatternLayout pattern="%highlight{%d{HH:mm:ss,SSS} %level %class{1} %t %NDC %message%n}"/>
<PatternLayout pattern="%highlight{%d{HH:mm:ss,SSS} %level %class{1} %t %NDC %message%n}{TRACE=white}"/>
</Console>
</Appenders>
<Loggers>
<Root level="DEBUG">
<Root level="TRACE">
<AppenderRef ref="CONSOLE"/>
</Root>
</Loggers>
Expand Down

0 comments on commit a83e002

Please sign in to comment.