Skip to content

Commit

Permalink
initial attempt to split an asyncIterable into a set of new asyncIter…
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jan 25, 2021
1 parent d514b1a commit 0e7a077
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 1 deletion.
2 changes: 1 addition & 1 deletion packages/pubsub/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export * from './in-memory-channel';
export * from './in-memory-pubsub';

export * from './split';
export * from './types';
107 changes: 107 additions & 0 deletions packages/pubsub/src/split.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// adapted from: https://stackoverflow.com/questions/63543455/how-to-multicast-an-async-iterable
// and: https://gist.github.com/jed/cc1e949419d42e2cb26d7f2e1645864d
// and also: https://github.com/repeaterjs/repeater/issues/48#issuecomment-569134039

import { Push, Repeater } from '@repeaterjs/repeater';

import { Splitter } from './types';

export function split<T>(asyncIterable: AsyncIterableIterator<T>, n: number, splitter: Splitter<IteratorResult<T>>) {
const iterator = asyncIterable[Symbol.asyncIterator]();
const returner = iterator.return ?? undefined;

const buffers: Array<Array<IteratorResult<T>>> = Array(n).fill([]);

if (returner) {
const set: Set<number> = new Set();
return buffers.map((buffer, index) => {
set.add(index);
return new Repeater(async (push, stop) => {
let earlyReturn: any;
stop.then(() => {
set.delete(index);
if (!set.size) {
earlyReturn = returner();
}
});

await loop(push, earlyReturn, buffer, index, buffers, iterator, splitter);

await earlyReturn;
});
});
}

return buffers.map(
(buffer, index) =>
new Repeater(async (push, stop) => {
let earlyReturn: any;
stop.then(() => {
earlyReturn = returner ? returner() : true;
});

await loop(push, earlyReturn, buffer, index, buffers, iterator, splitter);

await earlyReturn;
})
);
}

async function loop<T>(
push: Push<T>,
earlyReturn: Promise<any> | any,
buffer: Array<IteratorResult<T>>,
index: number,
buffers: Array<Array<IteratorResult<T>>>,
iterator: AsyncIterator<T>,
splitter: Splitter<IteratorResult<T>>
): Promise<void> {
/* eslint-disable no-unmodified-loop-condition */
while (!earlyReturn) {
const iteration = await next(buffer, index, buffers, iterator, splitter);

if (iteration === undefined) {
continue;
}

if (iteration.done) {
stop();
return iteration.value;
}

await push(iteration.value);
}
/* eslint-enable no-unmodified-loop-condition */
}

async function next<T>(
buffer: Array<IteratorResult<T>>,
index: number,
buffers: Array<Array<IteratorResult<T>>>,
iterator: AsyncIterator<T>,
splitter: Splitter<IteratorResult<T>>
): Promise<IteratorResult<T> | undefined> {
let iteration: IteratorResult<T>;

if (0 in buffer) {
return buffer.shift();
}

const iterationCandidate = await iterator.next();

const value = iterationCandidate.value;
if (value) {
const [iterationIndex, newValue] = splitter(value);
if (index === iterationIndex) {
return newValue;
}

buffers[iterationIndex].push(iteration);
return undefined;
}

for (const buffer of buffers) {
buffer.push(iteration);
}
return iterationCandidate;
}
2 changes: 2 additions & 0 deletions packages/pubsub/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ export interface PubSub<T> {
subscribe(topic: string, buffer?: RepeaterBuffer): AsyncIterableIterator<T>;
close(reason?: any): Promise<unknown> | unknown;
}

export type Splitter<T> = (item: T) => [number, T];

0 comments on commit 0e7a077

Please sign in to comment.