Hi
I am migrating gx library from V0 to v1 and having issues checkpoint.run. We use 1 checkpoint per schema with many entities in the same schema. Validation definition is added at execution time. What am I doing wrong as I see the checkpoint is recognized by gx when initializing context.
StoreBackendError: Store already has the following key: (‘mdp_process_curated’,).
File , line 23
17 checkpoint = gx_context.checkpoints.add_or_update(checkpoint)
18 # results = validation_definition.run(
19 # checkpoint_id = checkpoint.id,
20 # batch_parameters = batch_parameters,
21 # result_format = result_format
22 # )
—> 23 results = checkpoint.run(batch_parameters=batch_parameters)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-a5684cda-aee3-4f3e-b024-77acc14fa94d/lib/python3.12/site-packages/great_expectations/data_context/store/_store_backend.py:146, in StoreBackend._add(self, key, value, **kwargs)
144 def _add(self, key, value, **kwargs):
145 if self.has_key(key):
→ 146 raise StoreBackendError(f"Store already has the following key: {key}.") # noqa: TRY003 # FIXME CoP
147 return self.set(key=key, value=value, **kwargs)
Here is my code
Checkpoints file
{
"actions": [
{
"name": "update_data_docs",
"site_names": [],
"type": "update_data_docs"
}
],
"name": "mdp_process_curated",
"result_format": "SUMMARY",
"validation_definitions": []
}
Expectation file
{
"name": "mdp_suite.curated.reference_fund_history",
"expectations": [
{
"type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "fund_code"
},
"meta": {}
}
],
"id": null,
"meta": {
"great_expectations_version": "0.18.9"
}
}
import great_expectations as gx
from pathlib import Path
import json
import yaml
from great_expectations.data_context import FileDataContext, EphemeralDataContext
from great_expectations.datasource.fluent.spark_datasource import SparkDatasource
from great_expectations.checkpoint import Checkpoint, CheckpointResult
from great_expectations import ValidationDefinition, ExpectationSuite
from datetime import date
import datetime
import re
import os
def get_or_add_checkpoint(gx_ctx: FileDataContext, checkpoint_name: str, result_format: str = "SUMMARY") -> Checkpoint:
"""
Return the Checkpoint instance for the given checkpoint name. If the Checkpoint does not already exist, create an instance.
Args:
gx_ctx (FileDataContext) : the Great Expectations Data Context
checkpoint_name (str) : the name of the Checkpoint
result_format (str) : the Expectations Result format
Returns:
the Checkpoint instance
Raises:
ValueError if checkpoint_name is empty or result_format is incorrect
"""
try:
if not checkpoint_name:
raise ValueError("checkpoint_name cannot be empty")
VALID_RESULT_FORMATS = ["BOOLEAN_ONLY", "BASIC", "SUMMARY", "COMPLETE"]
if result_format not in VALID_RESULT_FORMATS:
raise ValueError(f"{result_format=} is incorrect. Should be one of the valid result formats: " + ",".join(VALID_RESULT_FORMATS))
if checkpoint_name in [checkpoint.name for checkpoint in gx_ctx.checkpoints.all()]:
checkpoint = gx_ctx.checkpoints.get(checkpoint_name)
else:
checkpoint_content = read_gx_file_contents(gx_ctx.root_directory, checkpoint_name, "checkpoints")
checkpoint = gx_ctx.checkpoints.add_or_update(gx.Checkpoint(**checkpoint_content))
return checkpoint
except Exception as err:
raise RuntimeError(f"An error occurred in get_or_add_checkpoint: {str(err)}")
def get_or_add_datasource(gx_ctx: FileDataContext, datasource_name: str) -> SparkDatasource:
"""
Return an SparkDatasource instance for the given data source name. Create the data source does not already exist.
Args:
gx_ctx (FileDataContext) : the Great Expectations Data Context
datasource_name (str) : the name of the Data Source
Returns:
the SparkDatasource instance
Raises:
ValueError if datasource_name is empty
"""
try:
if not datasource_name:
raise ValueError("datasource_name cannot be empty")
if datasource_name in [ds['name'] for ds in gx_ctx.list_datasources()]:
return gx_ctx.data_sources.get(name=datasource_name)
else:
return gx_ctx.data_sources.add_spark(name=datasource_name,persist=False)
except Exception as err:
raise RuntimeError(f"An error occurred in get_or_add_datasource: {str(err)}")
def get_expectations_suite_names(gx_ctx: FileDataContext) -> ExpectationSuite:
"""
Return the list of expectations suite names that have been configured for the GX project (represented by the GX data context).
This function is a simple wrapper over the GX DataContext.list_expectation_suite_names() method.
Args:
gx_ctx (FileDataContext) : the Great Expectations Data Context
Returns:
the list of Expectations Suite names associated with the given Data Context
"""
try:
return gx_ctx.suites.all()
except Exception as err:
raise RuntimeError(f"An error occurred in get_expectations_suite_names: {str(err)}")
def read_gx_file_contents(ctx_root_dir: str, object_name: str, object_type: str) -> dict:
"""
Reads great expectations json/yml file object contents and returns content.
Args:
ctx_root_dir (str) : the root directory of the Great Expectations project
object_name (str) : the name of the Great Expectations object (e.g., checkpoint name/ expectation suite name)
object_type (str) : the type of the Great Expectations object (e.g., "checkpoints", "expectations")
Returns:
the file content as dictionary
"""
extension_type = '.json'
file_path = os.path.join(ctx_root_dir, object_type, object_name.replace('.',r'/') + extension_type)
if not os.path.exists(file_path):
raise FileNotFoundError(f"GX file not found at path: {file_path}")
with open(file_path, 'r') as file:
if object_type == 'checkpoints':
file_content_dict = yaml.safe_load(file)
else:
file_content = file.read()
file_content_dict = json.loads(file_content)
return file_content_dict
def create_validation_definition(gx_ctx: FileDataContext,datasource_name: str,suite_name: str,data_asset_name: str) -> ValidationDefinition:
"""
Create and return a Validation Definition instance.
Args:
gx_ctx (FileDataContext) : the Great Expectations Data Context
datasource_name (str) : the name of the data source
suite_name (str) : the Expectations Suite name
data_asset_name (str) : the name of the data asset
Returns:
the Validation Definition instance
"""
try:
data_source = get_or_add_datasource(gx_ctx, datasource_name=datasource_name)
if data_asset_name in data_source.get_asset_names():
data_source.delete_asset(data_asset_name)
data_asset = data_source.add_dataframe_asset(name=data_asset_name)
batch_definition = data_asset.add_batch_definition_whole_dataframe(f"{suite_name}_batch")
exptectation_suite = gx_ctx.suites.add_or_update(gx_ctx.suites.get(suite_name))
validation_definition = gx_ctx.validation_definitions.add_or_update(
gx.ValidationDefinition(
name=f"{suite_name}_validation",
data=batch_definition,
suite=exptectation_suite,
)
)
return validation_definition
except Exception as err:
raise RuntimeError(f"An error occurred in create_validation_definition: {str(err)}")
schema_name = 'curated'
table_name = 'reference_fund_history'
datasource_name = f'{schema_name}_source'
checkpoint_name = f'mdp_process_{schema_name}'
run_id = f"{table_name}"
result_format = 'SUMMARY'
suite_name = 'mdp_suite.curated.reference_fund_history'
df = read_table(f"{schema_name}.{table_name}")
batch_parameters = {"dataframe": df}
gx_context = gx.get_context(context_root_dir='/Workspace/testing/')
checkpoint = get_or_add_checkpoint(gx_ctx=gx_context, checkpoint_name=checkpoint_name)
validation_definition = create_validation_definition(gx_context, datasource_name, suite_name, f"{schema_name}.{table_name}")
checkpoint.validation_definitions = [validation_definition]
results = checkpoint.run(batch_parameters=batch_parameters)```