This article is for comments to: How to instantiate a Data Context on an EMR Spark cluster — great_expectations documentation
Please comment +1 if this How to is important to you.
This article is for comments to: How to instantiate a Data Context on an EMR Spark cluster — great_expectations documentation
Please comment +1 if this How to is important to you.
+1
It seems using the S3GlobReaderBatchKwargsGenerator
will translate and s3://
path into s3a://
which messes up with spark being able to open file within the EMRFS context.
I might be doing something wrong, having documentation will uncover if it’s a bug or not
+1
Just joined so I could +1 this! I’ve been playing with ge for a few weeks locally now. Good work so far, very impressed! I see a lot of potential to assist us with our data quality problems and the next step would be to try it on a bigger scale on our AWS instance.
Keep up the good work
I have been successully using it on EMR using this config running via spark-submit:
datasources={
"my_data_source": {
"data_asset_type": {
"class_name": "SparkDFDataset",
"module_name": "great_expectations.dataset",
},
"spark_config": dict(pyspark.SparkConf().getAll()),
"class_name": "SparkDFDatasource",
"module_name": "great_expectations.datasource",
"batch_kwargs_generators": {
# Hive not yet supported https://github.com/great-expectations/great_expectations/pull/1811
# "spark_sql_query": {
# "class_name": "QueryBatchKwargsGenerator",
# "queries": {"demographic_features": "SELECT * FROM data.dataseg"},
# }
"example": {
"class_name": "S3GlobReaderBatchKwargsGenerator",
"bucket": YOUR_BUCKET_LOCATION,
"assets": {
"example_asset": {
"prefix": "example_prefix/",
"regex_filter": ".*parquet",
}
},
},
},
},
},
Also for reference, this is the up to date url with the most up to date docs:
Thank you @Dandandan! I just updated the link from the original post. Glad to hear it is working well for you!
When I try to run it in EMR using a notebook and try to create a context using the config similar to defined above. I am getting following error
context = BaseDataContext(project_config=project_config)
An error was encountered:
Invalid status code ‘400’ from http://ip.ec2.internal:8998/sessions/2/statements/4 with error payload: {“msg”:“requirement failed: Session isn’t active.”}
below is my dataContext config
project_config = DataContextConfig(
config_version=2,
plugins_directory=None,
config_variables_file_path=None,
datasources={
"my_spark_datasource": {
"data_asset_type": {
"class_name": "SparkDFDataset",
"module_name": "great_expectations.dataset",
},
"class_name": "SparkDFDatasource",
"module_name": "great_expectations.datasource",
"batch_kwargs_generators": {
"example": {
"class_name": "S3GlobReaderBatchKwargsGenerator",
"bucket": "mybucket",
"assets": {
"example_asset": {
"prefix": "data/",
"regex_filter": ".*csv",
}
},
},
},
}
},
stores={
"expectations_S3_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": "mybucket",
"prefix": "my_expectations_store_prefix",
},
},
"validations_S3_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": "mybucket",
"prefix": "my_validations_store_prefix",
},
},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
},
expectations_store_name="expectations_S3_store",
validations_store_name="validations_S3_store",
evaluation_parameter_store_name="evaluation_parameter_store",
data_docs_sites={
"s3_site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": "mybucket",
"prefix": "my_optional_data_docs_prefix",
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
"show_cta_footer": True,
},
}
},
validation_operators={
"action_list_operator": {
"class_name": "ActionListValidationOperator",
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
],
}
},
anonymous_usage_statistics={
"enabled": True
}
)
context = BaseDataContext(project_config=project_config)
Any help would be much Appreciated.
When I leave this line out I get the session inactive error. @Dandandan Are you able to share why you added this in?