How to validate Spark DataFrames in 0.13

This post will help you get a Validator from a Spark DataFrame in order to run expectations on it.

Assumptions:

  1. You have a 0.13.0 (or higher) release of Great Expectations installed.
  2. You want to use the new-style Datasources and expectations that are implemented using the new Modular Expectations API.
  3. You have a Spark DataFrame (we will assume variable name “df”) and you want to validate or profile it.

What used to be called a “batch” in the old API was replaced with Validator. A Validator knows how to validate particular batch of data on a particular Execution Engine against a particular Expectation SuiteIn interactive mode, the Validator can store and update an Expectation Suite while conducting Data Discovery or Exploratory Data Analysis to build or edit an Expectations.

You can read more about the core classes that make GE run here: Datasources, Execution Engines, Data Connectors.

Make sure that you configured a Datasource with execution_engine of class SparkDFExecutionEngine and a data_connector of class RuntimeDataConnector:

datasources:
  my_spark_datasource:
    class_name: Datasource
    execution_engine:
      class_name: SparkDFExecutionEngine
    data_connectors:
      my_runtime_data_connector:
          module_name: great_expectations.datasource.data_connector
          class_name: RuntimeDataConnector
          runtime_keys:
              - some_key_maybe_pipeline_stage
              - some_other_key_maybe_run_id

SparkDFExecutionEngine is a class that can translate Expectations to Spark.

Data Connectors facilitate access to an external data store, such as a database, filesystem, or cloud storage. In this case there is no external data to read, since you have the Spark DataFrame ready. Since the architecture of Great Expectations requires a Data Connector, we created a RuntimeDataConnector class that instead of facilitating access to external data allows to pass a DataFrame to GE. We will explain the “runtime_keys” section in the connector’s config a bit later.

Now that your Datasource is configured, let’s move to the code that instantiates a Validator.

First, instantiate a DataContext and get a suite (from your config file or without config file):

import great_expectations as ge

context = ge.data_context.DataContext()

Now you can instantiate a Validator that will be able to run expectations on your DataFrame. A Validator needs to have a batch of data and an Expectation Suite. Let’s get a suite that you created in your Data Context:

suite = context.get_expectation_suite("my_suite_name")

Specify the batch that the Validator will be validating.
You are passing your DataFrame using batch_data argument.
datasource_name and data_connector_name arguments point to the Datasource and the Data Connector that you configured.
data_asset_name and partition_request arguments help you to attach metadata to your DataFrame.
NOTE: data_asset_name can be set only to this predefined string: “IN_MEMORY_DATA_ASSET” for now. We will fix it very soon and will allow you to specify your own name. Attributes inside the partition_request are optional - you can use them to attach additional metadata to your DataFrame. When configuring the Data Connector, you used runtime_keys to define which keys are allowed.

from great_expectations.core.batch import BatchRequest

batch_request = BatchRequest(
    datasource_name="my_spark_datasource",
    data_connector_name="my_runtime_data_connector",
    batch_data=df,
    data_asset_name="IN_MEMORY_DATA_ASSET",
    partition_request={"partition_identifiers": {
       "some_key_maybe_pipeline_stage": "ingestion step 1", 
       "some_other_key_maybe_run_id": "run 18"}}
)
    
my_validator = context.get_validator(
    batch_request= batch_request,
    expectation_suite=suite
)

You have a Validator that can run expectations on your DataFrame. You can use it exactly the way you used batches in the old API:

  1. Call expectation methods on it (e.g., my_validator.expect_column_values_to_be_unique(“some_column_name”)
  2. Use it with a Validation Operator or a Checkpoint.

Hi, thank you for doing this! I tried this on great_expectations 0.13.21 and BatchRequest has no parameter batch_data. I am trying to get ge running on pyspark on emr with api v3.

1 Like

So I also ran into the same issue, my recommendation is to search through the slack channel with your error message and also use the search in the docs this is what I was able to figure out.

In the config portion

datasources:
  my_spark_datasource:
    class_name: Datasource
    execution_engine:
      class_name: SparkDFExecutionEngine
    data_connectors:
      my_runtime_data_connector:
          module_name: great_expectations.datasource.data_connector
          class_name: RuntimeDataConnector
          batch_identifiers:
              - some_key

If your creating your own python object for the config, keep in mind that batch_identifiers is a list object not a dictonary object.

Next, for the BatchRequest you need to do something like so

from great_expectations.core.batch import RuntimeBatchRequest
batch_request = RuntimeBatchRequest(
    datasource_name="my_spark_datasource",
    data_connector_name="my_runtime_data_connector",
    runtime_parameters ={"batch_data":df},
    batch_identifiers = {"some_key": "some_value"},
    data_asset_name="IN_MEMORY_DATA_ASSET"
)

Right now I’m figuring out connecting getting GE to use my custom version of pyspark, but this should be enough to get you pass that error portion.

Good Luck.

I got the KeyError with key is name of my datasource (which is configured as SparkDFDatasource).
Do you have any suggestion for my case?