Now we have a thread pool, and we use a single gx context to validate
class QCValidation:
def __init__(self):
# Initialize the context once
self.context = gx.get_context()
# Define the Data Source name
self.data_source_name = "gx_data_source"
# Add the Data Source to the Data Context
#self.data_source = self.context.data_sources.add_spark(name=self.data_source_name, persist=True)
self.data_source = self.context.data_sources.add_spark(name=self.data_source_name, persist=False)
# Define the Data Asset name
self.data_asset_name = "gx_dataframe_data_asset"
# Add a Data Asset to the Data Source
self.data_asset = self.data_source.add_dataframe_asset(name=self.data_asset_name)
# Define the Batch Definition name
self.batch_definition_name = "gx_batch_definition"
# Add a Batch Definition to the Data Asset
self.batch_definition = self.data_asset.add_batch_definition_whole_dataframe(self.batch_definition_name)
then we use execute_data_validation to execute validation. But when multi thread come into execute_data_validation, there will error reported. So is there support for multithread in gx, if so, what is the Programming Paradigm for it? THANKS!
def execute_data_validation(self, source_df, iteration):
# Define the expectation suite as JSON
_json = """{
"name": "Normalized_debtcontentitem_gx_suite",
"expectations": [
{"type": "expect_column_to_exist", "kwargs": {"column": "col1"}},
{"type": "expect_column_values_to_not_be_null", "kwargs": {"column": "col1"}}
],
"meta": {"great_expectations_version": "0.16.15"}
}"""
suite_json = json.loads(_json)
suite = gx.ExpectationSuite(**suite_json)
batch_parameters = {"dataframe": source_df}
result_format = {
"result_format": "COMPLETE",
"return_unexpected_index_query": True
}
# Validate the dataframe
self.validate_dataframe(batch_parameters, suite, result_format, iteration)
def validate_dataframe(self, batch_parameters, suite: gx.core.expectation_suite, result_format: dict = None, iteration=None):
try:
batch = self.batch_definition.get_batch(batch_parameters=batch_parameters)
validation_results = batch.validate(suite, result_format=result_format)
validation_results.id = suite.id
return validation_results
except Exception as e:
print(f"Iteration {iteration}: Error in validation suite: {str(e)}")
raise e