I have to validate my data that is present on databricks unity catalog and then have to send the validation result to datahub.
I am trying this:
context_root_dir = "/dbfs/gx/"
context = gx.get_context(context_root_dir=context_root_dir)
dataframe_datasource = context.sources.add_or_update_spark(
name="my_spark_in_memory_datasource"
)
df = spark.read.table("ecomm_checkout_catalog.checkout_schema.checkout_orders_data")
dataframe_asset = dataframe_datasource.add_dataframe_asset(
name="checkout_schema.checkout_orders_data",
dataframe= df,
)
batch_request1 = dataframe_asset.build_batch_request(dataframe=df)
expectation_suite_name = "my_data_expectations_suite"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
batch_request=batch_request1,
expectation_suite_name=expectation_suite_name,
)
validator.expect_column_values_to_not_be_null(column="orderid")
validator.save_expectation_suite(discard_failed_expectations=False)
my_checkpoint_name = "my_databricks_checkpoint"
checkpoint = Checkpoint(
name=my_checkpoint_name,
run_name_template="%Y%m%d-%H%M%S-my-run-name-template",
data_context=context,
batch_request=batch_request1,
expectation_suite_name=expectation_suite_name,
action_list=[
{
"name": "store_validation_result",
"action": {
"class_name": "StoreValidationResultAction"
}
},
{
"name": "update_data_docs",
"action": {
"class_name": "UpdateDataDocsAction"
}
},
{
"name": "datahub_action",
"action": {
"module_name": "datahub.integrations.great_expectations.action",
"class_name": "DataHubValidationAction",
"server_url": "http://35.240.254.***:8080"
}
}
]
)
context.add_or_update_checkpoint(checkpoint=checkpoint)
checkpoint_result = checkpoint.run()
Error:
DataHubValidationAction does not recognize this GE data asset type - <class 'great_expectations.validator.validator.Validator'>. This is either using v2-api or execution engine other than sqlalchemy.
Metadata not sent to datahub. No datasets found.