Some of you have been asking for ways to fetch all rows from a custom SQL UnexpectedRowsExpectation
. We currently limit the number of rows in the result object to 200 to optimize performance. We do realize that it may be useful to fetch all rows, and while we think about how best to incorporate this functionality into GX, here’s a script you can use to do this.
Step 1: The imports
First, let’s get all the imports out of the way:
from sqlalchemy import text
from typing import NamedTuple
import os
import great_expectations as gx
from great_expectations import ValidationDefinition
from great_expectations.checkpoint import CheckpointResult
from great_expectations.core import ExpectationSuiteValidationResult
Step 2: Getting the SQL query
Next, there are a few helper methods we’ll write to keep the code organized.
Expectation configurations, stored in Validation Results, include the SQL query for each UnexpectedRowsExpectation. We’ll use this query to fetch unexpected rows.
We need a way to identify which Expectation we’re interested in, so we’ll use the description as a human-friendly identifier.
def get_status_and_query(res: ExpectationSuiteValidationResult, description: str) -> (bool, str):
"""get_status_and_query will get the success status and query text for an UnexpectedRowsExpectation matching the description
Args:
res: the result for a single validation
description: query description prefix which uniquely identifies the query
Returns:
tuple indicating whether the Expectation was successful or not as well as the query text
"""
for result in res.results:
if (result.expectation_config.description and
result.expectation_config.description.startswith(description) and
result.expectation_config.type == "unexpected_rows_expectation"
):
return result.success, result.expectation_config.kwargs["unexpected_rows_query"]
raise RuntimeError("unable to find Expectation matching description")
Step 3: Getting Validation results
Each Checkpoint may consist of one or more Validation Definitions. The CheckpointResult object will contain a dictionary with an ExpectationSuiteValidationResult corresponding to each of the Validation Definitions. The following method will allow us to get the results for the Validation Definition that we care about.
def get_validation_result(validation: ValidationDefinition, res: CheckpointResult) -> ExpectationSuiteValidationResult:
"""get_validation_result gets a Validation Result for a single Validation Definition from Checkpoint results
Note that in most cases we expect there to be only one Validation Definition per Checkpoint
Args:
validation: the Validation Definition for which we want the result
res: Checkpoint Result
Returns:
ExpectationSuiteValidationResult representing results for the specified Validation Definition
"""
validation_result_ids = res.run_results.keys()
for validation_result_id in validation_result_ids:
if (validation_result_id.expectation_suite_identifier.name == validation.suite.name and
validation.asset.name in str(validation_result_id.batch_identifier)
):
return res.run_results[validation_result_id]
raise RuntimeError("unable to find ExpectationSuiteValidationResult matching Validation Definition")
Step 4: Getting the records
In the final method, we’ll fetch unexpected rows by utilizing the previous two methods and the connection GX Cloud creates to your Data Sources.
Note how we substitute the {batch} placeholder with the fully qualified (i.e., schema + table) name for your Data Asset. If you’re not utilizing {batch} in your UnexpectedRowsExpectation, this won’t matter. If you are, then you should know that this will result in fetching all unexpected rows in the entire table, not just ones in the most recent batch. We’ve found that this is sufficient in most situations, but if not, let us know in the comments below.
def get_all_unexpected_records(
validation: ValidationDefinition, res: CheckpointResult, description: str) -> list[NamedTuple]:
"""get_all_unexpected_records gets all unexpected records for an unexpected rows Expectation matching the
description
Args:
validation: the Validation Definition containing the Expectation
res: the output of checkpoint.run
description: a prefix of the description of the Expectation for which unexpected records should be returned
Returns:
a list of NamedTuple containing the unexpected records
"""
validation_result = get_validation_result(validation, res)
asset_name = validation.asset.qualified_name
success, query = get_status_and_query(res=validation_result, description=description)
# no need to execute the query if expectation succeeded
if success:
return []
# if the query contains the '{batch}' placeholder, we'll replace it with the table
query = query.format(batch=asset_name)
data_source = validation.data_source
engine = data_source.get_engine()
with engine.connect() as conn:
rows = conn.execute(text(query)).fetchall()
return rows
Step 5: Putting it all together
Finally, let’s run a Checkpoint and get all unexpected rows.
## Code specific to running the checkpoint and getting all results
os.environ["GX_CLOUD_ORGANIZATION_ID"] = "<YOUR ORG ID>"
os.environ["GX_CLOUD_ACCESS_TOKEN"] = "<YOUR ACCESS TOKEN>"
context = gx.get_context()
# you can generate this snippet via our UI
checkpoint = context.checkpoints.get("<YOUR CHECKPOINT>")
res = checkpoint.run()
# assuming there is only one validation in the Checkpoint
validation = checkpoint.validation_definitions[0]
# When creating an UnexpectedRowsExpectation, you provide it with a description. Passing the full text or prefix of
# this description to the method below will allow us to select which query to execute
print(get_all_unexpected_records(validation=validation, res=res, description="<PREFIX FROM DESCRIPTION>"))
Hope you’ve found this guide helpful, and let us know what you think in the comments.