Hi Everyone ,
I am using checkpoint which have multiple validation definitions , each validation definitions represent one batch. when i run results= checkpoint.run() it is internally processing all the batches , but I am getting results only for the last batch.
great_expectation_version: 1.3.8
os_version: windows
To Reproduce the issue.
import great_expectations as gx
from great_expectations import expectations as gxe
context = gx.get_context(mode="file")
context.enable_analytics(enable=False)
##Define Data Source,Data Asset and Batch Definition
"""
I have created a data source named pandas_data_source and added a parquet asset named company_hierarchy to it.
I am dealing with multiple parquet files present in s3 that does not have a common prefix.
something like this 000000000001.parquet,000000000001.parquet
In this example, I have added 2 batch definitions to the data asset.I used batch_request to get the batch identifiers list and then added the batch definitions to the data asset using batch_definition_path.
"""
data_source_name = "pandas_data_source"
bucket_name = "abc"
s3_prefix = "companyhierarchy/parquet_test/"
data_source = context.data_sources.add_or_update_pandas_s3(
name=data_source_name, bucket=bucket_name
)
data_asset_name = "company_hierarchy"
data_asset = data_source.add_parquet_asset(name=data_asset_name, s3_prefix=s3_prefix)
batch_definition_name = "company_hierarchy_batch_definition"
batch_request = data_asset.build_batch_request()
batch_list = data_asset.get_batch_identifiers_list(batch_request=batch_request)
for i,batch in enumerate(batch_list):
data_asset.add_batch_definition_path(name="parquet_"+str(i),path=batch.get("path").split('/')[-1])
##Define Expectation Suite
suite_name = "company_hierarchy_expectation_suite"
suite = gx.ExpectationSuite(name=suite_name)
suite.add_expectation(gxe.ExpectColumnValuesToNotBeNull(column="company_ciq_key"))
context.suites.add_or_update(suite)
##Define Validation Definition
"""
I have created a validation definition for each batch definition and added it to the context."""
batch_list = data_asset.batch_definitions
validation_definition_name = "company_hierarchy_validation_definition"
validation_list = []
for i,batch in enumerate(batch_list):
validation_definitions = gx.ValidationDefinition(data=batch, suite=suite, name=validation_definition_name+str(i))
validation_list.append(context.validation_definitions.add_or_update(validation_definitions))
##Define Checkpoint
"""
I have created a checkpoint named company_hierarchy_checkpoint and added the validation definitions to it.
I have run the checkpoint and printed the results."""
checkpoint_name = "company_hierarchy_checkpoint"
checkpoint = gx.Checkpoint(
name=checkpoint_name,
validation_definitions=[i for i in validation_list],
actions= [],
result_format= "COMPLETE"
)
checkpoint = context.checkpoints.add_or_update(checkpoint)
run_results = checkpoint.run()
print(run_results)
and output goes like this and it gives us only last batch results
C:\Users\vijaykuma_g\Downloads\Python_GX\bifrost_gx\Lib\site-packages\botocore\auth.py:425: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
datetime_now = datetime.datetime.utcnow()
Calculating Metrics: 100%|█████████████████████████████████████████████████| 8/8 [00:00<00:00, 519.64it/s]
C:\Users\vijaykuma_g\Downloads\Python_GX\bifrost_gx\Lib\site-packages\botocore\auth.py:425: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
datetime_now = datetime.datetime.utcnow()
Calculating Metrics: 100%|████████████████████████████████████████████████| 8/8 [00:00<00:00, 1062.29it/s]
run_id={"run_name": null, "run_time": "2025-03-06T11:26:59.594365+05:30"} run_results={ValidationResultIdentifier::company_hierarchy_expectation_suite/__none__/20250306T055659.594365Z/pandas_data_source-company_hierarchy: {
"success": true,
"results": [
{
"success": true,
"expectation_config": {
"type": "expect_column_values_to_not_be_null",
"kwargs": {
"batch_id": "pandas_data_source-company_hierarchy",
"column": "company_ciq_key"
},
"meta": {},
"id": "8041c44c-f63b-413d-8952-d564ed1c31c7"
},
"result": {
"element_count": 1430,
"unexpected_count": 0,
"unexpected_percent": 0.0,
"partial_unexpected_list": [],
"partial_unexpected_counts": [],
"partial_unexpected_index_list": [],
"unexpected_list": [],
"unexpected_index_list": [],
"unexpected_index_query": "df.filter(items=[], axis=0)"
},
"meta": {},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
}
}
],
"suite_name": "company_hierarchy_expectation_suite",
"suite_parameters": {},
"statistics": {
"evaluated_expectations": 1,
"successful_expectations": 1,
"unsuccessful_expectations": 0,
"success_percent": 100.0
},
"meta": {
"great_expectations_version": "1.3.8",
"batch_spec": {
"path": "s3a://abc/companyhierarchy/parquet_test/000000000001.parquet",
"reader_method": "read_parquet",
"reader_options": {}
},
"batch_markers": {
"ge_load_time": "20250306T055704.101052Z",
"pandas_data_fingerprint": "576c16cf06cad62af80e7beb22e2acd2"
},
"active_batch_definition": {
"datasource_name": "pandas_data_source",
"data_connector_name": "fluent",
"data_asset_name": "company_hierarchy",
"batch_identifiers": {
"path": "companyhierarchy/parquet_test/000000000001.parquet"
},
"batching_regex": "(?P<path>companyhierarchy/parquet_test/000000000001.parquet$)"
},
"validation_id": "b95b0a8b-f822-4a29-b508-83f9cc0f3f22",
"checkpoint_id": "55707263-4c24-4c78-85ea-ebc373caad19",
"run_id": {
"run_name": null,
"run_time": "2025-03-06T11:26:59.594365+05:30"
},
"validation_time": "2025-03-06T05:56:59.594365+00:00",
"batch_parameters": null
},
"id": null
}} checkpoint_config=Checkpoint(name='company_hierarchy_checkpoint', validation_definitions=[ValidationDefinition(name='company_hierarchy_validation_definition0', data=BatchDefinition(id=UUID('96170c88-4e33-4065-a8b8-5a86fc7b3040'), name='parquet_0', partitioner=FileNamePartitionerPath(regex=re.compile('000000000000.parquet$'), param_names=(), sort_ascending=True)), suite={
"name": "company_hierarchy_expectation_suite",
"id": "8cb53edb-3cc1-44c6-afba-4c0d25740001",
"expectations": [
{
"type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "company_ciq_key"
},
"meta": {},
"id": "8041c44c-f63b-413d-8952-d564ed1c31c7"
}
],
"meta": {
"great_expectations_version": "1.3.8"
},
"notes": null
}, id='9571f177-011d-476f-a9ae-11efde589e28'), ValidationDefinition(name='company_hierarchy_validation_definition1', data=BatchDefinition(id=UUID('2380d3d9-1297-4cb4-9a67-940bd55a6613'), name='parquet_1', partitioner=FileNamePartitionerPath(regex=re.compile('000000000001.parquet$'), param_names=(), sort_ascending=True)), suite={
"name": "company_hierarchy_expectation_suite",
"id": "8cb53edb-3cc1-44c6-afba-4c0d25740001",
"expectations": [
{
"type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "company_ciq_key"
},
"meta": {},
"id": "8041c44c-f63b-413d-8952-d564ed1c31c7"
}
],
"meta": {
"great_expectations_version": "1.3.8"
},
"notes": null
}, id='b95b0a8b-f822-4a29-b508-83f9cc0f3f22')], actions=[], result_format=<ResultFormat.COMPLETE: 'COMPLETE'>, id='55707263-4c24-4c78-85ea-ebc373caad19') success=True
In checkpoint.py I found the dictionary which is using to hold Dict[ValidationResultIdentifier ,ExpectationSuiteValidationResult].
seems for every batch it is generating same ValidationResultIdentifier(Key) resulting in overriding the value. utimately run_results will have only one batch results.
def _run_validation_definitions(
self,
batch_parameters: Dict[str, Any] | None,
expectation_parameters: SuiteParameterDict | None,
result_format: ResultFormatUnion,
run_id: RunIdentifier,
) -> Dict[ValidationResultIdentifier, ExpectationSuiteValidationResult]:
run_results: Dict[ValidationResultIdentifier, ExpectationSuiteValidationResult] = {}
for validation_definition in self.validation_definitions:
validation_result = validation_definition.run(
checkpoint_id=self.id,
batch_parameters=batch_parameters,
expectation_parameters=expectation_parameters,
result_format=result_format,
run_id=run_id,
)
key = self._build_result_key(
validation_definition=validation_definition,
run_id=run_id,
batch_identifier=validation_result.batch_id,
)
run_results[key] = validation_result
return run_results
can someone help me configuring this . Thanks in advance