Is there any support for multithreading?

Follow up on above thread. Are there any plans on supporting multithreading or parallel execution of validation ?

Hi, @jon84doe - thanks for reaching out!

I checked in with Engineering on this topic. Adding support for multithreading/parallel execution is not currently in the scope of our upcoming 1.0 release.

I was able to get a bit of history around this topic from developers who have been at GX awhile - back in 13.x, we did accept a community contribution that added some multithreading capability as an experimental feature. It might be of interest to you, however, Engineering’s recollection is that it covered a niche usecase and wasn’t widely used in the community after merging.

thank you i will take a look into 13.x version appreciated

I’ve managed to get checkpoints running in parallel, but not down to expectations just yet. I thought I’d share.

We are validating tables within postgres databases and have a bunch of checkpoints being written by different developers to validate different tables, or table combinations. For now we are running locally and just copying the data_docs static content somewhere until we get further along with the publishing side. This simplicity should help others adapt. Our config is stored locally in filesystem.

Note: I am still continually confused by GX cli and json config approach of old versus the light-weight python-only approach which seems to be suggested (as at v0.18.x). I feel like the cli and the separated json approach is no longer the way and have run with that assumption. Also I have made a few assumptions below about what I need and don’t need which is based on brief experience and may not be correct.

Environment: local file config, local file data docs.

Things to address for concurrency support:

  • Datasource keeps writing jargon back to great_expectations.yml file (datasource info, queries, etc) even though we never use it. As well as adding noise, this causes contention when threaded.
  • Adding/getting table assets … we don’t want every checkpoint implementation to care if a table asset has been defined already or not.
  • We make sure checkpoints don’t generate data docs, just the validation result and we then generate data docs once at the end to avoid contention.

We wanted to disable config being written back to great_expectations.yml altogether, so added this utility in a sepratae file, common/context.py

import great_expectations as gx
from great_expectations.data_context import FileDataContext

class FileDataContextReadOnlyConfig(FileDataContext):
    """ Extends FileDataContext to prevent saving of project config to great_expectations.yml
    """
    def _save_project_config(self):
        # Prevent GX from saving datasource information to great_expectations.yml!!
        # Otherwise it saves every query, batch, etc to the file when we don't need it or ever read it.
        # Also by saving to file, this prevents parallel processing and has conflicts
        pass

def get_read_only_config_context():
    context = gx.get_context()
    context.__class__ = FileDataContextReadOnlyConfig
    return context

We also wanted to bundle context and datasource among other things to pass to our individual checkpoint implementations. We wrapped these up, but also added a convenience method to get_or_add_table_asset. This is added to a file common/checkpoints.py.

from dataclasses import dataclass, field
from threading import Lock

from great_expectations.data_context import AbstractDataContext
from great_expectations.datasource.fluent import PostgresDatasource


@dataclass
class CheckpointRequest:
    context: AbstractDataContext
    datasource: PostgresDatasource
    # Add more options in here you want to pass along (we had a flag for a quick_run for example)
    _lock: Lock = field(default=Lock(), init=False)

    def get_or_add_table_asset(self, table: str):
        asset_name = f"{table} table"
        with self._lock:
            try:
                return self.datasource.get_asset(asset_name)

            except LookupError:
                return self.datasource.add_table_asset(
                    name=asset_name,
                    table_name=table
                )

And we have a helper function to run a bunch of lambdas concurrently, in a file common/concurrency.py.

from concurrent.futures import ThreadPoolExecutor


def run_in_parallel(funcs, max_workers = None):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(func) for func in funcs]
        for future in futures:
            future.result()  # wait for task to complete and raise exception if the task threw one

Now in the root of our project we have code that looks like this:

from checkpoints.my_checkpoint_one import my_checkpoint_one
from checkpoints.my_checkpoint_two import my_checkpoint_two
from common.checkpoints import CheckpointRequest
from common.concurrency import run_in_parallel
from common.context import get_read_only_config_context


context = get_read_only_config_context()
datasource = context.sources.add_or_update_postgres(name="my-datasource-name",
                                                    connection_string="${my_jdbc_full_url_from_config_yml}")

request = CheckpointRequest(
    context=context,
    datasource=datasource,
)

run_in_parallel([
    lambda: my_checkpoint_one(req=request),
    lambda: my_checkpoint_two(req=request),
])

# Build and open data docs
context.build_data_docs()
context.open_data_docs()

An example checkpoint implementation for, say, checkpoints/my_checkpoint_one.py.

from great_expectations.checkpoint import Checkpoint
from great_expectations.core import ExpectationConfiguration
from great_expectations.core.expectation_suite import ExpectationSuite

from common.checkpoints import CheckpointRequest

name = "my_table row checks"

def my_checkpoint_one(req: CheckpointRequest):
    batch_request = req.get_or_add_table_asset("my_table").build_batch_request()
    suite = ExpectationSuite(name)
    suite.add_expectation(expectation_configuration=ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "my_column"}
    ))
    # Add more expectations here ...

    Checkpoint(
        name=f"{name}-checkpoint",
        run_name_template=f"{name}-%Y%m%d-%H%M%S",
        data_context=req.context,
        validator=req.context.get_validator(
            batch_request=batch_request,
            expectation_suite=suite,
        ),
        action_list=[
            {
                "name": "store_validation_result",
                "action": {"class_name": "StoreValidationResultAction"},
            }
        ],
    ).run()

I’m sure I’m not doing everything the correct “gx way”, but this got me going and some heavy-hitting checks could run in parallel. Some of my checkpoint files actually run multiple checkpoints and made use of the run_in_parallel helper themselves too.