How to configure an EMR Spark Datasource

This article is for comments to: How to instantiate a Data Context on an EMR Spark cluster — great_expectations documentation

Please comment +1 if this How to is important to you.

2 Likes

+1
It seems using the S3GlobReaderBatchKwargsGenerator will translate and s3:// path into s3a:// which messes up with spark being able to open file within the EMRFS context.
I might be doing something wrong, having documentation will uncover if it’s a bug or not

1 Like

+1
Just joined so I could +1 this! I’ve been playing with ge for a few weeks locally now. Good work so far, very impressed! I see a lot of potential to assist us with our data quality problems and the next step would be to try it on a bigger scale on our AWS instance.
Keep up the good work :+1:

2 Likes

I have been successully using it on EMR using this config running via spark-submit:

        datasources={
        "my_data_source": {
            "data_asset_type": {
                "class_name": "SparkDFDataset",
                "module_name": "great_expectations.dataset",
            },
            "spark_config": dict(pyspark.SparkConf().getAll()),
            "class_name": "SparkDFDatasource",
            "module_name": "great_expectations.datasource",
            "batch_kwargs_generators": {
                # Hive not yet supported https://github.com/great-expectations/great_expectations/pull/1811
                # "spark_sql_query": {
                #    "class_name": "QueryBatchKwargsGenerator",
                #    "queries": {"demographic_features": "SELECT * FROM data.dataseg"},
                # }
                "example": {
                    "class_name": "S3GlobReaderBatchKwargsGenerator",
                    "bucket": YOUR_BUCKET_LOCATION,
                    "assets": {
                        "example_asset": {
                            "prefix": "example_prefix/",
                            "regex_filter": ".*parquet",
                        }
                    },
                },
            },
        },
    },
2 Likes

Also for reference, this is the up to date url with the most up to date docs:

https://docs.greatexpectations.io/en/latest/guides/how_to_guides/configuring_data_contexts/how_to_instantiate_a_data_context_on_an_emr_spark_cluster.html#how-to-instantiate-a-data-context-on-an-emr-spark-cluster

1 Like

Thank you @Dandandan! I just updated the link from the original post. Glad to hear it is working well for you!

When I try to run it in EMR using a notebook and try to create a context using the config similar to defined above. I am getting following error

context = BaseDataContext(project_config=project_config)

An error was encountered:
Invalid status code ‘400’ from http://ip.ec2.internal:8998/sessions/2/statements/4 with error payload: {“msg”:“requirement failed: Session isn’t active.”}

below is my dataContext config

project_config = DataContextConfig(
    config_version=2,
    plugins_directory=None,
    config_variables_file_path=None,
    datasources={
        "my_spark_datasource": {
            "data_asset_type": {
                "class_name": "SparkDFDataset",
                "module_name": "great_expectations.dataset",
            },
            "class_name": "SparkDFDatasource",
            "module_name": "great_expectations.datasource",
            "batch_kwargs_generators": {
                "example": {
                    "class_name": "S3GlobReaderBatchKwargsGenerator",
                    "bucket": "mybucket",
                    "assets": {
                        "example_asset": {
                            "prefix": "data/",
                            "regex_filter": ".*csv",
                        }
                    },
                },
            },
        }
    },
    stores={
        "expectations_S3_store": {
            "class_name": "ExpectationsStore",
            "store_backend": {
                "class_name": "TupleS3StoreBackend",
                "bucket": "mybucket",
                "prefix": "my_expectations_store_prefix",
            },
        },
        "validations_S3_store": {
            "class_name": "ValidationsStore",
            "store_backend": {
                "class_name": "TupleS3StoreBackend",
                "bucket": "mybucket",
                "prefix": "my_validations_store_prefix",
            },
        },
        "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
    },
    expectations_store_name="expectations_S3_store",
    validations_store_name="validations_S3_store",
    evaluation_parameter_store_name="evaluation_parameter_store",
    data_docs_sites={
        "s3_site": {
            "class_name": "SiteBuilder",
            "store_backend": {
                "class_name": "TupleS3StoreBackend",
                "bucket":  "mybucket",
                "prefix":  "my_optional_data_docs_prefix",
            },
            "site_index_builder": {
                "class_name": "DefaultSiteIndexBuilder",
                "show_cta_footer": True,
            },
        }
    },
    validation_operators={
        "action_list_operator": {
            "class_name": "ActionListValidationOperator",
            "action_list": [
                {
                    "name": "store_validation_result",
                    "action": {"class_name": "StoreValidationResultAction"},
                },
                {
                    "name": "store_evaluation_params",
                    "action": {"class_name": "StoreEvaluationParametersAction"},
                },
                {
                    "name": "update_data_docs",
                    "action": {"class_name": "UpdateDataDocsAction"},
                },
            ],
        }
    },
    anonymous_usage_statistics={
        "enabled": True
    }
)

context = BaseDataContext(project_config=project_config)

Any help would be much Appreciated.

When I leave this line out I get the session inactive error. @Dandandan Are you able to share why you added this in?