GX-Databricks:Datasource-Data asset - Validator

Hello,

I use the notebooks produced from CLI and try to adapt them in order to run in Databricks so I run the following:

import great_expectations as gx

import pandas as pd

import great_expectations.jupyter_ux

from great_expectations.core.batch import BatchRequest

from great_expectations.checkpoint import SimpleCheckpoint

from great_expectations.exceptions import DataContextError

from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource, check_if_datasource_name_exists

from great_expectations.data_context import FileDataContext

from pyspark.sql import SparkSession
from great_expectations.datasource import SparkDatasource
from great_expectations.execution_engine.sparkdf_execution_engine import SparkDFExecutionEngine

context_root_dir = 'path'

context = FileDataContext.create(project_root_dir=context_root_dir)

datasource_name = "my_datasource"

example_yaml = f"""
name: {datasource_name}
class_name: Datasource
execution_engine:
  class_name: SparkDFExecutionEngine
data_connectors:
  default_inferred_data_connector_name:
    class_name: InferredAssetFilesystemDataConnector
    base_directory: ..
    default_regex:
      group_names:
        - data_asset_name
      pattern: (.*)
  default_runtime_data_connector_name:
    class_name: RuntimeDataConnector
    assets:
      my_runtime_asset_name:
        batch_identifiers:
          - runtime_batch_identifier_name
"""

context.test_yaml_config(yaml_config=example_yaml)
sanitize_yaml_and_save_datasource(context, example_yaml, overwrite_existing=True)

context.list_datasources()

df_spark = spark.table(f'{table_name_to_validate}')

df_pandas = df_spark.toPandas()

df_pandas_gx = gx.from_pandas(df_pandas)

dataframe_datasource = SparkDatasource(name="my_datasource")

data_asset = datasource_name.add_dataframe_asset(
    name="dataframe_asset",
    dataframe=df_pandas_gx
)

The line where defining dataframe_datasource either as SparkDatasource or as
dataframe_datasource = context.sources.add_or_update_spark(name=“case_list_datasource”)
produces an error
ImportError: cannot import name ‘SparkDatasource’ from ‘great_expectations.datasource’

so datasource is not properly created, the data asset cannot be added and consequently validator cannot run.

In CLI, the datasource and data asset are declared manually so what is the corresponding command in python ?

Thank you,
Katerina

Hi @kat, thanks for your question!

Use of the GX CLI is deprecated - you’ll want to use our current Fluent Data Source approach to create your Data Sources & Assets. (Also, I’d recommend upgrading to the latest version of GX if you have not already).

To get started with the current way to create your Data Sources and Data Assets, please check out:

Hi @kat ,

I use Databricks in conjunction with ADLS.
Here’s the Code I use for this:

Class in our functions .whl file:

import great_expectations as gx


class GX_Context:
    def __init__(self, root_dir, connection_string, filepath_prefix=""):
        self.root_dir = root_dir
        self.connection_string = connection_string
        self.filepath_prefix = filepath_prefix

        if not isinstance(self.root_dir, str) or not self.root_dir:
            raise ValueError("root_dir must be a non-empty string")
        if not isinstance(self.connection_string, str) or not self.connection_string:
            raise ValueError("connection_string must be a non-empty string")

    def get_gx_context(self) -> gx.data_context.EphemeralDataContext:
        project_config = gx.data_context.types.base.DataContextConfig(
            store_backend_defaults=gx.data_context.types.base.FilesystemStoreBackendDefaults(
                root_directory=self.root_dir
            ),
            data_docs_sites={
                "az_site": {
                    "class_name": "SiteBuilder",
                    "store_backend": {
                        "class_name": "TupleAzureBlobStoreBackend",
                        "container": r"\$web",
                        # "filepath_prefix": self.filepath_prefix,
                        "connection_string": self.connection_string,
                    },
                    "site_index_builder": {
                        "class_name": "DefaultSiteIndexBuilder",
                        "show_cta_footer": True,
                    },
                }
            },
        )

        context = gx.get_context(project_config=project_config)

        return context

An in the Databricks notebook I use this:

from mbti_utility.gx_context import GX_Context

# Great Expectations
    context_root_dir = f"/dbfs{project.MNT_PATH}/DataQuality/GX/"
    context_connection_string = dbutils.secrets.get(scope='MS-eu-SDL-DEV-kv', key='connection-string-rsv0sdldev0das')
    data_source_name = f"{get_hive_table_db_name()}".lower()
    data_asset_name = f"{get_product_name()}".lower()
    checkpoint_name = data_source_name + "_" + data_asset_name

try:
    # Create or load great expectations context
    context = GX_Context(context_root_dir, context_connection_string).get_gx_context()

    # Create batch request
    batch_request = (context
        .sources
        .add_or_update_spark(name=data_source_name)
        .add_dataframe_asset(name=data_asset_name, dataframe=df_Staging)
        .build_batch_request())

    # Run checkpoint
    checkpoint_result = context.get_checkpoint(checkpoint_name).run(run_name=checkpoint_name, batch_request=batch_request)

    # Check the validation result
    if checkpoint_result.success:
        print("The validation succeeded")
    else:
        raise ValueError("The validation failed: " + checkpoint_result["run_results"][list(checkpoint_result["run_results"].keys())[0]]["actions_results"]["update_data_docs"]["az_site"].replace(".blob.", ".z6.web.").replace("/$web", ""))

