Skip to content
This repository has been archived by the owner on Jan 26, 2022. It is now read-only.

No means for secure stopping async generators #126

Open
awto opened this issue Mar 5, 2018 · 91 comments
Open

No means for secure stopping async generators #126

awto opened this issue Mar 5, 2018 · 91 comments

Comments

@awto
Copy link

awto commented Mar 5, 2018

Let's assume a very typical reactive programming use case, say, I need to combine a few streams into a single one, yielding value each time any of original streams yields new value.

For example, I use it for UI and convert DOM events into an async iterable with a function subject (I don't include its sources here, it is not significant for the problem - it may be checked in this gist). The function returns an object with AsyncIterable interface and additional method send to pass the event into the stream.

And there is a function combine simply merging all async iterables in arguments into a single one.

async function* combine(...args) {
  const threads = args.map(i => i[Symbol.asyncIterator]())
  const sparks = new Set(threads.map(i => ({thread:i,step:i.next()})))
  try {
    while(sparks.size) {
      const v = await Promise.race([...sparks]
                                   .map(i => i.step.then(({done,value}) => ({done,value,spark:i}))))
      sparks.delete(v.spark)
      if (!v.done) {
        sparks.add({...v.spark,step:v.spark.thread.next()})
        yield v.value
      }
    }
  } finally {
    await Promise.all([...threads].map((i) => i.return()))
  }
}

And now I iterate two such sources combined and exit the loop on some condition.

const subj1 = subject()
const subj2 = subject()

async function test() {
  let cnt = 0
  for await(const i of combine(subj1,subj2)) {
    console.log("value:",i)
    if (cnt++)
      break
  }
}
//.....
subj1.send(1)
subj2.send(2)

Everything is pretty fine, I added console.log in exit function in subject all loops to handle subjects exit. But now if I wrap one of the sources with any simple async generator, like with (or just yield*)

async function* copy(input) {
  for await(const i of input)
    yield i
}

async function test() {
  let cnt = 0
  for await(const i of combine(subj1,copy(subj2))) {
    console.log("value:",i)
    if (cnt++)
      break
  }
}

Now test never exits (never calls next then), it waits in await in the combine's finally block. The reason is pretty simple and obvious, the copy generator is blocked on next and generators use the same queue for return.

I searched issues in this repository and found these tickets: #5 and #55. There are no clean reasons for the single queue decision, except probably:

writer.next(bufferA);
writer.next(bufferB);
await writer.return(); // Only wait for the file to close

This is maybe better to be done this way:

await Promise.all([writer.next(bufferA), writer.next(bufferB)]);
await writer.return(); // Only wait for the file to close

And how would we convert async generator into observable? For example, the solution from #20 won't work.

function toObservable(asyncIterable) {
    return new Observable(observer => {
        let stop = false;
        async function drain() {
            for await (let x of asyncIterable) {
                if (stop) break;
                observer.next(x);
            }
        }
        drain().then(x => observer.complete(x), err => observer.error(err));
        return _=> { stop = true; }
    });
}

The cleanup function won't stop the source generator (even if we add return call for object returned by drain) if there are no more values sent by original iterable, it is forever locked in next.

Current status

There is nothing to do in this proposal to fix this. This will be fixed automatically after some form of async cancelation operation is added as part of some another proposal. This issue is left open as a warning about the problem.

Workarounds

Using Transducers (lifted generators)

Transducers are functions taking a stream as input and producing another stream.
Generators can read its input passed as an argument and it is there we can send some specific stop signal if needed.

See more detailed description here:

There is fork function which merges streams and stops well, but it merges lifted streams - transducers

Transpiler

@effetful/es transpiler adds lightweight cancelation support to async function.

Resulting Promises of async functions transpiled by this are amended with M.cancelSymbol which is a function which tries to cancel execution of this function. It executes finally blocks, and propagates cancelation to current await expression. It doesn't do anything more (e.g. no propagation to children).

@awto awto changed the title any means for secure stopping async generators? No means for secure stopping async generators Mar 7, 2018
@benjamingr
Copy link

I'm not even sure what combineLatest would mean in a pull-based model, would you expect it to attempt to pull from all observables when it is being pulled from?

@benjamingr
Copy link

Let's assume a very typical reactive programming use case, say, I need to combine a few streams into a single one, yielding value each time any of original streams yields new value.

That is, that is not the directionality of async iterators - if next is called on the "combined" iterator - whom does it ask fro the value? Does it consume values eagerly from the parents?

@awto
Copy link
Author

awto commented Mar 25, 2018

@benjamingr

I'm not even sure what combineLatest would mean in a pull-based model, would you expect it to attempt to pull from all observables when it is being pulled from?

regardless it is combineLatest or some other combination of more than one source streams, sooner or later we'll need some of it, yes, I will need to pull from all of them if the caller is pulled

That is, that is not the directionality of async iterators - if next is called on the "combined" iterator - whom does it ask fro the value? Does it consume values eagerly from the parents?

Not eagerly, when first next called it awaits on Promise.race of all of the sources next. After the race is resolved its value is the result of the caller's next. The other promises (not winning the race) are stored somewhere and on next caller's next they will participate in the next race again, along with the next next of the previously signaled item.

And so, we cannot exit the loop from the caller, and the only problem here is the queue for next/throw/return execution. Say, we have something like this:

const src = {...}

async function* h1() {
   for await(const i of src) {
      yield some(i)
   }
}

async function* caller() {
    for await (const i of merge(h1(), somethingElse))  {
          if (someCondition)
             break
     }
}

and the simplified transpilation of for-await:

const src = {...}

async function* h1() {
   const loop = src[Symbol.asyncIterator]()
   try {
      for(let item;!(item = await loop.next()).done;) {
         yield some(item.value)
      }
  } finally {
     await src.return()
  } 
}

async function* caller() {
   const loop = merge(h1(), somethingElse)[Symbol.asyncIterator]()
   try {
       for(let item; !(item = await loop.next()).done;)  {
            if (someCondition)
               break
        } 
   } finally {
      await loop.return() 
   } 
}

Say when the control encounters break there it is because somethingElse.next was resolved in Promise.race, and the h1().next still sits in next (maybe it will never be resolved).

If we wouldn't have the queue the break in caller would immediately call return in merge. In merge it would call all the return of its arguments. That would invoke return continuation in h1 (no needs to cancel the currently pending next Promise, only mark its continuation as canceled). And the h1 finally block will call return of src. It is not a generator and it may know how to handle it properly. Say, it will resolve the promise to some dummy value, and since the continuation is marked as canceled everyone is happy.

With the queue the return call of h1 may stay in the queue forever, leaking a few objects.

@mika-fischer
Copy link

mika-fischer commented Mar 29, 2018

I have the same issue! I want to write an iterator adapter that makes an async iterator cancelable.

async function* iterateStream(stream) {
    let ended = false, error, wake = () => {};
    stream.on("readable", wake)
          .on("end",   ()    => { ended = true; wake(); })
          .on("error", (err) => { error = err;  wake(); });
    for (;;) {
        for (let chunk = stream.read(); chunk; chunk = stream.read()) {
            yield chunk;
        }
        if (error) { throw error; }
        if (ended) { return; }
        await new Promise(resolve => wake = resolve);
    }
}

async function* withTimeout(timeoutMs, iter) {
    let timer;
    try {
        const timeout = new Promise((_, reject) => {
            timer = setTimeout(reject, timeoutMs, new Error("Timeout!"));
        });
        for (;;) {
            const result = await Promise.race([timeout, iter.next()]);
            if (result.done) { break; }
            yield result.value;
        }
    }
    finally {
        clearTimeout(timer);
        if (iter.return) { await iter.return(); }
    }
}

async function main() {
    const socket = net.connect(80, "httpbin.org");
    try {
        for await (const val of withTimeout(1000, iterateStream(socket))) {
            console.log("[main] got value %s", val);
        }
    }
    finally {
        socket.end();
    }
}

This also gets stuck in await iter.return() and can only exit the loop when the socket closes. And because the socket is closed outside the loop, this results in a deadlock, which is only resolved when the socket times out...

It seems to me that

  1. An important use-case for async iteration involves iterators that can block for a significant amount of time, maybe even infinitely. Think reading from a socket, waiting for user input, etc.
  2. Async generators can not be used for implementing async iterators in such cases because if the generator is stuck in a long await there's no way anymore to break from the loop, raise an exception, etc. Control flow will have to wait until this await is finished...

If I understand all this correctly, this seems very unfortunate, since async generators otherwise would make implementing async iterators very easy!

@zenparsing
Copy link
Member

There are a couple of things going on here.

First, I notice that in all the provided examples we are awaiting a never-resolving promise inside of a finally block. Consider what happens in the case of "regular" async functions:

async function f() {
  try {
    throw new Error('oops');
  } finally {
    console.log('finally f');
    // Never-resolving
    await new Promise(() => {});
  }
}

async function g() {
  try {
    await f();
  } finally {
    // This is never executed
    console.log('finally g');
  }
}

g();

The lesson here is: be careful about putting an await in a finally block, regardless of whether we're dealing with an async function or async generator function.

The other thing I see is that we are assuming that return (with finally) is sufficient for implementing async cancellation. Although it would be nice if that were the case, we need to think of return as cleanup, not cancellation.

As we see in these examples, the cancellation signal needs to propagate through async generator functions (and async functions), into their async subroutines. Because async/await linearizes async computations, we can't use return as a secondary, parallel message channel. We need another channel to propagate that cancellation signal.

The DOM has invented AbortController for this purpose. Although AbortController is a little too DOM-centric to standardize at the language level, I'm hopeful that we can move forward with a standard "cancel token" protocol that AbortController could implement.

@benjamingr
Copy link

Reminds me of tc39/proposal-async-await#51 when I pondered if finally in
await should even be allowed.

@awto
Copy link
Author

awto commented Mar 30, 2018

@zenparsing

First, I notice that in all the provided examples we are awaiting a never-resolving promise inside of a finally block.

No, at least in my example, finally block is never executed and this is why it is a problem.

Consider what happens in the case of "regular" async functions:
The lesson here is: be careful about putting an await in a finally block, regardless of whether we're dealing with an async function or async generator function.

I see absolutely no problem in this example, someone could put never resolving promise in await in a usual block without finally, and control would never reach anything after.

The other thing I see is that we are assuming that return (with finally) is sufficient for implementing async cancellation.

No, I highlighted it in the message, Promise cancelation is not required if you mean that by async calculation.

Although it would be nice if that were the case, we need to think of return as cleanup, not a cancellation.

Let's keep it practical, without resorting to philosophical terms interpretation. I might as well call it - clean generator object.

As we see in these examples, the cancellation signal needs to propagate through async generator functions (and async functions), into their async subroutines. Because async/await linearizes async computations, we can't use return as a secondary, parallel message channel. We need another channel to propagate that cancellation signal.

I don't get this. What is not linear? Everything is kept linear without the queue in the generator scope as well, outside the scope, everything is async as usual. I see no problem again. What is another channel? Another channel means no async generators and implementing async iterators protocol?

The DOM has invented AbortController for this purpose. Although AbortController is a little too DOM-centric to standardize at the language level, I'm hopeful that we can move forward with a standard "cancel token" protocol that AbortController could implement.

This won't work, there is no another channel, unless all async iterators may interpret the token themselves in their code. But that just means - no async generators. There is no way to handle any token if control is resting in await.

Also @zenparsing in the first message, I actually copied your implementation of converting async generator to observable - toObservable. It doesn't work because of the same reasons. Do you confirm now, you were wrong and it should not be possible to convert async generators into observable?

@benjamingr

Reminds me of tc39/proposal-async-await#51 when I pondered if finally in
await should even be allowed.

In my message, it is some possible transpilation result, not the finally actual usage, await in finally. Though I don't see a problem even if it is the usage. The finally block there is a solution, not a problem. But it doesn't work only because there is the queue. And there are no obvious reasons for it.

@Jamesernator
Copy link

Jamesernator commented Mar 30, 2018

I think the queue as it currently exists should remain, but I still think it would be worth reviving some form of syntactic cancellation mechanism to make this sort've thing easier (what is one to do with for-await-of loops for example if there's no syntactic mechanism).

