diff --git a/docs/configuration/mqtt.md b/docs/configuration/mqtt.md index 32dbb4741..36149dd4b 100644 --- a/docs/configuration/mqtt.md +++ b/docs/configuration/mqtt.md @@ -13,7 +13,7 @@ They all share common properties: id: # optional, defaults to internally generated stable hash code host: port: # optional, defaults to 1883 - root-topic: + root-topic: username: # optional, but recommended password: # optional, but recommended auto-reconnect: #optional, see below diff --git a/docs/configuration/sensors-switches-fans.md b/docs/configuration/sensors-switches-fans.md index 7a96f2f5c..75a258691 100644 --- a/docs/configuration/sensors-switches-fans.md +++ b/docs/configuration/sensors-switches-fans.md @@ -61,7 +61,7 @@ switches: reversed: heartbeat: pace: - optimistic: + availability-topic: ``` #### reversed @@ -73,8 +73,11 @@ Optional. Send the command to hardware this often even if the logical state hasn #### pace Optional. Send the same command to hardware no more often that this. Some bridges (notably `zigbee2mqtt`) are known to become unresponsive with no error indication when incoming traffic exceeds their bandwidth. -#### optimistic -Optional. Send the command to hardware and don't wait for confirmation. Normally, you wouldn't have to do this, but some firmware (notably, [ESPHome](./esphome.md)) doesn't provide reliable confirmation so this may save the situation (and is a default for known hardware types). Use only if you must, and consider using [heartbeat](#heartbeat) to offset the risk. +#### availability-topic +* Mandatory for [ESPHome](./esphome.md) devices (see [esphome #5030](https://github.com/esphome/issues/issues/5030) for more information); +* Disallowed for [Zigbee](./zigbee2mqtt.md) and [Z-Wave](./zwave2mqtt.md) devices. + +Log messages at `ERROR` level will provide enough details to resolve the problem. ### fans Similar to above: @@ -85,11 +88,9 @@ fans: availability: /esphome/550212/status heartbeat: pace: + availability-topic: ``` -`id`, `address`, `heartbeat`, and `pace` parameters are identical to those above. - -#### availability -Defines the topic where the device announces its availability. +`id`, `address`, `heartbeat`, `pace`, and `availability-topic` parameters are identical to those above. This is how ESPHome configuration section looks like (`pin` and `min_power` depend on your particular hardware setup): @@ -113,7 +114,7 @@ fan: For more information, see [ESPHome Fan Component](https://esphome.io/components/fan/). -> **NOTE:** Leave `speed_count` at default (100), or this integration will not work. +> **NOTE:** Leave ESPHome `speed_count` at default (100), or this integration will not work. ### Property of * [esphome](./esphome.md) diff --git a/docs/configuration/zones.md b/docs/configuration/zones.md index d96913db3..718ce6f05 100644 --- a/docs/configuration/zones.md +++ b/docs/configuration/zones.md @@ -38,6 +38,7 @@ Best explained by example: limit: 0.7 mode: cooling hvac-device: economizer-a6 + timeout: 75S ``` @@ -79,6 +80,7 @@ Cooling mode assumed: * `controller`: just like the zone configuration above. * `mode`: self-explanatory * `hvac-device`: at this point, the economizer is an on/off device (multistage coming). This is the identifier of the [HVAC device](./hvac.md) acting as an economizer. +* `timeout`: treat both indoor and ambient sensors as stale and shut off the economizer after not receiving data from them for this long. Default is 90 seconds. The system will complain at `INFO` level if this is happening. ### Property of * [home-climate-control](./home-climate-control.md) diff --git a/docs/release-notes.md b/docs/release-notes.md index 43ae242f3..70cb693aa 100644 --- a/docs/release-notes.md +++ b/docs/release-notes.md @@ -4,6 +4,7 @@ Home Climate Control: Release Notes ## Coming Up * [#293 Implement variable output single mode HVAC device](https://github.com/home-climate-control/dz/issues/293) +* [#292 Decouple possibly faulty actuators from control logic so that the processing pipeline doesn't get stuck ](https://github.com/home-climate-control/dz/issues/292) * [#291 Economizer: control a HVAC device, not a switch](https://github.com/home-climate-control/dz/issues/291) ## v4.0.0 diff --git a/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/hardware/SwitchConfig.java b/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/hardware/SwitchConfig.java index ad4729a6f..eb8a5dcba 100644 --- a/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/hardware/SwitchConfig.java +++ b/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/hardware/SwitchConfig.java @@ -21,6 +21,6 @@ public interface SwitchConfig { Optional heartbeat(); @JsonProperty("pace") Optional pace(); - @JsonProperty("optimistic") - Optional optimistic(); + @JsonProperty("availability-topic") + Optional availabilityTopic(); } diff --git a/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/model/EconomizerConfig.java b/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/model/EconomizerConfig.java index e8dcf6109..ef683e3b2 100644 --- a/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/model/EconomizerConfig.java +++ b/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/model/EconomizerConfig.java @@ -3,6 +3,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import net.sf.dz3r.model.HvacMode; +import java.time.Duration; +import java.util.Optional; + public interface EconomizerConfig { @JsonProperty("ambient-sensor") String ambientSensor(); @@ -18,4 +21,6 @@ public interface EconomizerConfig { HvacMode mode(); @JsonProperty("hvac-device") String hvacDevice(); + @JsonProperty("timeout") + Optional timeout(); } diff --git a/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/protocol/mqtt/FanConfig.java b/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/protocol/mqtt/FanConfig.java index 7759cea26..26a56ee21 100644 --- a/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/protocol/mqtt/FanConfig.java +++ b/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/protocol/mqtt/FanConfig.java @@ -1,4 +1,19 @@ package net.sf.dz3r.runtime.config.quarkus.protocol.mqtt; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.Duration; +import java.util.Optional; + public interface FanConfig { + @JsonProperty("id") + Optional id(); + @JsonProperty("address") + String address(); + @JsonProperty("heartbeat") + Optional heartbeat(); + @JsonProperty("pace") + Optional pace(); + @JsonProperty("availability-topic") + Optional availabilityTopic(); } diff --git a/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/protocol/mqtt/MqttDeviceConfig.java b/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/protocol/mqtt/MqttDeviceConfig.java index 9c65a93c6..335e73128 100644 --- a/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/protocol/mqtt/MqttDeviceConfig.java +++ b/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/config/quarkus/protocol/mqtt/MqttDeviceConfig.java @@ -16,6 +16,7 @@ public interface MqttDeviceConfig extends MqttGateway { @JsonProperty("broker") + @Override MqttBrokerConfig broker(); @JsonProperty("sensors") diff --git a/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/mapper/InterfaceRecordMapper.java b/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/mapper/InterfaceRecordMapper.java index e7ced7199..e3e65b5f8 100644 --- a/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/mapper/InterfaceRecordMapper.java +++ b/dz3r-app-quarkus/src/main/java/net/sf/dz3r/runtime/mapper/InterfaceRecordMapper.java @@ -95,9 +95,16 @@ public interface InterfaceRecordMapper { @Mapping(expression = "java(source.reversed().orElse(false))", target = "reversed") @Mapping(expression = "java(source.heartbeat().orElse(null))", target = "heartbeat") @Mapping(expression = "java(source.pace().orElse(null))", target = "pace") - @Mapping(expression = "java(source.optimistic().orElse(null))", target = "optimistic") + @Mapping(expression = "java(source.availabilityTopic().orElse(null))", target = "availabilityTopic") net.sf.dz3r.runtime.config.hardware.SwitchConfig switchConfig(SwitchConfig source); + @Mapping(expression = "java(source.id().orElse(null))", target = "id") + @Mapping(expression = "java(source.address())", target = "address") + @Mapping(expression = "java(source.heartbeat().orElse(null))", target = "heartbeat") + @Mapping(expression = "java(source.pace().orElse(null))", target = "pace") + @Mapping(expression = "java(source.availabilityTopic().orElse(null))", target = "availabilityTopic") + net.sf.dz3r.runtime.config.protocol.mqtt.FanConfig fanConfig(FanConfig source); + @Mapping(expression = "java(source.serialPort())", target = "serialPort") @Mapping(expression = "java(InterfaceRecordMapper.INSTANCE.sensors(source.sensors()))", target = "sensors") @Mapping(expression = "java(InterfaceRecordMapper.INSTANCE.switches(source.switches()))", target = "switches") @@ -147,6 +154,7 @@ public interface InterfaceRecordMapper { @Mapping(expression = "java(InterfaceRecordMapper.INSTANCE.controller(source.controller()))", target = "controller") @Mapping(expression = "java(source.mode())", target = "mode") @Mapping(expression = "java(source.hvacDevice())", target = "hvacDevice") + @Mapping(expression = "java(source.timeout().orElse(null))", target = "timeout") net.sf.dz3r.runtime.config.model.EconomizerConfig economizer(EconomizerConfig source); @Mapping(expression = "java(source.min())", target = "min") @@ -217,6 +225,14 @@ public interface InterfaceRecordMapper { @Mapping(expression = "java(InterfaceRecordMapper.INSTANCE.filter(source.filter().orElse(null)))", target = "filter") net.sf.dz3r.runtime.config.hardware.HeatpumpConfig heatpump(HeatpumpConfig source); + @Mapping(expression = "java(source.id())", target = "id") + @Mapping(expression = "java(source.mode())", target = "mode") + @Mapping(expression = "java(source.actuator())", target = "actuator") + @Mapping(expression = "java(source.maxPower().orElse(null))", target = "maxPower") + @Mapping(expression = "java(source.bandCount().orElse(null))", target = "bandCount") + @Mapping(expression = "java(InterfaceRecordMapper.INSTANCE.filter(source.filter().orElse(null)))", target = "filter") + net.sf.dz3r.runtime.config.hardware.VariableHvacConfig variable(VariableHvacConfig source); + @Mapping(expression = "java(InterfaceRecordMapper.INSTANCE.singleStage(source.singleStage()))", target = "singleStage") @Mapping(expression = "java(InterfaceRecordMapper.INSTANCE.multiStage(source.multiStage()))", target = "multiStage") net.sf.dz3r.runtime.config.hardware.UnitControllerConfig unit(UnitControllerConfig source); diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/ApplicationBase.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/ApplicationBase.java index 9596e6215..4e986f119 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/ApplicationBase.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/ApplicationBase.java @@ -70,6 +70,7 @@ private void reportGitProperties() throws IOException { logger.debug("git.branch={}", p.get("git.branch")); logger.debug("git.commit.id={}", p.get("git.commit.id")); logger.debug("git.commit.id.abbrev={}", p.get("git.commit.id.abbrev")); + logger.debug("git.commit.id.describe={}", p.get("git.commit.id.describe")); logger.debug("git.build.version={}", p.get("git.build.version")); } diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/ConfigurationContext.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/ConfigurationContext.java index 09726b20f..b95f5fb5d 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/ConfigurationContext.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/ConfigurationContext.java @@ -1,7 +1,7 @@ package net.sf.dz3r.runtime.config; +import net.sf.dz3r.device.actuator.CqrsSwitch; import net.sf.dz3r.device.actuator.HvacDevice; -import net.sf.dz3r.device.actuator.Switch; import net.sf.dz3r.device.actuator.VariableOutputDevice; import net.sf.dz3r.device.mqtt.v1.MqttAdapter; import net.sf.dz3r.model.UnitController; @@ -30,7 +30,7 @@ public class ConfigurationContext { public final EntityProvider mqtt = new EntityProvider<>("mqtt"); public final EntityProvider>> sensors = new EntityProvider<>("sensor"); - public final EntityProvider> switches = new EntityProvider<>("switch"); + public final EntityProvider> switches = new EntityProvider<>("switch"); public final EntityProvider fans = new EntityProvider<>("fan"); public final EntityProvider zones = new EntityProvider<>("zone"); public final EntityProvider schedule = new EntityProvider<>("schedule"); diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/ConfigurationContextAware.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/ConfigurationContextAware.java index e861d4056..f5bec1d10 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/ConfigurationContextAware.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/ConfigurationContextAware.java @@ -1,7 +1,7 @@ package net.sf.dz3r.runtime.config; +import net.sf.dz3r.device.actuator.CqrsSwitch; import net.sf.dz3r.device.actuator.HvacDevice; -import net.sf.dz3r.device.actuator.Switch; import net.sf.dz3r.device.actuator.VariableOutputDevice; import net.sf.dz3r.model.UnitController; import net.sf.dz3r.model.Zone; @@ -38,7 +38,7 @@ protected final Mono>> getSensor(String address) { .doOnNext(s -> logger.debug("getSensor({}) = {}", address, s)); } - protected final Switch getSwitch(String address) { + protected final CqrsSwitch getSwitch(String address) { return context .switches .getMonoById("switches", address) diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/MockConfigurationParser.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/MockConfigurationParser.java index 7dfaa3748..2b8de671e 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/MockConfigurationParser.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/MockConfigurationParser.java @@ -1,9 +1,9 @@ package net.sf.dz3r.runtime.config; +import net.sf.dz3r.device.actuator.CqrsSwitch; +import net.sf.dz3r.device.actuator.NullCqrsSwitch; import net.sf.dz3r.runtime.config.hardware.MockConfig; import net.sf.dz3r.runtime.config.hardware.SwitchConfig; -import net.sf.dz3r.device.actuator.NullSwitch; -import net.sf.dz3r.device.actuator.Switch; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -17,16 +17,16 @@ protected MockConfigurationParser(ConfigurationContext context) { super(context); } - public Mono> parse(Set source) { + public Mono> parse(Set source) { // Trivial operation, no need to bother with parallelizing return Flux .fromIterable(Optional.ofNullable(source).orElse(Set.of())) .flatMap(c -> Flux.fromIterable(c.switches())) .map(SwitchConfig::address) - .map(NullSwitch::new) + .map(NullCqrsSwitch::new) .doOnNext(s -> context.switches.register(s.getAddress(), s)) - .map(Switch.class::cast) + .map(CqrsSwitch.class::cast) .collectList(); } } diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/model/ZoneConfigurationParser.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/model/ZoneConfigurationParser.java index b13b93ffa..07376c1db 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/model/ZoneConfigurationParser.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/model/ZoneConfigurationParser.java @@ -12,6 +12,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import reactor.core.publisher.Flux; +import java.time.Duration; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -41,13 +42,13 @@ public void parse(Set source) { private Map.Entry createZone(ZoneConfig cf) { var ts = createThermostat(cf.name(), cf.settings().setpoint(), cf.settings().setpointRange(), cf.controller()); - var eco = createEconomizer(cf.economizer()); + var eco = createEconomizer(cf.name(), cf.economizer()); var zone = new Zone(ts, map(cf.settings()), eco); return new ImmutablePair<>(cf.id(), zone); } - private EconomizerContext createEconomizer(EconomizerConfig cf) { + private EconomizerContext createEconomizer(String zoneName, EconomizerConfig cf) { if (cf == null) { return null; @@ -55,6 +56,11 @@ private EconomizerContext createEconomizer(EconomizerConfig cf) { var ambientSensor = HCCObjects.requireNonNull(getSensorBlocking(cf.ambientSensor()), "can't resolve ambient-sensor=" + cf.ambientSensor()); var hvacDevice = HCCObjects.requireNonNull(getHvacDevice(cf.hvacDevice()), "can't resolve hvac-device=" + cf.hvacDevice()); + var timeout = Optional.ofNullable(cf.timeout()).orElseGet(() -> { + var t = Duration.ofSeconds(90); + logger.info("{}: using default stale timeout of {} for the economizer", zoneName, t); + return t; + }); return new EconomizerContext( new EconomizerSettings( @@ -66,7 +72,8 @@ private EconomizerContext createEconomizer(EconomizerConfig cf) { cf.controller().i(), cf.controller().limit()), ambientSensor, - hvacDevice); + hvacDevice, + timeout); } private Thermostat createThermostat(String name, Double setpoint, RangeConfig rangeConfig, PidControllerConfig cf) { diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ESPHomeDeviceResolver.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ESPHomeDeviceResolver.java index b65432196..45e09e66c 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ESPHomeDeviceResolver.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ESPHomeDeviceResolver.java @@ -1,18 +1,18 @@ package net.sf.dz3r.runtime.config.mqtt; -import net.sf.dz3r.device.esphome.v1.ESPHomeFan; import net.sf.dz3r.device.esphome.v1.ESPHomeListener; -import net.sf.dz3r.device.esphome.v1.ESPHomeSwitch; +import net.sf.dz3r.device.esphome.v2.ESPHomeCqrsSwitch; +import net.sf.dz3r.device.esphome.v2.ESPHomeFan; import net.sf.dz3r.device.mqtt.v1.MqttAdapter; import net.sf.dz3r.runtime.config.protocol.mqtt.MqttDeviceConfig; import net.sf.dz3r.runtime.config.protocol.mqtt.MqttEndpointSpec; +import java.time.Clock; import java.time.Duration; import java.util.Map; -import java.util.Optional; import java.util.Set; -public class ESPHomeDeviceResolver extends MqttDeviceResolver { +public class ESPHomeDeviceResolver extends MqttDeviceResolver { public ESPHomeDeviceResolver(Set source, Map endpoint2adapter) { super(source, endpoint2adapter); @@ -30,22 +30,25 @@ protected ESPHomeListener createSensorListener(MqttAdapter adapter, String rootT } @Override - protected ESPHomeSwitch createSwitch(MqttAdapter adapter, String rootTopic, Boolean optimistic) { + protected ESPHomeCqrsSwitch createSwitch(String id, Duration heartbeat, Duration pace, MqttAdapter adapter, String rootTopic, String availabilityTopic) { - // Optimistic defaults to true for this switch only - // https://github.com/home-climate-control/dz/issues/280 - - return new ESPHomeSwitch( + return new ESPHomeCqrsSwitch( + id, + Clock.systemUTC(), + heartbeat, + pace, adapter, rootTopic, - Optional.ofNullable(optimistic).orElse(true), - null); + availabilityTopic); } @Override - protected ESPHomeFan createFan(String id, MqttAdapter adapter, String rootTopic, String availabilityTopic) { + protected ESPHomeFan createFan(String id, Duration heartbeat, Duration pace, MqttAdapter adapter, String rootTopic, String availabilityTopic) { return new ESPHomeFan( id, + Clock.systemUTC(), + heartbeat, + pace, adapter, rootTopic, availabilityTopic diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttConfigurationParser.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttConfigurationParser.java index ce597bfb3..5c97ca1ce 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttConfigurationParser.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttConfigurationParser.java @@ -1,8 +1,8 @@ package net.sf.dz3r.runtime.config.mqtt; -import net.sf.dz3r.device.mqtt.v1.AbstractMqttSwitch; import net.sf.dz3r.device.mqtt.v1.MqttAdapter; import net.sf.dz3r.device.mqtt.v1.MqttEndpoint; +import net.sf.dz3r.device.mqtt.v2.AbstractMqttCqrsSwitch; import net.sf.dz3r.instrumentation.Marker; import net.sf.dz3r.runtime.config.ConfigurationContext; import net.sf.dz3r.runtime.config.ConfigurationContextAware; @@ -50,7 +50,7 @@ public MqttConfigurationParser(ConfigurationContext context) { */ public Mono, - List>>> parse( + List>>> parse( Set esphome, Set zigbee2mqtt, Set zwave2mqtt, @@ -128,14 +128,14 @@ List>>> parse( .publishOn(Schedulers.boundedElastic()) .flatMap(MqttDeviceResolver::getSwitches) .doOnNext(kv -> context.switches.register(kv.getKey(), kv.getValue())) - .map(kv -> ((Map.Entry) new ImmutablePair<>(kv.getKey(), kv.getValue()))) + .map(kv -> ((Map.Entry) new ImmutablePair<>(kv.getKey(), kv.getValue()))) .collectList(); var fans = mqttConfigs .publishOn(Schedulers.boundedElastic()) .flatMap(MqttDeviceResolver::getFans) .doOnNext(kv -> context.fans.register(kv.getKey(), kv.getValue())) - .map(kv -> (new ImmutablePair<>(kv.getKey(), kv.getValue()))) + .map(kv -> new ImmutablePair<>(kv.getKey(), kv.getValue())) .collectList(); logger.debug("waiting at the gate"); diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttDeviceResolver.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttDeviceResolver.java index 0d7a46e51..8d9022e43 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttDeviceResolver.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttDeviceResolver.java @@ -1,8 +1,8 @@ package net.sf.dz3r.runtime.config.mqtt; import net.sf.dz3r.device.actuator.VariableOutputDevice; -import net.sf.dz3r.device.mqtt.v1.AbstractMqttSwitch; import net.sf.dz3r.device.mqtt.v1.MqttAdapter; +import net.sf.dz3r.device.mqtt.v2.AbstractMqttCqrsSwitch; import net.sf.dz3r.runtime.config.ConfigurationMapper; import net.sf.dz3r.runtime.config.DeviceResolver; import net.sf.dz3r.runtime.config.Id2Flux; @@ -36,7 +36,7 @@ * * @author Copyright © Vadim Tkachenko 2001-2023 */ -public abstract class MqttDeviceResolver, S extends AbstractMqttSwitch, F extends VariableOutputDevice> extends DeviceResolver { +public abstract class MqttDeviceResolver, S extends AbstractMqttCqrsSwitch, F extends VariableOutputDevice> extends DeviceResolver { private final Map endpoint2adapter; private final Set sensorConfigs = new LinkedHashSet<>(); @@ -217,7 +217,13 @@ private Flux> getSwitches(Map> getFans() { return getFans(endpoint2adapter, fanConfigs); } @@ -242,7 +258,13 @@ private Flux> getFans(Map en var id = c.fanConfig().id(); var address = c.fanConfig.address(); - var s = createFan(id, adapter, address, c.fanConfig.availability()); + var s = createFan( + id, + c.fanConfig().heartbeat(), + getOrDefaultPace(c.fanConfig.pace(), Duration.ofMinutes(1), "fan=" + id), + adapter, + address, + c.fanConfig.availabilityTopic()); // ID takes precedence over address var key = id == null ? address : id; @@ -251,9 +273,9 @@ private Flux> getFans(Map en }); } - protected abstract S createSwitch(MqttAdapter adapter, String rootTopic, Boolean optimistic); + protected abstract S createSwitch(String id, Duration heartbeat, Duration pace, MqttAdapter adapter, String rootTopic, String availabilityTopic); - protected abstract F createFan(String id, MqttAdapter adapter, String rootTopic, String availabilityTopic); + protected abstract F createFan(String id, Duration heartbeat, Duration pace, MqttAdapter adapter, String rootTopic, String availabilityTopic); public record MqttSensorConfig( MqttBrokerSpec mqttBrokerSpec, diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ZWaveDeviceResolver.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ZWaveDeviceResolver.java index 57d27b031..f2bec72c1 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ZWaveDeviceResolver.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ZWaveDeviceResolver.java @@ -2,18 +2,18 @@ import net.sf.dz3r.device.actuator.VariableOutputDevice; import net.sf.dz3r.device.mqtt.v1.MqttAdapter; -import net.sf.dz3r.device.zwave.v1.ZWaveBinarySwitch; import net.sf.dz3r.device.zwave.v1.ZWaveSensorListener; +import net.sf.dz3r.device.zwave.v2.ZWaveCqrsBinarySwitch; import net.sf.dz3r.runtime.config.protocol.mqtt.MqttDeviceConfig; import net.sf.dz3r.runtime.config.protocol.mqtt.MqttEndpointSpec; import net.sf.dz3r.signal.SignalSource; +import java.time.Clock; import java.time.Duration; import java.util.Map; -import java.util.Optional; import java.util.Set; -public class ZWaveDeviceResolver extends MqttDeviceResolver, ZWaveBinarySwitch, VariableOutputDevice> { +public class ZWaveDeviceResolver extends MqttDeviceResolver, ZWaveCqrsBinarySwitch, VariableOutputDevice> { public ZWaveDeviceResolver(Set source, Map endpoint2adapter) { super(source, endpoint2adapter); @@ -39,16 +39,23 @@ protected SignalSource createSensorListener(MqttAdapter ad } @Override - protected ZWaveBinarySwitch createSwitch(MqttAdapter adapter, String rootTopic, Boolean optimistic) { - return new ZWaveBinarySwitch( + protected ZWaveCqrsBinarySwitch createSwitch(String id, Duration heartbeat, Duration pace, MqttAdapter adapter, String rootTopic, String availabilityTopic) { + + if (availabilityTopic != null) { + throw new IllegalArgumentException("zigbee2mqtt.switches.availability-topic is determined automatically, please remove it from the configuration"); + } + + return new ZWaveCqrsBinarySwitch( + id, + Clock.systemUTC(), + heartbeat, + pace, adapter, - rootTopic, - Optional.ofNullable(optimistic).orElse(false), - null); + rootTopic); } @Override - protected VariableOutputDevice createFan(String id, MqttAdapter adapter, String rootTopic, String availabilityTopic) { + protected VariableOutputDevice createFan(String id, Duration heartbeat, Duration pace, MqttAdapter adapter, String rootTopic, String availabilityTopic) { throw new UnsupportedOperationException("Not Implemented"); } } diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ZigbeeDeviceResolver.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ZigbeeDeviceResolver.java index 1a4a2400a..40bed612d 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ZigbeeDeviceResolver.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/ZigbeeDeviceResolver.java @@ -3,16 +3,16 @@ import net.sf.dz3r.device.actuator.VariableOutputDevice; import net.sf.dz3r.device.mqtt.v1.MqttAdapter; import net.sf.dz3r.device.z2m.v1.Z2MJsonListener; -import net.sf.dz3r.device.z2m.v1.Z2MSwitch; +import net.sf.dz3r.device.z2m.v2.Z2MCqrsSwitch; import net.sf.dz3r.runtime.config.protocol.mqtt.MqttDeviceConfig; import net.sf.dz3r.runtime.config.protocol.mqtt.MqttEndpointSpec; +import java.time.Clock; import java.time.Duration; import java.util.Map; -import java.util.Optional; import java.util.Set; -public class ZigbeeDeviceResolver extends MqttDeviceResolver { +public class ZigbeeDeviceResolver extends MqttDeviceResolver { public ZigbeeDeviceResolver(Set source, Map endpoint2adapter) { super(source, endpoint2adapter); @@ -38,16 +38,23 @@ protected Z2MJsonListener createSensorListener(MqttAdapter adapter, String rootT } @Override - protected Z2MSwitch createSwitch(MqttAdapter adapter, String rootTopic, Boolean optimistic) { - return new Z2MSwitch( + protected Z2MCqrsSwitch createSwitch(String id, Duration heartbeat, Duration pace, MqttAdapter adapter, String rootTopic, String availabilityTopic) { + + if (availabilityTopic != null) { + throw new IllegalArgumentException("zwave2mqtt.switches.availability-topic is determined automatically, please remove it from the configuration"); + } + + return new Z2MCqrsSwitch( + id, + Clock.systemUTC(), + heartbeat, + pace, adapter, - rootTopic, - Optional.ofNullable(optimistic).orElse(false), - null); + rootTopic); } @Override - protected VariableOutputDevice createFan(String id, MqttAdapter adapter, String rootTopic, String availabilityTopic) { + protected VariableOutputDevice createFan(String id, Duration heartbeat, Duration pace, MqttAdapter adapter, String rootTopic, String availabilityTopic) { throw new UnsupportedOperationException("Not Implemented"); } } diff --git a/dz3r-bootstrap/src/test/java/net/sf/dz3r/device/actuator/MqttHeatPumpTest.java b/dz3r-bootstrap/src/test/java/net/sf/dz3r/device/actuator/MqttHeatPumpTest.java index c4eca6ac7..94dc6dcb5 100644 --- a/dz3r-bootstrap/src/test/java/net/sf/dz3r/device/actuator/MqttHeatPumpTest.java +++ b/dz3r-bootstrap/src/test/java/net/sf/dz3r/device/actuator/MqttHeatPumpTest.java @@ -1,6 +1,8 @@ package net.sf.dz3r.device.actuator; -import net.sf.dz3r.device.esphome.v1.ESPHomeSwitch; +import net.sf.dz3r.device.esphome.v2.ESPHomeCqrsSwitch; +import net.sf.dz3r.device.mqtt.v1.MqttAdapter; +import net.sf.dz3r.device.mqtt.v1.MqttEndpoint; import net.sf.dz3r.model.HvacMode; import net.sf.dz3r.signal.Signal; import net.sf.dz3r.signal.hvac.HvacCommand; @@ -10,7 +12,7 @@ import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; import reactor.core.publisher.Flux; -import java.io.IOException; +import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -22,69 +24,75 @@ * VT: NOTE: Careful, this works on real hardware, disconnect before testing. */ @EnabledIfEnvironmentVariable( - named = "TEST_DZ_ESPHOME_HEATPUMP", + named = "TEST_HCC_ESPHOME_HEATPUMP", matches = "safe", disabledReason = "Only execute this test if a suitable MQTT broker and ESPHome switch device are available" -)class ESPHomeHeatPumpTest { +) +class ESPHomeHeatPumpTest { private final Logger logger = LogManager.getLogger(); private final String MQTT_BROKER = "mqtt-esphome"; - private final String ESPHOME_SWITCH_TOPIC_ROOT = "/esphome/0156AC/switch/"; + private final String ESPHOME_SWITCH_TOPIC_ROOT = "/esphome/81B190/switch/"; + private final String ESPHOME_AVAILABILITY_TOPIC = "/esphome/81B190/status"; @Test - void cycle() throws IOException { - - var switchFan = new ESPHomeSwitch(MQTT_BROKER, ESPHOME_SWITCH_TOPIC_ROOT + "t-relay-2-r0-fan"); - var switchRunning = new ESPHomeSwitch(MQTT_BROKER, ESPHOME_SWITCH_TOPIC_ROOT + "t-relay-2-r1-condenser"); - var switchMode = new ESPHomeSwitch(MQTT_BROKER, ESPHOME_SWITCH_TOPIC_ROOT + "t-relay-2-r2-mode"); - var delay = Duration.ofSeconds(1); - - // This should throw a wrench into everything... - Flux - .just(true, true, true) - .delayElements(delay) - .map(switchMode::setState) - .blockLast(); - - logger.info("set mode to 1"); - - try (var hp = new HeatPump( - "heatpump", - switchMode, true, - switchRunning, false, - switchFan, false, - Duration.ofSeconds(3), - null)) { - - var commands = Flux - .just( - new HvacCommand(HvacMode.COOLING, 0d, 0d), - new HvacCommand(null, 1d, 1d), - new HvacCommand(HvacMode.HEATING, 0d, 0d), - new HvacCommand(null, 1d, 1d), - new HvacCommand(null, 0d, 0d)) + void cycle() throws Exception { + + + try (var adapter = new MqttAdapter(new MqttEndpoint(MQTT_BROKER))) { + + var switchFan = new ESPHomeCqrsSwitch("fan", Clock.systemUTC(), null, null, adapter, ESPHOME_SWITCH_TOPIC_ROOT + "t-relay-3-r0-fan", ESPHOME_AVAILABILITY_TOPIC); + var switchRunning = new ESPHomeCqrsSwitch("running", Clock.systemUTC(), null, null, adapter, ESPHOME_SWITCH_TOPIC_ROOT + "t-relay-3-r1-condenser", ESPHOME_AVAILABILITY_TOPIC); + var switchMode = new ESPHomeCqrsSwitch("mode", Clock.systemUTC(), null, null, adapter, ESPHOME_SWITCH_TOPIC_ROOT + "t-relay-3-r2-mode", ESPHOME_AVAILABILITY_TOPIC); + var delay = Duration.ofSeconds(1); + + // This should throw a wrench into everything... + Flux + .just(true, true, true) .delayElements(delay) - .doOnNext(command -> logger.info("command: {}", command)) - .map(c -> new Signal(Instant.now(), c)); - - assertThatCode(() -> { - hp - .compute(commands) - .doOnNext(s -> logger.info("output: {}", s)) - .doOnNext(s -> { - if (s.isError()) { - throw new IllegalStateException("Failure output received: " + s); - } - }) - .blockLast(); - - logger.info("done"); - }) - .doesNotThrowAnyException(); - - // Should close() here automatically + .map(switchMode::setState) + .blockLast(); + + logger.info("set mode to 1"); + + try (var hp = new HeatPump( + "heatpump", + switchMode, true, + switchRunning, false, + switchFan, false, + Duration.ofSeconds(3), + null)) { + + var commands = Flux + .just( + new HvacCommand(HvacMode.COOLING, 0d, 0d), + new HvacCommand(null, 1d, 1d), + new HvacCommand(HvacMode.HEATING, 0d, 0d), + new HvacCommand(null, 1d, 1d), + new HvacCommand(null, 0d, 0d)) + .delayElements(delay) + .doOnNext(command -> logger.info("command: {}", command)) + .map(c -> new Signal(Instant.now(), c)); + + assertThatCode(() -> { + hp + .compute(commands) + .doOnNext(s -> logger.info("output: {}", s)) + .doOnNext(s -> { + if (s.isError()) { + throw new IllegalStateException("Failure output received: " + s); + } + }) + .blockLast(); + + logger.info("done"); + }) + .doesNotThrowAnyException(); + + // Should close() here automatically + } } } } diff --git a/dz3r-common/src/main/java/net/sf/dz3r/common/TestClock.java b/dz3r-common/src/main/java/net/sf/dz3r/common/TestClock.java new file mode 100644 index 000000000..143ea4aed --- /dev/null +++ b/dz3r-common/src/main/java/net/sf/dz3r/common/TestClock.java @@ -0,0 +1,39 @@ +package net.sf.dz3r.common; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; + +public class TestClock extends Clock { + + private final Clock baseClock; + private Duration offset = Duration.ofSeconds(0); + + public TestClock() { + baseClock = Clock.systemUTC(); + } + + public TestClock(Clock baseClock) { + this.baseClock = baseClock; + } + + public void setOffset(Duration offset) { + this.offset = offset; + } + + @Override + public ZoneId getZone() { + return baseClock.getZone(); + } + + @Override + public Clock withZone(ZoneId zone) { + return baseClock.withZone(zone); + } + + @Override + public Instant instant() { + return baseClock.instant().plus(offset); + } +} diff --git a/dz3r-common/src/main/java/net/sf/dz3r/counter/LoggerTimeUsageReporter.java b/dz3r-common/src/main/java/net/sf/dz3r/counter/LoggerTimeUsageReporter.java index d63215a29..2ea2bdf14 100644 --- a/dz3r-common/src/main/java/net/sf/dz3r/counter/LoggerTimeUsageReporter.java +++ b/dz3r-common/src/main/java/net/sf/dz3r/counter/LoggerTimeUsageReporter.java @@ -24,8 +24,8 @@ public class LoggerTimeUsageReporter implements ResourceUsageReporter private final Map frequencyMap = Map.of( Level.DEBUG, Duration.of(1, ChronoUnit.HOURS), Level.INFO, Duration.of(1, ChronoUnit.HOURS), - Level.WARN, Duration.of(10, ChronoUnit.MINUTES), - Level.ERROR, Duration.of(1, ChronoUnit.MINUTES) + Level.WARN, Duration.of(30, ChronoUnit.MINUTES), + Level.ERROR, Duration.of(10, ChronoUnit.MINUTES) ); private Instant lastAlertIssued; @@ -78,7 +78,7 @@ public void report(ResourceUsageCounter.State state) { return; } - logger.log(level, "{}: current usage {}%{}", marker, percent, (percent > 100 ? " (OVERDUE)" : "")); + logger.log(level, "{}: current usage {}%{}", marker, (int) percent, (percent > 100 ? " (OVERDUE)" : "")); lastAlertIssued = now; } finally { diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/GitProperties.java b/dz3r-common/src/main/java/net/sf/dz3r/runtime/GitProperties.java similarity index 100% rename from dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/GitProperties.java rename to dz3r-common/src/main/java/net/sf/dz3r/runtime/GitProperties.java diff --git a/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/hardware/SwitchConfig.java b/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/hardware/SwitchConfig.java index 12ccfbe64..8dd3dc986 100644 --- a/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/hardware/SwitchConfig.java +++ b/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/hardware/SwitchConfig.java @@ -1,5 +1,8 @@ package net.sf.dz3r.runtime.config.hardware; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; + import java.time.Duration; /** @@ -10,14 +13,15 @@ * @param reversed {@code true} if the switch must be reversed. * @param heartbeat Issue identical control commands to this switch at least this often, repeat if necessary. * @param pace Issue identical control commands to this switch at most this often. - * @param optimistic See issue 280. + * @param availabilityTopic Topic where MQTT switch availability information is published. Will cause {@link IllegalArgumentException} for other types. */ +@JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class) public record SwitchConfig( String id, String address, boolean reversed, Duration heartbeat, Duration pace, - Boolean optimistic + String availabilityTopic ) { } diff --git a/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/model/EconomizerConfig.java b/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/model/EconomizerConfig.java index 27ad3985f..fc7b8fc57 100644 --- a/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/model/EconomizerConfig.java +++ b/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/model/EconomizerConfig.java @@ -4,6 +4,8 @@ import com.fasterxml.jackson.databind.annotation.JsonNaming; import net.sf.dz3r.model.HvacMode; +import java.time.Duration; + @JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class) public record EconomizerConfig( String ambientSensor, @@ -12,6 +14,7 @@ public record EconomizerConfig( Boolean keepHvacOn, PidControllerConfig controller, HvacMode mode, - String hvacDevice + String hvacDevice, + Duration timeout ) { } diff --git a/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/protocol/mqtt/FanConfig.java b/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/protocol/mqtt/FanConfig.java index 642b0ebb2..35f09124d 100644 --- a/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/protocol/mqtt/FanConfig.java +++ b/dz3r-config/src/main/java/net/sf/dz3r/runtime/config/protocol/mqtt/FanConfig.java @@ -1,5 +1,8 @@ package net.sf.dz3r.runtime.config.protocol.mqtt; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; + import java.time.Duration; /** @@ -7,15 +10,16 @@ * * @param id Identifier, optional (defaults to {@link #address} if absent). * @param address Device address. Mandatory. - * @param availability Availability topic. Mandatory. * @param heartbeat Issue identical control commands to this switch at least this often, repeat if necessary. * @param pace Issue identical control commands to this switch at most this often. + * @param availabilityTopic Availability topic. Mandatory. */ +@JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class) public record FanConfig( String id, String address, - String availability, Duration heartbeat, - Duration pace + Duration pace, + String availabilityTopic ) { } diff --git a/dz3r-director/src/main/java/net/sf/dz3r/instrumentation/InstrumentCluster.java b/dz3r-director/src/main/java/net/sf/dz3r/instrumentation/InstrumentCluster.java index c086520ac..f8455ecfc 100644 --- a/dz3r-director/src/main/java/net/sf/dz3r/instrumentation/InstrumentCluster.java +++ b/dz3r-director/src/main/java/net/sf/dz3r/instrumentation/InstrumentCluster.java @@ -1,7 +1,7 @@ package net.sf.dz3r.instrumentation; +import net.sf.dz3r.device.actuator.CqrsSwitch; import net.sf.dz3r.device.actuator.HvacDevice; -import net.sf.dz3r.device.actuator.Switch; import net.sf.dz3r.scheduler.ScheduleUpdater; import net.sf.dz3r.signal.Signal; import net.sf.dz3r.signal.health.SystemStatus; @@ -31,7 +31,7 @@ public class InstrumentCluster { private final Logger logger = LogManager.getLogger(); private final Flux>>> sensors; - private final Flux>> switches; + private final Flux>> switches; private final Flux> schedule; private final Flux> connectors; private final Flux> collectors; @@ -51,7 +51,7 @@ public class InstrumentCluster { public InstrumentCluster( Flux>>> sensors, - Flux>> switches, + Flux>> switches, Flux> schedule, Flux> connectors, Flux> collectors, diff --git a/dz3r-director/src/main/java/net/sf/dz3r/instrumentation/SwitchStatusProcessor.java b/dz3r-director/src/main/java/net/sf/dz3r/instrumentation/SwitchStatusProcessor.java index b7c9d7016..b0d130bc7 100644 --- a/dz3r-director/src/main/java/net/sf/dz3r/instrumentation/SwitchStatusProcessor.java +++ b/dz3r-director/src/main/java/net/sf/dz3r/instrumentation/SwitchStatusProcessor.java @@ -1,6 +1,6 @@ package net.sf.dz3r.instrumentation; -import net.sf.dz3r.device.actuator.Switch; +import net.sf.dz3r.device.DeviceState; import net.sf.dz3r.signal.Signal; import net.sf.dz3r.signal.SignalProcessor; import net.sf.dz3r.signal.health.SwitchStatus; @@ -15,7 +15,7 @@ * * @author Copyright © Vadim Tkachenko 2001-2023 */ -public class SwitchStatusProcessor implements SignalProcessor { +public class SwitchStatusProcessor implements SignalProcessor, SwitchStatus, String> { private final Logger logger = LogManager.getLogger(); @@ -28,18 +28,18 @@ public SwitchStatusProcessor(String id) { } @Override - public Flux> compute(Flux> in) { + public Flux> compute(Flux, String>> in) { return in.map(this::compute); } - private Signal compute(Signal source) { + private Signal compute(Signal, String> source) { if (source.isError()) { - // Nothing else matters + // Nothing else matters, for now return new Signal<>(source.timestamp, null, null, source.status, source.error); } - // VT: FIXME: Pass/fail is the only thing of interest right now + // VT: FIXME: Pass/fail is the only thing of interest right now, but DeviceState contains some juicy bits return new Signal<>(source.timestamp, new SwitchStatus(Optional.empty())); } } diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractCqrsDevice.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractCqrsDevice.java new file mode 100644 index 000000000..c9c525710 --- /dev/null +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractCqrsDevice.java @@ -0,0 +1,161 @@ +package net.sf.dz3r.device.actuator; + +import net.sf.dz3r.common.HCCObjects; +import net.sf.dz3r.device.DeviceState; +import net.sf.dz3r.signal.Signal; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.ThreadContext; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class AbstractCqrsDevice implements CqrsDevice { + + protected final Logger logger = LogManager.getLogger(); + + protected final String id; + protected final Clock clock; + + + /** + * Issue identical control commands to this device at least this often, repeat if necessary. + */ + protected final Duration heartbeat; + + /** + * Issue identical control commands to this device at most this often. + */ + protected final Duration pace; + + protected final AtomicInteger queueDepth = new AtomicInteger(); + + protected final Sinks.Many commandSink = Sinks.many().multicast().onBackpressureBuffer(); + private final Disposable commandSubscription; + protected final Sinks.Many, String>> stateSink = Sinks.many().multicast().onBackpressureBuffer(); + + private I lastCommand; + private Instant lastSet; + + protected O requested; + protected O actual; + + protected AbstractCqrsDevice(String id, Clock clock, Duration heartbeat, Duration pace) { + + this.id = HCCObjects.requireNonNull(id, "id can't be null"); + this.clock = HCCObjects.requireNonNull(clock, "clock can't be null"); + + // These are nullable + this.heartbeat = heartbeat; + this.pace = pace; + + commandSubscription = commandSink + .asFlux() + .publishOn(Schedulers.newSingle("cqrs-" + id)) + .flatMap(this::limitRate) + .subscribe(this::setStateSync); + } + + /** + * Make sure {@link #setStateSync(Object)} is not called for identical commands more often than {@link #pace} interval in between. + * + * @param command Command to inspect + * + * @return Flux of just the input command if the pace is not exceeded, or empty Flux if it is. + */ + synchronized Flux limitRate(I command) { + + ThreadContext.push("limitRate"); + + try { + + if (pace == null) { + logger.trace("{}: null pace - passthrough command={}", id, command); + return Flux.just(command); + } + + var now = clock.instant(); + + if (!command.equals(lastCommand)) { + + logger.trace("{}: command={} pass - different from {}", id, command, lastCommand); + + lastCommand = command; + lastSet = now; + + return Flux.just(command); + } + + var interval = Duration.between(lastSet, now); + if (interval.compareTo(pace) < 0) { + logger.trace("{}: command={} drop - too soon ({} vs {})", id, command, interval, pace); + return Flux.empty(); + } + + logger.trace("{}: command={} pass - {} is beyond {}", id, command, interval, pace); + lastSet = now; + return Flux.just(command); + + } finally { + ThreadContext.pop(); + } + } + + @Override + public DeviceState getState() { + + return new DeviceState<>( + id, + isAvailable(), + requested, + actual, + queueDepth.get() + ); + } + + protected Signal, String> getStateSignal() { + return new Signal<>(clock.instant(), getState(), id); + } + + /** + * Set the requested state, synchronously + * + * @param command Command to execute. + */ + protected abstract void setStateSync(I command); + + @Override + public final Flux, String>> getFlux() { + return stateSink.asFlux(); + } + + protected abstract I getCloseCommand(); + protected abstract void closeSubclass() throws Exception; + + @Override + public final void close() throws Exception { + + // Prevent new commands from coming in + commandSubscription.dispose(); + + // Shut down the device + setStateSync(getCloseCommand()); + + // Adjust the queue depth - previous command skewed it + queueDepth.incrementAndGet(); + + // Emit the final notification + stateSink.tryEmitNext(getStateSignal()); + + // Indicate that we're done + stateSink.tryEmitComplete(); + + closeSubclass(); + } +} diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractHvacDevice.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractHvacDevice.java index c5c6b97f1..7959e3b7d 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractHvacDevice.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractHvacDevice.java @@ -15,6 +15,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Objects; +import java.util.Optional; /** * Common functionality for all HVAC device drivers. @@ -81,7 +82,7 @@ public final String getAddress() { return name; } - protected void check(Switch s, String purpose) { + protected void check(CqrsSwitch s, String purpose) { if (s == null) { throw new IllegalArgumentException("'" + purpose + "' switch can't be null"); } @@ -149,7 +150,9 @@ public final void close() throws IOException { } isClosed = true; - uptimeCounterSubscription.dispose(); + Optional + .ofNullable(uptimeCounterSubscription) + .ifPresent(Disposable::dispose); doClose(); } diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/CqrsDevice.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/CqrsDevice.java new file mode 100644 index 000000000..20ec1c966 --- /dev/null +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/CqrsDevice.java @@ -0,0 +1,57 @@ +package net.sf.dz3r.device.actuator; + +import net.sf.dz3r.device.DeviceState; +import net.sf.dz3r.signal.Signal; +import reactor.core.publisher.Flux; + +/** + * Common interface for devices supporting Command Query Responsibility Segregation pattern. + * + * @param Command type. + * @param Output type. * + * + * @author Copyright © Vadim Tkachenko 2001-2023 + */ +public interface CqrsDevice extends AutoCloseable { + + /** + * Check the reported device availability. + * + * @return previously reported device availability, immediately. + */ + boolean isAvailable(); + + /** + * Get the instant state of the device. + * + * @return Actual, current state of the device, immediately. + */ + DeviceState getState(); + + /** + * Request the provided state, return immediately. + * + * If the device is not {@link #isAvailable() available}, the command is still accepted. + * + * @param state State to set. + * + * @return The result of {@link #getState()} after the command was accepted (not executed). + */ + DeviceState setState(I state); + + /** + * Get the state change notification flux. + * + * Note that this sink will NOT emit error signals, however, {@link DeviceState#available} will reflect device availability + * the moment the signal was emitted. + * + * @return Flux emitting signals every time the {@link #setState requested} or {@link #getState() actual} state has changed. + * Signal payload is the device identifier assigned upon assembly. + */ + Flux, String>> getFlux(); + /** + * Close the device, synchronously. + */ + @Override + void close() throws Exception; +} diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/CqrsSwitch.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/CqrsSwitch.java new file mode 100644 index 000000000..d18af9a49 --- /dev/null +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/CqrsSwitch.java @@ -0,0 +1,19 @@ +package net.sf.dz3r.device.actuator; + +import net.sf.dz3r.device.Addressable; + +/** + * Single channel switch abstraction. + * + * Unlike {@link Switch}, this specification honors Command Query Responsibility Segregation + * (like {@link VariableOutputDevice} does). + * + * The idea is to replicate the functionality of the {@link Switch} hierarchy in a CQRS compliant way, then retire {@link Switch} + * and rename the classes of this hierarchy. + * + * @param Address type. + * + * @author Copyright © Vadim Tkachenko 2001-2023 + */ +public interface CqrsSwitch> extends CqrsDevice, Addressable { +} diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/HeatPump.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/HeatPump.java index 850f99b33..28da5e52f 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/HeatPump.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/HeatPump.java @@ -35,22 +35,22 @@ public class HeatPump extends AbstractHvacDevice { private static final Duration DEFAULT_MODE_CHANGE_DELAY = Duration.ofSeconds(10); private static final Reconciler reconciler = new Reconciler(); - private final Switch switchMode; - private final Switch switchRunning; + private final CqrsSwitch switchMode; + private final CqrsSwitch switchRunning; /** * Fan hardware control switch. * * @see #switchFanStack */ - private final Switch switchFan; + private final CqrsSwitch switchFan; /** * Fan logical control switch. * * @see #switchFan */ - private final StackingSwitch switchFanStack; + private final StackingCqrsSwitch switchFanStack; private final boolean reverseMode; private final boolean reverseRunning; @@ -84,9 +84,9 @@ public class HeatPump extends AbstractHvacDevice { */ public HeatPump( String name, - Switch switchMode, boolean reverseMode, - Switch switchRunning, boolean reverseRunning, - Switch switchFan, boolean reverseFan, + CqrsSwitch switchMode, boolean reverseMode, + CqrsSwitch switchRunning, boolean reverseRunning, + CqrsSwitch switchFan, boolean reverseFan, Duration changeModeDelay, ResourceUsageCounter uptimeCounter) { this(name, @@ -99,9 +99,9 @@ public HeatPump( } public HeatPump( String name, - Switch switchMode, boolean reverseMode, - Switch switchRunning, boolean reverseRunning, - Switch switchFan, boolean reverseFan, + CqrsSwitch switchMode, boolean reverseMode, + CqrsSwitch switchRunning, boolean reverseRunning, + CqrsSwitch switchFan, boolean reverseFan, Duration changeModeDelay, ResourceUsageCounter uptimeCounter, Scheduler scheduler) { @@ -121,7 +121,7 @@ public HeatPump( // "demand" - controlled by heating and cooling operations // "ventilation" - controlled by explicit requests to turn the fan on or off - this.switchFanStack = new StackingSwitch("fan", switchFan); + this.switchFanStack = new StackingCqrsSwitch("fan", switchFan); this.reverseMode = reverseMode; this.reverseRunning = reverseRunning; @@ -232,7 +232,7 @@ private Flux, Void>> stopCondenser() { return Flux .just(new StateCommand(switchRunning, reverseRunning)) .doOnNext(ignore -> logger.info("{}: stopping the condenser", getAddress())) - .flatMap(this::setState) + .doOnNext(this::setState) .doOnNext(ignore -> logger.warn("{}: letting the hardware settle for modeChangeDelay={}", getAddress(), modeChangeDelay)) // VT: FIXME: This doesn't work where as it should (see test cases) and allows the next main sequence element to jump ahead, why? @@ -284,7 +284,7 @@ private Flux, Void>> forceMode(HvacMode mode) { .map(ignore -> // If we're here, this means that the operation was carried out successfully new Signal<>(clock.instant(), - new HvacDeviceStatus( + new HvacDeviceStatus<>( reconciler.reconcile( getAddress(), requestedState, @@ -296,8 +296,14 @@ private Flux, Void>> forceMode(HvacMode mode) { } private Mono setState(StateCommand command) { + + // VT: FIXME: Late night shortcut (#292), simplify + logger.debug("{}: setState({})={}", getAddress(), command.target, command.state); - return command.target.setState(command.state); + var result = command.target.setState(command.state); + logger.debug("{}: setState result={}", getAddress(), result); + + return Mono.just(result.requested); } /** @@ -359,33 +365,26 @@ protected void doClose() throws IOException { logger.warn("Shutting down: {}", getAddress()); - Flux.just( - switchRunning, - switchFan, - switchMode) - .flatMap(s -> s.setState(false)) - .blockLast(); - - switchRunning.setState(false).block(); - switchFan.setState(false).block(); - switchMode.setState(false).block(); + switchRunning.setState(reverseRunning); + switchFan.setState(reverseFan); + switchMode.setState(reverseRunning); logger.info("Shut down: {}", getAddress()); } @Deprecated protected Mono setMode(boolean state) { - return switchMode.setState(state); + return Mono.just(switchMode.setState(state).requested); } @Deprecated protected Mono setRunning(boolean state) { - return switchRunning.setState(state); + return Mono.just(switchRunning.setState(state).requested); } @Deprecated protected Mono setFan(boolean state) { - return switchFanStack.getSwitch("demand").setState(state); + return Mono.just(switchFanStack.getSwitch("demand").setState(state).requested); } static class Reconciler { @@ -433,7 +432,7 @@ public Result reconcile(String name, HvacCommand previous, HvacCommand next) { } private record StateCommand( - Switch target, + CqrsSwitch target, boolean state ) {} } diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/NullCqrsSwitch.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/NullCqrsSwitch.java new file mode 100644 index 000000000..049fe2507 --- /dev/null +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/NullCqrsSwitch.java @@ -0,0 +1,123 @@ +package net.sf.dz3r.device.actuator; + +import net.sf.dz3r.device.DeviceState; +import net.sf.dz3r.signal.Signal; + +import java.security.SecureRandom; +import java.time.Clock; +import java.time.Duration; +import java.util.Optional; +import java.util.Random; + +/** + * Does absolutely nothing other than fulfilling the API contract and reflecting itself in the log. + * + * A useful tool for troubleshooting a configuration without actual hardware available. + * + * @author Copyright © Vadim Tkachenko 2001-2023 + */ +public class NullCqrsSwitch extends AbstractCqrsDevice implements CqrsSwitch { + + private static final Random rg = new SecureRandom(); + private boolean available = true; + + private final Duration minDelay; + private final Duration maxDelay; + + public NullCqrsSwitch(String address) { + this( + address, + Clock.systemUTC(), + null, + null, + null, + null); + } + + public NullCqrsSwitch( + String address, + Clock clock, + Duration heartbeat, Duration pace, + Duration minDelay, Duration maxDelay) { + + super(address, clock, heartbeat, pace); + + this.minDelay = Optional.ofNullable(minDelay).orElse(Duration.ZERO); + this.maxDelay = Optional.ofNullable(maxDelay).orElse(Duration.ZERO); + + checkDelays(); + } + + private void checkDelays() { + + if (minDelay.isNegative() || maxDelay.isNegative() || minDelay.minus(maxDelay).toMillis() > 0) { + throw new IllegalArgumentException("invalid delays min=" + minDelay + ", max=" + maxDelay); + } + } + + public void setAvailable(boolean available) { + this.available = available; + } + + @Override + public String getAddress() { + return id; + } + + @Override + public boolean isAvailable() { + return false; + } + + @Override + public DeviceState getState() { + return new DeviceState<>(id, available, requested, actual, 0); + } + + @Override + protected void setStateSync(Boolean command) { + var delay = getDelay(); + logger.trace("{}: setStateSync={} wait({})...", getAddress(), command, delay); + try { + Thread.sleep(delay.toMillis()); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("interrupted", ex); + } + logger.trace("{}: setStateSync={} done", getAddress(), command); + this.actual = command; + } + + private Duration getDelay() { + if (minDelay.isZero() && maxDelay.isZero()) { + return Duration.ZERO; + } + + return minDelay.plus(Duration.ofMillis(rg.nextLong(maxDelay.toMillis()))); + } + + @Override + public synchronized DeviceState setState(Boolean command) { + + logger.debug("{}: setState={}", getAddress(), command); + this.requested = command; + + queueDepth.incrementAndGet(); + commandSink.tryEmitNext(command); + + var state = getState(); + stateSink.tryEmitNext(new Signal<>(clock.instant(), state, id)); + + return state; + } + + @Override + protected Boolean getCloseCommand() { + return false; + } + + @Override + protected void closeSubclass() throws Exception { + logger.info("{}: closeSubclass()", getAddress()); + } +} diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/NullSwitch.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/NullSwitch.java index 8efed2de9..6b83ecd9e 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/NullSwitch.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/NullSwitch.java @@ -15,8 +15,11 @@ * * A useful tool for troubleshooting a configuration without actual hardware available. * + * @deprecated Use {@link NullCqrsSwitch} instead. + * * @author Copyright © Vadim Tkachenko 2001-2021 */ +@Deprecated(since = "5.0.0") public class NullSwitch extends AbstractSwitch { private static final Random rg = new SecureRandom(); diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/StackingCqrsSwitch.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/StackingCqrsSwitch.java new file mode 100644 index 000000000..f56d2df7a --- /dev/null +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/StackingCqrsSwitch.java @@ -0,0 +1,158 @@ +package net.sf.dz3r.device.actuator; + +import net.sf.dz3r.device.DeviceState; +import net.sf.dz3r.signal.Signal; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import reactor.core.publisher.Flux; + +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * A switch that turns off when the first participating virtual switch is turned on, and turns off when the last + * participating virtual switch is turned off. + * + * @author Copyright © Vadim Tkachenko 2000-2023 + */ +public class StackingCqrsSwitch implements CqrsSwitch { + + private final Logger logger = LogManager.getLogger(); + + public final String address; + + private final CqrsSwitch target; + + /** + * The key is whatever is provided to {@link #getSwitch(String)}, the value is the corresponding proxy. + */ + private final Map address2switch = new TreeMap<>(); + + /** + * Switches in "on" state. + */ + private final Set> demand = new TreeSet<>(); + + /** + * Create an instance. + * @param address An address for this switch, likely something human readable. + * @param target Switch that + */ + public StackingCqrsSwitch(String address, CqrsSwitch target) { + this.address = address; + this.target = target; + } + + @Override + public String getAddress() { + return address; + } + + @Override + public boolean isAvailable() { + return target.isAvailable(); + } + + @Override + public DeviceState getState() { + return target.getState(); + } + + @Override + public DeviceState setState(Boolean state) { + throw new UnsupportedOperationException("This switch is only controlled via its virtual switches, use getSwitch(address) to get one"); + } + + @Override + public Flux, String>> getFlux() { + return target.getFlux(); + } + + @Override + public void close() throws Exception { + target.close(); + } + /** + * Obtain a virtual switch instance. + * + * @param address Virtual switch address. + * + * @return If the address is unknown, a new switch is created, otherwise existing instance is returned. + */ + public synchronized CqrsSwitch getSwitch(String address) { + + logger.debug("getSwitch:{}", address); + return address2switch.computeIfAbsent(address, k -> createSwitch(address)); + } + + private SwitchProxy createSwitch(String address) { + + logger.debug("createSwitch:{}", address); + return new SwitchProxy(address); + } + + public class SwitchProxy implements CqrsSwitch { + + public final String address; + private boolean state = false; + + + public SwitchProxy(String address) { + this.address = address; + } + + @Override + public String getAddress() { + return address; + } + + @Override + public void close() throws Exception { + + } + + @Override + public boolean isAvailable() { + return target.isAvailable(); + } + + @Override + public DeviceState getState() { + + var targetState = target.getState(); + + return new DeviceState<>( + address, + isAvailable(), + state, + targetState.actual, + targetState.queueDepth + ); + } + + /** + * Set this switch's {@link #state}, and the {@link #target} state if necessary. + */ + @Override + public DeviceState setState(Boolean state) { + this.state = state; + + if (Boolean.TRUE.equals(state)) { + demand.add(this); + } else { + demand.remove(this); + } + + logger.debug("{}: state={} demand={}", address, state, demand.size()); + + return target.setState(!demand.isEmpty()); + } + + @Override + public Flux, String>> getFlux() { + throw new UnsupportedOperationException("likely programming error, this class should not be accessible"); + } + } +} diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/StackingSwitch.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/StackingSwitch.java index 37903c0b6..1d1e3949c 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/StackingSwitch.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/StackingSwitch.java @@ -16,8 +16,11 @@ * participating virtual switch is turned off. * * @author Copyright © Vadim Tkachenko 2000-2022 + * + * @deprecated Use {@link StackingCqrsSwitch} instead. */ -public class StackingSwitch implements Switch { +@Deprecated(since = "5.0.0") +public class StackingSwitch implements Switch { private final Logger logger = LogManager.getLogger(); diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/SwitchableHvacDevice.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/SwitchableHvacDevice.java index b6711a9f8..9e1989359 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/SwitchableHvacDevice.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/SwitchableHvacDevice.java @@ -26,7 +26,7 @@ */ public class SwitchableHvacDevice extends SingleModeHvacDevice { - private final Switch theSwitch; + private final CqrsSwitch theSwitch; private final boolean inverted; @@ -42,7 +42,7 @@ public SwitchableHvacDevice( Clock clock, String name, HvacMode mode, - Switch theSwitch, + CqrsSwitch theSwitch, boolean inverted, ResourceUsageCounter uptimeCounter ) { @@ -84,12 +84,9 @@ private Flux, Void>> computeNonBlocking(Flux(command, uptime(), null); - return theSwitch - .setState(state != inverted) - .map(ignore -> { - updateUptime(clock.instant(), state); - return new Signal<>(clock.instant(), result); - }); + theSwitch.setState(state != inverted); + updateUptime(clock.instant(), state); + return Flux.just(new Signal<>(clock.instant(), result)); }); } @@ -113,7 +110,7 @@ private boolean getState(HvacCommand command) { protected void doClose() { logger.warn("Shutting down: {}", getAddress()); logger.warn("close(): setting {} to off", theSwitch.getAddress()); - theSwitch.setState(inverted).block(); + theSwitch.setState(inverted); logger.info("Shut down: {}", getAddress()); } } diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/VariableHvacDevice.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/VariableHvacDevice.java index 0536d4769..7a3ad4b59 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/VariableHvacDevice.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/VariableHvacDevice.java @@ -13,6 +13,7 @@ import java.time.Clock; import java.time.Duration; +import static net.sf.dz3r.device.actuator.VariableOutputDevice.Command; import static net.sf.dz3r.device.actuator.VariableOutputDevice.OutputState; /** @@ -113,7 +114,7 @@ protected Flux, Void>> apply(HvacCommand co return Flux.just( new Signal<>( clock.instant(), - translate(command, actuator.setState(on, scaled)) + translate(command, actuator.setState(new Command(on, scaled))) ) ); } diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/VariableOutputDevice.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/VariableOutputDevice.java index be51516a6..c27a5c0fb 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/VariableOutputDevice.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/VariableOutputDevice.java @@ -1,8 +1,7 @@ package net.sf.dz3r.device.actuator; -import net.sf.dz3r.device.DeviceState; -import net.sf.dz3r.signal.Signal; -import reactor.core.publisher.Flux; +import static net.sf.dz3r.device.actuator.VariableOutputDevice.Command; +import static net.sf.dz3r.device.actuator.VariableOutputDevice.OutputState; /** * Variable output device. @@ -11,7 +10,7 @@ * * @author Copyright © Vadim Tkachenko 2001-2023 */ -public interface VariableOutputDevice extends AutoCloseable { +public interface VariableOutputDevice extends CqrsDevice { /** * Command to pass to the device. @@ -32,46 +31,4 @@ record OutputState( ) { } - - /** - * Get the instant state of the device. - * - * @return Actual, current state of the device, immediately. - */ - DeviceState getState(); - - /** - * Request the provided state, return immediately. - * - * If the device is not {@link #isAvailable() available}, the command is still accepted. - * - * @param on Request the device to be on, or off. - * @param output Request the output, {@code 0.0} to {@code 1.0}. - * - * @return The result of {@link #getState()} after the command was accepted (not executed). - */ - DeviceState setState(boolean on, double output); - - /** - * Check the reported device availability. - * - * @return previously reported device availability, immediately. - */ - boolean isAvailable(); - - /** - * Get the state change notification flux. - * - * Note that this sink will NOT emit error signals, however, {@link DeviceState#available} will reflect device availability - * the moment the signal was emitted. - * - * @return Flux emitting signals every time the {@link #setState(boolean, double) requested} or {@link #getState() actual} state has changed. - */ - Flux, String>> getFlux(); - - /** - * Close the device, synchronously. - */ - @Override - void close() throws Exception; } diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizer.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizer.java index e4c0a2318..24c42cf29 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizer.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizer.java @@ -1,5 +1,6 @@ package net.sf.dz3r.device.actuator.economizer; +import net.sf.dz3r.common.HCCObjects; import net.sf.dz3r.controller.HysteresisController; import net.sf.dz3r.controller.ProcessController; import net.sf.dz3r.device.Addressable; @@ -40,6 +41,8 @@ public abstract class AbstractEconomizer implements SignalProcessor> deviceCommandSink = Sinks.many().unicast().onBackpressureBuffer(); /** * Last known indoor temperature. @@ -70,12 +73,14 @@ public abstract class AbstractEconomizer implements SignalProcessor computeCombined(IndoorAmbientPair pair) { return new Signal<>(clock.instant(), -1d); } - // Let's be generous; Zigbee sensors can fall back to 60 seconds interval even if configured faster - var stale = clock.instant().minus(Duration.ofSeconds(90)); + // Let's be generous; Zigbee sensors can fall back to 60 seconds interval even if configured faster, + // and even 90 seconds can cause blips once in a while + var stale = clock.instant().minus(timeout); if (pair.indoor.timestamp.isBefore(stale) || pair.ambient.timestamp.isBefore(stale)) { diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/EconomizerContext.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/EconomizerContext.java index 9b8cf31e8..cebd0cd60 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/EconomizerContext.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/EconomizerContext.java @@ -4,6 +4,8 @@ import net.sf.dz3r.signal.Signal; import reactor.core.publisher.Flux; +import java.time.Duration; + /** * A bridge between {@link EconomizerSettings}, {@link AbstractEconomizer economizer}, and {@link net.sf.dz3r.model.Zone}. */ @@ -12,10 +14,12 @@ public class EconomizerContext { public final EconomizerSettings settings; public final Flux> ambientFlux; public final HvacDevice device; + public final Duration timeout; - public EconomizerContext(EconomizerSettings settings, Flux> ambientFlux, HvacDevice device) { + public EconomizerContext(EconomizerSettings settings, Flux> ambientFlux, HvacDevice device, Duration timeout) { this.settings = settings; this.ambientFlux = ambientFlux; this.device = device; + this.timeout = timeout; } } diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/v1/SimpleEconomizer.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/v1/SimpleEconomizer.java index c4dc6926a..c61cf25b4 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/v1/SimpleEconomizer.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/v1/SimpleEconomizer.java @@ -8,6 +8,8 @@ import org.apache.logging.log4j.ThreadContext; import reactor.core.publisher.Flux; +import java.time.Duration; + /** * Simple economizer implementation with no jitter control (rather a prototype). * @@ -24,14 +26,16 @@ public class SimpleEconomizer> extends AbstractEconomize * * @param ambientFlux Flux from the ambient temperature sensor. * @param device HVAC device acting as the economizer. + * @param timeout Stale timeout. 90 seconds is a reasonable default. */ public SimpleEconomizer( String name, EconomizerSettings settings, Flux> ambientFlux, - HvacDevice device) { + HvacDevice device, + Duration timeout) { - super(null, name, settings, device); + super(null, name, settings, device, timeout); initFluxes(ambientFlux); } diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/v2/PidEconomizer.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/v2/PidEconomizer.java index 61e3e0386..be2775ea9 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/v2/PidEconomizer.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/v2/PidEconomizer.java @@ -13,6 +13,7 @@ import reactor.core.publisher.Flux; import java.time.Clock; +import java.time.Duration; /** * Economizer implementation with PID jitter control. @@ -46,15 +47,17 @@ public class PidEconomizer> extends AbstractEconomizer { * * @param ambientFlux Flux from the ambient temperature sensor. * @param device HVAC device acting as the economizer. + * @param timeout Stale timeout. 90 seconds is a reasonable default. */ public PidEconomizer( Clock clock, String name, EconomizerSettings settings, Flux> ambientFlux, - HvacDevice device) { + HvacDevice device, + Duration timeout) { - super(clock, name, settings, device); + super(clock, name, settings, device, timeout); controller = new SimplePidController<>("(controller) " + getAddress(), 0, settings.P, settings.I, 0, settings.saturationLimit); signalRenderer = new HysteresisController<>("(signalRenderer) " + getAddress(), 0, HYSTERESIS); diff --git a/dz3r-model/src/main/java/net/sf/dz3r/model/Zone.java b/dz3r-model/src/main/java/net/sf/dz3r/model/Zone.java index ba40cf5db..85d7dbefc 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/model/Zone.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/model/Zone.java @@ -85,7 +85,13 @@ public Zone(Thermostat ts, ZoneSettings settings, EconomizerContext economizerCo setSettingsSync(new ZoneSettings(settings, ts.getSetpoint())); economizer = Optional.ofNullable(economizerContext) - .map(ctx -> new PidEconomizer<>(Clock.systemUTC(), ts.getAddress(), ctx.settings, ctx.ambientFlux, ctx.device)) + .map(ctx -> new PidEconomizer<>( + Clock.systemUTC(), + ts.getAddress(), + ctx.settings, + ctx.ambientFlux, + ctx.device, + ctx.timeout)) .orElse(null); } diff --git a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/AbstractCqrsDeviceTest.java b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/AbstractCqrsDeviceTest.java new file mode 100644 index 000000000..b76297498 --- /dev/null +++ b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/AbstractCqrsDeviceTest.java @@ -0,0 +1,97 @@ +package net.sf.dz3r.device.actuator; + +import net.sf.dz3r.common.TestClock; +import net.sf.dz3r.device.DeviceState; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Flux; + +import java.time.Clock; +import java.time.Duration; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +class AbstractCqrsDeviceTest { + + @Test + void noPace() { + var clock = new TestClock(); + var device = new PaceTest("pt", clock, null, null); + + device.limitRate("command"); + + assertThatCode(() -> { + device.limitRate("command"); + }).doesNotThrowAnyException(); + } + + @ParameterizedTest + @MethodSource("getPaceStream") + void pace(Flux source) { + + var clock = new TestClock(); + var device = new PaceTest("pt", clock, null, Duration.ofSeconds(30)); + + source + .doOnNext(t -> { + clock.setOffset(t.offset); + assertThat(device.limitRate(t.command).blockFirst()).isEqualTo(t.expected); + }) + .blockLast(); + } + + private record PaceTuple( + String command, + Duration offset, + String expected + ) { + + } + + static Stream> getPaceStream() { + + return Stream.of( + Flux.just( + new PaceTuple("A", Duration.ZERO, "A"), // Pace is 30 seconds + new PaceTuple("A", Duration.ofSeconds(40), "A"), // Beyond the window, should be let through + new PaceTuple("A", Duration.ofSeconds(50), null), // Falls within, swallowed + new PaceTuple("B", Duration.ofSeconds(51), "B") // Different command, let through + ) + ); + } + + private class PaceTest extends AbstractCqrsDevice { + + protected PaceTest(String id, Clock clock, Duration heartbeat, Duration pace) { + super(id, clock, heartbeat, pace); + } + + @Override + protected void setStateSync(String command) { + throw new IllegalStateException("shouldn't be used"); + } + + @Override + protected String getCloseCommand() { + throw new IllegalStateException("shouldn't be used"); + } + + @Override + protected void closeSubclass() throws Exception { + throw new IllegalStateException("shouldn't be used"); + } + + @Override + public boolean isAvailable() { + throw new IllegalStateException("shouldn't be used"); + } + + @Override + public DeviceState setState(String state) { + throw new IllegalStateException("shouldn't be used"); + } + } +} diff --git a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/AbstractSwitchTest.java b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/AbstractSwitchTest.java index a2cb02563..48957dc17 100644 --- a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/AbstractSwitchTest.java +++ b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/AbstractSwitchTest.java @@ -1,5 +1,6 @@ package net.sf.dz3r.device.actuator; +import net.sf.dz3r.common.TestClock; import org.junit.jupiter.api.Test; import reactor.core.scheduler.Scheduler; @@ -123,37 +124,4 @@ protected boolean getStateSync() throws IOException { return state; } } - - private static class TestClock extends Clock { - - private final Clock baseClock; - private Duration offset = Duration.ofSeconds(0); - - private TestClock() { - baseClock = Clock.systemUTC(); - } - - private TestClock(Clock baseClock) { - this.baseClock = baseClock; - } - - public void setOffset(Duration offset) { - this.offset = offset; - } - - @Override - public ZoneId getZone() { - return baseClock.getZone(); - } - - @Override - public Clock withZone(ZoneId zone) { - return baseClock.withZone(zone); - } - - @Override - public Instant instant() { - return baseClock.instant().plus(offset); - } - } } diff --git a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/HeatPumpTest.java b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/HeatPumpTest.java index 7aabe4fb0..b6560933c 100644 --- a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/HeatPumpTest.java +++ b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/HeatPumpTest.java @@ -35,9 +35,9 @@ private SwitchPack getSwitchPack() { // VT: NOTE: Might need to use this for running parameterized tests with different schedulers return new SwitchPack( - new NullSwitch("mode", true, scheduler), - new NullSwitch("running", true, scheduler), - new NullSwitch("fan", true, scheduler) + new NullCqrsSwitch("mode"), + new NullCqrsSwitch("running"), + new NullCqrsSwitch("fan") ); } @@ -434,8 +434,8 @@ void delayElementsFromMono2() { } private record SwitchPack( - Switch mode, - Switch running, - Switch fan + CqrsSwitch mode, + CqrsSwitch running, + CqrsSwitch fan ) {} } diff --git a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/SwitchableHvacDeviceTest.java b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/SwitchableHvacDeviceTest.java index 73e216943..e8dbabb0e 100644 --- a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/SwitchableHvacDeviceTest.java +++ b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/SwitchableHvacDeviceTest.java @@ -10,6 +10,7 @@ import reactor.test.StepVerifier; import java.time.Clock; +import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -23,9 +24,9 @@ class SwitchableHvacDeviceTest { void lifecycle() { var now = Instant.now(); - var minDelayMillis = 50; - var maxDelayMillis = 200; - var s = new NullSwitch("a", false, minDelayMillis, maxDelayMillis, null); + var minDelay = Duration.ofMillis(50); + var maxDelay = Duration.ofMillis(200); + var s = new NullCqrsSwitch("a", Clock.systemUTC(), null, null, minDelay, maxDelay); var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", COOLING, s, false, null); var sequence = Flux.just( @@ -64,7 +65,7 @@ void lifecycle() { @Disabled("for now; need to fix blocking operation first") void wrongMode() { - var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", COOLING, mock(Switch.class), false, null); + var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", COOLING, mock(CqrsSwitch.class), false, null); var sequence = Flux.just( new Signal(Instant.now(), new HvacCommand(HvacMode.HEATING, 0.8, null)) ); @@ -89,7 +90,7 @@ void wrongMode() { @Disabled("until #222 is fixed") void noFansForHeating() { - var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", HvacMode.HEATING, mock(Switch.class), false, null); + var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", HvacMode.HEATING, mock(CqrsSwitch.class), false, null); var sequence = Flux.just( new Signal(Instant.now(), new HvacCommand(null, 0.8, 1.0)) ); @@ -110,7 +111,7 @@ void noFansForHeating() { @Test void allowFansForCooling() { - var s = new NullSwitch("a"); + var s = new NullCqrsSwitch("a"); var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", COOLING, s, false, null); var sequence = Flux.just( new Signal(Instant.now(), new HvacCommand(null, 0.8, 1.0)) @@ -132,7 +133,7 @@ void allowFansForCooling() { @Test void modeOnly() { - var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", COOLING, mock(Switch.class), false, null); + var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", COOLING, mock(CqrsSwitch.class), false, null); var sequence = Flux.just( new Signal(Instant.now(), new HvacCommand(COOLING, null, null)) ); @@ -151,7 +152,7 @@ void modeOnly() { void interleave() { var now = Instant.now(); - var s = new NullSwitch("a"); + var s = new NullCqrsSwitch("a"); var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", COOLING, s, false, null); var sequence = Flux.just( @@ -197,7 +198,7 @@ void interleave() { void inverted() { var now = Instant.now(); - var s = new NullSwitch("a"); + var s = new NullCqrsSwitch("a"); var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", COOLING, s, true, null); var sequence = Flux.just( @@ -214,14 +215,14 @@ void inverted() { d.compute(sequence).log().blockLast(); // A bit simpler than full, but it'll do - assertThat(s.getState().block()).isTrue(); + assertThat(s.getState().requested).isTrue(); } @Test void nonBlockingPass() { var now = Instant.now(); - var s = new NullSwitch("a"); + var s = new NullCqrsSwitch("a"); var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", COOLING, s, false, null); var sequence = Flux .just(new Signal(now, new HvacCommand(null, 0.8, null))) diff --git a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizerTest.java b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizerTest.java index a56f49eaa..0dd6d302b 100644 --- a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizerTest.java +++ b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizerTest.java @@ -2,7 +2,7 @@ import net.sf.dz3r.controller.ProcessController; import net.sf.dz3r.device.actuator.HvacDevice; -import net.sf.dz3r.device.actuator.NullSwitch; +import net.sf.dz3r.device.actuator.NullCqrsSwitch; import net.sf.dz3r.device.actuator.SwitchableHvacDevice; import net.sf.dz3r.model.HvacMode; import net.sf.dz3r.signal.Signal; @@ -11,6 +11,7 @@ import reactor.core.publisher.Flux; import java.time.Clock; +import java.time.Duration; import java.util.stream.Stream; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -38,7 +39,7 @@ void targetAdjustmentTest(TargetAdjustmentTestData source) { Clock.systemUTC(), "d", HvacMode.COOLING, - new NullSwitch("s"), + new NullCqrsSwitch("s"), false, null) ); @@ -59,7 +60,7 @@ private class TestEconomizer extends AbstractEconomizer { * @param device HVAC device acting as the economizer. */ protected TestEconomizer(String name, EconomizerSettings settings, HvacDevice device) { - super(Clock.systemUTC(), name, settings, device); + super(Clock.systemUTC(), name, settings, device, Duration.ofSeconds(90)); } @Override diff --git a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/economizer/v1/SimpleEconomizerTest.java b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/economizer/v1/SimpleEconomizerTest.java index 7df245ddc..3b96b87d9 100644 --- a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/economizer/v1/SimpleEconomizerTest.java +++ b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/economizer/v1/SimpleEconomizerTest.java @@ -1,6 +1,6 @@ package net.sf.dz3r.device.actuator.economizer.v1; -import net.sf.dz3r.device.actuator.NullSwitch; +import net.sf.dz3r.device.actuator.NullCqrsSwitch; import net.sf.dz3r.device.actuator.SwitchableHvacDevice; import net.sf.dz3r.device.actuator.economizer.EconomizerSettings; import net.sf.dz3r.model.HvacMode; @@ -15,6 +15,7 @@ import reactor.tools.agent.ReactorDebugAgent; import java.time.Clock; +import java.time.Duration; import java.time.Instant; import java.util.concurrent.CountDownLatch; @@ -51,7 +52,7 @@ void dropToOnAndBack() { Clock.systemUTC(), "d", HvacMode.COOLING, - new NullSwitch("s"), + new NullCqrsSwitch("s"), false, null); @@ -64,7 +65,8 @@ void dropToOnAndBack() { "economizer", settings, deferredAmbientFlux, - device); + device, + Duration.ofSeconds(90)); economizer .compute(Flux.just(new Signal<>(Instant.now(), indoor))) diff --git a/dz3r-model/src/test/resources/log4j2.xml b/dz3r-model/src/test/resources/log4j2.xml index 9f4ce2f2b..d90353193 100644 --- a/dz3r-model/src/test/resources/log4j2.xml +++ b/dz3r-model/src/test/resources/log4j2.xml @@ -2,11 +2,11 @@ - + - + diff --git a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v1/ESPHomeFan.java b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v1/ESPHomeFan.java deleted file mode 100644 index d7eed2f21..000000000 --- a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v1/ESPHomeFan.java +++ /dev/null @@ -1,264 +0,0 @@ -package net.sf.dz3r.device.esphome.v1; - -import com.hivemq.client.mqtt.datatypes.MqttQos; -import net.sf.dz3r.common.HCCObjects; -import net.sf.dz3r.device.DeviceState; -import net.sf.dz3r.device.actuator.VariableOutputDevice; -import net.sf.dz3r.device.mqtt.v1.MqttAdapter; -import net.sf.dz3r.device.mqtt.v1.MqttSignal; -import net.sf.dz3r.instrumentation.Marker; -import net.sf.dz3r.signal.Signal; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.ThreadContext; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; -import reactor.core.scheduler.Schedulers; - -import java.time.Clock; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Driver for ESPHome Fan Component. - * - * Important: {@code speed_count} needs to be left at default (100) for this driver to operate correctly. - * - * @author Copyright © Vadim Tkachenko 2001-2023 - */ -public class ESPHomeFan implements VariableOutputDevice { - - private final Logger logger = LogManager.getLogger(); - - private final String id; - private final Clock clock; - private final MqttAdapter adapter; - private final String stateTopic; - private final String commandTopic; - private final String speedStateTopic; - private final String speedCommandTopic; - private String availabilityMessage; - - private final AtomicInteger queueDepth = new AtomicInteger(); - - private final Disposable availabilityFlux; - private final Disposable rootFlux; - private final Sinks.Many commandSink = Sinks.many().multicast().onBackpressureBuffer(); - private final Disposable commandSubscription; - private final Sinks.Many, String>> stateSink = Sinks.many().multicast().onBackpressureBuffer(); - - private OutputState requested; - private OutputState actual; - - public ESPHomeFan( - String id, - MqttAdapter adapter, - String rootTopic, - String availabilityTopic) { - - this( - id, - Clock.systemUTC(), - adapter, - rootTopic, - availabilityTopic - ); - } - - public ESPHomeFan( - String id, - Clock clock, - MqttAdapter adapter, - String rootTopic, - String availabilityTopic) { - - this.id = HCCObjects.requireNonNull(id, "id can't be null"); - this.clock = HCCObjects.requireNonNull(clock, "adapter can't be null"); - this.adapter = HCCObjects.requireNonNull(adapter, "adapter can't be null"); - HCCObjects.requireNonNull(rootTopic, "rootTopic can't be null"); - - HCCObjects.requireNonNull(availabilityTopic, "availabilityTopic can't be null"); - - // Defaults - stateTopic = rootTopic + "/state"; - commandTopic = rootTopic + "/command"; - speedStateTopic = rootTopic + "/speed_level/state"; - speedCommandTopic = rootTopic + "/speed_level/command"; - - commandSubscription = commandSink - .asFlux() - .publishOn(Schedulers.newSingle("espfan-" + id)) - .subscribe(this::setStateSync); - - availabilityFlux = adapter - .getFlux(availabilityTopic, true) - .subscribe(this::parseAvailability); - rootFlux = adapter - .getFlux(rootTopic, true) - .subscribe(this::parseState); - } - - private void parseAvailability(MqttSignal message) { - - this.availabilityMessage = message.message(); - stateSink.tryEmitNext(getStateSignal()); - } - - /** - * Parse device state coming from {@link #adapter}. - * - * @param message Incoming MQTT message. - */ - private void parseState(MqttSignal message) { - - // VT: NOTE: MqttAdapter has already logged the message at TRACE level - - tryParseState(message); - tryParseSpeed(message); - - stateSink.tryEmitNext(getStateSignal()); - } - - private void tryParseState(MqttSignal message) { - - if (!stateTopic.equals(message.topic())) { - return; - } - - switch (message.message()) { - case "OFF" -> actual = mergeState(actual, false); - case "ON" -> actual = mergeState(actual, true); - default -> logger.error("{}: can't parse state from {}", id, message); - } - } - - private OutputState mergeState(OutputState state, boolean on) { - - if (state == null) { - return new OutputState(on, null); - } - - return new OutputState(on, actual.output()); - } - - private void tryParseSpeed(MqttSignal message) { - - if (!speedStateTopic.equals(message.topic())) { - return; - } - - try { - - var speed = Integer.parseInt(message.message()); - - actual = mergeSpeed(actual, speed / 100d); - - } catch (NumberFormatException ex) { - logger.error("{}: can't parse speed from {}", id, message, ex); - } - } - - private OutputState mergeSpeed(OutputState state, double speed) { - - if (state == null) { - return new OutputState(null, speed); - } - - return new OutputState(state.on(), speed); - } - - @Override - public DeviceState getState() { - - return new DeviceState<>( - id, - isAvailable(), - requested, - actual, - queueDepth.get() - ); - } - - @Override - public synchronized DeviceState setState(boolean on, double output) { - - if (output < 0 || output > 1) { - throw new IllegalArgumentException("speed given (" + output + ") is outside of 0..1 range"); - } - - this.requested = new OutputState(on, output); - queueDepth.incrementAndGet(); - commandSink.tryEmitNext(new Command(on, output)); - - var state = getState(); - stateSink.tryEmitNext(new Signal<>(clock.instant(), state, id)); - - return state; - } - - /** - * Set the requested state, synchronously - * - * @param command Command to execute. - */ - private void setStateSync(Command command) { - - ThreadContext.push("setStateSync"); - var m = new Marker("setStateSync", Level.TRACE); - - try { - - // This will translate into two commands, but so be it - - adapter.publish(commandTopic, command.on() ? "ON" : "OFF", MqttQos.AT_LEAST_ONCE, false); - adapter.publish(speedCommandTopic, Integer.toString((int) (command.output() * 100)), MqttQos.AT_LEAST_ONCE, false); - - queueDepth.decrementAndGet(); - - stateSink.tryEmitNext(getStateSignal()); - - } finally { - m.close(); - ThreadContext.pop(); - } - } - - @Override - public boolean isAvailable() { - return "online".equals(availabilityMessage); - } - - @Override - public Flux, String>> getFlux() { - return stateSink.asFlux(); - } - - private Signal, String> getStateSignal() { - return new Signal<>(clock.instant(), getState(), id); - } - - @Override - public void close() throws Exception { - - // Prevent new commands from coming in - commandSubscription.dispose(); - - // Shut down the device - setStateSync(new Command(false, 0d)); - - // Adjust the queue depth - previous command skewed it - queueDepth.incrementAndGet(); - - // Emit the final notification - stateSink.tryEmitNext(getStateSignal()); - - // Indicate that we're done - stateSink.tryEmitComplete(); - - // Close the comms channel - rootFlux.dispose(); - availabilityFlux.dispose(); - adapter.close(); - } -} diff --git a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v1/ESPHomeSwitch.java b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v1/ESPHomeSwitch.java index 4573792eb..6dabb57da 100644 --- a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v1/ESPHomeSwitch.java +++ b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v1/ESPHomeSwitch.java @@ -22,7 +22,10 @@ * @see net.sf.dz3r.device.z2m.v1.Z2MSwitch * * @author Copyright © Vadim Tkachenko 2001-2023 + * + * @deprecated Use {@link net.sf.dz3r.device.esphome.v2.ESPHomeCqrsSwitch} instead. */ +@Deprecated(since = "5.0.0") public class ESPHomeSwitch extends AbstractMqttSwitch { private final String deviceRootTopic; diff --git a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v2/ESPHomeCqrsSwitch.java b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v2/ESPHomeCqrsSwitch.java new file mode 100644 index 000000000..ca8e221d5 --- /dev/null +++ b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v2/ESPHomeCqrsSwitch.java @@ -0,0 +1,83 @@ +package net.sf.dz3r.device.esphome.v2; + +import net.sf.dz3r.common.HCCObjects; +import net.sf.dz3r.device.mqtt.v1.MqttAdapter; +import net.sf.dz3r.device.mqtt.v1.MqttSignal; +import net.sf.dz3r.device.mqtt.v2.AbstractMqttCqrsSwitch; + +import java.time.Clock; +import java.time.Duration; + +/** + * Implementation to control ESPHome Switch Component via + * ESPHome MQTT Client Component. + * + * See HOWTO: DZ to ESPHome integration + * for configuration details. + * + * @see net.sf.dz3r.device.zwave.v2.ZWaveCqrsBinarySwitch + * @see net.sf.dz3r.device.z2m.v2.Z2MCqrsSwitch + * + * @author Copyright © Vadim Tkachenko 2001-2023 + */ +public class ESPHomeCqrsSwitch extends AbstractMqttCqrsSwitch { + + private final String availabilityTopic; + public ESPHomeCqrsSwitch( + String id, + Clock clock, + Duration heartbeat, + Duration pace, + MqttAdapter adapter, + String rootTopic, + String availabilityTopic) { + super(id, clock, heartbeat, pace, adapter, rootTopic); + + this.availabilityTopic = HCCObjects.requireNonNull(availabilityTopic, "esphome.switches.availability-topic can't be null (id=" + id + ")"); + } + + @Override + protected boolean includeSubtopics() { + return true; + } + + @Override + protected String getAvailabilityTopic() { + return availabilityTopic; + } + + @Override + protected String getStateTopic() { + return rootTopic + "/state"; + } + + @Override + protected String getCommandTopic() { + return rootTopic + "/command"; + } + + @Override + protected String renderPayload(Boolean state) { + return Boolean.TRUE.equals(state) ? "ON" : "OFF"; + } + + @Override + protected void parseState(MqttSignal message) { + if (!getStateTopic().equals(message.topic())) { + return; + } + + switch (message.message()) { + case "OFF" -> actual = false; + case "ON" -> actual = true; + default -> logger.error("{}: can't parse state from {}", id, message); + } + + stateSink.tryEmitNext(getStateSignal()); + } + + @Override + public boolean isAvailable() { + return "online".equals(availabilityMessage); + } +} diff --git a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v2/ESPHomeFan.java b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v2/ESPHomeFan.java new file mode 100644 index 000000000..b858f846f --- /dev/null +++ b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/esphome/v2/ESPHomeFan.java @@ -0,0 +1,188 @@ +package net.sf.dz3r.device.esphome.v2; + +import com.hivemq.client.mqtt.datatypes.MqttQos; +import net.sf.dz3r.common.HCCObjects; +import net.sf.dz3r.device.actuator.VariableOutputDevice; +import net.sf.dz3r.device.mqtt.v1.MqttAdapter; +import net.sf.dz3r.device.mqtt.v1.MqttSignal; +import net.sf.dz3r.device.mqtt.v2.AbstractMqttCqrsDevice; +import net.sf.dz3r.instrumentation.Marker; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.ThreadContext; + +import java.time.Clock; +import java.time.Duration; + +import static net.sf.dz3r.device.actuator.VariableOutputDevice.Command; +import static net.sf.dz3r.device.actuator.VariableOutputDevice.OutputState; + +/** + * Driver for ESPHome Fan Component. + * + * Important: {@code speed_count} needs to be left at default (100) for this driver to operate correctly. + * + * @author Copyright © Vadim Tkachenko 2001-2023 + */ +public class ESPHomeFan extends AbstractMqttCqrsDevice implements VariableOutputDevice { + + private final String speedStateTopic; + private final String speedCommandTopic; + private final String availabilityTopic; + + public ESPHomeFan( + String id, + Clock clock, + Duration heartbeat, + Duration pace, + MqttAdapter mqttAdapter, + String rootTopic, + String availabilityTopic) { + super( + id, clock, + heartbeat, pace, + mqttAdapter, rootTopic); + + this.availabilityTopic = HCCObjects.requireNonNull(availabilityTopic, "esphome.fans.availability-topic can't be null (id=" + id + ")"); + + // Defaults + speedStateTopic = rootTopic + "/speed_level/state"; + speedCommandTopic = rootTopic + "/speed_level/command"; + } + + @Override + protected boolean includeSubtopics() { + return true; + } + + @Override + protected String getAvailabilityTopic() { + return availabilityTopic; + } + + /** + * Parse device state coming from the {@link #mqttAdapter}. + * + * @param message Incoming MQTT message. + */ + @Override + protected void parseState(MqttSignal message) { + + // VT: NOTE: MqttAdapter has already logged the message at TRACE level + + tryParseState(message); + tryParseSpeed(message); + + stateSink.tryEmitNext(getStateSignal()); + } + + @Override + protected String getStateTopic() { + return rootTopic + "/state"; + } + + @Override + protected String getCommandTopic() { + return rootTopic + "/command"; + } + + private void tryParseState(MqttSignal message) { + + if (!getStateTopic().equals(message.topic())) { + return; + } + + switch (message.message()) { + case "OFF" -> actual = mergeState(actual, false); + case "ON" -> actual = mergeState(actual, true); + default -> logger.error("{}: can't parse state from {}", id, message); + } + } + + private OutputState mergeState(OutputState state, boolean on) { + + if (state == null) { + return new OutputState(on, null); + } + + return new OutputState(on, actual.output()); + } + + private void tryParseSpeed(MqttSignal message) { + + if (!speedStateTopic.equals(message.topic())) { + return; + } + + try { + + var speed = Integer.parseInt(message.message()); + + actual = mergeSpeed(actual, speed / 100d); + + } catch (NumberFormatException ex) { + logger.error("{}: can't parse speed from {}", id, message, ex); + } + } + + private OutputState mergeSpeed(OutputState state, double speed) { + + if (state == null) { + return new OutputState(null, speed); + } + + return new OutputState(state.on(), speed); + } + + @Override + protected void checkCommand(Command command) { + + super.checkCommand(command); + + if (command.output() < 0 || command.output() > 1) { + throw new IllegalArgumentException("speed given (" + command.output() + ") is outside of 0..1 range"); + } + } + + @Override + protected OutputState translateCommand(Command command) { + return new OutputState(command.on(), command.output()); + } + + /** + * Set the requested state, synchronously + * + * @param command Command to execute. + */ + @Override + protected void setStateSync(Command command) { + + ThreadContext.push("setStateSync"); + var m = new Marker("setStateSync", Level.TRACE); + + try { + + // This will translate into two commands, but so be it + + mqttAdapter.publish(getCommandTopic(), command.on() ? "ON" : "OFF", MqttQos.AT_LEAST_ONCE, false); + mqttAdapter.publish(speedCommandTopic, Integer.toString((int) (command.output() * 100)), MqttQos.AT_LEAST_ONCE, false); + + queueDepth.decrementAndGet(); + + stateSink.tryEmitNext(getStateSignal()); + + } finally { + m.close(); + ThreadContext.pop(); + } + } + + @Override + protected Command getCloseCommand() { + return new Command(false, 0d); + } + + @Override + public boolean isAvailable() { + return "online".equals(availabilityMessage); + } +} diff --git a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/mqtt/v2/AbstractMqttCqrsDevice.java b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/mqtt/v2/AbstractMqttCqrsDevice.java new file mode 100644 index 000000000..83f7d096d --- /dev/null +++ b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/mqtt/v2/AbstractMqttCqrsDevice.java @@ -0,0 +1,111 @@ +package net.sf.dz3r.device.mqtt.v2; + +import net.sf.dz3r.common.HCCObjects; +import net.sf.dz3r.device.DeviceState; +import net.sf.dz3r.device.actuator.AbstractCqrsDevice; +import net.sf.dz3r.device.mqtt.v1.MqttAdapter; +import net.sf.dz3r.device.mqtt.v1.MqttSignal; +import net.sf.dz3r.signal.Signal; +import reactor.core.Disposable; + +import java.time.Clock; +import java.time.Duration; + +public abstract class AbstractMqttCqrsDevice extends AbstractCqrsDevice { + + protected final MqttAdapter mqttAdapter; + protected final String rootTopic; + + protected String availabilityMessage; + + private Disposable availabilityFlux; + private final Disposable rootFlux; + + protected AbstractMqttCqrsDevice( + String id, Clock clock, + Duration heartbeat, Duration pace, + MqttAdapter mqttAdapter, + String rootTopic + ) { + super(id, clock, heartbeat, pace); + + this.mqttAdapter = HCCObjects.requireNonNull(mqttAdapter, "mqttAdapter can't be null"); + this.rootTopic = HCCObjects.requireNonNull(rootTopic, "rootTopic can't be null"); + + rootFlux = initRootFlux(); + } + + protected Disposable initAvailabilityFlux() { + return mqttAdapter + .getFlux(getAvailabilityTopic(), false) + .subscribe(this::parseAvailability); + } + + protected abstract boolean includeSubtopics(); + protected abstract String getAvailabilityTopic(); + protected abstract String getStateTopic(); + protected abstract String getCommandTopic(); + + protected void parseAvailability(MqttSignal message) { + this.availabilityMessage = message.message(); + stateSink.tryEmitNext(getStateSignal()); + } + + protected Disposable initRootFlux() { + return mqttAdapter + .getFlux(rootTopic, includeSubtopics()) + .subscribe(this::parseState); + } + + protected abstract void parseState(MqttSignal mqttSignal); + + @Override + public final DeviceState setState(I newState) { + + // VT: NOTE: Ugly hack to allow ESPHome classes to use availability topic not available to the constructor of this class + synchronized (this) { + + if (availabilityFlux == null) { + availabilityFlux = initAvailabilityFlux(); + } + } + + checkCommand(newState); + + this.requested = translateCommand(newState); + queueDepth.incrementAndGet(); + commandSink.tryEmitNext(newState); + + var state = getState(); + stateSink.tryEmitNext(new Signal<>(clock.instant(), state, id)); + + return state; + } + + /** + * Translate the command into state. + * + * @param command Command to interpret. + * @return State it translates into. + */ + protected abstract O translateCommand(I command); + + /** + * Make sure the input argument is sane. + * + * @param command Command to check. + */ + protected void checkCommand(I command) { + HCCObjects.requireNonNull(command, "command can't be null"); + } + + @Override + protected final void closeSubclass() throws Exception { + + // Close the comms channel + rootFlux.dispose(); + availabilityFlux.dispose(); + + mqttAdapter.close(); + } +} diff --git a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/mqtt/v2/AbstractMqttCqrsSwitch.java b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/mqtt/v2/AbstractMqttCqrsSwitch.java new file mode 100644 index 000000000..fb20344ea --- /dev/null +++ b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/mqtt/v2/AbstractMqttCqrsSwitch.java @@ -0,0 +1,65 @@ +package net.sf.dz3r.device.mqtt.v2; + +import com.hivemq.client.mqtt.datatypes.MqttQos; +import net.sf.dz3r.device.actuator.CqrsSwitch; +import net.sf.dz3r.device.mqtt.v1.MqttAdapter; +import net.sf.dz3r.device.mqtt.v1.MqttMessageAddress; +import net.sf.dz3r.instrumentation.Marker; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.ThreadContext; + +import java.time.Clock; +import java.time.Duration; + +public abstract class AbstractMqttCqrsSwitch extends AbstractMqttCqrsDevice implements CqrsSwitch { + + protected AbstractMqttCqrsSwitch( + String id, Clock clock, + Duration heartbeat, Duration pace, + MqttAdapter mqttAdapter, + String rootTopic + ) { + super( + id, clock, + heartbeat, pace, + mqttAdapter, rootTopic + ); + } + + @Override + protected Boolean getCloseCommand() { + return false; + } + + @Override + protected final void setStateSync(Boolean command) { + + ThreadContext.push("setStateSync"); + var m = new Marker("setStateSync", Level.TRACE); + + try { + mqttAdapter.publish( + getCommandTopic(), + renderPayload(command), + MqttQos.AT_LEAST_ONCE, + false); + queueDepth.decrementAndGet(); + + } finally { + m.close(); + ThreadContext.pop(); + } + } + + + @Override + protected final Boolean translateCommand(Boolean command) { + return command; + } + protected abstract String renderPayload(Boolean state); + + @Override + public MqttMessageAddress getAddress() { + return null; + } +} diff --git a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/z2m/v1/Z2MSwitch.java b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/z2m/v1/Z2MSwitch.java index d6a093248..732d084a4 100644 --- a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/z2m/v1/Z2MSwitch.java +++ b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/z2m/v1/Z2MSwitch.java @@ -23,7 +23,10 @@ * @see net.sf.dz3r.device.zwave.v1.ZWaveBinarySwitch * * @author Copyright © Vadim Tkachenko 2001-2023 + * + * @deprecated Use {@link net.sf.dz3r.device.z2m.v2.Z2MCqrsSwitch} instead. */ +@Deprecated(since = "5.0.0") public class Z2MSwitch extends AbstractMqttSwitch { private final ObjectMapper objectMapper = new ObjectMapper(); diff --git a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/z2m/v2/Z2MCqrsSwitch.java b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/z2m/v2/Z2MCqrsSwitch.java new file mode 100644 index 000000000..7918f0e45 --- /dev/null +++ b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/z2m/v2/Z2MCqrsSwitch.java @@ -0,0 +1,101 @@ +package net.sf.dz3r.device.z2m.v2; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sf.dz3r.device.mqtt.v1.MqttAdapter; +import net.sf.dz3r.device.mqtt.v1.MqttSignal; +import net.sf.dz3r.device.mqtt.v2.AbstractMqttCqrsSwitch; +import org.apache.logging.log4j.ThreadContext; + +import java.time.Clock; +import java.time.Duration; +import java.util.Map; + +/** + * Implementation for a Zigbee switch over Zigbee2MQTT. + * + * @see net.sf.dz3r.device.esphome.v2.ESPHomeCqrsSwitch + * @see net.sf.dz3r.device.zwave.v2.ZWaveCqrsBinarySwitch + * + * @author Copyright © Vadim Tkachenko 2001-2023 + */ +public class Z2MCqrsSwitch extends AbstractMqttCqrsSwitch { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + public Z2MCqrsSwitch( + String id, + Clock clock, + Duration heartbeat, + Duration pace, + MqttAdapter mqttAdapter, + String rootTopic) { + super(id, clock, heartbeat, pace, mqttAdapter, rootTopic); + } + + @Override + protected void parseState(MqttSignal message) { + ThreadContext.push("parseState"); + try { + + if (!getStateTopic().equals(message.topic())) { + return; + } + + var payload = objectMapper.readValue(message.message(), Map.class); + + logger.debug("payload: {}", payload); + + var stateString = String.valueOf(payload.get("state")); + switch (stateString) { + case "OFF" -> actual = false; + case "ON" -> actual = true; + default -> logger.error("{}: can't parse state from {}", id, message.message()); + } + + stateSink.tryEmitNext(getStateSignal()); + + } catch (JsonProcessingException e) { + throw new IllegalStateException("Can't parse JSON: " + message, e); + } finally { + ThreadContext.pop(); + } + } + + @Override + public boolean isAvailable() { + + // VT: NOTE: per https://www.zigbee2mqtt.io/guide/configuration/device-availability.html, + // the default timeout is 10 minutes - that'll send the queue depth through the roof, and if THIS is reported + // as offline, then it is seriously offline. Might need to take measures not to saturate the device queue with + // obsolete commands - this calls for a "stale command timeout" configuration value. + + return availabilityMessage != null && availabilityMessage.contains("online"); + } + + @Override + protected boolean includeSubtopics() { + return false; + } + + @Override + protected String getAvailabilityTopic() { + return rootTopic + "/availability"; + } + + @Override + protected String getStateTopic() { + // Z2M pushes device state as JSON in the device root topic + return rootTopic; + } + + @Override + protected String getCommandTopic() { + return rootTopic + "/set"; + } + + @Override + protected String renderPayload(Boolean state) { + return "{\"state\": \"" + (Boolean.TRUE.equals(state) ? "ON" : "OFF") + "\"}"; + } +} diff --git a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/zwave/v1/ZWaveBinarySwitch.java b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/zwave/v1/ZWaveBinarySwitch.java index 888ac86e6..6991d473f 100644 --- a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/zwave/v1/ZWaveBinarySwitch.java +++ b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/zwave/v1/ZWaveBinarySwitch.java @@ -25,7 +25,10 @@ * @see net.sf.dz3r.device.z2m.v1.Z2MSwitch * * @author Copyright © Vadim Tkachenko 2001-2023 + * + * @deprecated Use {@link net.sf.dz3r.device.zwave.v2.ZWaveCqrsBinarySwitch} instead. */ +@Deprecated(since = "5.0.0") public class ZWaveBinarySwitch extends AbstractMqttSwitch { private final ObjectMapper objectMapper = new ObjectMapper(); diff --git a/dz3r-mqtt/src/main/java/net/sf/dz3r/device/zwave/v2/ZWaveCqrsBinarySwitch.java b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/zwave/v2/ZWaveCqrsBinarySwitch.java new file mode 100644 index 000000000..dd76225d0 --- /dev/null +++ b/dz3r-mqtt/src/main/java/net/sf/dz3r/device/zwave/v2/ZWaveCqrsBinarySwitch.java @@ -0,0 +1,102 @@ +package net.sf.dz3r.device.zwave.v2; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sf.dz3r.device.mqtt.v1.MqttAdapter; +import net.sf.dz3r.device.mqtt.v1.MqttSignal; +import net.sf.dz3r.device.mqtt.v2.AbstractMqttCqrsSwitch; +import org.apache.logging.log4j.ThreadContext; + +import java.time.Clock; +import java.time.Duration; +import java.util.Map; + +public class ZWaveCqrsBinarySwitch extends AbstractMqttCqrsSwitch { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + public ZWaveCqrsBinarySwitch( + String id, + Clock clock, + Duration heartbeat, + Duration pace, + MqttAdapter mqttAdapter, String rootTopic) { + super(id, clock, heartbeat, pace, mqttAdapter, rootTopic); + } + + @Override + protected void parseState(MqttSignal message) { + ThreadContext.push("parseState"); + try { + + if (!getStateTopic().equals(message.topic())) { + return; + } + + var payload = objectMapper.readValue(message.message(), Map.class); + + logger.debug("payload: {}", payload); + + actual = Boolean.valueOf(payload.get("value").toString()); + + } catch (JsonProcessingException e) { + + actual = null; + throw new IllegalStateException("Can't parse JSON: " + message, e); + + } finally { + ThreadContext.pop(); + } + } + + @Override + protected boolean includeSubtopics() { + return true; + } + + @Override + protected String getAvailabilityTopic() { + return rootTopic + "/status"; + } + + @Override + public boolean isAvailable() { + // Check for "Alive" or "Dead" topic payload, otherwise log error and return false + + if (availabilityMessage == null) { + return false; + } + + if (availabilityMessage.toLowerCase().contains("alive")) { + return true; + } + + if (availabilityMessage.toLowerCase().contains("dead")) { + return false; + } + + logger.warn("{}: unknown availability message: {}", getAddress(), availabilityMessage); + return false; + } + + /** + * Z-Wave gateway specific "get state" topic name. + */ + @Override + protected String getStateTopic() { + return rootTopic + "/37/0/currentValue"; + } + + /** + * Z-Wave gateway specific "set state" topic name. + */ + @Override + protected String getCommandTopic() { + return rootTopic + "/37/0/targetValue/set"; + } + + @Override + protected String renderPayload(Boolean state) { + return "{\"value\": " + state + "}"; + } +} diff --git a/dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v1/ESPHomeSwitchTest.java b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v1/ESPHomeSwitchTest.java index 7501451ae..1dd697449 100644 --- a/dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v1/ESPHomeSwitchTest.java +++ b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v1/ESPHomeSwitchTest.java @@ -26,7 +26,7 @@ class ESPHomeSwitchTest { private final String MQTT_BROKER = "mqtt-esphome"; - private final String ESPHOME_SWITCH_TOPIC = "/esphome/0156AC/switch/t-relay-2-r3"; + private final String ESPHOME_SWITCH_TOPIC = "/esphome/81B190/switch/t-relay-3-r3"; @BeforeAll static void init() { diff --git a/dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v2/ESPHomeCqrsSwitchTest.java b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v2/ESPHomeCqrsSwitchTest.java new file mode 100644 index 000000000..ae447a38e --- /dev/null +++ b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v2/ESPHomeCqrsSwitchTest.java @@ -0,0 +1,77 @@ +package net.sf.dz3r.device.esphome.v2; + +import net.sf.dz3r.device.mqtt.v1.MqttAdapter; +import net.sf.dz3r.device.mqtt.v1.MqttEndpoint; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; +import reactor.tools.agent.ReactorDebugAgent; + +import java.time.Clock; +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThatCode; + +@EnabledIfEnvironmentVariable( + named = "TEST_HCC_ESPHOME_SWITCH", + matches = "safe", + disabledReason = "Only execute this test if a suitable MQTT broker and ESPHome switch device are available" +) +class ESPHomeCqrsSwitchTest { + + private final Logger logger = LogManager.getLogger(); + + private final String MQTT_BROKER = "mqtt-esphome"; + + private final String ESPHOME_SWITCH_TOPIC = "/esphome/81B190/switch/t-relay-3-r3"; + private final String ESPHOME_AVAILABILITY_TOPIC = "/esphome/81B190/status"; + + @BeforeAll + static void init() { + ReactorDebugAgent.init(); + } + + @Test + void setState1010() { + + assertThatCode(() -> { + + try (var adapter = new MqttAdapter(new MqttEndpoint(MQTT_BROKER))) { + + var esphomeSwitch = new ESPHomeCqrsSwitch( + "s", + Clock.systemUTC(), + null, null, + adapter, + ESPHOME_SWITCH_TOPIC, + ESPHOME_AVAILABILITY_TOPIC + ); + + // VT: NOTE: This switch doesn't control anything critical now, does it? + + var send = Flux + .just(true, false, true, false) + .delayElements(Duration.ofSeconds(1)) + .publishOn(Schedulers.boundedElastic()) + .doOnNext(state -> { + logger.info("Switch state requested={}", state); + esphomeSwitch.setState(state); + }); + + // This is likely to miss the last status update; good enough for now + esphomeSwitch + .getFlux() + .subscribe(s -> logger.info("status: {}", s)); + + send.blockLast(); + + logger.info("DONE"); + } + + }).doesNotThrowAnyException(); + } +} diff --git a/dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v1/ESPHomeFanTest.java b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v2/ESPHomeFanTest.java similarity index 71% rename from dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v1/ESPHomeFanTest.java rename to dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v2/ESPHomeFanTest.java index 203ff0756..854cda9ff 100644 --- a/dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v1/ESPHomeFanTest.java +++ b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/esphome/v2/ESPHomeFanTest.java @@ -1,4 +1,4 @@ -package net.sf.dz3r.device.esphome.v1; +package net.sf.dz3r.device.esphome.v2; import net.sf.dz3r.device.mqtt.v1.MqttAdapter; import net.sf.dz3r.device.mqtt.v1.MqttEndpoint; @@ -9,8 +9,10 @@ import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; +import java.time.Clock; import java.time.Duration; +import static net.sf.dz3r.device.actuator.VariableOutputDevice.Command; import static org.assertj.core.api.Assertions.assertThatCode; @EnabledIfEnvironmentVariable( @@ -22,21 +24,24 @@ class ESPHomeFanTest { private final Logger logger = LogManager.getLogger(); - private final String host = "mqtt-esphome"; - private final String fanTopic = "/esphome/550212/fan/a6-0"; - private final String availabilityTopic = "/esphome/550212/status"; + private final String MQTT_BROKER = "mqtt-esphome"; + private final String FAN_TOPIC = "/esphome/550212/fan/a6-0"; + private final String AVAILABILITY_TOPIC = "/esphome/550212/status"; @Test void sendCycle() throws Exception { assertThatCode(() -> { - var adapter = new MqttAdapter(new MqttEndpoint(host)); + var adapter = new MqttAdapter(new MqttEndpoint(MQTT_BROKER)); var fan = new ESPHomeFan( "a6", + Clock.systemUTC(), + null, + null, adapter, - fanTopic, - availabilityTopic + FAN_TOPIC, + AVAILABILITY_TOPIC ); var status = fan @@ -48,7 +53,7 @@ void sendCycle() throws Exception { Flux .just(0d, 0.25d, 0.5d, 0.75d, 1d) .delayElements(Duration.ofSeconds(1)) - .map(level -> fan.setState(true, level)) + .map(level -> fan.setState(new Command(true, level))) .doOnNext(state -> logger.info("state/sent: {}", state)) .blockLast(); diff --git a/dz3r-mqtt/src/test/java/net/sf/dz3r/device/z2m/v1/Z2MSwitchTest.java b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/z2m/v1/Z2MSwitchTest.java index 52618f26f..4b97e8506 100644 --- a/dz3r-mqtt/src/test/java/net/sf/dz3r/device/z2m/v1/Z2MSwitchTest.java +++ b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/z2m/v1/Z2MSwitchTest.java @@ -27,7 +27,7 @@ class Z2MSwitchTest { private final String MQTT_BROKER = "mqtt-zigbee"; - private final String ZIGBEE_SWITCH_TOPIC = "zigbee2mqtt/sengled-00"; + private final String ZIGBEE_SWITCH_TOPIC = "zigbee2mqtt-dev/sengled-01"; @BeforeAll static void init() { diff --git a/dz3r-mqtt/src/test/java/net/sf/dz3r/device/z2m/v2/Z2MCqrsSwitchTest.java b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/z2m/v2/Z2MCqrsSwitchTest.java new file mode 100644 index 000000000..8b6425ba5 --- /dev/null +++ b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/z2m/v2/Z2MCqrsSwitchTest.java @@ -0,0 +1,75 @@ +package net.sf.dz3r.device.z2m.v2; + +import net.sf.dz3r.device.mqtt.v1.MqttAdapter; +import net.sf.dz3r.device.mqtt.v1.MqttEndpoint; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; +import reactor.tools.agent.ReactorDebugAgent; + +import java.time.Clock; +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThatCode; + +@EnabledIfEnvironmentVariable( + named = "TEST_HCC_Z2M_SWITCH", + matches = "safe", + disabledReason = "Only execute this test if a suitable MQTT broker and Zigbee switch device are available" +) +class Z2MCqrsSwitchTest { + + private final Logger logger = LogManager.getLogger(); + + private final String MQTT_BROKER = "mqtt-zigbee"; + + private final String ZIGBEE_SWITCH_TOPIC = "zigbee2mqtt-dev/sengled-01"; + + @BeforeAll + static void init() { + ReactorDebugAgent.init(); + } + + @Test + void setState1010() { + + assertThatCode(() -> { + + try (var adapter = new MqttAdapter(new MqttEndpoint(MQTT_BROKER))) { + + var z2mSwitch = new Z2MCqrsSwitch( + "zigbee", + Clock.systemUTC(), + null, null, + adapter, + ZIGBEE_SWITCH_TOPIC + ); + + // VT: NOTE: This switch doesn't control anything critical now, does it? + + var send = Flux + .just(true, false, true, false) + .delayElements(Duration.ofSeconds(1)) + .publishOn(Schedulers.boundedElastic()) + .doOnNext(state -> { + logger.info("Switch state requested={}", state); + z2mSwitch.setState(state); + }); + + // This is likely to miss the last status update; good enough for now + z2mSwitch + .getFlux() + .subscribe(s -> logger.info("status: {}", s)); + + send.blockLast(); + + logger.info("DONE"); + } + + }).doesNotThrowAnyException(); + } +} diff --git a/dz3r-mqtt/src/test/java/net/sf/dz3r/device/zwave/v1/ZWaveBinarySwitchTest.java b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/zwave/v1/ZWaveBinarySwitchTest.java index 5e9eb35fa..12a03539b 100644 --- a/dz3r-mqtt/src/test/java/net/sf/dz3r/device/zwave/v1/ZWaveBinarySwitchTest.java +++ b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/zwave/v1/ZWaveBinarySwitchTest.java @@ -16,7 +16,7 @@ import static org.assertj.core.api.Assertions.assertThatCode; @EnabledIfEnvironmentVariable( - named = "TEST_DZ_ZWAVE_SWITCH", + named = "TEST_HCC_ZWAVE_SWITCH", matches = "safe", disabledReason = "Only execute this test if a suitable MQTT broker and Z-Wave switch device are available" ) @@ -26,7 +26,7 @@ class ZWaveBinarySwitchTest { private final String MQTT_BROKER = "mqtt-zwave"; - private final String ZWAVE_SWITCH_TOPIC = "zwave/TestLab/MP31ZP-0"; + private final String ZWAVE_SWITCH_TOPIC = "zwave/SE_Bedroom/MP31ZP-0"; @BeforeAll static void init() { diff --git a/dz3r-mqtt/src/test/java/net/sf/dz3r/device/zwave/v2/ZWaveCqrsBinarySwitchTest.java b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/zwave/v2/ZWaveCqrsBinarySwitchTest.java new file mode 100644 index 000000000..db6d96ac2 --- /dev/null +++ b/dz3r-mqtt/src/test/java/net/sf/dz3r/device/zwave/v2/ZWaveCqrsBinarySwitchTest.java @@ -0,0 +1,75 @@ +package net.sf.dz3r.device.zwave.v2; + +import net.sf.dz3r.device.mqtt.v1.MqttAdapter; +import net.sf.dz3r.device.mqtt.v1.MqttEndpoint; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; +import reactor.tools.agent.ReactorDebugAgent; + +import java.time.Clock; +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThatCode; + +@EnabledIfEnvironmentVariable( + named = "TEST_HCC_ZWAVE_SWITCH", + matches = "safe", + disabledReason = "Only execute this test if a suitable MQTT broker and Z-Wave switch device are available" +) +class ZWaveCqrsBinarySwitchTest { + + private final Logger logger = LogManager.getLogger(); + + private final String MQTT_BROKER = "mqtt-zwave"; + + private final String ZWAVE_SWITCH_TOPIC = "zwave/SE_Bedroom/MP31ZP-0"; + + @BeforeAll + static void init() { + ReactorDebugAgent.init(); + } + + @Test + void setState1010() { + + assertThatCode(() -> { + + try (var adapter = new MqttAdapter(new MqttEndpoint(MQTT_BROKER))) { + + var zwaveSwitch = new ZWaveCqrsBinarySwitch( + "zwave", + Clock.systemUTC(), + null, null, + adapter, + ZWAVE_SWITCH_TOPIC + ); + + // VT: NOTE: This switch doesn't control anything critical now, does it? + + var send = Flux + .just(true, false, true, false) + .delayElements(Duration.ofSeconds(1)) + .publishOn(Schedulers.boundedElastic()) + .doOnNext(state -> { + logger.info("Switch state requested={}", state); + zwaveSwitch.setState(state); + }); + + // This is likely to miss the last status update; good enough for now + zwaveSwitch + .getFlux() + .subscribe(s -> logger.info("status: {}", s)); + + send.blockLast(); + + logger.info("DONE"); + } + + }).doesNotThrowAnyException(); + } +} diff --git a/dz3r-raspberry-pi/src/main/java/net/sf/dz3r/device/actuator/pi/autohat/AutomationHatWrapper.java b/dz3r-raspberry-pi/src/main/java/net/sf/dz3r/device/actuator/pi/autohat/AutomationHatWrapper.java index d45d378fd..75e9dd87f 100644 --- a/dz3r-raspberry-pi/src/main/java/net/sf/dz3r/device/actuator/pi/autohat/AutomationHatWrapper.java +++ b/dz3r-raspberry-pi/src/main/java/net/sf/dz3r/device/actuator/pi/autohat/AutomationHatWrapper.java @@ -3,8 +3,12 @@ import com.homeclimatecontrol.autohat.AutomationHAT; import com.homeclimatecontrol.autohat.Relay; import com.homeclimatecontrol.autohat.pi.PimoroniAutomationHAT; -import net.sf.dz3r.device.actuator.AbstractSwitch; -import net.sf.dz3r.device.actuator.Switch; +import net.sf.dz3r.device.DeviceState; +import net.sf.dz3r.device.actuator.CqrsSwitch; +import net.sf.dz3r.signal.Signal; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import reactor.core.publisher.Flux; import java.io.IOException; import java.util.List; @@ -13,10 +17,11 @@ * Wrapper to convert independent abstractions of {@link AutomationHAT} * into ones acceptable for DZ, and make them available at a proper point in DZ entities' lifecycle. * - * @author Copyright © Vadim Tkachenko 2001-2021 + * @author Copyright © Vadim Tkachenko 2001-2023 */ public class AutomationHatWrapper { + private static final Logger logger = LogManager.getLogger(AutomationHatWrapper.class); private static AutomationHatWrapper instance; private final AutomationHAT hat; @@ -44,35 +49,94 @@ private AutomationHatWrapper() throws IOException { * * @return All the relays as a list. */ - public List> relay() { + public List> relay() { return List.of( switchWrapper(hat.relay().get(0), "R0"), switchWrapper(hat.relay().get(1), "R1"), switchWrapper(hat.relay().get(2), "R2")); } - private Switch switchWrapper(Relay relay, String name) { + private CqrsSwitch switchWrapper(Relay relay, String name) { return new RelayWrapper(relay, name); } - private static class RelayWrapper extends AbstractSwitch { + private static class RelayWrapper implements CqrsSwitch { + private final String name; private final Relay target; + private boolean available = true; + + private Boolean requested; + private Boolean actual; + public RelayWrapper(Relay target, String name) { - super(name); + this.name = name; this.target = target; } + private boolean readState() throws IOException { + // This value will be missing just on startup, unlikely to be a problem + return target.read().orElse(false); + } + @Override - protected void setStateSync(boolean state) throws IOException { - target.write(state); + public String getAddress() { + return name; } @Override - protected boolean getStateSync() throws IOException { - // This value will be missing just on startup, unlikely to be a problem - return target.read().orElse(false); + public boolean isAvailable() { + return available; + } + + @Override + public DeviceState getState() { + + if (available) { + + try { + actual = readState(); + } catch (IOException ex) { + available = false; + logger.error("{}: readState() failed, marking device unavailable", getAddress(), ex); + } + } + + return new DeviceState<>( + name, + isAvailable(), + requested, + actual, + 0 + ); + } + + @Override + public DeviceState setState(Boolean state) { + + this.requested = state; + + try { + target.write(state); + this.actual = state; + available = true; + } catch (IOException ex) { + available = false; + logger.error("{}: setState({}) failed, marking device unavailable", getAddress(), state, ex); + } + + return getState(); + } + + @Override + public Flux, String>> getFlux() { + throw new UnsupportedOperationException("likely programming error, this class should not be accessible"); + } + + @Override + public void close() throws Exception { + } } } diff --git a/dz3r-webui/src/main/java/net/sf/dz3r/view/webui/v2/RoutingConfiguration.java b/dz3r-webui/src/main/java/net/sf/dz3r/view/webui/v2/RoutingConfiguration.java index 829974ead..4d50347de 100644 --- a/dz3r-webui/src/main/java/net/sf/dz3r/view/webui/v2/RoutingConfiguration.java +++ b/dz3r-webui/src/main/java/net/sf/dz3r/view/webui/v2/RoutingConfiguration.java @@ -29,6 +29,9 @@ public RouterFunction monoRouterFunction(WebUI webUI) { GET("/zones").and(ACCEPT_JSON), webUI::getZones).andRoute( GET("/zone/{zone}").and(ACCEPT_JSON), webUI::getZone).andRoute( + GET("/uptime").and(ACCEPT_JSON), webUI::getUptime).andRoute( + GET("/version").and(ACCEPT_JSON), webUI::getVersion).andRoute( + // Mutators POST("/zone{zone}").and(ACCEPT_JSON), webUI::setZone).andRoute( diff --git a/dz3r-webui/src/main/java/net/sf/dz3r/view/webui/v2/WebUI.java b/dz3r-webui/src/main/java/net/sf/dz3r/view/webui/v2/WebUI.java index 8e70529be..cf51d1c3d 100644 --- a/dz3r-webui/src/main/java/net/sf/dz3r/view/webui/v2/WebUI.java +++ b/dz3r-webui/src/main/java/net/sf/dz3r/view/webui/v2/WebUI.java @@ -2,6 +2,7 @@ import net.sf.dz3r.instrumentation.InstrumentCluster; import net.sf.dz3r.model.UnitDirector; +import net.sf.dz3r.runtime.GitProperties; import net.sf.dz3r.runtime.config.model.TemperatureUnit; import net.sf.dz3r.signal.Signal; import net.sf.dz3r.signal.hvac.ZoneStatus; @@ -20,10 +21,16 @@ import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.text.SimpleDateFormat; import java.time.Duration; import java.time.Instant; +import java.time.ZoneId; import java.util.AbstractMap; +import java.util.Date; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -263,6 +270,36 @@ public Mono setUnit(ServerRequest rq) { return ServerResponse.unprocessableEntity().bodyValue("Stay tuned, coming soon"); } + public Mono getUptime(ServerRequest rq) { + logger.info("GET /uptime"); + + var mx = ManagementFactory.getRuntimeMXBean(); + var startMillis = mx.getStartTime(); + var uptimeMillis = mx.getUptime(); + var start = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS").format(new Date(startMillis)) + " " + ZoneId.systemDefault(); + var uptime = Duration.ofMillis(uptimeMillis).toString().substring(2).replaceAll("(\\d[HMS])(?!$)", "$1 ").toLowerCase(); + + // Let's make the JSON order predictable + var result = new LinkedHashMap<>(); + + result.put("start", start); + result.put("uptime", uptime); + result.put("start.millis", startMillis); + result.put("uptime.millis", uptimeMillis); + + return ok().contentType(MediaType.APPLICATION_JSON).body(Flux.fromIterable(result.entrySet()), Object.class); + } + + public Mono getVersion(ServerRequest rq) { + logger.info("GET /version"); + + try { + return ok().contentType(MediaType.APPLICATION_JSON).body(Flux.fromIterable(GitProperties.get().entrySet()), Object.class); + } catch (IOException ex) { + throw new IllegalStateException("This shouldn't have happened", ex); + } + } + private interface Initializer { void init(T source); } diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 033e24c4c..7f93135c4 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ac72c34e8..3fa8f862f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index fcb6fca14..0adc8e1a5 100755 --- a/gradlew +++ b/gradlew @@ -83,7 +83,8 @@ done # This is normally unused # shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum