How to instantiate a Data Context on an Databricks Spark cluster

This article is for comments to:

https://docs.greatexpectations.io/en/latest/guides/how_to_guides/configuring_data_contexts/how_to_instantiate_a_data_context_on_a_databricks_spark_cluster.html

Please comment +1 if this How-to Guide is important to you.

1 Like

Currently the above linked instructions for the EMR Spark cluster are sufficient for Databricks. Please reply here if they do not work for you, and if they do please like this post!

Thanks for instructions… a few more details would help :slight_smile:

1 Like

Hi @mjboothaus! We just released a document on instantiating a data context with Databricks: https://docs.greatexpectations.io/en/latest/guides/how_to_guides/configuring_data_contexts/how_to_instantiate_a_data_context_on_a_databricks_spark_cluster.html
Check it out and let us know if it answers your questions! If not, please reply here with questions.

1 Like

Hi @anthony

Just had a query. I tried to follow the steps mentioned. I am getting the following error:

ModuleNotFoundError: No module named ‘black’

Would you have any guidance related to the same.
I am using Databricks runtime 7 with Spark 3.

Thanks and Regards
Saurav Chakraborty

Hi @Saurav! In the latest release 0.12.5 we moved black to a purely dev dependency, however it is used within Great Expectations. Until we are able to release the next version, you can do a pip install black. Apologies for the confusion.

@Saurav - FYI we just released Great Expectations v0.12.6 which adds black back into our requirements.txt file. So you should be able to use this newest version without issue.

Its working now. Thanks!

2 Likes

I’m trying to populate the “batch_kwargs_generators” field in the example code and I’m getting an error that I find hard to decipher.

Right now, I have the following:

datasource_spark = {
    "data": {
        "data_asset_type": {
            "class_name": "SparkDFDataset",
            "module_name": "great_expectations.dataset",
        },
        "spark_config": dict(pyspark.SparkConf().getAll()),
        "class_name": "SparkDFDatasource",
        "module_name": "great_expectations.datasource",
        "batch_kwargs_generators": {
            "data": {
                "class_name": "QueryBatchKwargsGenerator",
                "queries": {
                    "testquery": "SELECT * FROM knmi_weather_table",
                },
            }
        },
    },
}

And I’m using this in the DataContextConfig like so:

