Testing for logging Output from PythonOperator Functions

While generally my python functions called by a PythonOperator end with return True, sometimes I would like for them to emit some useful information (like a count of how many objects it finished working on, rows loaded into a table, etc.) to the airflow logs via logging.

The problem is “How do you test that a message was emitted to the airflow log?” Well, wonder no more!

pytest Built-In Fixture, caplog

(I’m going to save another pytest built-in fixture, tmp_path, for a future post…and context is my own fixture discussed in a previous post for simulating airflow’s context.)

import requests

def download_csv_file(target_dir: Path, **context) -> str:
    """Download CSV file from remote server.
    """
    task_date = datetime.strptime(context.get("ds"), "%Y-%m-%d")
    file_name = csv_file_name(task_date)
    local_file = Path(target_dir) / file_name
    url = f"https://website.com/extracts/{file_name}"
    csv_file = requests.get(url, stream=True)
    csv_file.raise_for_status()
    with open(local_file, "wb") as lf:
        lf.write(csv_file.content)
    logging.info(f"{local_file} downloaded successfully.")  # LOOK FOR THIS LINE!
    return True

def test_download_csv_file(caplog, tmp_path, context):
    """Make sure that a valid CSV file is downloaded from remote server.
    """
    test_date = date.today() - timedelta(1)
    file_date = test_date.strftime("%d-%m-%Y")
    file_name = f"remote_source_{file_date}.csv"
    test_file = tmp_path / file_name
    results = download_csv_file(tmp_path, **context)
    assert results
    assert f"{test_file}" in caplog.text  # Make sure logging message is emitted
    assert test_file.is_file()
    assert os.path.getsize(test_file) > 7_000_000  # File should be 7+MB

Definitely look over the documentation on the pytest website because there are all sorts of things you can check with caplog.

But here I simply look for a string in caplog.text, which in this case just happens to be a single object. If you want to capture multiple logged events, then you may have to iterate through a list to get to the message you want.

Using PostgresHook.copy_expert() method

Another day, another interesting data source to perform ETL on. This time, it’s a CSV file that’s downloaded from a remote server. Not going to focus on the part about using requests to download the file, but jumping right into using PostgresHook.copy_expert() to dump the CSV file into a staging table.

Even the actual code_export method code doesn’t offer much insight, so I had to resort to looking through the psycopg2 docs (scroll to bottom of the page) to find out how to use this method!

Here’s what makes it work: you provide the method with a SQL statement (starting with COPY …) but instead of passing the file name–which just so happens to also be the second parameter you pass to copy_expert()–you use STDIN with COPY FROM (or STDOUT with COPY TO)!

Pretty trivial in hindsight but it tripped me up for a few minutes as to why I don’t need to pass the file name twice (once as part of SQL statement & again as a parameter).

Getting Credit Card BIN/IIN Information with Python

There are free (& paid) services that allow you to submit API calls with partial credit card numbers to return information about the credit card, like type (Visa, MC, Amex), the issuing bank, the country & currency where the card was issued, etc. The problem is that the free services either limit what data is returned (only gives the card type in most cases) or throttles the number of requests allowed (no more than 10 per minute).

So with a paid service (at least the one I’m using), the API call requires sending you API key that they provide you with to get past the free limitations. To make matters a little more difficult, the service I am using didn’t have any python-specific documentation & it took a while for me to reverse-engineer their javascript code into something more python friendly.

However, before I even got to the part of using the API key, I thought “Surely there is a python library that would allow me to do this!” And the answer is yes, I found & used card_identifier…but it comes with one giant caveat: there’s no way to pass my API key.

Using card_identifier did make verifying that I was passing at least 6 digits & returning simple JSON-turned-dictionary pretty trivial for initial testing. And while that library was helpful, it really doesn’t (need to) do that much.

Replacing the card_identifier call with requests was again, easy to do. It was getting the API key formatted as a proper HTTP header that proved to be the only challenge.

The Code

It’s worth pointing out that my API key is being stored as an Airflow Variable so that I don’t have to store–and risk exposing–the API key in git and/or allow for the API key to be changed independently of the DAG code.

from airflow.models import Variable
from base64 import b64encode
import requests

binlist_url = "https://lookup.binlist.net/"
binlist_api_key = Variable.get("binlist_api_key")
binlist_auth = b64encode(str.encode(binlist_api_key + ":")).decode("ascii")
binlist_headers = {"Accept-Version": "3", "Authorization": f"Basic {binlist_auth}"}

