Another Way to Dynamically Create New Tasks: Factory Methods

New project, new challenge. This time I wanted to create very similar tasks (in this case to drop indexes), however, I didn’t want to be locked in with doing those similar tasks at the same time (read: in parallel with help from an iterable).

So while I will not even pretend to be the originator of this idea (I know I’ve seen this style of creating tasks in a couple Medium articles that I cannot seem to find links for now), I finally had a need where it made sense to give it a try…

Really there is very little to do. The one caveat is that the factory function must be inside the DAG definition–it throws up a “I was expecting an Operator class but got a function instead”-type error…(presumably because it would have been missing all of the hidden kwargs that are assigned to a task Operator.)

In this particular case of needing to drop some indexes, I simply pass the name of the indexed column when I want to instantiate a PostgresOperator task & assign it a unique variable name for each instance.

with DAG(
    dag_id="my_new_dag",
    default_args=default_args,
    schedule_interval="@daily",
) as dag:

    def drop_index_factory(column: str) -> PostgresOperator:
        """Instantiate drop index task(s) when needed.
        """
        return PostgresOperator(
            task_id=f"drop_{column}_index",
            postgres_conn_id=staging_connection,
            sql=f"DROP INDEX IF EXISTS staging_table__{column};",
            on_failure_callback=send_slack_message_failed,
        )

    drop_fizz_index = drop_index_factory("fizz")

    [more tasks...]

    drop_buzz_index = drop_index_factory("buzz")
    drop_fizz_index >> OTHER_TASKS >> drop_buzz_index

So while this, in the right use cases is extremely effective, it does make for a “muddy” DAG definition–particularly if you wanted to do this for multiple actions, like adding another factory method to create an index! The big gain here is it makes your DAG code DRY (Don’t Repeat Yourself).

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.