FEATURE: TaskFlow / Decorated Task #96
Labels
documentation
Improvements or additions to documentation
enhancement
New feature or request
help wanted
Extra attention is needed
Feature description and background
Currently, the Kubernetes operator supports generating XCom output through job output. But as for receiving XCom input, it only provides the most basic approach through argument templating and usage of
TaskInstance.xcom.pull()
within the template.Fighting with Jinja templating can be cumbersome sometimes, especially when accessing XCom output from multiple upstream tasks. For instance, when running a Kubernetes job with dynamic arguments (
arguments
argument) that relies on several upstream tasks to be determined, one need to build a Jinja template that outputs the argument list, and remember to enablerender_template_as_native_obj
flag for the DAG it's running.Given Airflow has introduced concept of
TaskFlow
and been promoting usage of decorated tasks since Airflow 2.0's release, adopting the new paradigm and adding decorated form ofKubernetesJobOperator
(e.g.@task.kubernetes_job
) can be helpful, and use cases like the situation described above can be done more smoothly by passing upstream task instances to the decorated task function directly.Proposed Solution
For task operator that only runs Kubernetes jobs, it may be difficult to make Implementation similar to
@task.kubernetes
(a wrapper of Airflow'sKuberetesPodOperator
) and move task program logics into the decorated function. However, I think it's feasible to make the decorated function return arguments for theKubernetesJobOperator
instead. The dynamically generated arguments can then integrated/merged with theKubernetesJobOperator
's defaults and arguments passed to decorator header.For example, supposed we define a decorated task function like this:
When a list
["-n", "-f", "source_data.csv"]
gets passed tomy_job
and creates a task instance within a DAG:then the created task is equivalent to:
When task output gets passed to
options
argument, then value ofarguments
will depend on value of the output.Example DAG implementation before introducing feature
Say we're having an container image owning a program that transfers data from several sources to the destination at once, where the image's entry point accepts several optional arguments to customize this program:
--sources
option to only pull data from specified entries.--enable-high-loading-mode
and--disable-high-loading-mode
flags to decide whether the program should run in special mode to endure high volume loading--bucket-name
option to specify location to place intermediate data dumpsNow we want to design a DAG pipeline that creates a Kubernetes job and runs the image when triggered, and we decide to collect argument information for the image by setting up three upstream tasks separately:
--sources
option).--enable-high-loading-mode
should be turned on.--bucket-name
.To pass information from these upstream tasks into the final
KubernetesJobOperator
task directly, one have to create Jinja template that generates thearguments
parameter, and introduce outputs from upstream tasks withTaskInstance.xcom_pull()
calls within the template:As an alternative, one can insert additional task in between to organize upstream outputs into single argument list to avoid templating:
Example DAG implementation after introducing feature
The sample code below tries to replicate the example above with the proposed approach. Task ID and image name are assumed to be "fixed" and passed to the decorator header, while
args
field is dynamically generated within the decorated function:The text was updated successfully, but these errors were encountered: