GX-Databricks:Datasource-Data asset - Validator

Hi @kat ,

I use Databricks in conjunction with ADLS.
Here’s the Code I use for this:

Class in our functions .whl file:

import great_expectations as gx


class GX_Context:
    def __init__(self, root_dir, connection_string, filepath_prefix=""):
        self.root_dir = root_dir
        self.connection_string = connection_string
        self.filepath_prefix = filepath_prefix

        if not isinstance(self.root_dir, str) or not self.root_dir:
            raise ValueError("root_dir must be a non-empty string")
        if not isinstance(self.connection_string, str) or not self.connection_string:
            raise ValueError("connection_string must be a non-empty string")

    def get_gx_context(self) -> gx.data_context.EphemeralDataContext:
        project_config = gx.data_context.types.base.DataContextConfig(
            store_backend_defaults=gx.data_context.types.base.FilesystemStoreBackendDefaults(
                root_directory=self.root_dir
            ),
            data_docs_sites={
                "az_site": {
                    "class_name": "SiteBuilder",
                    "store_backend": {
                        "class_name": "TupleAzureBlobStoreBackend",
                        "container": r"\$web",
                        # "filepath_prefix": self.filepath_prefix,
                        "connection_string": self.connection_string,
                    },
                    "site_index_builder": {
                        "class_name": "DefaultSiteIndexBuilder",
                        "show_cta_footer": True,
                    },
                }
            },
        )

        context = gx.get_context(project_config=project_config)

        return context

An in the Databricks notebook I use this:

from mbti_utility.gx_context import GX_Context

# Great Expectations
    context_root_dir = f"/dbfs{project.MNT_PATH}/DataQuality/GX/"
    context_connection_string = dbutils.secrets.get(scope='MS-eu-SDL-DEV-kv', key='connection-string-rsv0sdldev0das')
    data_source_name = f"{get_hive_table_db_name()}".lower()
    data_asset_name = f"{get_product_name()}".lower()
    checkpoint_name = data_source_name + "_" + data_asset_name

try:
    # Create or load great expectations context
    context = GX_Context(context_root_dir, context_connection_string).get_gx_context()

    # Create batch request
    batch_request = (context
        .sources
        .add_or_update_spark(name=data_source_name)
        .add_dataframe_asset(name=data_asset_name, dataframe=df_Staging)
        .build_batch_request())

    # Run checkpoint
    checkpoint_result = context.get_checkpoint(checkpoint_name).run(run_name=checkpoint_name, batch_request=batch_request)

    # Check the validation result
    if checkpoint_result.success:
        print("The validation succeeded")
    else:
        raise ValueError("The validation failed: " + checkpoint_result["run_results"][list(checkpoint_result["run_results"].keys())[0]]["actions_results"]["update_data_docs"]["az_site"].replace(".blob.", ".z6.web.").replace("/$web", ""))

except Exception as exception:
    handle_exception(exception, dbutils.notebook.entry_point.getDbutils().notebook().getContext())
    raise exception

To get the expectation-suite JSON files and the checkpoint YML files you can do a profiling on the dataframes like this:

try:
    # Create or load great expectations context
    context = GX_Context(context_root_dir, context_connection_string).get_gx_context()

    # Create batch request
    batch_request = (context
    .sources
    .add_or_update_spark(name=data_source_name)
    .add_dataframe_asset(name=data_asset_name, dataframe=df_Staging)
    .build_batch_request()
  )

    # Profiler
    # Run the default onboarding profiler on the batch request
    onboarding_data_assistant_result = (context
        .assistants
        .onboarding
        .run(
            batch_request=batch_request,
            exclude_column_names=[],
            estimation="exact", # default: "exact"; "flag_outliers"
        )
    )

    # Get the expectation suite from the onboarding result
    onboarding_suite = (onboarding_data_assistant_result
        .get_expectation_suite(
            expectation_suite_name=onboarding_suite_name
        )
    )

    # Perist expectation suite with the specified suite name from above
    context.add_or_update_expectation_suite(expectation_suite=onboarding_suite)

    # Create and persist checkpoint to reuse for multiple batches
    context.add_or_update_checkpoint(
        name = onboarding_checkpoint_name,
        batch_request=batch_request,
        expectation_suite_name=onboarding_suite_name,
    )

    # Run Onboarding checkpoint
    control_tower_ccu_datasources_block_checkpoint_result = context.get_checkpoint(onboarding_checkpoint_name).run()

    # Check the validation result
    if result.success:
        print("The validation succeeded")
    else:
        dbutils.notebook.exit("The validation failed : " + control_tower_ccu_datasources_block_checkpoint_result["run_results"][list(control_tower_ccu_datasources_block_checkpoint_result["run_results"].keys())[0]]["actions_results"]["update_data_docs"]["az_site"])
        

except Exception as exception:
    handle_exception(exception, dbutils.notebook.entry_point.getDbutils().notebook().getContext())
    raise exception

Hope that helps?