Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review I/O of tasks #785

Closed
tcompa opened this issue Jul 7, 2023 · 4 comments
Closed

Review I/O of tasks #785

tcompa opened this issue Jul 7, 2023 · 4 comments
Labels
flexibility Support more workflow-execution use cases High Priority Current Priorities & Blocking Issues

Comments

@tcompa
Copy link
Collaborator

tcompa commented Jul 7, 2023

I'm reviewing the methods for task execution (in view of #261), and I (re)-discovered a somewhat cumbersome way we use to define I/O parameters of tasks. This was probably implemented with a clear target in mind (the typical image->zarr conversion), but it doesn't sound fully right any more.

--

The execute_tasks function (streamlined in #780, so that it's also a bit more readable now) iterates over a list of wftasks. For each wftask, either call_single_task or call_parallel_task is called (and waited for), and then the loop continues. The main I/O structure of these functions is defined in the task_pars argument and in the returned object, see

def call_single_task(
    *,
    wftask: WorkflowTask,
    task_pars: TaskParameters,
    workflow_dir: Path,
    workflow_dir_user: Optional[Path] = None,
  ) -> TaskParameters:
    # ...

Those are two TaskParameters objects, where

class TaskParameters(BaseModel):
    input_paths: list[Path]
    output_path: Path
    metadata: dict[str, Any]
    # ...

We should then review how the I/O TaskParameters objects are constructed for each wftask, and especially for

  1. The very first task (which should get some info from the workflow input dataset);
  2. Any intermediate task;
  3. The final task (which should get some info from the workflow output dataset).

Tl;DR The current way these objects are constructed does not seem robust, and was probably fine-tuned to work only on the typical Fractal workflows.

First wftask

Input parameters

The definition of what will be passed as an input TaskParameter object to the first task is (e.g.) in app/runner/_local/__init__.py:

            task_pars=TaskParameters(                                                                     
                input_paths=input_paths,                                                                  
                output_path=output_path,                                                                  
                metadata=input_metadata,                                                                  
            )

where the input_paths, output_path and input_metadata values come directly from the DB and they are obtained (in app/runner/__init__.py) as

input_metadata=input_dataset.meta
input_paths = input_dataset.paths
output_path = output_dataset.paths[0]

There is something clearly suspicious here: the first task already needs some output_dataset properties, even if output_dataset is meant to be the output of the whole workflow.

Output parameters

At the end of call_single_task (and the same holds for call_parallel_task), there is this definition of the return value

    # ...
    out_task_parameters = TaskParameters(
        input_paths=[task_pars.output_path],
        output_path=task_pars.output_path,
        metadata=updated_metadata,
      )
    return out_task_parameters

This is strictly enforcing a task structure where every subsequent task will only act on a given path, both for input and output.

Also: this shows why we only support multiple paths as the input of the very first task (which essentially means only the create_ome_zarr_multiplex task).

Intermediate wftask

An intermediate wftask (starting from the second one) will always have an input TaskParameter object which was the output of a previous one, and then by construction it can only have input_paths and output_path identical (apart from the former being cast to a list).
The same holds for the output TaskParameter object, which will look like the output of the first wftask described above.

Last wftask

As should be clear by what is described above, the last wftask is not different from any intermediate wftask, because the output_dataset was already used in the very first task.

Notes and questions

  1. What kind of workflows are actually supported?
    If I'm getting it right, we only support a workflow such that:

    • The first task is somewhat flexible:
      • It can take N input paths (but it always has a single output path)
      • The input paths can obviously be different from the output path, meaning that the task has an input dataset A and an output dataset B.
    • Any other subsequent task must be a B->B task
  2. Why does it currently work, for typical image->zarr workflows?
    This use case perfectly matches what is described: the first task is the one that goes from the input (image) dataset to the output (zarr) dataset, and all the others keep using the output (zarr) dataset.

  3. Why does it currently work, for a zarr->zarr workflow?
    This use case always has the same (zarr) dataset both as an input and as an output, so that it's not affected by the current I/O definitions.

  4. Can we make a clear example of a workflow that would fail because of how we define I/O parameters?
    The following workflow is not supported:

    • Consider a single-path dataset (although adding the N-paths input dataset would make things even more complex)
    • Run some trivial pre-processing of images (e.g. with the compress_tif task - if it were available)
    • Run the typical Fractal workflow (create_ome_zarr and then yokogawa_to_ome_zarr).
  5. Where to move from here?

@tcompa tcompa added the High Priority Current Priorities & Blocking Issues label Jul 7, 2023
@tcompa
Copy link
Collaborator Author

tcompa commented Jul 7, 2023

Here's a rough scheme of how things are working now
Fractal Workflow

@tcompa
Copy link
Collaborator Author

tcompa commented Jul 7, 2023

Some other thoughts coming up:

  1. The current version does allow some kind of flexible workflow execution, as was implemented in Support execution of a workflow subset #783. That is, if we have a "supported" workflow, then it's easy to split it into two sub-workflows which are still supported - and executing one sub-workflow at a time is not a problem.
  2. I'm not saying that the current limited scope of supported workflows is the actual blocking factor for supporting use case 3 (namely restarting execution from a failed task - see Support execution of a workflow subset #783 or Workflow submission scenarios #261 (comment)). I'm rather saying that it doesn't sound right to build additional features on top of this restrictive structure. To elaborate further, what I have in mind is that if a task fails then we have to "cut" some connections in the diagram above, and store (where? that's another question) the information of what was the last task that went through. And for instance it would be very weird to handle differently the cases where the first task fails or an intermediate task fails.

@tcompa
Copy link
Collaborator Author

tcompa commented Jul 10, 2023

Slightly more precise version:

Fractal Workflow 2

@tcompa
Copy link
Collaborator Author

tcompa commented Mar 22, 2024

This issue in the current form is made obsolete by ongoing V2 work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flexibility Support more workflow-execution use cases High Priority Current Priorities & Blocking Issues
Projects
None yet
Development

No branches or pull requests

1 participant