Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CU hashchain validation #1114

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 52 additions & 48 deletions servers/cu/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions servers/cu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
"test": "node --experimental-wasm-memory64 --test"
},
"dependencies": {
"@fastify/middie": "^9.0.2",
"@fastify/middie": "^9.0.3",
"@permaweb/ao-loader": "^0.0.44",
"@permaweb/ao-scheduler-utils": "^0.0.25",
"@permaweb/weavedrive": "^0.0.18",
"arweave": "^1.15.5",
"async-lock": "^1.4.1",
"better-sqlite3": "^11.7.0",
"better-sqlite3": "^11.8.0",
"bytes": "^3.1.2",
"cors": "^2.8.5",
"dataloader": "^2.2.3",
"dotenv": "^16.4.7",
"fast-glob": "^3.3.2",
"fastify": "^5.2.0",
"fast-glob": "^3.3.3",
"fastify": "^5.2.1",
"helmet": "^8.0.0",
"hyper-async": "^1.1.2",
"keccak": "^3.0.4",
Expand All @@ -36,7 +36,7 @@
"pg": "^8.13.1",
"prom-client": "^15.1.3",
"ramda": "^0.30.1",
"undici": "^7.2.0",
"undici": "^7.2.3",
"warp-arbundles": "^1.0.4",
"winston": "^3.17.0",
"workerpool": "^9.2.0",
Expand Down
15 changes: 14 additions & 1 deletion servers/cu/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ export const createApis = async (ctx) => {
Math.ceil(ctx.WASM_EVALUATION_MAX_WORKERS * (ctx.WASM_EVALUATION_PRIMARY_WORKERS_PERCENTAGE / 100))
)

/**
* node's crypto module is synchronous, which blocks the main thread.
* Since hash chain valiation is done for _every single scheduled message_, we offload the
* work to a worker thread, so at least the main thread isn't blocked.
*/
const hashChainWorkerPath = join(__dirname, 'effects', 'worker', 'hashChain', 'index.js')
const hashChainWorker = workerpool.pool(hashChainWorkerPath, { maxWorkers: maxPrimaryWorkerThreads })

const worker = join(__dirname, 'effects', 'worker', 'evaluator', 'index.js')
const primaryWorkerPool = workerpool.pool(worker, {
maxWorkers: maxPrimaryWorkerThreads,
Expand Down Expand Up @@ -367,7 +375,12 @@ export const createApis = async (ctx) => {
findMessageBefore: AoEvaluationClient.findMessageBeforeWith({ db, logger }),
loadTimestamp: AoSuClient.loadTimestampWith({ fetch: ctx.fetch, logger }),
loadProcess: AoSuClient.loadProcessWith({ fetch: ctx.fetch, logger }),
loadMessages: AoSuClient.loadMessagesWith({ fetch: ctx.fetch, pageSize: 1000, logger }),
loadMessages: AoSuClient.loadMessagesWith({
hashChain: (...args) => hashChainWorker.exec('hashChain', args),
fetch: ctx.fetch,
pageSize: 1000,
logger
}),
locateProcess: locateDataloader.load.bind(locateDataloader),
isModuleMemoryLimitSupported: WasmClient.isModuleMemoryLimitSupportedWith({ PROCESS_WASM_MEMORY_MAX_LIMIT: ctx.PROCESS_WASM_MEMORY_MAX_LIMIT }),
isModuleComputeLimitSupported: WasmClient.isModuleComputeLimitSupportedWith({ PROCESS_WASM_COMPUTE_MAX_LIMIT: ctx.PROCESS_WASM_COMPUTE_MAX_LIMIT }),
Expand Down
8 changes: 6 additions & 2 deletions servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ export const saveLatestProcessMemorySchema = z.function()
processId: z.string(),
moduleId: z.string().nullish(),
messageId: z.string().nullish(),
assignmentId: z.string().nullish(),
hashChain: z.string().nullish(),
timestamp: z.coerce.number().nullish(),
epoch: z.coerce.number().nullish(),
nonce: z.coerce.number().nullish(),
ordinate: z.coerce.string().nullish(),
cron: z.string().nullish(),
blockHeight: z.coerce.number().nullish(),
Memory: bufferSchema,
evalCount: z.number().nullish(),
gasUsed: z.bigint().nullish()
}))
.returns(z.promise(z.any()))
Expand Down Expand Up @@ -167,13 +168,16 @@ export const loadMessagesSchema = z.function()
z.object({
suUrl: z.string().url(),
processId: z.string(),
block: blockSchema,
owner: z.string(),
tags: z.array(rawTagSchema),
moduleId: z.string(),
moduleTags: z.array(rawTagSchema),
moduleOwner: z.string(),
from: z.coerce.number().nullish(),
to: z.coerce.number().nullish()
to: z.coerce.number().nullish(),
assignmentId: z.string().nullish(),
hashChain: z.string().nullish()
})
)
/**
Expand Down
15 changes: 13 additions & 2 deletions servers/cu/src/domain/lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ export function evaluateWith (env) {
.chain(fromPromise(async (ctx) => {
// A running tally of gas used in the eval stream
let totalGasUsed = BigInt(0)
let mostRecentAssignmentId = ctx.mostRecentAssignmentId
let mostRecentHashChain = ctx.mostRecentHashChain
let prev = applySpec({
/**
* Ensure all result fields are initialized
Expand Down Expand Up @@ -160,7 +162,7 @@ export function evaluateWith (env) {
* and evaluate each one
*/
let first = true
for await (const { noSave, cron, ordinate, name, message, deepHash, isAssignment, AoGlobal } of messages) {
for await (const { noSave, cron, ordinate, name, message, deepHash, isAssignment, assignmentId, AoGlobal } of messages) {
if (cron) {
const key = toEvaledCron({ timestamp: message.Timestamp, cron })
if (evaledCrons.has(key)) continue
Expand All @@ -170,6 +172,14 @@ export function evaluateWith (env) {
* again in the eval stream
*/
else evaledCrons.set(key, true)
} else if (!noSave) {
/**
* As messages stream into the process to be evaluated,
* we need to keep track of the most assignmentId
* and hashChain of the most recent scheduled message
*/
mostRecentAssignmentId = assignmentId
mostRecentHashChain = message['Hash-Chain']
}

/**
Expand Down Expand Up @@ -306,14 +316,15 @@ export function evaluateWith (env) {
processId: ctx.id,
moduleId: ctx.moduleId,
messageId: message.Id,
assignmentId: mostRecentAssignmentId,
hashChain: mostRecentHashChain,
timestamp: message.Timestamp,
nonce: message.Nonce,
epoch: message.Epoch,
blockHeight: message['Block-Height'],
ordinate,
cron,
Memory: prev.Memory,
evalCount: ctx.stats.messages.scheduled + ctx.stats.messages.cron,
gasUsed: totalGasUsed
})
}
Expand Down
Loading
Loading