Skip to content

Commit

Permalink
Add stream fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Bre77 committed Dec 3, 2024
1 parent 066c8da commit 30e7432
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 6 deletions.
15 changes: 11 additions & 4 deletions custom_components/teslemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Final

from tesla_fleet_api import EnergySpecific, Teslemetry, VehicleSpecific
from tesla_fleet_api.const import Scope
from tesla_fleet_api.const import Scope, TelemetryField
from tesla_fleet_api.exceptions import (
InvalidToken,
SubscriptionRequired,
Expand Down Expand Up @@ -32,7 +32,7 @@
TeslemetryVehicleDataCoordinator,
)
from .const import TeslemetryState
from .helpers import flatten
from .helpers import flatten, AddStreamFields
from .models import TeslemetryData, TeslemetryEnergyData, TeslemetryVehicleData
from .services import async_register_services

Expand Down Expand Up @@ -73,6 +73,8 @@ def receive(self, data: dict) -> None:
self.coordinator.data["state"] = data["state"]
self.coordinator.async_set_updated_data(self.coordinator.data)



async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Telemetry integration."""
async_register_services(hass)
Expand Down Expand Up @@ -132,6 +134,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
vin = product["vin"]
api = VehicleSpecific(teslemetry.vehicle, vin)
coordinator = TeslemetryVehicleDataCoordinator(hass, api, product)
fields = AddStreamFields(stream, vin)

device = DeviceInfo(
identifiers={(DOMAIN, vin)},
Expand All @@ -144,13 +147,17 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:

device_registry.async_get_or_create(config_entry_id=entry.entry_id, **device)

# A function to collect multiple stream field updates before sending them


vehicles.append(
TeslemetryVehicleData(
api=api,
coordinator=coordinator,
stream=stream,
vin=vin,
device=device,
fields=fields,
)
)
elif "energy_site_id" in product and Scope.ENERGY_DEVICE_DATA in scopes:
Expand Down Expand Up @@ -274,11 +281,11 @@ def handle_errors(event: dict) -> None:
LOGGER.debug("Streaming received error from %s", vehicle.vin)
if errors := event.get("errors"):
for error in errors:
if error["startedAt"] <= vehicle.last_error:
if error["createdAt"] <= vehicle.last_error:
break
error["vin"] = vehicle.vin
hass.bus.fire("teslemetry_error", error)
vehicle.last_error = errors[0]["startedAt"]
vehicle.last_error = errors[0]["createdAt"]

def handle_vehicle_data(data: dict) -> None:
"""Handle vehicle data from the stream."""
Expand Down
2 changes: 1 addition & 1 deletion custom_components/teslemetry/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .const import LOGGER, TeslemetryState, DOMAIN, ENERGY_HISTORY_FIELDS
from .helpers import flatten

VEHICLE_INTERVAL = timedelta(seconds=30)
VEHICLE_INTERVAL = timedelta(minutes=30)
VEHICLE_WAIT = timedelta(minutes=15)
ENERGY_LIVE_INTERVAL = timedelta(seconds=30)
ENERGY_INFO_INTERVAL = timedelta(seconds=30)
Expand Down
36 changes: 36 additions & 0 deletions custom_components/teslemetry/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
self._attr_translation_key = f"stream_{streaming_key.lower()}"
self.stream = data.stream
self.vin = data.vin
self.fields = data.fields

self._attr_unique_id = f"{data.vin}-stream_{streaming_key.lower()}"
self._attr_device_info = data.device
Expand All @@ -54,13 +55,48 @@ async def async_added_to_hass(self) -> None:
{"vin": self.vin, "data": {self.streaming_key: None}},
)
)
self.hass.async_create_task(self.fields.add(self.streaming_key))

def _handle_stream_update(self, data: dict[str, Any]) -> None:
"""Handle updated data from the stream."""
self._async_value_from_stream(data["data"][self.streaming_key])
self.async_write_ha_state()


class TeslemetryVehicleComplexStreamEntity:
"""Parent class for Teslemetry Vehicle Stream entities with multiple keys."""

_attr_has_entity_name = True

def __init__(
self, data: TeslemetryVehicleData, key: str, streaming_keys: [TelemetryField]
) -> None:
"""Initialize common aspects of a Teslemetry entity."""
self.streaming_keys = streaming_keys

self._attr_translation_key = key
self.stream = data.stream
self.vin = data.vin

self._attr_unique_id = f"{data.vin}-{key}"
self._attr_device_info = data.device

async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super().async_added_to_hass()
if self.stream.server:
self.async_on_remove(
self.stream.async_add_listener(
self._handle_stream_update,
{"vin": self.vin, "data": ({key: None} for key in self.streaming_keys)},
)
)

def _handle_stream_update(self, data: dict[str, Any]) -> None:
"""Handle updated data from the stream."""
self._async_value_from_stream(data["data"])
self.async_write_ha_state()

class TeslemetryEntity(
CoordinatorEntity[
TeslemetryVehicleDataCoordinator
Expand Down
38 changes: 37 additions & 1 deletion custom_components/teslemetry/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

import asyncio
from typing import Any
from homeassistant.exceptions import HomeAssistantError
from teslemetry_stream import TeslemetryStream
from tesla_fleet_api.const import TelemetryField
from tesla_fleet_api.exceptions import TeslaFleetError

from homeassistant.exceptions import HomeAssistantError
from .const import DOMAIN, LOGGER, TeslemetryState


def flatten(data: dict[str, Any], parent: str | None = None) -> dict[str, Any]:
"""Flatten the data structure."""
result = {}
Expand Down Expand Up @@ -132,3 +136,35 @@ def _ignore_drop(value):
return _last_value

return _ignore_drop

class AddStreamFields:
"""Handle streaming field updates."""

def __init__(self, stream: TeslemetryStream, vin: str):
# A dictionary of TelemetryField keys and null values
self.stream: TeslemetryStream = stream
self.vin: str = vin
self.fields: dict[TelemetryField, None] = {}
self.lock = asyncio.Lock()

async def add(self, field: TelemetryField) -> None:
"""Handle vehicle data from the stream."""
if field in self.stream.fields:
LOGGER.debug("Streaming field %s already enabled @ %ss", field, self.stream.fields[field].get('interval_seconds'))
return
self.fields[field] = None
async with self.lock:
# Short circuit if no fields are present
if len(self.fields) == 0:
return
# Collect additional fields before sending
await asyncio.sleep(1)
# Copy the field list and empty it
fields = self.fields.copy()
self.fields.clear()

resp = await self.stream.update_fields(fields, self.vin)
if(resp.get("response",{}).get("updated_vehicles") == 1):
LOGGER.debug("Added streaming fields %s", ", ".join(fields.keys()))
else:
LOGGER.warning("Unable to add streaming fields %s", ", ".join(fields.keys()))
2 changes: 2 additions & 0 deletions custom_components/teslemetry/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
TeslemetryVehicleDataCoordinator,
)

from .helpers import AddStreamFields

@dataclass
class TeslemetryData:
Expand All @@ -41,6 +42,7 @@ class TeslemetryVehicleData:
stream: TeslemetryStream
vin: str
device: DeviceInfo
fields: AddStreamFields
wakelock = asyncio.Lock()
last_alert: str = dt_util.utcnow().isoformat()
last_error: str = dt_util.utcnow().isoformat()
Expand Down

0 comments on commit 30e7432

Please sign in to comment.