Creating Dynamic Number of Tasks at DAG Runtime

There is one DAG that I wrote–and it’s one of the first DAGs that I ever put into production–that has tasks that are dynamically allocated. What I mean is that there are a number of very similar tasks that I need to perform as quickly as possible, thus they need to run in parallel.

The approaches I took with this pairing of tasks is very different though. One creates a dynamic number of tasks based on a previous task in the same DAG. The other uses a Variable so that I can control how many tasks run based on the server’s hardware.

Despite the different approaches, the implementation is virtually identical.

Dynamically Created Tasks

There is a single previous task that saves a file containing a list of directories from a remote server on the airflow server. Based on the number of directories returned, a dynamic number of tasks are created–one task per directory. These dynamically created tasks fetch a bunch of files from each of the directories (so these tasks are almost entirely network intensive).

directories = acquire_directories_from_file()
for index, directory in enumerate(directories, 1):
    extract_dirs = PythonOperator(
        task_id=f"extract_dir_{index:02}",
        python_callable=extract.fetch_remote_files,
        op_args=[directory],
        provide_context=True,
        on_failure_callback=send_slack_message_failed,
    )
    pre_extract >> extract_dirs >> post_extract

There is a finite number of active running tasks defined in airflow.cfg (I believe the default is 16 with the LocalExecutor). So while there might be more than that, the others will run (selected randomly as best I can tell) whenever there are less than 16 tasks running.

Variable Defined Tasks

On the other hand, I have tasks that process those downloaded files & load them into a database. This process is more local disk & CPU intensive, so the number of tasks is based on the number of CPUs.

chunks = int(Variable.get("chunks"))
for chunk in range(chunks):
    transform_chunks = PythonOperator(
        task_id=f"transform_chunk_{chunk + 1:02}",
        python_callable=transform.transform,
        op_args=[chunk],
        provide_context=True,
        on_failure_callback=send_slack_message_failed,
    )
    pre_transform >> transform_chunks >> post_transform

Again, virtually identical in structure, but the first one is based around the results of a previous step (& actually vary by a small number day in & day out), whereas the second one almost never changes but can be modified in a trivial manner if the need arises.

Update on 2019-Dec-3

There is an interesting artifact that can occur when starting a new DAG (& some other scenarios) where you are depending on a list for some dynamic tasks. However, the easiest example I can come up with this is using a list of files (whether created or downloaded in a previous task). If there are no files there (either because the DAG has never run before or those files are deleted by the previous DAG run), then you can use a placeholder to hold the DAG together until those (temporary) files are actually present.

from pathlib import Path

local_path = Path.home()

def get_temp_files():
    """Return a real list of file(s) or if not present, return a bogus file name.
    """
    return [f for f in local_path.glob("*.csv") if f.is_file()] or [
        local_path / "placeholder.csv"
    ]

for csv_file in get_temp_files():
    load_csv_file = PythonOperator(
        task_id=f"load_{csv_file}",
        python_callable=load_csv,
        op_args=[csv_file],
        provide_context=True,
    )

The key part in get_temp_files() is the or operator in that if the first part is True (meaning real files exist), then it stops evaluating any further & returns that list of files. However, if they are not present, then the second part is true which points to a non-existent file.

2 thoughts on “Creating Dynamic Number of Tasks at DAG Runtime

  1. B

    I’m trying to create something similar to your Dynamically Created Tasks example but all I end up with is the pre and post tasks when looking at my DAG in the airflow UI. I’m using the GoogleCloudStorageToBigQueryOperator so I don’t know if it’s the difference that you’re using a PythonOperator or not. Any help is appreciated, thanks!

    Reply
    1. lalligood Post author

      I don’t think it should matter what the Operator is being used. However, it sounds like you are running into the issue of the task not being created until right before it is needed. Check out the Update on 2019-Dec-3 addendum & see if that applies to you. The magic there is that if the first part of the if clause of the list comprehension evaluates as false (meaning real files aren’t there), then it will use a dummy file to “hold the DAG together” until the real files exist. Hope that helps & thank you for the comment!

      Reply

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.