Running checkpoint.run is returning error StoreBackendError: Store already has the following key

Hi

I am migrating gx library from V0 to v1 and having issues checkpoint.run. We use 1 checkpoint per schema with many entities in the same schema. Validation definition is added at execution time. What am I doing wrong as I see the checkpoint is recognized by gx when initializing context.

StoreBackendError: Store already has the following key: (‘mdp_process_curated’,).
File , line 23
17 checkpoint = gx_context.checkpoints.add_or_update(checkpoint)
18 # results = validation_definition.run(
19 # checkpoint_id = checkpoint.id,
20 # batch_parameters = batch_parameters,
21 # result_format = result_format
22 # )
—> 23 results = checkpoint.run(batch_parameters=batch_parameters)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-a5684cda-aee3-4f3e-b024-77acc14fa94d/lib/python3.12/site-packages/great_expectations/data_context/store/_store_backend.py:146, in StoreBackend._add(self, key, value, **kwargs)
144 def _add(self, key, value, **kwargs):
145 if self.has_key(key):
→ 146 raise StoreBackendError(f"Store already has the following key: {key}.") # noqa: TRY003 # FIXME CoP
147 return self.set(key=key, value=value, **kwargs)

Here is my code

Checkpoints file
{
  "actions": [
    {
      "name": "update_data_docs",
      "site_names": [],
      "type": "update_data_docs"
    }
  ],
  "name": "mdp_process_curated",
  "result_format": "SUMMARY",
  "validation_definitions": []
}


Expectation file
{
  "name": "mdp_suite.curated.reference_fund_history",
  "expectations": [
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "fund_code"
      },
      "meta": {}
    }

  ],
  "id": null,
  "meta": {
    "great_expectations_version": "0.18.9"
  }
}
import great_expectations as gx
from pathlib import Path
import json
import yaml

from great_expectations.data_context import FileDataContext, EphemeralDataContext
from great_expectations.datasource.fluent.spark_datasource import SparkDatasource
from great_expectations.checkpoint import Checkpoint, CheckpointResult
from great_expectations import ValidationDefinition, ExpectationSuite
from datetime import date
import datetime
import re
import os

def get_or_add_checkpoint(gx_ctx: FileDataContext, checkpoint_name: str, result_format: str = "SUMMARY") -> Checkpoint:
    """
    Return the Checkpoint instance for the given checkpoint name. If the Checkpoint does not already exist, create an instance. 

    Args:
        gx_ctx (FileDataContext) : the Great Expectations Data Context
        checkpoint_name (str) : the name of the Checkpoint
        result_format (str) : the Expectations Result format
    Returns:
        the Checkpoint instance
    Raises:
        ValueError if checkpoint_name is empty or result_format is incorrect
    """
    try:
        if not checkpoint_name:
            raise ValueError("checkpoint_name cannot be empty")
        VALID_RESULT_FORMATS = ["BOOLEAN_ONLY", "BASIC", "SUMMARY", "COMPLETE"]
        if result_format not in VALID_RESULT_FORMATS:
            raise ValueError(f"{result_format=} is incorrect. Should be one of the valid result formats: " + ",".join(VALID_RESULT_FORMATS))

        if checkpoint_name in [checkpoint.name for checkpoint in gx_ctx.checkpoints.all()]:
            checkpoint = gx_ctx.checkpoints.get(checkpoint_name)
        else:
            checkpoint_content = read_gx_file_contents(gx_ctx.root_directory, checkpoint_name, "checkpoints")
            checkpoint = gx_ctx.checkpoints.add_or_update(gx.Checkpoint(**checkpoint_content))

        return checkpoint
    except Exception as err:
        raise RuntimeError(f"An error occurred in get_or_add_checkpoint: {str(err)}")