project_config = DataContextConfig(
    config_version=2,
    plugins_directory=None,
    config_variables_file_path=None,
    datasources=datasource_spark,
    stores={
   .....

I based this approach on the example found here.

Whereas the following query works:

my_batch = context.get_batch(
  batch_kwargs={
    "datasource": "data",
    "query": "SELECT * FROM knmi_weather_table LIMIT 100"
  },
  expectation_suite_name="my_new_suite"
)

Running the command below

my_batch = context.get_batch(
  batch_kwargs={
    "datasource": "data",
  },
  expectation_suite_name="my_new_suite"
)

fails with error “BatchKwargsError: Unrecognized batch_kwargs for spark_source”.

And I’m not sure how I can remedy this // which kwargs are not accepted.

Thanks in advance, and please let me know if I can provide additional info!

Best,

Jasper.

The “query” batch kwarg is the likely cause of the error. Great Expectations validates only DataFrames on Spark. Validating the result sets of queries works only with Datasources of type SqlAlchemyDatasource.
Using a QueryBatchKwargsGenerator won’t work with a Spark Datasource for the same reason.

This notebook shows an example of validating DataDrames in Spark:

Hi Eugene,

Thanks for your reply.

I’m not entirely sure I follow you. The ‘query’ option does work. What doesn’t work is trying this approach.

Thanks!

Best,

Jasper.

Jasper, my bad - I gave you a wrong answer. We do support queries in Spark Datasources.

The “BatchKwargsError: Unrecognized batch_kwargs for spark_source” error is raised in SparkDFDatasource, because it cannot find “path”, “query” or “dataset” in the batch_kwargs.

Since you configured a QueryBatchKwargsGenerator to manage your queries, you should use it go generate batch_kwargs that you need, so the snippet should look like this:

my_batch_kwargs = context.build_batch_kwargs("name_of_my_spark_datasource", "name_of_generator", "name_of_my_query")
my_batch = context.get_batch(
  batch_kwargs=my_batch_kwargs,
  expectation_suite_name="my_new_suite"
)

Hello,

Can you share example code on how to use spark sql in batch_kwargs_generators ? thank you.

@jmp Please see the “Additional notes” section in this guide and the code snippet in my previous comment.

@here , I have initialised the datacontext using BaseDataContext Class , based on the above example
i can successfully run the test suite , problem unable to see the test suite files and docs in the s3 bucket , configured the stores for validations_S3_store , expectations_S3_store and s3_site for docs
i need to get the configured stored list , but basecontext doesn’t offer to list the store list

My env Databricks + AWS

1.How can i make sure the stores configured correctly ?
2.if any error comes in backend for uploading these files to s3 - how can i see those info
3.Is it possible to initialise DataContext instead of BaseDataContext class with project_config

Thanks
Dinakar S

Hi @dinakar_sundar! Thank you for using Great Expectations.

  1. You can list stores several ways using BaseDataContext:
  • context.stores
  • context.list_stores()
  • context.list_active_stores()
  1. Uploading to s3 errors will throw an exception. You will see these in your Databricks notebook or in your logs depending on your setup.
  2. Currently it is not possible to pass a project_config to DataContext. Hopefully with the above you are able to use BaseDataContext instead! Are there other features of the DataContext specifically that you would like to use that are missing from BaseDataContext?

Thanks @anthony , Now i were able to see the active stores configured correctly, While i’m trying to use data profiler , does it take quire sometime profiling process , i’m databricks environment

Hi @anthony , Now we are trying to instantiate the BaseDataContext by passing the project config , project configuration setup is done as per the recommendations . but now we are getting the below issue , same code has been worked.couple of months back.

The spark context has stopped and the driver is restarting. Your notebook will be automatically reattached.

Databricks runtime version:- 6.4
Spark version :- 2.4.5
great_expectations:- 0.13.14

I too am getting the same error in databricks; Did you ever get it working @dinakar_sundar ?

“The spark context has stopped and the driver is restarting. Your notebook will be automatically reattached.”

Hi @Anthony. Thanks for the instructions. I followed them (V3) and was successfully able to perform validations using a databricks notebook. Afterwards, I tested if the same code run via databricks-connect.
The only part I had to adapt was

project_config = DataContextConfig(
    datasources={"databricks_spark_dataframe": my_spark_datasource_config},
    store_backend_defaults=FilesystemStoreBackendDefaults(root_directory="/some/directory/on/your/pc")
)

so that the FilesystemStoreBackendDefaults points to a location on my local computer.

Afterwards, I tried running a validation and got the following error:

validator.expect_column_values_to_be_null(column="country_code")
Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]Traceback (most recent call last):
  File "/Users/lauferj/miniconda3/envs/fdprc-dev-ge/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3343, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-2-b50f07675790>", line 1, in <module>
    validator.expect_column_values_to_be_null(column="country_code")
  File "/Users/lauferj/miniconda3/envs/fdprc-dev-ge/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 277, in inst_expectation
    raise err
  File "/Users/lauferj/miniconda3/envs/fdprc-dev-ge/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 239, in inst_expectation
    runtime_configuration=basic_runtime_configuration,
  File "/Users/lauferj/miniconda3/envs/fdprc-dev-ge/lib/python3.7/site-packages/great_expectations/expectations/expectation.py", line 637, in validate
    runtime_configuration=runtime_configuration,
  File "/Users/lauferj/miniconda3/envs/fdprc-dev-ge/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 471, in graph_validate
    metrics = self.resolve_validation_graph(graph, metrics, runtime_configuration)
  File "/Users/lauferj/miniconda3/envs/fdprc-dev-ge/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 516, in resolve_validation_graph
    runtime_configuration=runtime_configuration,
  File "/Users/lauferj/miniconda3/envs/fdprc-dev-ge/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 557, in _resolve_metrics
    metrics_to_resolve, metrics, runtime_configuration
  File "/Users/lauferj/miniconda3/envs/fdprc-dev-ge/lib/python3.7/site-packages/great_expectations/execution_engine/execution_engine.py", line 296, in resolve_metrics
    resolved_metrics.update(self.resolve_metric_bundle(metric_fn_bundle))
  File "/Users/lauferj/miniconda3/envs/fdprc-dev-ge/lib/python3.7/site-packages/great_expectations/execution_engine/sparkdf_execution_engine.py", line 558, in resolve_metric_bundle
    res = df.agg(*aggregate_cols).collect()
  File "/Users/lauferj/miniconda3/envs/fdprc-dev-ge/lib/python3.7/site-packages/pyspark/sql/dataframe.py", line 610, in collect
    with SCCallSiteSync(self._sc) as css:
  File "/Users/lauferj/miniconda3/envs/fdprc-dev-ge/lib/python3.7/site-packages/pyspark/traceback_utils.py", line 72, in __enter__
    self._context._jsc.setCallSite(self._call_site)
AttributeError: 'NoneType' object has no attribute 'setCallSite'

Does great_expectations also support databricks-connect? If yes, do you know how to fix this problem?
Thanks in advance!

Johannes