Skip to content

Commit

Permalink
Fix timestamp alignment when merging WPILOGs
Browse files Browse the repository at this point in the history
  • Loading branch information
jwbonner committed Jan 2, 2025
1 parent f52863a commit dfa4704
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 79 deletions.
6 changes: 5 additions & 1 deletion src/hub/dataSources/HistoricalDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ export class HistoricalDataSource {
);

// Process response
let offset = 0;
this.worker.onmessage = (event) => {
let message = event.data as HistoricalDataSource_WorkerResponse;
switch (message.type) {
Expand All @@ -152,7 +153,9 @@ export class HistoricalDataSource {
return; // Exit immediately

case "initial":
this.log?.mergeWith(Log.fromSerialized(message.log), this.keyPrefix);
if (this.log !== null) {
offset = this.log.mergeWith(Log.fromSerialized(message.log), this.keyPrefix);
}
this.logIsPartial = message.isPartial;
break;

Expand All @@ -164,6 +167,7 @@ export class HistoricalDataSource {
if (this.logIsPartial) {
message.fields.forEach((field) => {
let key = applyKeyPrefix(this.keyPrefix, field.key);
field.data.timestamps = (field.data.timestamps as number[]).map((timestamp) => timestamp + offset);
this.log?.setField(key, LogField.fromSerialized(field.data));
if (field.generatedParent) this.log?.setGeneratedParent(key);
this.requestedFields.delete(key);
Expand Down
163 changes: 87 additions & 76 deletions src/hub/dataSources/wpilog/wpilogWorker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Log from "../../../shared/log/Log";
import { PROTO_PREFIX, STRUCT_PREFIX } from "../../../shared/log/LogUtil";
import { getEnabledKey, PROTO_PREFIX, STRUCT_PREFIX } from "../../../shared/log/LogUtil";
import LoggableType from "../../../shared/log/LoggableType";
import {
HistoricalDataSource_WorkerFieldResponse,
Expand Down Expand Up @@ -126,6 +126,13 @@ async function start(data: Uint8Array) {
return;
}

// Load enabled field (required for merging)
let enabledKey = getEnabledKey(log);
if (enabledKey !== undefined) {
parseField(enabledKey, true);
}

// Send message
log.getChangedFields(); // Reset changed fields
sendResponse({
type: "initial",
Expand All @@ -134,7 +141,7 @@ async function start(data: Uint8Array) {
});
}

function parseField(key: string) {
function parseField(key: string, skipMessage = false) {
// Parse records
if (!(key in entryTypes)) {
// Try removing leading slash, sometimes inserted
Expand All @@ -147,80 +154,82 @@ function parseField(key: string) {
return;
}
}
const type = entryTypes[key];
dataRecordPositions[key].forEach((position) => {
const [record, _] = decoder!.getRecordAtPosition(position);
if (record === null) return;
let timestamp = Math.max(0, record.getTimestamp() / 1000000.0);
try {
switch (type) {
case "boolean":
log.putBoolean(key, timestamp, record.getBoolean());
break;
case "int":
case "int64":
log.putNumber(key, timestamp, record.getInteger());
break;
case "float":
log.putNumber(key, timestamp, record.getFloat());
break;
case "double":
log.putNumber(key, timestamp, record.getDouble());
break;
case "string":
log.putString(key, timestamp, record.getString());
break;
case "boolean[]":
log.putBooleanArray(key, timestamp, record.getBooleanArray());
break;
case "int[]":
case "int64[]":
log.putNumberArray(key, timestamp, record.getIntegerArray());
break;
case "float[]":
log.putNumberArray(key, timestamp, record.getFloatArray());
break;
case "double[]":
log.putNumberArray(key, timestamp, record.getDoubleArray());
break;
case "string[]":
log.putStringArray(key, timestamp, record.getStringArray());
break;
case "json":
log.putJSON(key, timestamp, record.getString());
break;
case "msgpack":
log.putMsgpack(key, timestamp, record.getRaw());
break;
default: // Default to raw
if (type.startsWith(STRUCT_PREFIX)) {
let schemaType = type.split(STRUCT_PREFIX)[1];
if (schemaType.endsWith("[]")) {
log.putStruct(key, timestamp, record.getRaw(), schemaType.slice(0, -2), true);
if (key in dataRecordPositions) {
const type = entryTypes[key];
dataRecordPositions[key].forEach((position) => {
const [record, _] = decoder!.getRecordAtPosition(position);
if (record === null) return;
let timestamp = Math.max(0, record.getTimestamp() / 1000000.0);
try {
switch (type) {
case "boolean":
log.putBoolean(key, timestamp, record.getBoolean());
break;
case "int":
case "int64":
log.putNumber(key, timestamp, record.getInteger());
break;
case "float":
log.putNumber(key, timestamp, record.getFloat());
break;
case "double":
log.putNumber(key, timestamp, record.getDouble());
break;
case "string":
log.putString(key, timestamp, record.getString());
break;
case "boolean[]":
log.putBooleanArray(key, timestamp, record.getBooleanArray());
break;
case "int[]":
case "int64[]":
log.putNumberArray(key, timestamp, record.getIntegerArray());
break;
case "float[]":
log.putNumberArray(key, timestamp, record.getFloatArray());
break;
case "double[]":
log.putNumberArray(key, timestamp, record.getDoubleArray());
break;
case "string[]":
log.putStringArray(key, timestamp, record.getStringArray());
break;
case "json":
log.putJSON(key, timestamp, record.getString());
break;
case "msgpack":
log.putMsgpack(key, timestamp, record.getRaw());
break;
default: // Default to raw
if (type.startsWith(STRUCT_PREFIX)) {
let schemaType = type.split(STRUCT_PREFIX)[1];
if (schemaType.endsWith("[]")) {
log.putStruct(key, timestamp, record.getRaw(), schemaType.slice(0, -2), true);
} else {
log.putStruct(key, timestamp, record.getRaw(), schemaType, false);
}
} else if (type.startsWith(PROTO_PREFIX)) {
let schemaType = type.split(PROTO_PREFIX)[1];
log.putProto(key, timestamp, record.getRaw(), schemaType);
} else {
log.putStruct(key, timestamp, record.getRaw(), schemaType, false);
}
} else if (type.startsWith(PROTO_PREFIX)) {
let schemaType = type.split(PROTO_PREFIX)[1];
log.putProto(key, timestamp, record.getRaw(), schemaType);
} else {
log.putRaw(key, timestamp, record.getRaw());
if (CustomSchemas.has(type)) {
try {
CustomSchemas.get(type)!(log, key, timestamp, record.getRaw());
} catch {
console.error('Failed to decode custom schema "' + type + '"');
log.putRaw(key, timestamp, record.getRaw());
if (CustomSchemas.has(type)) {
try {
CustomSchemas.get(type)!(log, key, timestamp, record.getRaw());
} catch {
console.error('Failed to decode custom schema "' + type + '"');
}
log.setGeneratedParent(key);
}
log.setGeneratedParent(key);
}
}
break;
break;
}
} catch (error) {
console.error("Failed to decode WPILOG record:", error);
}
} catch (error) {
console.error("Failed to decode WPILOG record:", error);
}
});
delete dataRecordPositions[key]; // Clear memory
});
delete dataRecordPositions[key]; // Clear memory
}

// Get set of changed fields
let fieldData: HistoricalDataSource_WorkerFieldResponse[] = [];
Expand All @@ -246,8 +255,10 @@ function parseField(key: string) {
}

// Send result
sendResponse({
type: "fields",
fields: fieldData
});
if (!skipMessage) {
sendResponse({
type: "fields",
fields: fieldData
});
}
}
5 changes: 3 additions & 2 deletions src/shared/log/Log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,8 @@ export default class Log {
}
}

/** Merges a new log into this log. */
mergeWith(source: Log, prefix = ""): void {
/** Merges a new log into this log. Returns the timestamp offset. */
mergeWith(source: Log, prefix = ""): number {
// Serialize source and adjust timestamps
let offset = 0;
let targetEnabledData = getEnabledData(this);
Expand Down Expand Up @@ -828,6 +828,7 @@ export default class Log {
protoDescriptors.push(descriptor);
});
this.protoDecoder = ProtoDecoder.fromSerialized(protoDescriptors);
return offset;
}

/** Returns a serialized version of the data from this log. */
Expand Down

0 comments on commit dfa4704

Please sign in to comment.