I don't really see how the cancellation proposal could solve this issue in it's current form, unless it was decided that .next should be sent the cancellation token (in which case to use async generators to write anything we essentialy require function.sent for the part before the first yield).

Even sending the cancellation token forward with .next it still means for-await-of is un-usable if you want to be able to cancel a request, e.g. you'd have to write it using iterators directly e.g.:

function map(asyncIterable, iteratee) {
    return {
        async* [Symbol.asyncIterator]() {
            const iterator = getIterator(asyncIterable)
            try {
                while (true) {
                    const cancelToken = function.sent;
                    const value = iterator.next(cancelToken);
                    yield iteratee(value);
                }
            } finally {
                await iterator.return()
            }
        }
    }
}

This seems quite bad compared to the naive implementation of map that just loops through the items and yields iteratee(value) in a for-await-of loop. But frankly I don't see any other way it could work with the current cancellation proposal.

@awto
Copy link
Author

awto commented Mar 30, 2018

@Jamesernator

I think the queue as it currently exists should remain,

but why? I mean, I've, of course, read the motivation like to make something running in parallel with Promise.all or dangling promises, but as @zenparsing wrote Async Generators linearize their control flow. There is no way (and shouldn't be) to run different parts of a single async generator simultaneously.

While of course, something implementing Async Iterator protocol but not Generator could do this. But the thing may handle the order itself in whatever way it likes. It may be a queue, it may be a priority queue it may ignore something etc.

