GX Core only reads last file in directory when using add_batch_definition_whole_directory()

Hello,

I have an s3 prefix containing a set of .parquet files. I’m trying to use GX Core to read these files into a single batch to do some validations.

I’m using add_batch_definition_whole_directory(), because the documentation says:

A whole directory Batch Definition returns all of the records in a data file as a single Batch.

Unfortunately, no matter what I try, the resulting Batch definition will only contain the last file in the directory.

Here is the code:

# omitting imports

BUCKET_NAME = 'foobar'
DATA_PATH = "foo/bar/"
DATA_ASSET_NAME = 'foobar'  # arbitary
BATCH_DEFINITION_NAME = 'foobar_contents'  # arbitrary

context = get_context()

data_source = context.data_sources.add_spark_s3(
    name=BUCKET_NAME,
    bucket=BUCKET_NAME
)

file_data_asset = data_source.add_directory_parquet_asset(
    name=DATA_ASSET_NAME,
    s3_prefix=DATA_PATH,
    s3_recursive_file_discovery=True,
    recursive_file_lookup=True,
    data_directory=DATA_PATH
)

batch_definition = file_data_asset.add_batch_definition_whole_directory(
    name=BATCH_DEFINITION_NAME,
)

spark_df = batch.data.dataframe
print(f"Spark input files: {spark_df.inputFiles()}")
# Spark input files: ['s3a://foo/bar/foobar_contents/part-00007-longhashcode.zstd.parquet']
print(f"Number of input files: {len(spark_df.inputFiles())}")
# Number of input files: 1

When I go on to add expectations and validations, I get the same issue, GX believes there is only one row of data (as each parquet file in this example contains only one row). It does read the contents of this row correctly, but of course i want the entire directory to be read.

Any help would be appreciated. Thank you.

hey @lcoogan , thanks for reporting this. It’s caused by a bug on our end which impacts whole directory BatchDefinitions on directory assets for s3, gcs, and abs data sources. I’ve filed a bug report, but don’t have a timeline for the fix. Directory assets work correctly for SparkFilesystemDatasource, so for now a potential workaround would be to run GX on the same filesystem as your data. Sorry we don’t have a better answer for you right now.

Hi @joshstauffer, thanks for the response. Good to hear that a bug report has been filed.

For anyone else running into this issue, my workaround has been to create a dataframe using PySpark directly:

from pyspark.sql import DataFrame, SparkSession

def load_prefix(bucket: str, prefix: str) -> DataFrame:
    path = f"s3a://{bucket}/{prefix}"
    return spark.read.parquet(path)

spark = SparkSession.builder \
    .appName("GX") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memoryOffheap.enabled", "true") \
    .config("spark.executor.memoryOffheap.size", "2g") \
    .getOrCreate()

atexit.register(spark.stop)

Then, you can pass the dataframe into GX at runtime:

validation_result = validation_definition.run(batch_parameters={"dataframe": df})
1 Like