While the ETL I am responsible for takes advantage of PostgreSQL’s foreign data wrappers to simplify (avoid?) extracting from one database into another, I was recently tasked with an interesting project to track (changes in) the schemas of the remote databases proving the source data. This meant reaching out to the information_schema
of those remote databases then compare it to the information_schema
of the data warehouse.
And in order to do that, I had to write a couple custom operators–one for extracting data from MySQL, and one for MS SQL Server. Thankfully, the end results are nearly identical other than the MySqlHook()
vs MsSqlHook()
references.
It’s worth mentioning that to handle some much heavier loads than what was needed for this project (read: futureproofing my code), both operators use a cursor & pull blocks of (up to by default) 5000 rows from the source database to insert into the destination. This attribute, rows_chunk
, is configurable in the DAG task, so that memory usage can by the operator can be kept to manageable levels.
MySqlToPostgresOperator Code
class MySqlToPostgresOperator(BaseOperator):
"""Selects data from a MySQL database and inserts that data into a
PostgreSQL database. Cursors are used to minimize memory usage for large
queries.
"""
template_fields = ("sql", "postgres_table", "params")
template_ext = (".sql",)
ui_color = "#944dff" # cool purple
@apply_defaults
def __init__(
self,
sql: str,
mysql_conn_id: str = "mysql_default",
postgres_table: str = "",
postgres_conn_id: str = "postgres_default",
params: Optional[Dict[str, Union[str, int]]] = None,
rows_chunk: int = 5000,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
if params is None:
params = {}
self.sql = sql
self.mysql_conn_id = mysql_conn_id
self.postgres_table = postgres_table
self.postgres_conn_id = postgres_conn_id
self.params = params
self.rows_chunk = rows_chunk
def execute(self, context):
"""Establish connections to both MySQL & PostgreSQL databases, open
cursor and begin processing query, loading chunks of rows into
PostgreSQL. Repeat loading chunks until all rows processed for query.
"""
source = MySqlHook(mysql_conn_id=self.mysql_conn_id)
target = PostgresHook(postgres_conn_id=self.postgres_conn_id)
with closing(source.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute(self.sql, self.params)
target_fields = [x[0] for x in cursor.description]
row_count = 0
rows = cursor.fetchmany(self.rows_chunk)
while len(rows) > 0:
row_count += len(rows)
target.insert_rows(
self.postgres_table,
rows,
target_fields=target_fields,
commit_every=self.rows_chunk,
)
rows = cursor.fetchmany(self.rows_chunk)
self.log.info(
f"{row_count} row(s) inserted into {self.postgres_table}."
)
MsSqlToPostgresOperator Code
Here are only the lines that are different:
class MsSqlToPostgresOperator(BaseOperator):
"""Selects data from a MS SQL database and inserts that data into a
PostgreSQL database. Cursors are used to minimize memory usage for large
queries.
"""
...
ui_color = "#0099e6" # aqua
@apply_defaults
def __init__(
...
mssql_conn_id: str = "mssql_default",
...
):
...
self.mssql_conn_id = mssql_conn_id
...
def execute(self, context):
...
source = MsSqlHook(mssql_conn_id=self.mssql_conn_id)
DAG Usage
Within the DAG file:
for name, schema, db_name in mysql_databases:
mysql_tables = MySqlToPostgresOperator(
task_id=f"{name}_tables",
sql="compare_database_schemas/mysql_tables.sql",
postgres_table="remote_tables",
postgres_conn_id=connection,
mysql_conn_id=f"mysql_{schema}",
params={"brand": name, "schema": schema, "db_name": db_name},
on_failure_callback=send_slack_message_failed,
)
for name, schema, db_name in mssql_brands:
mssql_tables = MsSqlToPostgresOperator(
task_id=f"{name}_tables",
sql="compare_database_schemas/mssql_tables.sql",
postgres_table="remote_tables",
postgres_conn_id=connection,
mssql_conn_id=f"mssql_{schema}",
params={"brand": name, "schema": schema, "db_name": db_name},
on_failure_callback=send_slack_message_failed,
)
With all of the data loaded into the local database, a later task then queries the local information_schema
& compares the results to what gets loaded into remote_tables
table.
Hi Ialligood,
How do you use for name, schema, db_name in mssql_brands: in the dag?
I don’t understand that syntax.
Ah, yes. I happened to strip this out of some working code & neglected to mention that I was creating several tasks with each custom operator where I needed to extract data from multiple tables. So the “mssql_brands” (& “mysql_databases”) is an iterable containing tuples containing the (table) name, (database) schema, & database name that the custom operators need to extract data from.
tl;dr: I should have either explained or, better yet, dropped the “for name, schema, db_name …” references in the code examples.
Sorry for the confusion.