Skip to content

Commit

Permalink
feat(cu): loadMessages tests and fix impl as a result #92
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Oct 19, 2023
1 parent e5ea697 commit c386d99
Show file tree
Hide file tree
Showing 2 changed files with 412 additions and 45 deletions.
71 changes: 45 additions & 26 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Rejected, Resolved, fromPromise, of } from 'hyper-async'
import { T, always, aperture, ascend, cond, equals, head, last, mergeRight, prop, reduce } from 'ramda'
import { T, always, aperture, ascend, cond, equals, head, last, mergeRight, pipe, prop, reduce } from 'ramda'
import { z } from 'zod'
import ms from 'ms'

Expand Down Expand Up @@ -38,7 +38,13 @@ export function parseSchedules ({ tags }) {
*
* TODO: harden
*/
[T, always({ interval, unit: 'seconds', value: Math.floor(ms([value, unit].join(' ')) / 1000) })]
[T, pipe(
always({ interval, unit: 'seconds', value: Math.floor(ms([value, unit].join(' ')) / 1000) }),
(schedule) => {
if (schedule.value <= 0) throw new Error('time-based interval cannot be less than 1 second')
return schedule
}
)]
])(unit)
}

Expand Down Expand Up @@ -101,6 +107,31 @@ export function parseSchedules ({ tags }) {
export const SCHEDULED_INTERVAL = 'scheduled-interval'
export const SCHEDULED_MESSAGE = 'scheduled-message'

/**
* Whether the block height, relative to the origin block height,
* matches the provided schedule
*/
export function isBlockOnSchedule ({ height, originHeight, schedule }) {
return (height - originHeight) % schedule.value === 0
}

/**
* Whether the timstamp, relative to the origin timestamp,
* matches the provided schedule
*/
export function isTimestampOnSchedule ({ timestamp, originTimestamp, schedule }) {
/**
* The smallest unit of time a schedule can be placed is in seconds,
* and if we modulo milliseconds, it can return 0 for fractional overlaps
* of the scedule
*
* So convert the times to seconds perform applying modulo
*/
timestamp = Math.floor(timestamp / 1000)
originTimestamp = Math.floor(originTimestamp / 1000)
return (timestamp - originTimestamp) % schedule.value === 0
}

export function scheduleMessagesBetweenWith ({
processId,
owner: processOwner,
Expand Down Expand Up @@ -129,13 +160,11 @@ export function scheduleMessagesBetweenWith ({
* sort time based schedules from most granualar to least granular. This will ensure
* time based messages are ordered consistently w.r.t each other.
*/
const timeBased = ascend(
prop('value'),
schedules.filter(s => s.unit === 'time')
)
const timeBased = schedules.filter(s => s.unit === 'seconds')
.sort(ascend(prop('value')))

/**
* { sortKey, block, message }
* { sortKey, block }
*/
function maybeScheduledMessages (left, right) {
const scheduledMessages = []
Expand All @@ -151,7 +180,7 @@ export function scheduleMessagesBetweenWith ({
const rightBlock = right.block

// No Schedules
if (!blockBased.length || timeBased.length) return scheduledMessages
if (!blockBased.length && !timeBased.length) return scheduledMessages

/**
* This is the first time in a long time that
Expand All @@ -167,16 +196,6 @@ export function scheduleMessagesBetweenWith ({
for (let curHeight = leftBlock.height; curHeight < rightBlock.height; curHeight++) {
const curBlock = blockRange.find((b) => b.height === curHeight)
const nextBlock = blockRange.find((b) => b.height === curHeight + 1)
/**
* The timestamp for blocks meta from the gateway is currently in seconds
* while the SU uses a millisecond timestamp, so we convert the seconds
* into milliseconds for each block
*
* TODO: probably should be done as part of the gateway?
*/
const curBlockMills = curBlock.timestamp * 1000
const nextBlockMillis = nextBlock.timestamp * 1000

/**
* Block-based schedule messages
*
Expand All @@ -187,7 +206,7 @@ export function scheduleMessagesBetweenWith ({
scheduledMessages,
blockBased.reduce(
(acc, schedule, idx) => {
if (curHeight - originBlock.height % schedule.value === 0) {
if (isBlockOnSchedule({ height: curBlock.height, originHeight: originBlock.height, schedule })) {
acc.push({
message: {
owner: processOwner,
Expand All @@ -202,7 +221,7 @@ export function scheduleMessagesBetweenWith ({
* It will also enable performing range queries to fetch all scheduled messages by simply
* appending a ',' to any sortKey
*/
sortKey: `${curBlock.height},${curBlockMills},${leftHash},${schedule.interval}${idx}`,
sortKey: `${curBlock.height},${curBlock.timestamp},${leftHash},${schedule.interval}${idx}`,
AoGlobal: {
process: { id: processId, owner: processOwner },
block: curBlock
Expand All @@ -227,12 +246,12 @@ export function scheduleMessagesBetweenWith ({
* For each second between the current block and the next block, check if any time-based
* schedules need to generate an implicit message
*/
for (let curEpoch = curBlockMills; curEpoch < nextBlockMillis; curEpoch += 1000) {
for (let curTimestamp = curBlock.timestamp; curTimestamp < nextBlock.timestamp; curTimestamp += 1000) {
scheduledMessages.push.apply(
scheduledMessages,
timeBased.reduce(
(acc, schedule, idx) => {
if ((curEpoch - originBlock.timestamp) % schedule.value === 0) {
if (isTimestampOnSchedule({ timestamp: curTimestamp, originTimestamp: originBlock.timestamp, schedule })) {
acc.push({
message: {
target: processId,
Expand All @@ -247,7 +266,7 @@ export function scheduleMessagesBetweenWith ({
* It will also enable performing range queries to fetch all scheduled messages by simply
* appending a ',' to any sortKey
*/
sortKey: `${curBlock.height},${curBlockMills},${leftHash},${schedule.interval}${idx}`,
sortKey: `${curHeight},${curTimestamp},${leftHash},${schedule.interval}${idx}`,
AoGlobal: {
process: { id: processId, owner: processOwner },
block: curBlock
Expand All @@ -267,7 +286,7 @@ export function scheduleMessagesBetweenWith ({
return scheduledMessages
}

return ([left, right]) => {
return (left, right) => {
const messages = []
messages.push.apply(messages, maybeScheduledMessages(left, right))
/**
Expand Down Expand Up @@ -378,8 +397,8 @@ function loadScheduledMessagesWith ({ loadTimestamp, logger }) {
})

return reduce(
(merged, pair) => {
merged.push.apply(merged, scheduleMessagesBetween(pair))
(merged, [left, right]) => {
merged.push.apply(merged, scheduleMessagesBetween(left, right))
return merged
},
[],
Expand Down
Loading

0 comments on commit c386d99

Please sign in to comment.