I’m trying out the new 1.0 release on Databricks and going through the related documentation.
What’s the recommended approach for working with GX 1.0 in Databricks and validating data in the Unity Catalog tables?
Hi Toivo, we’re working on providing gx cloud integration with databricks sql warehouses; we can let you know when it launches!
Awesome, I’m looking forward to it!
I got things working with GX Core using SQLAlchemy
Had some problems with numpy and SQLAlchemy versioning but they were solved by explicitly defining the versions, I’ll include the errors after the code.
Here’s my code:
%pip install great_expectations[databricks]
%pip uninstall numpy
%pip install "numpy<2"
%pip install "sqlalchemy>=2"
%restart_python # Not sure if this is actually needed
import great_expectations as gx
context = gx.get_context()
token = ...
host = ...
port = ...
http_path = ...
catalog = ...
schema = ...
datasource = context.data_sources.add_or_update_databricks_sql(
name="my datasource",
connection_string=f"databricks://token:{token}@{host}:{port}?http_path={http_path}&catalog={catalog}&schema={schema}",
)
asset_name = "yellow_tripdata_sample_2019_01"
database_table_name = "yellow_tripdata_sample_2019_01"
table_data_asset = datasource.add_table_asset(
table_name=database_table_name, name=asset_name
)
full_table_batch_definition = table_data_asset.add_batch_definition_whole_table(
name="FULL_TABLE"
)
full_table_batch = full_table_batch_definition.get_batch()
preset_expectation = gx.expectations.ExpectColumnMaxToBeBetween(
column="passenger_count", min_value=1, max_value=6
)
suite_name = "my_expectation_suite"
suite = gx.ExpectationSuite(name=suite_name)
suite = context.suites.add(suite)
suite.add_expectation(preset_expectation)
definition_name = "my_validation_definition"
validation_definition = gx.ValidationDefinition(
data=full_table_batch_definition, suite=suite, name=definition_name
)
validation_definition = context.validation_definitions.add(validation_definition)
validation_results = validation_definition.run()
validation_definitions = [validation_definition]
action_list = []
checkpoint_name = "my_checkpoint"
checkpoint = gx.Checkpoint(
name=checkpoint_name,
validation_definitions=validation_definitions,
actions=action_list,
result_format={"result_format": "COMPLETE"},
)
context.checkpoints.add(checkpoint)
checkpoint_result = checkpoint.run()
Note that this does not save the Checkpoints and Data Sources etc to disk and can’t be easily reused later for automagically validating data.
I’m still having trouble figuring out how to do that. Help would be appreciated with that, though I’m under the impression that it’s quite hard when working with the clusters.
Errors that I got related to Numpy version + link to a StackOverflow thread on the topic:
ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject
Error related to SQLAlchemy:
TestConnectionError: Attempt to connect to datasource failed: due to SQLAlchemyCreateEngineError(‘Unable to create SQLAlchemy Engine: due to AttributeError(“module 'sqlalchemy.types' has no attribute 'Uuid'”)’)
This is fixed in SQLAlchemy 2.0, see discussion here.
Seems like the errors have nothing to do with GX and for whatever reason pip just decided to install incompatible versions.