def get_or_add_datasource(gx_ctx: FileDataContext, datasource_name: str) -> SparkDatasource:
    """
    Return an SparkDatasource instance for the given data source name. Create the data source does not already exist. 

    Args:
        gx_ctx (FileDataContext) : the Great Expectations Data Context
        datasource_name (str) : the name of the Data Source
    Returns:
        the SparkDatasource instance
    Raises:
        ValueError if datasource_name is empty
    """
    try:
        if not datasource_name:
            raise ValueError("datasource_name cannot be empty")
        if datasource_name in [ds['name'] for ds in gx_ctx.list_datasources()]:
            return gx_ctx.data_sources.get(name=datasource_name)
        else:
            return gx_ctx.data_sources.add_spark(name=datasource_name,persist=False)
    except Exception as err:
        raise RuntimeError(f"An error occurred in get_or_add_datasource: {str(err)}")

def get_expectations_suite_names(gx_ctx: FileDataContext) -> ExpectationSuite:
    """
    Return the list of expectations suite names that have been configured for the GX project (represented by the GX data context). 
    This function is a simple wrapper over the GX DataContext.list_expectation_suite_names() method. 

    Args:
        gx_ctx (FileDataContext) : the Great Expectations Data Context
    Returns:
        the list of Expectations Suite names associated with the given Data Context
    """
    try:
        return gx_ctx.suites.all()
    except Exception as err:
        raise RuntimeError(f"An error occurred in get_expectations_suite_names: {str(err)}") 


def read_gx_file_contents(ctx_root_dir: str, object_name: str, object_type: str) -> dict:
    """
    Reads great expectations json/yml file object contents and returns content.

    Args:
        ctx_root_dir (str) : the root directory of the Great Expectations project
        object_name (str) : the name of the Great Expectations object (e.g., checkpoint name/ expectation suite name)
        object_type (str) : the type of the Great Expectations object (e.g., "checkpoints", "expectations")
    Returns:
        the file content as dictionary
    """
    extension_type = '.json'
    file_path = os.path.join(ctx_root_dir, object_type, object_name.replace('.',r'/') + extension_type)
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"GX file not found at path: {file_path}")
    with open(file_path, 'r') as file:
        if object_type == 'checkpoints':
            file_content_dict = yaml.safe_load(file)
        else:
            file_content = file.read()
            file_content_dict = json.loads(file_content)

    return file_content_dict

def create_validation_definition(gx_ctx: FileDataContext,datasource_name: str,suite_name: str,data_asset_name: str) -> ValidationDefinition:
    """
    Create and return a Validation Definition instance.

    Args:
        gx_ctx (FileDataContext) : the Great Expectations Data Context
        datasource_name (str) : the name of the data source
        suite_name (str) : the Expectations Suite name
        data_asset_name (str) : the name of the data asset

    Returns:
        the Validation Definition instance
    """
    try:
        data_source = get_or_add_datasource(gx_ctx, datasource_name=datasource_name)
        if data_asset_name in data_source.get_asset_names():
            data_source.delete_asset(data_asset_name)
        
        data_asset = data_source.add_dataframe_asset(name=data_asset_name)
        batch_definition = data_asset.add_batch_definition_whole_dataframe(f"{suite_name}_batch")
        exptectation_suite = gx_ctx.suites.add_or_update(gx_ctx.suites.get(suite_name))

        validation_definition = gx_ctx.validation_definitions.add_or_update(
            gx.ValidationDefinition(
                name=f"{suite_name}_validation",
                data=batch_definition,
                suite=exptectation_suite,
            )
        )

        return validation_definition
    except Exception as err:
        raise RuntimeError(f"An error occurred in create_validation_definition: {str(err)}") 
    

schema_name = 'curated'
table_name = 'reference_fund_history'
datasource_name = f'{schema_name}_source'
checkpoint_name = f'mdp_process_{schema_name}'
run_id = f"{table_name}"
result_format = 'SUMMARY'
suite_name = 'mdp_suite.curated.reference_fund_history'

df = read_table(f"{schema_name}.{table_name}")
batch_parameters = {"dataframe": df}

gx_context = gx.get_context(context_root_dir='/Workspace/testing/')
checkpoint = get_or_add_checkpoint(gx_ctx=gx_context, checkpoint_name=checkpoint_name)
validation_definition = create_validation_definition(gx_context, datasource_name, suite_name, f"{schema_name}.{table_name}")

checkpoint.validation_definitions = [validation_definition]

results = checkpoint.run(batch_parameters=batch_parameters)```