Get all rows for a failed custom SQL UnexpectedRowsExpectation

Some of you have been asking for ways to fetch all rows from a custom SQL UnexpectedRowsExpectation. We currently limit the number of rows in the result object to 200 to optimize performance. We do realize that it may be useful to fetch all rows, and while we think about how best to incorporate this functionality into GX, here’s a script you can use to do this.

Step 1: The imports

First, let’s get all the imports out of the way:

from sqlalchemy import text
from typing import NamedTuple
import os

import great_expectations as gx
from great_expectations import ValidationDefinition
from great_expectations.checkpoint import CheckpointResult
from great_expectations.core import ExpectationSuiteValidationResult

Step 2: Getting the SQL query

Next, there are a few helper methods we’ll write to keep the code organized.

Expectation configurations, stored in Validation Results, include the SQL query for each UnexpectedRowsExpectation. We’ll use this query to fetch unexpected rows.

We need a way to identify which Expectation we’re interested in, so we’ll use the description as a human-friendly identifier.

def get_status_and_query(res: ExpectationSuiteValidationResult, description: str) -> (bool, str):
    """get_status_and_query will get the success status and query text for an UnexpectedRowsExpectation matching the description
    Args:
        res: the result for a single validation
        description: query description prefix which uniquely identifies the query
    Returns:
        tuple indicating whether the Expectation was successful or not as well as the query text
    """
    for result in res.results:
        if (result.expectation_config.description and
                result.expectation_config.description.startswith(description) and
                result.expectation_config.type == "unexpected_rows_expectation"
        ):
            return result.success, result.expectation_config.kwargs["unexpected_rows_query"]
    raise RuntimeError("unable to find Expectation matching description")

Step 3: Getting Validation results

Each Checkpoint may consist of one or more Validation Definitions. The CheckpointResult object will contain a dictionary with an ExpectationSuiteValidationResult corresponding to each of the Validation Definitions. The following method will allow us to get the results for the Validation Definition that we care about.

def get_validation_result(validation: ValidationDefinition, res: CheckpointResult) -> ExpectationSuiteValidationResult:
    """get_validation_result gets a Validation Result for a single Validation Definition from Checkpoint results
    Note that in most cases we expect there to be only one Validation Definition per Checkpoint
    Args:
        validation: the Validation Definition for which we want the result
        res: Checkpoint Result
    Returns:
        ExpectationSuiteValidationResult representing results for the specified Validation Definition
    """
    validation_result_ids = res.run_results.keys()
    for validation_result_id in validation_result_ids:
        if (validation_result_id.expectation_suite_identifier.name == validation.suite.name and
                validation.asset.name in str(validation_result_id.batch_identifier)
        ):
            return res.run_results[validation_result_id]
    raise RuntimeError("unable to find ExpectationSuiteValidationResult matching Validation Definition")

Step 4: Getting the records

In the final method, we’ll fetch unexpected rows by utilizing the previous two methods and the connection GX Cloud creates to your Data Sources.

Note how we substitute the {batch} placeholder with the fully qualified (i.e., schema + table) name for your Data Asset. If you’re not utilizing {batch} in your UnexpectedRowsExpectation, this won’t matter. If you are, then you should know that this will result in fetching all unexpected rows in the entire table, not just ones in the most recent batch. We’ve found that this is sufficient in most situations, but if not, let us know in the comments below.

def get_all_unexpected_records(
        validation: ValidationDefinition, res: CheckpointResult, description: str) -> list[NamedTuple]:
    """get_all_unexpected_records gets all unexpected records for an unexpected rows Expectation matching the
    description

    Args:
        validation: the Validation Definition containing the Expectation
        res: the output of checkpoint.run
        description: a prefix of the description of the Expectation for which unexpected records should be returned

    Returns:
        a list of NamedTuple containing the unexpected records
    """

    validation_result = get_validation_result(validation, res)
    asset_name = validation.asset.qualified_name

    success, query = get_status_and_query(res=validation_result, description=description)

    # no need to execute the query if expectation succeeded
    if success:
        return []

    # if the query contains the '{batch}' placeholder, we'll replace it with the table
    query = query.format(batch=asset_name)
    data_source = validation.data_source
    engine = data_source.get_engine()

    with engine.connect() as conn:
        rows = conn.execute(text(query)).fetchall()

    return rows

Step 5: Putting it all together

Finally, let’s run a Checkpoint and get all unexpected rows.

## Code specific to running the checkpoint and getting all results
os.environ["GX_CLOUD_ORGANIZATION_ID"] = "<YOUR ORG ID>"
os.environ["GX_CLOUD_ACCESS_TOKEN"] = "<YOUR ACCESS TOKEN>"

context = gx.get_context()
# you can generate this snippet via our UI
checkpoint = context.checkpoints.get("<YOUR CHECKPOINT>")

res = checkpoint.run()

# assuming there is only one validation in the Checkpoint
validation = checkpoint.validation_definitions[0]

# When creating an UnexpectedRowsExpectation, you provide it with a description. Passing the full text or prefix of
# this description to the method below will allow us to select which query to execute
print(get_all_unexpected_records(validation=validation, res=res, description="<PREFIX FROM DESCRIPTION>"))

Hope you’ve found this guide helpful, and let us know what you think in the comments.