Hello everyone,
I am running airflow 2.6.1 in docker. Within a DAG I have imported the GX python library and am running around 40 validations on Dataframes.
config_data_docs_sites = {
"s3_site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": "great-expectations",
"prefix": "data_docs",
"boto3_options": BOTO3_OPTIONS
},
},
}
data_context_config = DataContextConfig(
store_backend_defaults=S3StoreBackendDefaults(default_bucket_name=GX_BUCKET_NAME),
data_docs_sites=config_data_docs_sites
)
context = BaseDataContext(project_config=data_context_config)
asset_names, data_source = gx_preparation(s3_client, context, latest_version, latest_file_name, normalised_file_list)
failed = False
for asset_name in asset_names:
data_asset = data_source.get_asset(asset_name)
my_batch_request = data_asset.build_batch_request()
if asset_name.endswith('df1'):
expectation = 'Exp_Abteilung'
column_names = ["Hauptabteilung", "Nebenabteilung"]
elif asset_name.endswith('df2'):
expectation = 'Exp_Person'
column_names = ["Person", "PID"]
elif asset_name.endswith('df3'):
expectation = 'Exp_Ausruestung'
column_names = ["Ausrüstung", "KID"]
else:
expectation = 'Exp_Combined'
column_names = []
checkpoint = gx.checkpoint.SimpleCheckpoint(
name=f"{asset_name.replace('/', '_')}_test",
data_context=context,
validations=[
{
"batch_request": my_batch_request,
"expectation_suite_name": expectation,
},
],
run_name_template=latest_file_name + "_" + latest_version,
runtime_configuration={
"result_format": {
"result_format": "COMPLETE",
"unexpected_index_column_names": column_names,
"return_unexpected_index_query": True,
"include_unexpected_rows": True
},
},
)
result = checkpoint.run()
if not result["success"]:
failed = True
All the stores are inside a MinIO bucket. The issue is the speed of these validations. They take about 4 min to complete. Is there a better way to do this and improve the speed?