Custom Airflow Operators for Loading Data Into PostgreSQL

While the ETL I am responsible for takes advantage of PostgreSQL’s foreign data wrappers to simplify (avoid?) extracting from one database into another, I was recently tasked with an interesting project to track (changes in) the schemas of the remote databases proving the source data. This meant reaching out to the information_schema of those remote databases then compare it to the information_schema of the data warehouse.

And in order to do that, I had to write a couple custom operators–one for extracting data from MySQL, and one for MS SQL Server. Thankfully, the end results are nearly identical other than the MySqlHook() vs MsSqlHook() references.

It’s worth mentioning that to handle some much heavier loads than what was needed for this project (read: futureproofing my code), both operators use a cursor & pull blocks of (up to by default) 5000 rows from the source database to insert into the destination. This attribute, rows_chunk, is configurable in the DAG task, so that memory usage can by the operator can be kept to manageable levels.

MySqlToPostgresOperator Code

class MySqlToPostgresOperator(BaseOperator):
    """Selects data from a MySQL database and inserts that data into a
    PostgreSQL database. Cursors are used to minimize memory usage for large
    queries.
    """

    template_fields = ("sql", "postgres_table", "params")
    template_ext = (".sql",)
    ui_color = "#944dff"  # cool purple

    @apply_defaults
    def __init__(
        self,
        sql: str,
        mysql_conn_id: str = "mysql_default",
        postgres_table: str = "",
        postgres_conn_id: str = "postgres_default",
        params: Optional[Dict[str, Union[str, int]]] = None,
        rows_chunk: int = 5000,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        if params is None:
            params = {}
        self.sql = sql
        self.mysql_conn_id = mysql_conn_id
        self.postgres_table = postgres_table
        self.postgres_conn_id = postgres_conn_id
        self.params = params
        self.rows_chunk = rows_chunk

    def execute(self, context):
        """Establish connections to both MySQL & PostgreSQL databases, open
        cursor and begin processing query, loading chunks of rows into
        PostgreSQL. Repeat loading chunks until all rows processed for query.
        """
        source = MySqlHook(mysql_conn_id=self.mysql_conn_id)
        target = PostgresHook(postgres_conn_id=self.postgres_conn_id)
        with closing(source.get_conn()) as conn:
            with closing(conn.cursor()) as cursor:
                cursor.execute(self.sql, self.params)
                target_fields = [x[0] for x in cursor.description]
                row_count = 0
                rows = cursor.fetchmany(self.rows_chunk)
                while len(rows) > 0:
                    row_count += len(rows)
                    target.insert_rows(
                        self.postgres_table,
                        rows,
                        target_fields=target_fields,
                        commit_every=self.rows_chunk,
                    )
                    rows = cursor.fetchmany(self.rows_chunk)
                self.log.info(
                    f"{row_count} row(s) inserted into {self.postgres_table}."
                )

MsSqlToPostgresOperator Code

Here are only the lines that are different:

class MsSqlToPostgresOperator(BaseOperator):
    """Selects data from a MS SQL database and inserts that data into a
    PostgreSQL database. Cursors are used to minimize memory usage for large
    queries.
    """

    ...
    ui_color = "#0099e6"  # aqua

    @apply_defaults
    def __init__(
        ...
        mssql_conn_id: str = "mssql_default",
        ...
    ):
        ...
        self.mssql_conn_id = mssql_conn_id
        ...

    def execute(self, context):
        ...
        source = MsSqlHook(mssql_conn_id=self.mssql_conn_id)

DAG Usage

Within the DAG file:

for name, schema, db_name in mysql_databases:
    mysql_tables = MySqlToPostgresOperator(
        task_id=f"{name}_tables",
        sql="compare_database_schemas/mysql_tables.sql",
        postgres_table="remote_tables",
        postgres_conn_id=connection,
        mysql_conn_id=f"mysql_{schema}",
        params={"brand": name, "schema": schema, "db_name": db_name},
        on_failure_callback=send_slack_message_failed,
    )

for name, schema, db_name in mssql_brands:
    mssql_tables = MsSqlToPostgresOperator(
        task_id=f"{name}_tables",
        sql="compare_database_schemas/mssql_tables.sql",
        postgres_table="remote_tables",
        postgres_conn_id=connection,
        mssql_conn_id=f"mssql_{schema}",
        params={"brand": name, "schema": schema, "db_name": db_name},
        on_failure_callback=send_slack_message_failed,
    )

With all of the data loaded into the local database, a later task then queries the local information_schema & compares the results to what gets loaded into remote_tables table.

2 thoughts on “Custom Airflow Operators for Loading Data Into PostgreSQL

  1. Anudeep

    Hi Ialligood,

    How do you use for name, schema, db_name in mssql_brands: in the dag?

    I don’t understand that syntax.

    Reply
  2. lalligood Post author

    Ah, yes. I happened to strip this out of some working code & neglected to mention that I was creating several tasks with each custom operator where I needed to extract data from multiple tables. So the “mssql_brands” (& “mysql_databases”) is an iterable containing tuples containing the (table) name, (database) schema, & database name that the custom operators need to extract data from.

    tl;dr: I should have either explained or, better yet, dropped the “for name, schema, db_name …” references in the code examples.

    Sorry for the confusion.

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.