Skip to content

Commit

Permalink
implement mapAsyncIterator with repeaters
Browse files Browse the repository at this point in the history
 Implementation adapted from: repeaterjs/repeater#48 (comment) so that all payloads will be delivered in the original order
  • Loading branch information
yaacovCR committed May 20, 2021
1 parent f11661e commit 93687dc
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 45 deletions.
1 change: 1 addition & 0 deletions packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
},
"dependencies": {
"@ardatan/aggregate-error": "0.0.6",
"@repeaterjs/repeater": "^3.0.4",
"camel-case": "4.1.2",
"tslib": "~2.2.0"
},
Expand Down
102 changes: 57 additions & 45 deletions packages/utils/src/mapAsyncIterator.ts
Original file line number Diff line number Diff line change
@@ -1,59 +1,71 @@
/**
* Given an AsyncIterable and a callback function, return an AsyncIterator
* Given an AsyncIterator and a callback function, return an AsyncIterator
* which produces values mapped via calling the callback function.
*
* Implementation adapted from:
* https://github.com/repeaterjs/repeater/issues/48#issuecomment-569134039
* so that all payloads will be delivered in the original order
*/

import { Push, Stop, Repeater } from '@repeaterjs/repeater';

export function mapAsyncIterator<T, U>(
iterator: AsyncIterator<T>,
callback: (value: T) => Promise<U> | U,
rejectCallback?: any
mapValue: (value: T) => Promise<U> | U,
): AsyncIterableIterator<U> {
let $return: any;
let abruptClose: any;

if (typeof iterator.return === 'function') {
$return = iterator.return;
abruptClose = (error: any) => {
const rethrow = () => Promise.reject(error);
return $return.call(iterator).then(rethrow, rethrow);
};
}
const returner = iterator.return?.bind(iterator) ?? (() => {});

function mapResult(result: any) {
return result.done ? result : asyncMapValue(result.value, callback).then(iteratorResult, abruptClose);
}
return new Repeater(async (push, stop) => {
let earlyReturn: any;
stop.then(() => {
earlyReturn = returner();
});

let mapReject: any;
if (rejectCallback) {
// Capture rejectCallback to ensure it cannot be null.
const reject = rejectCallback;
mapReject = (error: any) => asyncMapValue(error, reject).then(iteratorResult, abruptClose);
}
await loop(push, stop, earlyReturn, iterator, mapValue);

return {
next() {
return iterator.next().then(mapResult, mapReject);
},
return() {
return $return
? $return.call(iterator).then(mapResult, mapReject)
: Promise.resolve({ value: undefined, done: true });
},
throw(error: any) {
if (typeof iterator.throw === 'function') {
return iterator.throw(error).then(mapResult, mapReject);
}
return Promise.reject(error).catch(abruptClose);
},
[Symbol.asyncIterator]() {
return this;
},
};
await earlyReturn;
});
}

function asyncMapValue<T, U>(value: T, callback: (value: T) => Promise<U> | U): Promise<U> {
return new Promise(resolve => resolve(callback(value)));
async function loop<T, U>(
push: Push<U>,
stop: Stop,
earlyReturn: Promise<any> | any,
iterator: AsyncIterator<T>,
mapValue: (value: T) => Promise<U> | U,
): Promise<void> {
/* eslint-disable no-unmodified-loop-condition */
while (!earlyReturn) {
const iteration = await next(iterator, mapValue);

if (iteration.done) {
if (iteration.value !== undefined) {
await push(iteration.value);
}
stop();
return;
}

await push(iteration.value);
}
/* eslint-enable no-unmodified-loop-condition */
}

function iteratorResult<T>(value: T): IteratorResult<T> {
return { value, done: false };
async function next<T, U>(
iterator: AsyncIterator<T>,
mapValue: (value: T) => Promise<U> | U,
): Promise<IteratorResult<U>> {
const iterationCandidate = await iterator.next();

const value = iterationCandidate.value;
if (value === undefined) {
return iterationCandidate as IteratorResult<U>;
}

const newValue = await mapValue(iterationCandidate.value);

return {
...iterationCandidate,
value: newValue,
};
}

0 comments on commit 93687dc

Please sign in to comment.