def to_card_details(first_6_digits):
    """Submit API call to BINlist for card issuer information.
    """
    raw_html = requests.get(f"{binlist_url}{first_6_digits}", headers=binlist_headers)
    return raw_html.json()

What is completely not-obvious (at least not before figuring this out) is that the API key is passed as a username with a null password. First, the API key followed by a colon–to separate the username & passwords–are encoded as BASE64 then decoded at ASCII (since HTTP headers can only contain ASCII). The HTTP headers will always be the same, just the 6+ digits submitted in the URL will be different for each API call. Finally, the .json() method at the end means I get back a dictionary from the returned JSON to pull out the fields I want to use.

A Tale of Printing Two Euros

There are 2 different Airflow DAGs that I maintain which generate reports that are delivered via email. And in both reports, the financial information needs to be reported in both USD & EUR currency.

One of the reports involves running a SQL query, which loads the results into a pandas dataframe, then performs a few simple transformations before calling pandas’s to_html() method for generating the message body. I didn’t realize just how magical that pandas method could be…

…until I realized that the other report was only reporting in USD. And its dataset is a list of tuples. I thought I could just update parts of the code to use the Euro sign (€), but I was very wrong. I kept getting the following error:

UnicodeEncodeError: 'ascii' codec can't encode character u'\u20ac'

That sent me down a Stack Overflow rabbit hole of trying all sorts of things with .encode(‘utf-8’) tacked on to the end of a lot of different strings. And not a single one worked. I then realized that Airflow’s EmailOperator converts the html_content to ASCII. The Euro symbol does not exist in ASCII’s 256 characters.

And it was then I stumbled upon a result that showed how to display the Euro sign in various languages. Most of them were some kind of variation of the Unicode value, except for the last one:

HTML: €

Problem solved.

Navigating Past Seemingly Unrecoverable Tables with pg_restore

We happen to run our ETL processes only once per day. So that allows us to simply use pg_dump for our database backups. But there’s the old adage “Your backups are only as good as your last restore.” So I do a restore of the production server backup on the development server at least every 45 days.

However, with some recent development work involving extracting data from MS SQL Server databases, I have grappled with some interesting encoding issues. So I should have expected problems when doing my most recent test restore…

While I have self-authored bash scripts for doing backups & restores, I had to do a simple yet significant change to ensure that I can complete a restore.

First, excluding tables from being backed up with pg_dump can be done with -T / --exclude-table option, however, using it may introduce some interesting–& undesired–side effects above & beyond not backing up data that you may want to restore. My approach therefore is to backup everything & figure out the problems when restoring.

Note: Of course I’m going to make the huge assumption that you have already verified that your dump file was created successfully without any errors.

Determining What You Actually Want to Restore

It all starts with generating a list of objects contained in a dump file.

pg_restore --list ${basepath}/${backupfile} > ${basepath}/recovery.lst

Open recovery.lst in your editor of choice to view & make sure that the list is readable. Assuming that’s the case, let’s find a table to not restore. But before we go removing lines from our recovery list file, it is worth pointing out that there are at least 3 lines per table–one to create the table, one for permissions for the table, & one for loading the data into the table. Those 3 lines look like this (but almost certainly will not be found consecutively in the recovery list):

647; 1259 37177 TABLE public stage_subscription_orders bi
14971; 0 0 ACL public TABLE stage_subscription_orders bi
...
13615; 0 37177 TABLE DATA public stage_subscription_orders bi

It’s well worth pointing out here: to avoid getting into a death spiral of being unable to restore other objects (namely views) that depend on the undesired table, you really will only want to remove the third line–the one for restoring the data–but keep the table creation & permissions lines.

Editing the Recovery List File and Performing the Restore

Back in the editor of your choice, locate & remove the ... TABLE DATA ... line, save changes, and exit. Then use the -L / --use-list option for explicitly telling pg_restore what objects to restore:

pg_restore --dbname=postgres --exit-on-error \
        --use-list=${basepath}/recovery.lst \
        --verbose ${basepath}/${backupfile}

And if you don’t manage to exclude all of the necessary items the first go round, simply re-edit recovery.lst & re-run pg_restore until you get a successful restore.

Custom Airflow Operators for Loading Data Into PostgreSQL

While the ETL I am responsible for takes advantage of PostgreSQL’s foreign data wrappers to simplify (avoid?) extracting from one database into another, I was recently tasked with an interesting project to track (changes in) the schemas of the remote databases proving the source data. This meant reaching out to the information_schema of those remote databases then compare it to the information_schema of the data warehouse.