It is also easy to write a wrapper to add a queue to any other Async Iterator, e.g. queue(something) if someone really needs it. But if it is as it is now - it is an unnecessary complication for nothing. And you propose to complicate it even more with some cancelation protocol. While the cancelation isn't required here. We, of course, need to cancel a continuation after await where the generator is suspended. But this effect is visible only inside the same single generator. It is not related to some async operation cancelation.

@benjamingr
Copy link

There is no way (and shouldn't be) to run different parts of a single async generator simultaneously.

That's not true for arbitrary async iterators though

@benjamingr
Copy link

I would love to see an implementation of combineLatest with the semantics proposed by @awto though - I looked at all my implementations from when I gave a talk and they suffer from this problem.

I haven't had proper time to think about it - but I definitely think it's worth solving and the spec is worth amending if it can't be.

@awto
Copy link
Author

awto commented Mar 30, 2018

@benjamingr

There is no way (and shouldn't be) to run different parts of a single async generator simultaneously.

That's not true for arbitrary async iterators though

indeed, but there is no any queue for arbitrary async iterators anyway, the iterator itself can decide how to handle the requests

@Jamesernator
Copy link

Jamesernator commented Mar 30, 2018

Yes but it's because other types of async iterators exist that generators need to queue. If a consumer consumes two values eagerly and then invokes .return an async iterator only knows that the requests are done not the existing nexts should be cancelled.

e.g. One might implement a preloading version of take that consumes values eagerly instead of as requested e.g.:

function eagerTakeN(asyncIterable, n) {
    async* [Symbol.asyncIterator]() {
        const iterator = getIterator(asyncIterable);
        // Preload values as fast as possible into buffers of a given size
        const buffer = []
        for (let i = 0 ; i < n ; i++) {
            buffer.push(iterator.next())
        }
        // Allow the iterator to stop queuing now and cleanup when *it*
        // wants to
        const done = iterator.return();
        const results = await Promise.all(buffer);
        for (const { done, value } of buffer) {
            if (done) {
                return value;
            }
            yield value;
        }
        await done;
    }
}

Note that if you know the iterables are generators it makes no difference to consume multiple concurrently, but if it's an arbitrary iterable it might be better to consume earlier. .return says that no more values will be requested but to fufill the requests that have already been requested. Because of the this the most sensible option for an async generator is just to queue as a consumer like eagerTakeN shouldn't need to know the type of the iterator.

If you want to cancel the .next Promises then there should be a mechanism to say I longer care about those specifically, in the case of an async generator I would definitely expect Promise cancellation to close an async generator though on cancelling a Promise returned from a .next.

@awto
Copy link
Author

awto commented Mar 30, 2018

@Jamesernator
The same may be solved using explicit queue wrapper. I would rather have merging working rather than the implicit queue. And as another advantage of the queue is explicit is we can use some smarter application-specific scheduling algorithms.

@Jamesernator
Copy link

The thing is cancellation allows you to have both which I why I want to see a better cancellation proposal than just a builtin CancelToken type or the like.

What you're proposing effectively requires that await is a potential interruption point which I think would be surprising given that it gives a producer no assurances that if they perform some work between yields that it will ever be executed.

e.g. Consider this that needs to free a lock immediately after producing a value:

async function* readFromDatabase() {
    const cursor = db.getCursor('someTable');
    while (true) {
        // This await could be interrupted
        const dbLock = await db.acquireLock();
        const value = cursor.nextValue();
        dbLock.unlock();
        yield value;
    }
}

But from what I can tell under your proposed change if it were to be cancelled via .return while stopped on db.acquireLock() then db.acquireLock() would proceed as per normal but the generator would be terminated and now the lock would never be freed as the generator was terminated in the middle of an operation.

@awto
Copy link
Author

awto commented Mar 30, 2018

@Jamesernator

fair point, I don't really mind much about the cancelation, just wanted something simpler. Still, the solution to this problem is possible, it can be some db context where we can signal the lock isn't longer required in finally. Or just implement the read logic in separate not-generator function. Or, again an explicit wrapper - queue(async function* readFromDatabase(){....}). It is indeed complex and not intuitive, but possible. While merging generators isn't possible now at all.

@awto
Copy link
Author

awto commented Mar 30, 2018

actually, the cancelation proposal will indeed solve the problem faster than something can be changed in this spec, so I'm closing the issue. Thanks, everyone, esp. @Jamesernator.

@awto awto closed this as completed Mar 30, 2018
@awto
Copy link
Author

awto commented Mar 31, 2018

in case if someone needs this urgently, you can use my transpiler - @effectful/es (sorry for advertisement). In abstract interface mode, you can extend the interface with any functionality you want including cancelation or no queue or anything else.

@benjamingr
Copy link

@awto what cancellation proposal? :D

@awto
Copy link
Author

awto commented Mar 31, 2018

@benjamingr cancellation proposal - I suppose it is the thing to solve the problem somehow in some future, there are not many details now, and it will happen apparently not soon. For my task, I'm doing a workaround with my transpiler for now.

@awto
Copy link
Author

awto commented Jul 8, 2018

there are libraries emerging with (not-safe) iterator's combinations, so I think it is rather worth keeping this issue open

@awto awto reopened this Jul 8, 2018
@benjamingr
Copy link

Maybe talk to the people from https://github.com/jamiemccrindle/axax @jamiemccrindle

@jamiemccrindle
Copy link

jamiemccrindle commented Jul 8, 2018

Does anyone have a simple test case that demonstrates this issue?

@awto
Copy link
Author

awto commented Jul 8, 2018

@jamiemccrindle I would at least add a warning on your page, axax is still usable if the sources are managed, but as a user, I would like to be aware of possible problems, e.g.

const s1 = new Subject()
const s2 = new Subject()

async function* something() {
  try {
    for await(const i of s2.iterator)
      yield i
  } finally {
    console.log("never called")
  }
}

async function test() {
  for await(const i of merge(s1.iterator,something())) {
    break
  }
}

test()
s1.onNext(1)

here is the demonstration of the leak with merge if s2 never touched, something instance will leak. It may be fixed if the program manages references but it is not required with, say, Observables.

@jamiemccrindle
Copy link

jamiemccrindle commented Jul 8, 2018

Thanks for the example @awto. I'll add a note to axax.

@jamiemccrindle
Copy link

jamiemccrindle commented Jul 8, 2018

Would you say that this is comparable to a Promise that never returns. Something like:

async function example() {
  await new Promise((resolve, reject) => {});
}

example.then(() => console.log('never gets here'));

@awto
Copy link
Author

awto commented Jul 8, 2018

I don't think so, there is still a way to finish the chain, e.g. by calling s2.onCompleted(), but no way to stop it from another end because the generator there sits in next and there is a queue requirement, so return just needs to await next to exit. But the queue cannot be removed because it will introduce another resource management problems (covered in this thread).

@brainkim
Copy link

To be clear, the proposal I’m saying is:
if an async generator is suspended at an await operator, resume the generator with an abrupt completion. That’s all. Further await/yield operators would still work as expected.

@jedwards1211
Copy link

I thought more on my earlier comment:

One thing I'm confused about is why generator transpilers couldn't just be changed to immediately return or throw the iterator being for awaited when those methods get called on the iterator returned by the generator?

This solves for await problems but isn't a general solution for generators because they may instead acquire multiple resources in parallel and then on a promise (not associated with a loop). The only comprehensive solution would have the transpiler wrap every single promise so that it rejects immediately if the iterator gets returned, but that's an undesirable amount of overhead...

@jedwards1211
Copy link

I'm thinking about making a Babel plugin to wrap async generators and all await and for await statements in them to reject immediately if the generator's iterator is returned or thrown...

@brainkim
Copy link

brainkim commented Dec 25, 2019

I disagree with the idea that you have to reject async iterator promises in the case of return. return should simply make the earliest pending next fulfill to {value: "arg passed to return", done: true}, and then all remaining next/return calls can fulfill to {done: true}. There really is no need for a rejection insofar as nothing about abrupt return indicates an error condition. The throw method on the other hand should not ever modify a pending next call insofar as you should give the async iterator/generator a chance to recover from the error thrown in. for await has nothing to do with throw; if you’re calling throw, you’re doing so deliberately. Too Christmas eve drunk to continue this conversation. Happy holidays or whatever.

@jedwards1211
Copy link

jedwards1211 commented Dec 26, 2019

@brainkim My apologies, you're right! I forgot that the iteration protocol allows the value passed to return to be yielded alongside done: true. So just await expressions would have to be rejected after return (since generators may await standalone promises too, not just use for await loops)

Obviously you hadn't passed the balmer peak 😂 Happy holidays

@jedwards1211
Copy link

Dude, I'm still not exactly thinking straight. Control would still return to a generator when a standalone promise it's awaiting finishes. Only yield statements would prevent control from returning to the generator after return. So aside from terminating for await loops, a Babel plugin would have to throw if the generator tries to yield after return to ensure that it finally reaches a cleanup block.

@brainkim
Copy link

brainkim commented Dec 26, 2019

To clarify, we’re pointing out two additional problems here which are similar to, but not exactly the original issue:

  1. An async generator is suspended on an await when return is called:
async function *gen() {
  try {
    await myAsyncFn();
  } finally {
    myCleanupFn();
  }
}
const g = gen();
g.next();
g.return();

In the above code, myCleanupFn isn’t called until the call to myAsyncFn settles. This is only really a problem if myAsyncFn never settles, or you expect myCleanupFn to have been called synchronously after g.return(). In the first case, if you have a promise which never settles, you have larger issues than just your cleanup code not being called, and cancelling myAsyncFn might only hide the fact that you have a hanging promise, causing larger issues later on. In the second case, you really shouldn’t assume g.return() works synchronously, for instance, you might have further await operators in the finally block which makes your return asynchronous. In short, if myAsyncFn settles, you will eventually run myCleanupFn anyways, and there is no actual leak.

My proposed solution, should we consider this an actual problem, is to have the await operator behave similarly to the yield operator, where if a generator is suspended on an await operator and return or throw is called, you resume the generator with the return or throw statement at the point of the await. This would allow myCleanupFn to happen synchronously after g.return is called in the code above. One additional complication is you have to figure out what to do with the pending myAsyncFn call promise. If the promise rejects, should that rejection be considered an unhandled rejection? Also, do you want to do the same thing for both return and throw?

Changing the behavior to make await operators act like yield operators for both return and throw would be a minimal change, which in my opinion, would make async generators slightly more responsive and would be scoped only to async generators, insofar as you would only be able resume an await within an async generator body. Note also that this doesn’t require any notion of “promise cancellation,” it would be a solution which works at the level of operators rather than at the level of the promise class.

  1. An async generator yields a promise which has not settled when return is called:
async function *gen() {
  try {
    yield new Promise(() => {});
  } finally {
    myCleanupFn();
  }
}
const g = gen();
g.next();
g.return(1);

In the above code, because calls to next/return/throw all settle in order, the call to g.return has no effect, because g.next will never settle. Therefore myCleanupFn is never called. This is again only a problem when the yielded promise does not settle, and there is an argument to be made that papering over this non-settling promise will cause more problems down the line. Preventing g.return from settling is the clearest way to communicate to the developer that something in the async generator has not settled and may be hanging.

My proposed solution, should we consider this an actual problem, is to have pending calls to next simply resolve to the iteration that would have been returned from g.return. In other words, if you passed g.return(1), the g.next call above would fulfill to {value: 1, done: true} and the g.return call itself would fulfill to {done: true}, because the value would have been consumed. I’m not a fan of this proposal, because 1. you stop communicating to the developer that some promise is hanging earlier in the iteration, and 2. this consuming behavior might be surprising for the caller of g.return.

Ultimately, these are both only issues if you have hanging promises, and like I said, if you have hanging promises you have bigger issues than that your cleanup code isn’t being called. The solutions I propose are only half-hearted, because I think it’s important for async iterators/generators to communicate hanging promises to the developer early by preventing further calls to next/return from settling. By allowing async iterators/generators to abruptly abort deadlocks/hanging promises, you risk greater injury later on when you use the hanging/deadlocked code outside of an async iterator/generator. On the other hand, if the promises you’re awaiting do eventually settle, your cleanup issues go away (eventually), and this eventual settling is queryable by awaiting the return call. Therefore, my earlier point about lazy initialization of custom async iterators still stands; they are a convention which completely solves the problem described in the original issue.

Let me know if any of this is unclear and I will re-explain until it makes sense.

@jedwards1211
Copy link

if a generator is suspended on an await operator and return or throw is called, you resume the generator with the return or throw statement at the point of the await

If I understand correctly, this would cause unexpected behavior, for example:

async function *gen() {
  try {
    const name = await getName();
    await fs.writeFile(JSON.stringify(name).replace(/^"|"$/g, '') + '.txt');
  } finally {
    myCleanupFn();
  }
}

If the generator is resumed before getParts() resolves, then parts is undefined and you end up accidentally creating undefined.txt.

It seems like you're hoping for a more elegant way to clean up than throwing, but there's really no surefire way to jump to the finally block and avoid unexpected behavior besides throwing.

return() and throw() must return promises, so there's no guarantee they will ever do anything synchronously.

@brainkim
Copy link

brainkim commented Dec 26, 2019

What I’m saying is that if the generator was suspended at getName but was resumed with return or throw, we would immediately skip to the finally block because it would be as though the await operator was replaced with a return or throw statement. In this situation, name would never be assigned and the writeFile call would never be reached.

Again I‘m not really advocating for these solutions, more just proposing them to deny that promise cancellation is somehow “necessary.”

return() and throw() must return promises, so there's no guarantee they will ever do anything synchronously.

Yes exactly, which is why I believe the “problem” of return not cancelling promises is not really a problem. The more I think about it, the more I think it’s important that async iterators/generators transparently hang when their sources hang, because this is the best way to communicate hanging promises to the callee.

@jedwards1211
Copy link

jedwards1211 commented Dec 26, 2019

You're right that hanging promises is a big problem...it's kind of scary considering that Promise.race can essentially leave hanging promises unless magic in the VM can prevent the promises that lost the race from leaking memory. I guess standardized cancelation is really the only robust solution.

@jedwards1211
Copy link

jedwards1211 commented Dec 26, 2019

The more I think about it, the more I think it’s important that async iterators/generators transparently hang when their sources hang, because this is the best way to communicate hanging promises to the callee.

Yeah I'm starting to agree with you. The only reason I really got into this rabbit hole is that GraphQL subs use async iterators over endless event streams (which seems like a really questionable design decision by now); if I were only using async generators for things like mapping the lines of a file I would never even have to think about problems like this.

@brainkim
Copy link

brainkim commented Dec 26, 2019

The only reason I really got into this rabbit hole is that GraphQL subs use async iterators over endless event streams (which seems like a really questionable design decision by now);

Hmm if I’m understanding you right it seems like you got another situation where you’re doing something eagerly where you should be doing it lazily, namely, you’re turning each event into a promise eagerly before passing it along to the async iterator. You shouldn’t do anything like that; rather you should be passing the events to the iterator as they happen, so the async iterator can return immediately when there are no pending events.

Ultimately, the issues with async iterators expressed here in this issue and elsewhere is that we’ve asked developers to hand-roll async queues themselves, and it’s about as unfair as asking them to implement promises from scratch whenever they need them, with lots of subtle pitfalls which can have disastrous effects in production. And then at the same time there’s an unrelenting stream of FUD from developers who would rather see observables or eigenmonads or whatever be adopted as the standard stream abstraction in js.

Maybe if we unified around a common async iterator constructor cough repeaters cough we wouldn’t have as many problems? Like peek this:

const socket = new WebSocket("ws://echo.websocket.org");
const messages = new Repeater(async (push, stop) => {
  socket.onmessage = (ev) => push(ev.data);
  socket.onerror = () => stop(new Error("WebSocket error"));
  socket.onclose = () => stop();
  await stop;
  socket.close();
});

I guarantee you could replace every single ad-hoc async iterator class in your graphql dependencies with repeaters and you’d have cleaner, less buggy code.

@brainkim
Copy link

brainkim commented Dec 28, 2019

The only reason I really got into this rabbit hole is that GraphQL subs use async iterators over endless event streams

Just wanted to update this thread with a possible complication related to for await and creating async generator combinators that @jedwards1211 articulated in this issue and which I hadn’t ever considered:

We can’t use async generators and for await to create async iterator combinator functions like map and filter, because whereas we would expect the next/return methods to be called in parallel on the source iterator, for await and async generators execute the next/return methods in series.

Code example:

async function *map(iterable, fn) {
  for await (const value of iterable) {
    yield fn(value);
  }
}
(async () => {
  const doubleIter = map(iterable, (i) => i * 2);
  const p1 = doubleIter.next();
  // only has an effect after the previous line fulfills
  const p2 = doubleIter.return();
  Promise.race([p2, p1]).then(console.log); // p1 will always win
});

Even though the return method is called in parallel with the next method on the map iterator, internally, the map combinator function will call next and return in series. This is because for await waits for the loop to start before resuming with a return operation at the point of the yield. In other words, if the map generator is suspended at the top of the for await loop, return won’t work until implicit next call on the inner iterator settles. This can be a problem when the source async iterator assume that return is called in parallel to settle/cancel any outstanding next calls, because this results in a possible deadlock where next has not settled and return has not yet been called.

The problem here is for await, not async generators, because for await works in series even when the outer generator is returned, while a programmer might expect it to work in parallel. My solution above, where we resume async generators with abrupt return completions if they’re paused on an await, could be extended to terminating for await loops as well to solve this issue, but again I’m not sure it’s the best idea.

@freiksenet
Copy link

We've experienced some issues with async generators causing GraphQL subscription cleanup to never be called. We have solved it by replacing async generators functions with async iterators that are implemented using raw protocol. While this doesn't fully solve the issue of promises being uncancellable (and so unresolved promise might hang in memory forever), at very least this allowed cleanup code to be called when iterator.return() is called. I hope this will help to mitigate this issue for some people.

See this code sandbox for illustration of difference in behaviour: https://codesandbox.io/s/async-generator-vs-iterator-returns-s4sht?file=/src/App.js

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests