How to create custom expectations using GX Core 1.2.0 version?

Hi Team,

We are trying to create a automation framework for testing data which covers business requirements related to Data migration and Data reconciliation. We have already created basic level framework using great expectations library in Azure databricks. Currently we are stuck at a point where we need to create custom expectations for additional business requirements. I am able to view Customize an expectation class in documentation but it doesn’t help our need. I am looking for creating a custom expectation by inheriting the parent classes like BatchExpectation, QueryExpectation, ColumnMapExpectation etc. I understood the part till where I need to override “_validate()” method. I am not sure about the method parameters that I can use and how to implement logic inside that overridden method. Can anyone please help us here.

In our framework we were able to implement all 3 types of datasources (SQL, FileSystem and Dataframes) and routing accordingly depending upon the source or target data configuration.

Required solution:- We have a requirement where we need to do row-to-row validation for historical data in batches. I want to create an expectation which takes both source and target table names or dataframes as parameters and does the rest of the comparison. I don’t require the logic to do row-to-row validation but I need to help in finding logic of what needs to be done in custom expectation “_validate()” method.

class ExpectTableValuesToEqual(BatchExpectation):
      @override
      def _validate(self,<What parameters should I add here?>):
          <If I am using SQL datasource and table asset, what logic needs to be implemented here>
      return <what should I return?>

Above row-to-row validation is just one requirement, and we have other requirements where we need to use ColumnMapExpectation, ColumnPairMapExpectation.

Any inputs or help is greatly appreciated and thanks in advance.

Thanks,
Dinesh

I recommend taking a look at this page in our docs. Although it’s pre-1.x, there’s valuable information that could help you.

Additionally, we have a repository with custom expectation examples where you may find something that fits your needs.

Here’s an example from that doc showing the parameters for the _validate method:

def _validate(
    self,
    metrics: Dict,
    runtime_configuration: dict | None = None,
    execution_engine: ExecutionEngine | None = None,
):
    unique_columns = metrics.get("table.columns.unique")
    batch_columns = metrics.get("table.columns")
    strict = self.configuration.kwargs.get("strict")

    duplicate_columns = unique_columns.symmetric_difference(batch_columns)

    if strict is True:
        success = len(duplicate_columns) == 0
    else:
        success = len(duplicate_columns) < len(batch_columns)

    return {
        "success": success,
        "result": {"observed_value": {"duplicate_columns": duplicate_columns}},
    }

To answer your question about what would be returned:

  • This method returns a dictionary with a success key indicating whether the validation passed or failed.
  • In the case of failure, the result will include the observed value and details about any mismatched rows.
1 Like