diff --git a/packages/core/package.json b/packages/core/package.json index abb5a6f0..85f09247 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@walmartlabs/cookie-cutter-core", - "version": "1.3.0-beta.5", + "version": "1.3.0-beta.6", "license": "Apache-2.0", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/packages/core/src/__test__/utils/BoundedPriorityQueue.test.ts b/packages/core/src/__test__/utils/BoundedPriorityQueue.test.ts index 0cc68ec1..7436304b 100644 --- a/packages/core/src/__test__/utils/BoundedPriorityQueue.test.ts +++ b/packages/core/src/__test__/utils/BoundedPriorityQueue.test.ts @@ -9,6 +9,19 @@ import { BoundedPriorityQueue } from "../../"; import { timeout } from "../../utils"; describe("BoundedPriorityQueue", () => { + it("enforces capacity per queue priority", async () => { + const queue = new BoundedPriorityQueue(2); + await expect(queue.enqueue(1)).resolves.toBe(true); + await expect(queue.enqueue(2)).resolves.toBe(true); + await expect(queue.enqueue(3, 1)).resolves.toBe(true); + await expect(queue.enqueue(4, 1)).resolves.toBe(true); + await expect(queue.enqueue(5, 2)).resolves.toBe(true); + await expect(queue.enqueue(6, 2)).resolves.toBe(true); + await expect(timeout(queue.enqueue(7), 50)).rejects.toBeDefined(); + await expect(timeout(queue.enqueue(8, 1), 50)).rejects.toBeDefined(); + await expect(timeout(queue.enqueue(9, 2), 50)).rejects.toBeDefined(); + }); + it("blocks adding when capacity is reached", async () => { const queue = new BoundedPriorityQueue(3); await expect(queue.enqueue(1)).resolves.toBe(true); @@ -53,14 +66,21 @@ describe("BoundedPriorityQueue", () => { }); it("dequeues items with higher priority first", async () => { - const queue = new BoundedPriorityQueue(3); + const queue = new BoundedPriorityQueue(2); await queue.enqueue(1, 0); - await queue.enqueue(2, 1); - await queue.enqueue(3, 0); + await queue.enqueue(3, 1); + await queue.enqueue(5, 2); + await queue.enqueue(2, 0); + await queue.enqueue(4, 1); + await queue.enqueue(6, 2); + queue.close(); + + const buffer = []; + for await (const item of queue.iterate()) { + buffer.push(item); + } - await expect(queue.dequeue()).resolves.toBe(2); - await expect(queue.dequeue()).resolves.toBe(1); - await expect(queue.dequeue()).resolves.toBe(3); + expect(buffer).toMatchObject([5, 6, 3, 4, 1, 2]); }); it("iterates contained items", async () => { diff --git a/packages/core/src/utils/BoundedPriorityQueue.ts b/packages/core/src/utils/BoundedPriorityQueue.ts index 11e86c51..ab379be5 100644 --- a/packages/core/src/utils/BoundedPriorityQueue.ts +++ b/packages/core/src/utils/BoundedPriorityQueue.ts @@ -11,17 +11,17 @@ import { Future } from "."; export class BoundedPriorityQueue { private readonly queues: Map; private whenNotEmpty: Future; - private whenNotFull: Future; + private whenNotFullList: Map>; private sortedPriorities: number[]; - private count: number; + private total_count: number; private closed: boolean; constructor(private readonly capacity: number) { - this.count = 0; + this.total_count = 0; this.queues = new Map(); this.sortedPriorities = []; this.whenNotEmpty = new Future(); - this.whenNotFull = new Future(); + this.whenNotFullList = new Map(); this.closed = false; } @@ -40,48 +40,53 @@ export class BoundedPriorityQueue { if (queue === undefined) { queue = []; this.queues.set(priority, queue); + this.whenNotFullList.set(priority, new Future()); this.sortedPriorities.push(priority); this.sortedPriorities = this.sortedPriorities.sort((n1, n2) => Math.sign(n2 - n1)); } - if (this.count < this.capacity) { + if (queue.length < this.capacity) { queue.push(item); - if (++this.count === 1) { + if (++this.total_count === 1) { this.whenNotEmpty.resolve(); } return true; } - if (this.whenNotFull) { - await this.whenNotFull.promise; + const whenNotFull = this.whenNotFullList.get(priority); + if (whenNotFull) { + await whenNotFull.promise; } if (this.closed) { return false; } - if (isNullOrUndefined(this.whenNotFull)) { - this.whenNotFull = new Future(); + if (isNullOrUndefined(whenNotFull)) { + this.whenNotFullList.set(priority, new Future()); } return this.enqueue(item); } public async dequeue(): Promise { - if (this.closed && this.count === 0) { + if (this.closed && this.total_count === 0) { throw new Error("queue is closed"); } for (const priority of this.sortedPriorities) { const queue = this.queues.get(priority)!; + const whenNotFull = this.whenNotFullList.get(priority); if (queue.length > 0) { const item = queue.shift(); - if (this.count-- === this.capacity) { - if (this.whenNotFull) { - this.whenNotFull.resolve(); - this.whenNotFull = undefined; + this.total_count--; + if (queue.length + 1 === this.capacity) { + if (whenNotFull) { + whenNotFull.resolve(); + this.whenNotFullList.delete(priority); } } if (priority > 0 && queue.length === 0) { this.queues.delete(priority); + this.whenNotFullList.delete(priority); this.sortedPriorities = this.sortedPriorities.filter((p) => p !== priority); } return item; @@ -97,7 +102,7 @@ export class BoundedPriorityQueue { } public get length(): number { - return this.count; + return this.total_count; } public close(): void { @@ -105,8 +110,10 @@ export class BoundedPriorityQueue { if (this.whenNotEmpty) { this.whenNotEmpty.resolve(); } - if (this.whenNotFull) { - this.whenNotFull.resolve(); + for (const whenNotFull of this.whenNotFullList.values()) { + if (whenNotFull) { + whenNotFull.resolve(); + } } }