Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have separate queue capacity per priority level #160

Merged
merged 4 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@walmartlabs/cookie-cutter-core",
"version": "1.3.0-beta.5",
"version": "1.3.0-beta.6",
"license": "Apache-2.0",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
32 changes: 26 additions & 6 deletions packages/core/src/__test__/utils/BoundedPriorityQueue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@ import { BoundedPriorityQueue } from "../../";
import { timeout } from "../../utils";

describe("BoundedPriorityQueue", () => {
it("enforces capacity per queue priority", async () => {
const queue = new BoundedPriorityQueue<number>(2);
await expect(queue.enqueue(1)).resolves.toBe(true);
await expect(queue.enqueue(2)).resolves.toBe(true);
await expect(queue.enqueue(3, 1)).resolves.toBe(true);
await expect(queue.enqueue(4, 1)).resolves.toBe(true);
await expect(queue.enqueue(5, 2)).resolves.toBe(true);
await expect(queue.enqueue(6, 2)).resolves.toBe(true);
await expect(timeout(queue.enqueue(7), 50)).rejects.toBeDefined();
await expect(timeout(queue.enqueue(8, 1), 50)).rejects.toBeDefined();
await expect(timeout(queue.enqueue(9, 2), 50)).rejects.toBeDefined();
});

it("blocks adding when capacity is reached", async () => {
const queue = new BoundedPriorityQueue<number>(3);
await expect(queue.enqueue(1)).resolves.toBe(true);
Expand Down Expand Up @@ -53,14 +66,21 @@ describe("BoundedPriorityQueue", () => {
});

it("dequeues items with higher priority first", async () => {
const queue = new BoundedPriorityQueue<number>(3);
const queue = new BoundedPriorityQueue<number>(2);
await queue.enqueue(1, 0);
await queue.enqueue(2, 1);
await queue.enqueue(3, 0);
await queue.enqueue(3, 1);
await queue.enqueue(5, 2);
await queue.enqueue(2, 0);
await queue.enqueue(4, 1);
await queue.enqueue(6, 2);
queue.close();

const buffer = [];
for await (const item of queue.iterate()) {
buffer.push(item);
}

await expect(queue.dequeue()).resolves.toBe(2);
await expect(queue.dequeue()).resolves.toBe(1);
await expect(queue.dequeue()).resolves.toBe(3);
expect(buffer).toMatchObject([5, 6, 3, 4, 1, 2]);
});

it("iterates contained items", async () => {
Expand Down
43 changes: 25 additions & 18 deletions packages/core/src/utils/BoundedPriorityQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ import { Future } from ".";
export class BoundedPriorityQueue<T> {
private readonly queues: Map<number, T[]>;
private whenNotEmpty: Future<void>;
private whenNotFull: Future<void>;
private whenNotFullList: Map<number, Future<void>>;
private sortedPriorities: number[];
private count: number;
private total_count: number;
private closed: boolean;

constructor(private readonly capacity: number) {
this.count = 0;
this.total_count = 0;
this.queues = new Map();
this.sortedPriorities = [];
this.whenNotEmpty = new Future();
this.whenNotFull = new Future();
this.whenNotFullList = new Map();
this.closed = false;
}

Expand All @@ -40,48 +40,53 @@ export class BoundedPriorityQueue<T> {
if (queue === undefined) {
queue = [];
this.queues.set(priority, queue);
this.whenNotFullList.set(priority, new Future());
this.sortedPriorities.push(priority);
this.sortedPriorities = this.sortedPriorities.sort((n1, n2) => Math.sign(n2 - n1));
}

if (this.count < this.capacity) {
if (queue.length < this.capacity) {
queue.push(item);
if (++this.count === 1) {
if (++this.total_count === 1) {
this.whenNotEmpty.resolve();
}
return true;
}

if (this.whenNotFull) {
await this.whenNotFull.promise;
const whenNotFull = this.whenNotFullList.get(priority);
if (whenNotFull) {
await whenNotFull.promise;
}
if (this.closed) {
return false;
}

if (isNullOrUndefined(this.whenNotFull)) {
this.whenNotFull = new Future();
if (isNullOrUndefined(whenNotFull)) {
this.whenNotFullList.set(priority, new Future());
}
return this.enqueue(item);
}

public async dequeue(): Promise<T> {
if (this.closed && this.count === 0) {
if (this.closed && this.total_count === 0) {
throw new Error("queue is closed");
}

for (const priority of this.sortedPriorities) {
const queue = this.queues.get(priority)!;
const whenNotFull = this.whenNotFullList.get(priority);
if (queue.length > 0) {
const item = queue.shift();
if (this.count-- === this.capacity) {
if (this.whenNotFull) {
this.whenNotFull.resolve();
this.whenNotFull = undefined;
this.total_count--;
if (queue.length + 1 === this.capacity) {
if (whenNotFull) {
whenNotFull.resolve();
this.whenNotFullList.delete(priority);
}
}
if (priority > 0 && queue.length === 0) {
this.queues.delete(priority);
this.whenNotFullList.delete(priority);
this.sortedPriorities = this.sortedPriorities.filter((p) => p !== priority);
}
return item;
Expand All @@ -97,16 +102,18 @@ export class BoundedPriorityQueue<T> {
}

public get length(): number {
return this.count;
return this.total_count;
}

public close(): void {
this.closed = true;
if (this.whenNotEmpty) {
this.whenNotEmpty.resolve();
}
if (this.whenNotFull) {
this.whenNotFull.resolve();
for (const whenNotFull of this.whenNotFullList.values()) {
if (whenNotFull) {
whenNotFull.resolve();
}
}
}

Expand Down