Hello,
I have an s3 prefix containing a set of .parquet files. I’m trying to use GX Core to read these files into a single batch to do some validations.
I’m using add_batch_definition_whole_directory()
, because the documentation says:
A whole directory Batch Definition returns all of the records in a data file as a single Batch.
Unfortunately, no matter what I try, the resulting Batch definition will only contain the last file in the directory.
Here is the code:
# omitting imports
BUCKET_NAME = 'foobar'
DATA_PATH = "foo/bar/"
DATA_ASSET_NAME = 'foobar' # arbitary
BATCH_DEFINITION_NAME = 'foobar_contents' # arbitrary
context = get_context()
data_source = context.data_sources.add_spark_s3(
name=BUCKET_NAME,
bucket=BUCKET_NAME
)
file_data_asset = data_source.add_directory_parquet_asset(
name=DATA_ASSET_NAME,
s3_prefix=DATA_PATH,
s3_recursive_file_discovery=True,
recursive_file_lookup=True,
data_directory=DATA_PATH
)
batch_definition = file_data_asset.add_batch_definition_whole_directory(
name=BATCH_DEFINITION_NAME,
)
spark_df = batch.data.dataframe
print(f"Spark input files: {spark_df.inputFiles()}")
# Spark input files: ['s3a://foo/bar/foobar_contents/part-00007-longhashcode.zstd.parquet']
print(f"Number of input files: {len(spark_df.inputFiles())}")
# Number of input files: 1
When I go on to add expectations and validations, I get the same issue, GX believes there is only one row of data (as each parquet file in this example contains only one row). It does read the contents of this row correctly, but of course i want the entire directory to be read.
Any help would be appreciated. Thank you.