And in order to do that, I had to write a couple custom operators–one for extracting data from MySQL, and one for MS SQL Server. Thankfully, the end results are nearly identical other than the MySqlHook() vs MsSqlHook() references.

It’s worth mentioning that to handle some much heavier loads than what was needed for this project (read: futureproofing my code), both operators use a cursor & pull blocks of (up to by default) 5000 rows from the source database to insert into the destination. This attribute, rows_chunk, is configurable in the DAG task, so that memory usage can by the operator can be kept to manageable levels.

MySqlToPostgresOperator Code

class MySqlToPostgresOperator(BaseOperator):
    """Selects data from a MySQL database and inserts that data into a
    PostgreSQL database. Cursors are used to minimize memory usage for large
    queries.
    """

    template_fields = ("sql", "postgres_table", "params")
    template_ext = (".sql",)
    ui_color = "#944dff"  # cool purple

    @apply_defaults
    def __init__(
        self,
        sql: str,
        mysql_conn_id: str = "mysql_default",
        postgres_table: str = "",
        postgres_conn_id: str = "postgres_default",
        params: Optional[Dict[str, Union[str, int]]] = None,
        rows_chunk: int = 5000,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        if params is None:
            params = {}
        self.sql = sql
        self.mysql_conn_id = mysql_conn_id
        self.postgres_table = postgres_table
        self.postgres_conn_id = postgres_conn_id
        self.params = params
        self.rows_chunk = rows_chunk

    def execute(self, context):
        """Establish connections to both MySQL & PostgreSQL databases, open
        cursor and begin processing query, loading chunks of rows into
        PostgreSQL. Repeat loading chunks until all rows processed for query.
        """
        source = MySqlHook(mysql_conn_id=self.mysql_conn_id)
        target = PostgresHook(postgres_conn_id=self.postgres_conn_id)
        with closing(source.get_conn()) as conn:
            with closing(conn.cursor()) as cursor:
                cursor.execute(self.sql, self.params)
                target_fields = [x[0] for x in cursor.description]
                row_count = 0
                rows = cursor.fetchmany(self.rows_chunk)
                while len(rows) > 0:
                    row_count += len(rows)
                    target.insert_rows(
                        self.postgres_table,
                        rows,
                        target_fields=target_fields,
                        commit_every=self.rows_chunk,
                    )
                    rows = cursor.fetchmany(self.rows_chunk)
                self.log.info(
                    f"{row_count} row(s) inserted into {self.postgres_table}."
                )

MsSqlToPostgresOperator Code

Here are only the lines that are different:

class MsSqlToPostgresOperator(BaseOperator):
    """Selects data from a MS SQL database and inserts that data into a
    PostgreSQL database. Cursors are used to minimize memory usage for large
    queries.
    """

    ...
    ui_color = "#0099e6"  # aqua

    @apply_defaults
    def __init__(
        ...
        mssql_conn_id: str = "mssql_default",
        ...
    ):
        ...
        self.mssql_conn_id = mssql_conn_id
        ...

    def execute(self, context):
        ...
        source = MsSqlHook(mssql_conn_id=self.mssql_conn_id)

DAG Usage

Within the DAG file:

for name, schema, db_name in mysql_databases:
    mysql_tables = MySqlToPostgresOperator(
        task_id=f"{name}_tables",
        sql="compare_database_schemas/mysql_tables.sql",
        postgres_table="remote_tables",
        postgres_conn_id=connection,
        mysql_conn_id=f"mysql_{schema}",
        params={"brand": name, "schema": schema, "db_name": db_name},
        on_failure_callback=send_slack_message_failed,
    )

for name, schema, db_name in mssql_brands:
    mssql_tables = MsSqlToPostgresOperator(
        task_id=f"{name}_tables",
        sql="compare_database_schemas/mssql_tables.sql",
        postgres_table="remote_tables",
        postgres_conn_id=connection,
        mssql_conn_id=f"mssql_{schema}",
        params={"brand": name, "schema": schema, "db_name": db_name},
        on_failure_callback=send_slack_message_failed,
    )

With all of the data loaded into the local database, a later task then queries the local information_schema & compares the results to what gets loaded into remote_tables table.

Testing SqlSensor & dealing with str.startswith

This was one of those “Of course multiple values can be passed/checked!”-type situations.

I’ve written a couple new DAGs which use SqlSensor, and I want to use my existing test, test_database_operators_have_sql(), to make sure that I am passing a SQL statement to the SqlSensor task.

Here’s how the test originally looked testing for just the PostgresOperator:

from airflow.operators.postgres_operator import PostgresOperator

@mark.parametrize(
    "db_operator",
    [x for x in dag.tasks if isinstance(x, PostgresOperator)],
    ids=[x for x, y in zip(dag.task_ids, dag.tasks) if isinstance(y, PostgresOperator)],
)
def test_database_operators_have_sql(db_operator):
    """For all PostgresOperator task(s), verify that sql attribute returns
    non-empty value.
    """
    assert db_operator.sql.startswith("INSERT") or db_operator.sql.startswith(
        "TRUNCATE"
    )

You can check for more than one class with isinstance() by passing a tuple of classes for the second argument. And not only do I want to check for SqlSensor tasks, I need to make sure that the SQL query may start with SELECT (as well as the other 2 strings), which means modifying the assert statement. Some research turned up that you can also pass a tuple to str.startswith() to check for multiple strings! Implementing those requirements now gives us this:

from airflow.operators.postgres_operator import PostgresOperator
from airflow.sensors.sql_sensor import SqlSensor

@mark.parametrize(
    "db_operator",
    [x for x in dag.tasks if isinstance(x, (PostgresOperator, SqlSensor))],
    ids=[
        x
        for x, y in zip(dag.task_ids, dag.tasks)
        if isinstance(y, (PostgresOperator, SqlSensor))
    ],
)
def test_database_operators_have_sql(db_operator):
    """For all tasks involving a SQL query, verify that sql attribute returns
    non-empty value.
    """
    assert db_operator.sql.startswith(("INSERT", "SELECT", "TRUNCATE"))

Ah, simplicity!

Bonus lesson learned:

The first time I ran the modified test, I had accidentally omitted SqlSensor in the second argument (first list comprehension) of @mark.parametrize()… When I ran the test (worth noting that the DAG does not have any PostgresOperator tasks in it), it was being skipped! So if there are no tasks that meet the criteria, pytest automatically marks the test as skipped. Nice!

Creating Dynamic Number of Tasks at DAG Runtime

There is one DAG that I wrote–and it’s one of the first DAGs that I ever put into production–that has tasks that are dynamically allocated. What I mean is that there are a number of very similar tasks that I need to perform as quickly as possible, thus they need to run in parallel.

The approaches I took with this pairing of tasks is very different though. One creates a dynamic number of tasks based on a previous task in the same DAG. The other uses a Variable so that I can control how many tasks run based on the server’s hardware.

Despite the different approaches, the implementation is virtually identical.

Dynamically Created Tasks

There is a single previous task that saves a file containing a list of directories from a remote server on the airflow server. Based on the number of directories returned, a dynamic number of tasks are created–one task per directory. These dynamically created tasks fetch a bunch of files from each of the directories (so these tasks are almost entirely network intensive).

directories = acquire_directories_from_file()
for index, directory in enumerate(directories, 1):
    extract_dirs = PythonOperator(
        task_id=f"extract_dir_{index:02}",
        python_callable=extract.fetch_remote_files,
        op_args=[directory],
        provide_context=True,
        on_failure_callback=send_slack_message_failed,
    )
    pre_extract >> extract_dirs >> post_extract

There is a finite number of active running tasks defined in airflow.cfg (I believe the default is 16 with the LocalExecutor). So while there might be more than that, the others will run (selected randomly as best I can tell) whenever there are less than 16 tasks running.

Variable Defined Tasks

On the other hand, I have tasks that process those downloaded files & load them into a database. This process is more local disk & CPU intensive, so the number of tasks is based on the number of CPUs.

chunks = int(Variable.get("chunks"))
for chunk in range(chunks):
    transform_chunks = PythonOperator(
        task_id=f"transform_chunk_{chunk + 1:02}",
        python_callable=transform.transform,
        op_args=[chunk],
        provide_context=True,
        on_failure_callback=send_slack_message_failed,
    )
    pre_transform >> transform_chunks >> post_transform

Again, virtually identical in structure, but the first one is based around the results of a previous step (& actually vary by a small number day in & day out), whereas the second one almost never changes but can be modified in a trivial manner if the need arises.

Update on 2019-Dec-3

There is an interesting artifact that can occur when starting a new DAG (& some other scenarios) where you are depending on a list for some dynamic tasks. However, the easiest example I can come up with this is using a list of files (whether created or downloaded in a previous task). If there are no files there (either because the DAG has never run before or those files are deleted by the previous DAG run), then you can use a placeholder to hold the DAG together until those (temporary) files are actually present.

from pathlib import Path

local_path = Path.home()

def get_temp_files():
    """Return a real list of file(s) or if not present, return a bogus file name.
    """
    return [f for f in local_path.glob("*.csv") if f.is_file()] or [
        local_path / "placeholder.csv"
    ]

for csv_file in get_temp_files():
    load_csv_file = PythonOperator(
        task_id=f"load_{csv_file}",
        python_callable=load_csv,
        op_args=[csv_file],
        provide_context=True,
    )

The key part in get_temp_files() is the or operator in that if the first part is True (meaning real files exist), then it stops evaluating any further & returns that list of files. However, if they are not present, then the second part is true which points to a non-existent file.

Using pytest.fixture to Test Airflow Context

I’ve been wanting to figure out a way that will allow me to test tasks that reference Airflow context (via provide_context=True), such as BranchPythonOperator orPythonOperator. What I’ve come up with is a fixture that returns a dictionary. (In my example below, I perhaps went a little overboard with configuring it to return all of the various date-related values instead of a statically defined date, DAG name, & task name.)

There are 2 key components to creating a working context kwargs dictionary for your tests:

  1. Create a context fixture that dynamically populates many key values in the context dictionary.
  2. Use @mark.parametrize with indirect=True to allow dynamically assigned dates, DAG name, & task name. (It could easily be enhanced to include the task operator, too.) Then call the context fixture inside the test function as **context.

Create context Fixture

Configuring the fixture to accept parameters is easy, but it requires using the very benign sounding request object. (So don’t go thinking that you can call the parameter whatever you’d like… And you can only pass one object, so if you want to have more than one–as you’ll see below–you’ll need to “wrap” multiple values inside of a tuple.)

from datetime import datetime, timedelta
from pytest import fixture, mark

@fixture
def context(request):
    """Given a string in YYYY-MM-DD format, DAG name, & task name, fabricate an airflow
    context.
    """
    test_date, dag_name, task_name = request.param  # <== Unpacking all parameters
    today = datetime.strptime(test_date, "%Y-%m-%d")
    yesterday = today - timedelta(1)
    tomorrow = today + timedelta(1)
    ds = (today).strftime("%Y-%m-%d")
    ds_nodash = (today).strftime("%Y%m%d")
    yesterday_ds = (yesterday).strftime("%Y-%m-%d")
    yesterday_ds_nodash = (yesterday).strftime("%Y%m%d")
    tomorrow_ds = (tomorrow).strftime("%Y-%m-%d")
    tomorrow_ds_nodash = (tomorrow).strftime("%Y%m%d")
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    now_t = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
    return {
        "dag": f"<DAG: {dag_name}>",
        "ds": ds,
        "ds_nodash": ds_nodash,
        "ts": f"{ds}T00:00:00",
        "ts_nodash": f"{ds_nodash}T000000",
        "yesterday_ds": yesterday_ds,
        "yesterday_ds_nodash": yesterday_ds_nodash,
        "tomorrow_ds": tomorrow_ds,
        "tomorrow_ds_nodash": tomorrow_ds_nodash,
        "END_DATE": ds,
        "end_date": ds,
        "dag_run": f"<DagRun {task_name} @ {now}: scheduled__{now_t}, externally triggered: False>",
        "run_id": f"scheduled__{ds}T00:00:00",
        "execution_date": today,
        "prev_execution_date": yesterday,
        "next_execution_date": tomorrow,
        "latest_date": ds,
        "macros": "<module 'airflow.macros' from '/anaconda3/lib/python3.6/site-packages/airflow/macros/__init__.py'>",
        "params": {},
        "tables": None,
        "task": f"<Task(BranchPythonOperator): {task_name}>",
        "task_instance": f"<TaskInstance: {dag_name}.verify_error_existence {now} [running]>",
        "ti": f"<TaskInstance: {dag_name}.verify_error_existence {now} [running]>",
        "task_instance_key_str": f"{dag_name}__{task_name}__{ds_nodash}",
        "conf": "<module 'airflow.configuration' from '/anaconda3/lib/python3.6/site-packages/airflow/configuration.py'>",
        "test_mode": False,
        "var": {"value": None, "json": None},
        "templates_dict": None,
    }

And I might as well show the actual function under test–and it is being called by a BranchPythonOperator, which means that needs to return (at least) 2 different strings (aka task_ids).

def verify_error_existence(**context):
    """Get log file names from database and collect error details. If no errors are found,
    send Slack message. Otherwise format errors to send both email & Slack messages
    containing error details.
    """
    task_date = context.get("ds")
    rows_found = get_log_file_names(task_date)
    error_details = collect_results(rows_found)
    if len(error_details) == 0:
        return "send_all_clear_slack_message"
    else:
        return "format_error_details"

Use indirect=True Within @mark.parametrize to Pass Values to context Fixture and Call

The magic bits of indirect=True are that the parameters being passed will NOT be used in the test function, but passed upstream to the fixture!

@mark.parametrize(
    "context",
    [("2019-07-01", "legacy_ETL_verify", "verify_error_existence")], # <== Tuple to pass to fixture
    ids=["date_without_any_errors"],
    indirect=True,  # <== Pass the tuple upstream
)
def test_verify_error_existence_all_clear(context):
    results = my_function(**context)  # <== call as kwargs
    assert results == "send_all_clear_slack_message"

It’s worth noting that if I just wanted to pass any non-specific date, I could just pass datetime.date.today().strftime("%Y-%m-%d") as the first value in the tuple.

The Mess That Is PostgreSQL Foreign Data Wrappers, tds_fdw, & diacritic characters

This post may seem off-topic, however, after spending considerable time over the past week grappling with getting data containing text with diacritic remarks–in my case, German names & email addresses that contain ß, ä, ö, and/or ü–capturing some notes about the issue would be worth it in case I ever need to deal with this again!

I’m not going to do a deep dive into Foreign Data Wrappers (FDWs), but I will say that they are perhaps my favorite feature of PostgreSQL. And when combined with Airflow, FDWs have allowed me to keep many of my DAGs simple by just using PostgresHook & PostgresOperator for all SQL interactions, rather than having to deal with other database operators and/or write my own custom operators to move data from other databases into PostgreSQL.

So the messy part of this is that MS SQL–at least the SQL Server databases that I have to interact with–don’t (appear to) do UTF-8 encoding. That means that instead of being able to assume that some text fields, like name & email, if they happen to contain characters with the aforementioned diacritic marks, almost certainly are not going to be inserted properly into PostgreSQL as long as those columns in the foreign table have a data type of text or character varying (varchar).

The errors seen in psql were similar to:

[Code: 0, SQL State: 22021] ERROR: invalid byte sequence for encoding "UTF8": 0xe9 0x73 0x40

With the 3 hex values depending on the character to be rendered.

There were 3 things I did to resolve this issue:

  1. Exclude the table from IMPORT FOREIGN SCHEMA statement.
  2. Manually CREATE FOREIGN TABLE and set the data type for the name & email columns to BYTEA (binary format).
  3. Modify any queries in my DAG(s) to use CONVERT_FROM() string function for name & email columns.

Exclude (or Limit) Tables When Importing Foreign Schema

IMPORT FOREIGN SCHEMA dbo
    EXCEPT (customer)
    FROM SERVER mssql INTO mssrvr;

This command will import ALL tables found in the remote database schema into my local schema, called mssrvr. However, that could be overkill as you may import far more tables than you really need to. So you can limit the import to be only a specified list of tables:

IMPORT FOREIGN SCHEMA dbo
    LIMIT TO (products, transactions)
    FROM SERVER mssql INTO mssrvr;

Either way, you want to make sure that the customer table is not imported!

Manually Create Foreign Table

There’s nothing revolutionary here, just replace the data type for the necessary column(s).

CREATE FOREIGN TABLE customer (
    id integer NOT NULL
    , name bytea   -- Changed from text to bytea
    , email bytea  -- Changed from text to bytea
    , city text
    , state text
    , country text
    , username text
    , created timestamp without time zone
    , ...
)
    SERVER mssql
    OPTIONS (schema_name 'dbo', table_name 'customer');

Modify SQL Query to Convert Fields

Simply casting a bytea object to text isn’t going to work here. We need to actually convert it with the convert_from() function. MS SQL (or at least older versions of it) default to LATIN-1 encoding.

INSERT INTO customer_extract (
    id, name, email, city, state, country, username, created, ...)
(
    SELECT
        id
        , convert_from(name, 'latin-1') AS name
        , convert_from(email, 'latin-1') AS email
        , city
        , state
        , country
        , username
        , created
        , ...
    FROM mssrvr.customer
);

And that does it. However, a word of warning: convert_from() still doesn’t like all values as I can only retrieve data on the remote database that were created from mid-2013 on. I might investigate further, and if I figure it out, I’ll update this post.