This post will help you get a Validator from a Spark DataFrame in order to run expectations on it.
Assumptions:
- You have a 0.13.0 (or higher) release of Great Expectations installed.
- You want to use the new-style Datasources and expectations that are implemented using the new Modular Expectations API.
- You have a Spark DataFrame (we will assume variable name “df”) and you want to validate or profile it.
What used to be called a “batch” in the old API was replaced with Validator. A Validator knows how to validate particular batch of data on a particular Execution Engine against a particular Expectation SuiteIn interactive mode, the Validator can store and update an Expectation Suite while conducting Data Discovery or Exploratory Data Analysis to build or edit an Expectations.
You can read more about the core classes that make GE run here: Datasources, Execution Engines, Data Connectors.
Make sure that you configured a Datasource with execution_engine of class SparkDFExecutionEngine and a data_connector of class RuntimeDataConnector:
datasources:
my_spark_datasource:
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
data_connectors:
my_runtime_data_connector:
module_name: great_expectations.datasource.data_connector
class_name: RuntimeDataConnector
runtime_keys:
- some_key_maybe_pipeline_stage
- some_other_key_maybe_run_id
SparkDFExecutionEngine is a class that can translate Expectations to Spark.
Data Connectors facilitate access to an external data store, such as a database, filesystem, or cloud storage. In this case there is no external data to read, since you have the Spark DataFrame ready. Since the architecture of Great Expectations requires a Data Connector, we created a RuntimeDataConnector class that instead of facilitating access to external data allows to pass a DataFrame to GE. We will explain the “runtime_keys” section in the connector’s config a bit later.
Now that your Datasource is configured, let’s move to the code that instantiates a Validator.
First, instantiate a DataContext and get a suite (from your config file or without config file):
import great_expectations as ge
context = ge.data_context.DataContext()
Now you can instantiate a Validator that will be able to run expectations on your DataFrame. A Validator needs to have a batch of data and an Expectation Suite. Let’s get a suite that you created in your Data Context:
suite = context.get_expectation_suite("my_suite_name")
Specify the batch that the Validator will be validating.
You are passing your DataFrame using batch_data
argument.
datasource_name
and data_connector_name
arguments point to the Datasource and the Data Connector that you configured.
data_asset_name
and partition_request arguments help you to attach metadata to your DataFrame.
NOTE: data_asset_name
can be set only to this predefined string: “IN_MEMORY_DATA_ASSET” for now. We will fix it very soon and will allow you to specify your own name. Attributes inside the partition_request are optional - you can use them to attach additional metadata to your DataFrame. When configuring the Data Connector, you used runtime_keys to define which keys are allowed.
from great_expectations.core.batch import BatchRequest
batch_request = BatchRequest(
datasource_name="my_spark_datasource",
data_connector_name="my_runtime_data_connector",
batch_data=df,
data_asset_name="IN_MEMORY_DATA_ASSET",
partition_request={"partition_identifiers": {
"some_key_maybe_pipeline_stage": "ingestion step 1",
"some_other_key_maybe_run_id": "run 18"}}
)
my_validator = context.get_validator(
batch_request= batch_request,
expectation_suite=suite
)
You have a Validator that can run expectations on your DataFrame. You can use it exactly the way you used batches in the old API:
- Call expectation methods on it (e.g., my_validator.expect_column_values_to_be_unique(“some_column_name”)
- Use it with a Validation Operator or a Checkpoint.