-
Notifications
You must be signed in to change notification settings - Fork 0
/
coordinator.py
158 lines (125 loc) · 5.69 KB
/
coordinator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Coordinators for willow."""
from __future__ import annotations
from datetime import timedelta
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
import logging
from homeassistant.helpers.entity import Entity
from homeassistant.const import PERCENTAGE
from homeassistant.core import callback
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
)
from .const import DOMAIN, CONF_IP_ADDRESS, NAME, MANUFACTURER, MODEL
_LOGGER = logging.getLogger(__name__)
class InverterCoordinator(DataUpdateCoordinator):
"""Inverter coordinator.
The CoordinatorEntity class provides:
should_poll
async_update
async_added_to_hass
available
"""
def __init__(self, hass, entry, ip_address):
"""Initialize coordinator."""
super().__init__(
hass,
_LOGGER,
# Name of the data. For logging purposes.
name=DOMAIN,
# Polling interval. Will only be polled if there are subscribers.
update_interval=timedelta(seconds=15),
)
self._hass = hass
self._entry = entry
self._ip_address = ip_address
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN].setdefault(entry.entry_id, {
"name": entry.title,
"ip": ip_address,
"model": MODEL,
"status": "OFFLINE"
})
@property
def device_info(self):
"""Return information to link this entity with the correct device."""
return {
"identifiers": {
(DOMAIN, self._entry.entry_id)
},
"name": NAME,
"manufacturer": MANUFACTURER,
"model": MODEL
}
async def _async_update_data(self):
"""Fetch data from API endpoint.
This is the place to pre-process the data to lookup tables
so entities can quickly look up their data.
"""
client = AsyncModbusTcpClient(self._ip_address, port=1502) # IP-Adresse und Port des Inverters
data = {
"scaleFactor": 1,
"batteryWorkCapacity": 0,
"maxChargePower": 0,
"maxDischargePower": 0,
"minSoc": 5,
"maxSoc": 100
}
try:
connection = await client.connect()
if connection:
# Power Scale Factor
result = await client.read_holding_registers(1025, count=1, slave=71)
if not result.isError():
#result_float = client.convert_from_registers(registers=result.registers, data_type=client.DATATYPE.FLOAT32, word_order="little")
power_scale_factor = client.convert_from_registers(registers=result.registers, data_type=client.DATATYPE.INT16)
data["scaleFactor"] = power_scale_factor
else:
_LOGGER.error("Error reading registers")
# batteryWorkCapacity
result = await client.read_holding_registers(1068, count=2, slave=71)
if not result.isError():
result_float = client.convert_from_registers(registers=list(reversed(result.registers)), data_type=client.DATATYPE.FLOAT32)
data["batteryWorkCapacity"] = result_float
else:
_LOGGER.error("Error reading registers")
# Battery max. charge/discharge power limit and Minimum/Maximum SOC
result = await client.read_holding_registers(1038, count=8, slave=71)
if not result.isError():
max_charge_power_limit = client.convert_from_registers(registers=list(reversed(result.registers[:2])), data_type=client.DATATYPE.FLOAT32)
max_discharge_power_limit = client.convert_from_registers(registers=list(reversed(result.registers[2:4])), data_type=client.DATATYPE.FLOAT32)
min_soc = client.convert_from_registers(registers=list(reversed(result.registers[4:6])), data_type=client.DATATYPE.FLOAT32)
max_soc = client.convert_from_registers(registers=list(reversed(result.registers[6:8])), data_type=client.DATATYPE.FLOAT32)
data["maxChargePower"] = max_charge_power_limit
data["maxDischargePower"] = max_discharge_power_limit
data["minSoc"] = min_soc
data["maxSoc"] = max_soc
else:
_LOGGER.error("Error reading registers")
else:
_LOGGER.error("Connection failed")
except ModbusException as e:
_LOGGER.error(f"Modbus error: {e}")
finally:
client.close()
return data
async def async_set_min_soc(self, value: float) -> None:
"""set minimum soc"""
_LOGGER.warn("InverterCoordinator async_set_min_soc")
client = AsyncModbusTcpClient(self._ip_address, port=1502) # IP-Adresse und Port des Inverters
try:
connection = await client.connect()
if connection:
registers = client.convert_to_registers(value=value, data_type=client.DATATYPE.FLOAT32)
result = await client.write_registers(1042, values=list(reversed(registers)), slave=71)
if not result.isError():
_LOGGER.error("Error writing registers")
else:
_LOGGER.error("Connection failed")
except ModbusException as e:
_LOGGER.error(f"Modbus error: {e}")
finally:
client.close()