Batch definitions and spark dataframes

Hi,

I’m currently looking into the gx pre-release but I’m having some trouble fitting it to my use-case. Essentially each times each time our data pipeline runs they will run ingest data that

  • has one load timestamp
  • has an arbitrary number of ‘domain dates’

For example, we might get a load today containing the last 7 ‘domain dates’. I thought of setting it up as follows

Data asset: Entire table
Data batches: One per unique (load_date,domain_date) pairing

But now I saw that at least this version of 1.0 does not support batch definitions for spark dataframes at all, and as far as I can tell current batch definitions only support partitioning on one date.

With that out of the way I have three questions

i, Are batch definitions for (spark) dataframes on its way?
ii, Will batch definitions allow for more granular batches, e.g. splitting by two columns?
iii, For my use-case, do you have a suggestions on how to set it up in terms of data assets etc?

Thanks!

Hi,

As of 1.0.0a5 (released August 6th) batch_definition_whole_table has landed for spark dataframes. Using this you can add your dataframe for validation.

Since we currently only offer batch definitions for the whole table, you will need to filter your dataframe, eg something like my_dataframe_for_validation = my_original_dataframe.filter("load_date=='my_load_date' and 'domain_date='my_domain_date'")

We are currently discussing quality of life improvements for dataframe users after the initial 1.0 release since we’ve been more focused on SQL users lately. I don’t know what form this will take or the timing.

You can install the latest prerelease using:

pip install --pre great_expectations

Here’s a code snippet I used to run a spark validation. I’m creating everything from scratch but you can to retrieve configs if you’ve already them up, eg data_source = context.data_sources.get("my_spark_data_source_name").

import great_expectations as gx
from pyspark.sql import SparkSession

# Get a context.
context = gx.get_context()

# Read in data
csv = "data/sampled_yellow_tripdata_2019-01.csv"
spark = SparkSession.builder.appName("Read CSV").getOrCreate()
spark_df = spark.read.csv(csv, header=True, inferSchema=True)
# filter dataframe however you want

# Create a suite
expectation = gx.expectations.ExpectColumnValuesToBeBetween(
    column="passenger_count", min_value=1, max_value=6
)
suite = context.suites.add(gx.ExpectationSuite(name="suite"))
suite.add_expectation(expectation)

# Configure data to validate
datasource = context.data_sources.add_spark(name="my_spark")
asset = datasource.add_dataframe_asset(name="my_asset")
bd = asset.add_batch_definition_whole_dataframe(name="my_bd")

# Configure and run the validation
vd = gx.ValidationDefinition(
    name="my_vd",
    data=bd,
    suite=suite,
)
checkpoint = gx.Checkpoint(
    name="my_checkpoint", 
    validation_definitions=[vd], 
    actions=[gx.checkpoint.UpdateDataDocsAction(name="update_data_docs")]
)

# Look at results
results = checkpoint.run(batch_parameters={"dataframe": spark_df})
context.open_data_docs()
print(results)