Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unsupported use case: mapConcurrently-alike with concurrency limit, and ordered job start #83

Open
dcoutts opened this issue May 28, 2018 · 7 comments

Comments

@dcoutts
Copy link

dcoutts commented May 28, 2018

Async is a nice abstraction, however as far as I can see the following use case is not supported in any straightforward obvious way:

  • A bunch of tasks to run
  • No more than N running at once (they consume resources other than just CPU so we want limits to work within bounded resources, and reduce thrashing overheads).

If it was just this, we could simply use async + a semaphore. There's one more requirement:

  • The list of tasks is ordered according to which ones's results are likely to be needed first, so though we cannot control when a task completes, we want to ensure we start tasks in order.

This last one is what means we cannot fit it into the async abstraction. But this is not an uncommon requirement. Consider a tool like cabal downloading package tarballs. We want to download e.g. 2-3 concurrently, but we want to start downloading the ones we'll need first, before the ones we'll need later, since we will also start building packages concurrently with the downloading.

The abstraction that cabal-install currently uses is this thing called JobControl. The above use case is handled by putting all the jobs into the job control queue in order, and the N worker threads simply grab the next one available.

Of course I'm not proposing to merge the rather-different JobControl abstraction into async, rather the challenge is if anyone can think of some way the existing async abstraction that would support this use case, and if any extensions would be needed, or if any helper utils would make this easier.

The basic problem is that an async starts a thread immediately, so having created a bunch of them there is no way to control the relative ordering. All of them can contend on a quantity semaphore, but there is no ordering guarantee. One plausible route might be to sequentialise the startup and add a startup synchronisation between the parent and the child async, so e.g. one could have the child thread enqueue onto a quantity semaphore before returning the async in the parent. This approach would not require any extensions, but could benefit from some helper utilities.

@sopvop
Copy link

sopvop commented May 28, 2018

I believe async-pool package does something like that.

@dcoutts
Copy link
Author

dcoutts commented May 28, 2018

And note that async-pool uses a different representation of the Async type. The question is, can we do something like async-pool or JobControl with the basic async abstraction.

@mitchellwrosen
Copy link

@dcoutts Is QSem not suitable here? It does have FIFO semantics. Small race conditions aside, mapConcurrently jobs would roughly acquire the semaphore in jobs-order.

@dcoutts
Copy link
Author

dcoutts commented May 30, 2018

@mitchellwrosen if you can add things into the QSem in order then it's fine, but if you start a bunch of asyncs up and each one queues itself onto the QSem then you've lost all ordering. There would have to be synchronisation, to only spawn the next one once the previous one has been enqueued (which is of course a bit tricky to arrange).

@mitchellwrosen
Copy link

@dcoutts Ah, that's what I meant by "small race conditions aside". Since mapConcurrently and friends at least call forkIO in container-order, then they should roughly grab the semaphore in order as well. But to your point, unfortunately there's no hard guarantee here, just a "soft" one :)

@simonmar
Copy link
Owner

simonmar commented Jun 2, 2018

You could do this by having a separate controller thread, like this:

do
   tickets <- replicateM n newEmptyMVar
   releaseQ <- newEmptyChan
   withAsync (controller tickets) $ \_ -> do
   forConcurrently tickets $ \t -> do
      bracket_ (takeMVar t) (writeChan releaseQ ()) $ do ....
 where
  controller tickets =  ...

And in the controller thread you can release tickets in the order you want, waiting for threads to complete by listening on releaseQ. (you could also do this by having the tickets be TMVars and waiting for any of them, but that would involve O(n)-sized transactions, so a Chan is better here.)

But a much nicer way would be to build precisely the abstraction you need, which is:

newTicketQueue
  :: Int -- number of tickets to issue in total
  -> Int -- number of withTickets that can run concurrently
  -> IO [Ticket]

withTicket :: Ticket -> IO a -> IO a

Implementation left as an exercise for the reader :)

@arrowd
Copy link

arrowd commented Jun 19, 2020

I stumbled upon http://hackage.haskell.org/package/unliftio-0.2.13/docs/UnliftIO-Async.html and it suits me well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants