Hi @kat ,
I use Databricks in conjunction with ADLS.
Here’s the Code I use for this:
Class in our functions .whl file:
import great_expectations as gx
class GX_Context:
def __init__(self, root_dir, connection_string, filepath_prefix=""):
self.root_dir = root_dir
self.connection_string = connection_string
self.filepath_prefix = filepath_prefix
if not isinstance(self.root_dir, str) or not self.root_dir:
raise ValueError("root_dir must be a non-empty string")
if not isinstance(self.connection_string, str) or not self.connection_string:
raise ValueError("connection_string must be a non-empty string")
def get_gx_context(self) -> gx.data_context.EphemeralDataContext:
project_config = gx.data_context.types.base.DataContextConfig(
store_backend_defaults=gx.data_context.types.base.FilesystemStoreBackendDefaults(
root_directory=self.root_dir
),
data_docs_sites={
"az_site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleAzureBlobStoreBackend",
"container": r"\$web",
# "filepath_prefix": self.filepath_prefix,
"connection_string": self.connection_string,
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
"show_cta_footer": True,
},
}
},
)
context = gx.get_context(project_config=project_config)
return context
An in the Databricks notebook I use this:
from mbti_utility.gx_context import GX_Context
# Great Expectations
context_root_dir = f"/dbfs{project.MNT_PATH}/DataQuality/GX/"
context_connection_string = dbutils.secrets.get(scope='MS-eu-SDL-DEV-kv', key='connection-string-rsv0sdldev0das')
data_source_name = f"{get_hive_table_db_name()}".lower()
data_asset_name = f"{get_product_name()}".lower()
checkpoint_name = data_source_name + "_" + data_asset_name
try:
# Create or load great expectations context
context = GX_Context(context_root_dir, context_connection_string).get_gx_context()
# Create batch request
batch_request = (context
.sources
.add_or_update_spark(name=data_source_name)
.add_dataframe_asset(name=data_asset_name, dataframe=df_Staging)
.build_batch_request())
# Run checkpoint
checkpoint_result = context.get_checkpoint(checkpoint_name).run(run_name=checkpoint_name, batch_request=batch_request)
# Check the validation result
if checkpoint_result.success:
print("The validation succeeded")
else:
raise ValueError("The validation failed: " + checkpoint_result["run_results"][list(checkpoint_result["run_results"].keys())[0]]["actions_results"]["update_data_docs"]["az_site"].replace(".blob.", ".z6.web.").replace("/$web", ""))
except Exception as exception:
handle_exception(exception, dbutils.notebook.entry_point.getDbutils().notebook().getContext())
raise exception
To get the expectation-suite JSON files and the checkpoint YML files you can do a profiling on the dataframes like this:
try:
# Create or load great expectations context
context = GX_Context(context_root_dir, context_connection_string).get_gx_context()
# Create batch request
batch_request = (context
.sources
.add_or_update_spark(name=data_source_name)
.add_dataframe_asset(name=data_asset_name, dataframe=df_Staging)
.build_batch_request()
)
# Profiler
# Run the default onboarding profiler on the batch request
onboarding_data_assistant_result = (context
.assistants
.onboarding
.run(
batch_request=batch_request,
exclude_column_names=[],
estimation="exact", # default: "exact"; "flag_outliers"
)
)
# Get the expectation suite from the onboarding result
onboarding_suite = (onboarding_data_assistant_result
.get_expectation_suite(
expectation_suite_name=onboarding_suite_name
)
)
# Perist expectation suite with the specified suite name from above
context.add_or_update_expectation_suite(expectation_suite=onboarding_suite)
# Create and persist checkpoint to reuse for multiple batches
context.add_or_update_checkpoint(
name = onboarding_checkpoint_name,
batch_request=batch_request,
expectation_suite_name=onboarding_suite_name,
)
# Run Onboarding checkpoint
control_tower_ccu_datasources_block_checkpoint_result = context.get_checkpoint(onboarding_checkpoint_name).run()
# Check the validation result
if result.success:
print("The validation succeeded")
else:
dbutils.notebook.exit("The validation failed : " + control_tower_ccu_datasources_block_checkpoint_result["run_results"][list(control_tower_ccu_datasources_block_checkpoint_result["run_results"].keys())[0]]["actions_results"]["update_data_docs"]["az_site"])
except Exception as exception:
handle_exception(exception, dbutils.notebook.entry_point.getDbutils().notebook().getContext())
raise exception
Hope that helps?