From ddab9fbfaf8e7c89aa16e0910e37a98e4e34093c Mon Sep 17 00:00:00 2001 From: Clemens Zagler Date: Tue, 14 Jan 2025 11:21:04 +0100 Subject: [PATCH] refactor measurement handling to allow for heterogenous periods and fix issues related to it --- .../bdp/writer/dal/MeasurementAbstract.java | 1 + .../dal/MeasurementAbstractHistory.java | 336 +++++++++--------- .../bdp/writer/dal/MeasurementJSON.java | 4 +- .../bdp/writer/dal/PersistenceIT.java | 2 +- 4 files changed, 181 insertions(+), 162 deletions(-) diff --git a/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstract.java b/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstract.java index c190c418..b0870d01 100644 --- a/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstract.java +++ b/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstract.java @@ -55,6 +55,7 @@ public abstract class MeasurementAbstract implements Serializable { public abstract MeasurementAbstract findLatestEntry(EntityManager em, Station station, DataType type, Integer period); public abstract Date getDateOfLastRecord(EntityManager em, Station station, DataType type, Integer period); public abstract void setValue(Object value); + public abstract Object getValue(); protected MeasurementAbstract() { this.created_on = new Date(); diff --git a/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstractHistory.java b/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstractHistory.java index 558ab3e0..591ddd36 100644 --- a/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstractHistory.java +++ b/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstractHistory.java @@ -8,9 +8,14 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Map.Entry; +import java.util.function.BiFunction; +import java.util.function.Function; import jakarta.persistence.CascadeType; import jakarta.persistence.Column; @@ -31,9 +36,13 @@ import com.opendatahub.timeseries.bdp.dto.dto.SimpleRecordDto; /** - *

This entity contains all measurements and is the biggest container for the data. - * Each measurement must extend this base class to keep integrity. - * It contains the two most important references to station and type and also contains generic + *

+ * This entity contains all measurements and is the biggest container for the + * data. + * Each measurement must extend this base class to keep + * integrity. + * It contains the two most important references to station and type and also + * contains generic * methods on how data gets stored and retrieved. * * @author Peter Moser @@ -64,16 +73,18 @@ public abstract class MeasurementAbstractHistory implements Serializable { @ManyToOne(optional = true, fetch = FetchType.LAZY) private Provenance provenance; - public abstract List findRecords(EntityManager em, String stationtype, String identifier, String cname, Date start, Date end, Integer period); + public abstract List findRecords(EntityManager em, String stationtype, String identifier, String cname, + Date start, Date end, Integer period); protected MeasurementAbstractHistory() { this.created_on = new Date(); } + /** - * @param station entity the measurement refers to - * @param type entity the measurement refers to + * @param station entity the measurement refers to + * @param type entity the measurement refers to * @param timestamp UTC time of the measurement detection - * @param period standard interval between 2 measurements + * @param period standard interval between 2 measurements */ protected MeasurementAbstractHistory(Station station, DataType type, Date timestamp, Integer period) { this.station = station; @@ -98,6 +109,7 @@ public Date getTimestamp() { public void setTimestamp(Date timestamp) { this.timestamp = timestamp; } + public Station getStation() { return station; } @@ -129,22 +141,34 @@ public Provenance getProvenance() { public void setProvenance(Provenance provenance) { this.provenance = provenance; } + public abstract void setValue(Object value); + public abstract Object getValue(); /** *

- * persists all measurement data send to the writer from data collectors to the database.
+ * persists all measurement data send to the writer from data collectors to the + * database.
* This method goes down the data tree and persists all new records
- * it also updates the newest measurement in {@link MeasurementAbstract}, if it really is newer + * it also updates the newest measurement in {@link MeasurementAbstract}, if it + * really is newer *

- * @param em entity manager - * @param stationType typology of the specific station, e.g., MeteoStation, EnvironmentStation - * @param dataMap container for data send from data collector containing measurements
- * Data is received in a tree structure, containing in the first level the identifier of the correlated station,
- * on the second level the identifier of the correlated data type and on the last level the data itself - * @throws JPAException if data is in any way corrupted or one of the references {@link Station}, {@link DataType}
does not exist in the database yet + * + * @param em entity manager + * @param stationType typology of the specific station, e.g., MeteoStation, + * EnvironmentStation + * @param dataMap container for data send from data collector containing + * measurements
+ * Data is received in a tree structure, containing in the + * first level the identifier of the correlated station,
+ * on the second level the identifier of the correlated data + * type and on the last level the data itself + * @throws JPAException if data is in any way corrupted or one of the references + * {@link Station}, {@link DataType}
+ * does not exist in the database yet */ + @SuppressWarnings("unchecked") public static void pushRecords(EntityManager em, String stationType, DataMapDto dataMap) { Log log = new Log(LOG, "pushRecords"); try { @@ -156,10 +180,12 @@ public static void pushRecords(EntityManager em, String stationType, DataMapDto< for (Entry> stationEntry : dataMap.getBranch().entrySet()) { Station station = Station.findStation(em, stationType, stationEntry.getKey()); if (station == null) { - log.warn(String.format("Station '%s/%s' not found. Skipping...", stationType, stationEntry.getKey())); + log.warn(String.format("Station '%s/%s' not found. Skipping...", stationType, + stationEntry.getKey())); continue; } - for(Entry> typeEntry : stationEntry.getValue().getBranch().entrySet()) { + for (Entry> typeEntry : stationEntry.getValue().getBranch() + .entrySet()) { try { DataType type = DataType.findByCname(em, typeEntry.getKey()); if (type == null) { @@ -172,145 +198,86 @@ public static void pushRecords(EntityManager em, String stationType, DataMapDto< continue; } - //TODO: remove period check once it gets removed from database - Integer period = ((SimpleRecordDto) dataRecords.get(0)).getPeriod(); - if (period == null){ - log.warn("No period specified. Skipping..."); - continue; - } + // Some datacollectors write multiple periods in a single call. + // They need to be handled as if they were separate datatypes, each with their + // own latest measurement + Map periods = new HashMap<>(); em.getTransaction().begin(); - MeasurementAbstract latestNumberMeasurement = MeasurementAbstract.findLatestEntry(em, station, type, period, Measurement.class); - long latestNumberMeasurementTime = (latestNumberMeasurement != null) ? latestNumberMeasurement.getTimestamp().getTime() : 0; - MeasurementAbstract latestStringMeasurement = MeasurementAbstract.findLatestEntry(em, station, type, period, MeasurementString.class); - long latestStringMeasurementTime = (latestStringMeasurement != null) ? latestStringMeasurement.getTimestamp().getTime() : 0; - MeasurementAbstract latestJSONMeasurement = MeasurementAbstract.findLatestEntry(em, station, type, period, MeasurementJSON.class); - long latestJSONMeasurementTime = (latestJSONMeasurement != null) ? latestJSONMeasurement.getTimestamp().getTime() : 0; - - SimpleRecordDto newestStringDto = null; - SimpleRecordDto newestNumberDto = null; - SimpleRecordDto newestJsonDto = null; - - List dupList = new ArrayList<>(); - for (RecordDtoImpl recordDto : dataRecords) { - - /* - * XXX We support only SimpleRecordDtos at the moment. This should be removed, - * when we see that we do not need anything else then SimpleRecords - */ - if (! (recordDto instanceof SimpleRecordDto)) + SimpleRecordDto simpleRecordDto = (SimpleRecordDto) recordDto; + Integer periodSeconds = simpleRecordDto.getPeriod(); + if (periodSeconds == null) { + log.error("No period specified. Skipping..."); continue; + } + Period period = periods.get(periodSeconds); + if (period == null) { + period = new Period(em, station, type, periodSeconds, provenance); + periods.put(periodSeconds, period); + } - SimpleRecordDto simpleRecordDto = (SimpleRecordDto)recordDto; - Long dateOfMeasurement = simpleRecordDto.getTimestamp(); + Date dateOfMeasurement = new Date(recordDto.getTimestamp()); Object valueObj = simpleRecordDto.getValue(); + if (valueObj instanceof Number) { - if (latestNumberMeasurementTime < dateOfMeasurement) { - Double value = ((Number)valueObj).doubleValue(); - if (isDuplicate(log, dupList, station, type, simpleRecordDto.getPeriod(), "NUMBER", dateOfMeasurement, value)) - continue; - MeasurementHistory rec = new MeasurementHistory(station, type, value, new Date(dateOfMeasurement), simpleRecordDto.getPeriod()); - rec.setProvenance(provenance); - em.persist(rec); - } - if (newestNumberDto == null || newestNumberDto.getTimestamp() < simpleRecordDto.getTimestamp()) { - newestNumberDto = simpleRecordDto; - } + MeasurementHistory rec = new MeasurementHistory(station, type, + ((Number) valueObj).doubleValue(), + dateOfMeasurement, periodSeconds); + period.number.addHistory(em, log, simpleRecordDto, rec); } else if (valueObj instanceof String) { - if (latestStringMeasurementTime < dateOfMeasurement) { - String value = (String) valueObj; - if (isDuplicate(log, dupList, station, type, simpleRecordDto.getPeriod(), "STRING", dateOfMeasurement, value)) - continue; - MeasurementStringHistory rec = new MeasurementStringHistory(station, type, value, new Date(dateOfMeasurement), simpleRecordDto.getPeriod()); - rec.setProvenance(provenance); - em.persist(rec); - } - if (newestStringDto == null || newestStringDto.getTimestamp() < simpleRecordDto.getTimestamp()) { - newestStringDto = simpleRecordDto; - } + MeasurementStringHistory rec = new MeasurementStringHistory(station, type, + (String) valueObj, + dateOfMeasurement, periodSeconds); + period.string.addHistory(em, log, simpleRecordDto, rec); } else if (valueObj instanceof Map) { - if (latestJSONMeasurementTime < dateOfMeasurement) { - @SuppressWarnings("unchecked") - Map value = (Map) valueObj; - if (isDuplicate(log, dupList, station, type, simpleRecordDto.getPeriod(), "JSON", dateOfMeasurement, value)) - continue; - MeasurementJSONHistory rec = new MeasurementJSONHistory(station, type, value, new Date(dateOfMeasurement), simpleRecordDto.getPeriod()); - rec.setProvenance(provenance); - em.persist(rec); - } - if (newestJsonDto == null || newestJsonDto.getTimestamp() < simpleRecordDto.getTimestamp()) { - newestJsonDto = simpleRecordDto; - } + MeasurementJSONHistory rec = new MeasurementJSONHistory(station, type, + (Map) valueObj, + dateOfMeasurement, periodSeconds); + period.json.addHistory(em, log, simpleRecordDto, rec); } else { log.warn( - String.format("Unsupported data format for %s/%s/%s with value '%s'. Skipping...", - stationType, - stationEntry.getKey(), - typeEntry.getKey(), - (valueObj == null ? "(null)" : valueObj.getClass().getSimpleName()) - ) - ); + String.format( + "Unsupported data format for %s/%s/%s with value '%s'. Skipping...", + stationType, + stationEntry.getKey(), + typeEntry.getKey(), + (valueObj == null ? "(null)" : valueObj.getClass().getSimpleName()))); } } - if (newestNumberDto != null) { - Double valueNumber = ((Number)newestNumberDto.getValue()).doubleValue(); - if (latestNumberMeasurement == null) { - latestNumberMeasurement = new Measurement(station, type, valueNumber, new Date(newestNumberDto.getTimestamp()), newestNumberDto.getPeriod()); - latestNumberMeasurement.setProvenance(provenance); - em.persist(latestNumberMeasurement); - } else if (newestNumberDto.getTimestamp() > latestNumberMeasurementTime) { - latestNumberMeasurement.setTimestamp(new Date(newestNumberDto.getTimestamp())); - latestNumberMeasurement.setProvenance(provenance); - latestNumberMeasurement.setValue(valueNumber); - em.merge(latestNumberMeasurement); - } + for (Period period : periods.values()) { + period.number.updateLatest(em, (newest) -> { + return new Measurement(station, type, ((Number) newest.getValue()).doubleValue(), + new Date(newest.getTimestamp()), period.period); + }); + period.string.updateLatest(em, (newest) -> { + return new MeasurementString(station, type, (String) newest.getValue(), + new Date(newest.getTimestamp()), + period.period); + }); + period.json.updateLatest(em, (newest) -> { + return new MeasurementJSON(station, type, (Map) newest.getValue(), + new Date(newest.getTimestamp()), + period.period); + }); } - if (newestStringDto != null) { - String valueString = (String) newestStringDto.getValue(); - if (latestStringMeasurement == null) { - latestStringMeasurement = new MeasurementString(station, type, valueString, new Date(newestStringDto.getTimestamp()), newestStringDto.getPeriod()); - latestStringMeasurement.setProvenance(provenance); - em.persist(latestStringMeasurement); - } else if (newestStringDto.getTimestamp() > latestStringMeasurementTime) { - latestStringMeasurement.setTimestamp(new Date(newestStringDto.getTimestamp())); - latestStringMeasurement.setValue(valueString); - latestStringMeasurement.setProvenance(provenance); - em.merge(latestStringMeasurement); - } - } - if (newestJsonDto != null) { - @SuppressWarnings("unchecked") - Map jsonValue = (Map) newestJsonDto.getValue(); - if (latestJSONMeasurement == null) { - latestJSONMeasurement = new MeasurementJSON(station, type, jsonValue, new Date(newestJsonDto.getTimestamp()), newestJsonDto.getPeriod()); - latestJSONMeasurement.setProvenance(provenance); - em.persist(latestJSONMeasurement); - } else if (newestJsonDto.getTimestamp() > latestJSONMeasurementTime) { - latestJSONMeasurement.setTimestamp(new Date(newestJsonDto.getTimestamp())); - latestJSONMeasurement.setValue(jsonValue); - latestJSONMeasurement.setProvenance(provenance); - em.merge(latestJSONMeasurement); - } - } em.getTransaction().commit(); - } catch(Exception ex) { + } catch (Exception ex) { log.error( - String.format("Exception '%s'... Skipping this measurement!", ex.getMessage()), - ex - ); - if (em.getTransaction().isActive()){ + String.format("Exception '%s'... Skipping this measurement!", ex.getMessage()), + ex); + if (em.getTransaction().isActive()) { em.getTransaction().rollback(); } LOG.debug("Printing stack trace", ex); } } } - } catch(Exception e) { + } catch (Exception e) { if (em.getTransaction().isActive()) em.getTransaction().rollback(); throw JPAException.unnest(e); @@ -321,31 +288,77 @@ public static void pushRecords(EntityManager em, String stationType, DataMapDto< } } - private static boolean isDuplicate(Log log, List dupList, Station station, DataType type, long period, String measurementType, long timestamp, Object value) { - String dupCheck = measurementType + timestamp + value.hashCode(); - if (dupList.contains(dupCheck)) { - log.info( - String.format( - "Measurement duplicate of type %s found in list, skipping! (stationtype, stationcode, cname, period, timestamp, double_value) = (%s, %s, %s, %d, %d, %s) ", - measurementType, - station.getStationtype(), - station.getStationcode(), - type.getCname(), - period, - timestamp, - value.toString() - ) - ); - return true; + private static class Period { + public TimeSeries number; + public TimeSeries string; + public TimeSeries json; + + private Station station; + private DataType type; + private Integer period; + private Provenance provenance; + + private class TimeSeries { + private MeasurementAbstract latest; + private long latestTime; + private RecordDtoImpl newest; + + public TimeSeries(EntityManager em, Class clazz) { + latest = MeasurementAbstract.findLatestEntry(em, station, type, period, clazz); + latestTime = (latest != null) ? latest.getTimestamp().getTime() : 0; + newest = null; + } + + private void updateNewest(RecordDtoImpl dto) { + if (newest == null || newest.getTimestamp() < dto.getTimestamp()) { + newest = dto; + } + } + + public void addHistory(EntityManager em, Log log, SimpleRecordDto dto, MeasurementAbstractHistory rec) { + if (latestTime < dto.getTimestamp()) { + rec.setProvenance(provenance); + em.persist(rec); + updateNewest(dto); + } else { + log.warn(String.format("Skipping record due to timestamp: [%s, %s, %s, %d, %d]", + station.stationtype, station.stationcode, type.getCname(), period, dto.getTimestamp())); + } + } + + public void updateLatest(EntityManager em, Function measurementMapper) { + if (newest != null) { + var measurement = measurementMapper.apply(newest); + if (latest == null) { + measurement.setProvenance(provenance); + em.persist(measurement); + } else if (newest.getTimestamp() > latestTime) { + latest.setTimestamp(new Date(newest.getTimestamp())); + latest.setValue(measurement.getValue()); + latest.setProvenance(provenance); + em.merge(latest); + } + } + } + } + + public Period(EntityManager em, Station station, DataType type, Integer period, Provenance provenance) { + this.station = station; + this.type = type; + this.period = period; + this.provenance = provenance; + + number = new TimeSeries(em, Measurement.class); + string = new TimeSeries(em, MeasurementString.class); + json = new TimeSeries(em, MeasurementJSON.class); } - dupList.add(dupCheck); - return false; } private static List castToDtos(List result, boolean setPeriod) { List dtos = new ArrayList<>(); for (MeasurementAbstractHistory m : result) { - SimpleRecordDto dto = new SimpleRecordDto(m.getTimestamp().getTime(), m.getValue(), setPeriod ? m.getPeriod() : null); + SimpleRecordDto dto = new SimpleRecordDto(m.getTimestamp().getTime(), m.getValue(), + setPeriod ? m.getPeriod() : null); dto.setCreated_on(m.getCreated_on().getTime()); dtos.add(dto); } @@ -354,20 +367,25 @@ private static List castToDtos(List resul /** *

- * the only method which requests history data from the biggest existing tables in the underlying DB,
+ * the only method which requests history data from the biggest existing tables + * in the underlying DB,
* it's very important that indexes are set correctly to avoid bad performance *

- * @param em entity manager - * @param typology of the specific station, e.g., MeteoStation, EnvironmentStation - * @param identifier unique station identifier, required - * @param cname unique type identifier, required - * @param start time filter start in milliseconds UTC for query, required - * @param end time filter start in milliseconds UTC for query, required - * @param period interval between measurements - * @param tableObject implementation which calls this method to decide which table to query, required + * + * @param em entity manager + * @param typology of the specific station, e.g., MeteoStation, + * EnvironmentStation + * @param identifier unique station identifier, required + * @param cname unique type identifier, required + * @param start time filter start in milliseconds UTC for query, required + * @param end time filter start in milliseconds UTC for query, required + * @param period interval between measurements + * @param tableObject implementation which calls this method to decide which + * table to query, required * @return a list of measurements from history tables */ - protected static List findRecordsImpl(EntityManager em, String stationtype, String identifier, String cname, Date start, Date end, Integer period, T tableObject) { + protected static List findRecordsImpl(EntityManager em, String stationtype, String identifier, + String cname, Date start, Date end, Integer period, T tableObject) { List result = QueryBuilder .init(em) .addSql("SELECT record") diff --git a/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementJSON.java b/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementJSON.java index 4774f3a9..5d69df46 100644 --- a/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementJSON.java +++ b/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementJSON.java @@ -25,7 +25,7 @@ import jakarta.persistence.UniqueConstraint; /** - * Implementation for measurements cache of type double + * Implementation for measurements cache of type json * * @author Patrick Bertolla */ @@ -84,7 +84,7 @@ public void setId(Long id) { @JdbcTypeCode(SqlTypes.JSON) private Map jsonValue; - public Map getJsonValue() { + public Map getValue() { return jsonValue; } diff --git a/writer/src/test/java/com/opendatahub/timeseries/bdp/writer/dal/PersistenceIT.java b/writer/src/test/java/com/opendatahub/timeseries/bdp/writer/dal/PersistenceIT.java index 787c84fe..0eee167d 100644 --- a/writer/src/test/java/com/opendatahub/timeseries/bdp/writer/dal/PersistenceIT.java +++ b/writer/src/test/java/com/opendatahub/timeseries/bdp/writer/dal/PersistenceIT.java @@ -34,7 +34,7 @@ public class PersistenceIT{ @BeforeEach public void setup() { - map.put("He", new Integer(4)); + map.put("He", Integer.valueOf(4)); } @Test