except Exception as exception:
    handle_exception(exception, dbutils.notebook.entry_point.getDbutils().notebook().getContext())
    raise exception

To get the expectation-suite JSON files and the checkpoint YML files you can do a profiling on the dataframes like this:

try:
    # Create or load great expectations context
    context = GX_Context(context_root_dir, context_connection_string).get_gx_context()

    # Create batch request
    batch_request = (context
    .sources
    .add_or_update_spark(name=data_source_name)
    .add_dataframe_asset(name=data_asset_name, dataframe=df_Staging)
    .build_batch_request()
  )

    # Profiler
    # Run the default onboarding profiler on the batch request
    onboarding_data_assistant_result = (context
        .assistants
        .onboarding
        .run(
            batch_request=batch_request,
            exclude_column_names=[],
            estimation="exact", # default: "exact"; "flag_outliers"
        )
    )

    # Get the expectation suite from the onboarding result
    onboarding_suite = (onboarding_data_assistant_result
        .get_expectation_suite(
            expectation_suite_name=onboarding_suite_name
        )
    )

    # Perist expectation suite with the specified suite name from above
    context.add_or_update_expectation_suite(expectation_suite=onboarding_suite)

    # Create and persist checkpoint to reuse for multiple batches
    context.add_or_update_checkpoint(
        name = onboarding_checkpoint_name,
        batch_request=batch_request,
        expectation_suite_name=onboarding_suite_name,
    )

    # Run Onboarding checkpoint
    control_tower_ccu_datasources_block_checkpoint_result = context.get_checkpoint(onboarding_checkpoint_name).run()

    # Check the validation result
    if result.success:
        print("The validation succeeded")
    else:
        dbutils.notebook.exit("The validation failed : " + control_tower_ccu_datasources_block_checkpoint_result["run_results"][list(control_tower_ccu_datasources_block_checkpoint_result["run_results"].keys())[0]]["actions_results"]["update_data_docs"]["az_site"])
        

except Exception as exception:
    handle_exception(exception, dbutils.notebook.entry_point.getDbutils().notebook().getContext())
    raise exception

Hope that helps?

Yes, this helped me overcome one of the errors I get!
Thanks a lot!

Yes thank you very much, the second part for databricks helped me get a better idea.

Hi Rachel,

thanks for your comment but the links you directed me to are not very helpful they don’t have examples.

What I am trying to do right now is to pass a dataframe into the batch but I keep receiving the error message “InvalidBatchRequestError: Validator could not be created because BatchRequest returned an empty batch_list.”

My code is the following:

from great_expectations.datasource.fluent import SparkDatasource
from great_expectations.core.batch import BatchRequest
from pyspark.sql.functions import *
from great_expectations.data_context import FileDataContext

import great_expectations as gx
import great_expectations.jupyter_ux

context_root_dir  = 'path'
context = FileDataContext.create(project_root_dir=context_root_dir)

datasource_name = SparkDatasource(name="my_datasource")

df_spark = spark.table(f'{table_name_to_validate}')

data_asset = datasource_name.add_dataframe_asset(
    name="dataframe_asset",
    dataframe=df_spark)

my_batch_request = data_asset.build_batch_request(dataframe=df_spark)

batch_request_parameters = {'datasource_name': 'my_datasource',  'data_connector_name': 'default_inferred_data_connector_name', 'data_asset_name': 'dataframe_asset', 'limit': 1000}

my_batch_request = BatchRequest(**batch_request_parameters)

context.get_batch_list(batch_request=my_batch_request)

and the result is an empty batch list.

Could you please explain what is the mistake?

Hi @kat, apologies for the delay, I’ve been out of office.

I think your problem is caused by overwriting the Batch Request in your last three lines of code - using my_batch_request = data_asset.build_batch_request() should be sufficient, .build_batch_request() returns a BatchRequest object.

Our Databricks walkthrough does include examples on how to connect to a Spark dataframe in Databricks using GX, but perhaps this direct code snippet will be more helpful. Below, I’ve included an example of how to:

  • connect to a Spark dataframe with GX using the current Fluent Data Sources approach,
  • add two Expectations using a Validator,
  • and run the Expectations using a Checkpoint.

Using GX 0.18.15 in a Databricks notebook:

import great_expectations as gx

context = gx.get_context()

# Create the dataframe from CSV files on disk.
df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-01.csv.gz")

# Or you could also define your dataframe reading directly from the table using Spark.
# df = spark.table("samples.nyctaxi.trips")

data_source = context.sources.add_or_update_spark(
    name="spark_datasource",
)

data_asset = data_source.add_dataframe_asset(
    name="spark_dataframe",
    dataframe=df,
)

batch_request = data_asset.build_batch_request()

expectation_suite_name = "expectation_suite"

context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)

validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
)

validator.expect_column_values_to_not_be_null(
    column="tpep_pickup_datetime"
)

validator.expect_column_values_to_be_between(
    column="fare_amount", min_value=0, max_value=1000
)

checkpoint = context.add_or_update_checkpoint(
    name="checkpoint",
    validator=validator,
)

checkpoint_result = checkpoint.run()

Does that help clear up the GX Spark dataframe workflow?