Unable to configure the GE checks to validate parquet files on s3

I am currently accessing parquet files from aws s3 and using AWS MWAA (Airflow) to validate input files. I am using the dataframe approach however I want to access the files directly from s3 without reading into data frame can you suggest changes.

Below is my code for reference

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import great_expectations as gx
import great_expectations.expectations as gxe
from great_expectations.checkpoint import UpdateDataDocsAction
import os
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
import logging

S3 project configuration

s3_gx_project = “s3://bucket/great_expectations/”

Default arguments for the DAG

default_args = {
“owner”: “airflow”,
“start_date”: datetime(2024, 2, 10),
“retries”: 1,
}

Define the DAG

dag = DAG(
“biorelate_validation_author_data_new”,
default_args=default_args,
schedule_interval=None,
catchup=False,
)

def validate_data_spark(file_type, s3_bucket, s3_key, data_source_name):
“”“Validates data using Spark and Great Expectations, then uploads the data docs to S3.”“”
spark = SparkSession.builder
.appName(f"GreatExpectationsValidation_{data_source_name}")
.config(“spark.hadoop.fs.s3a.impl”, “org.apache.hadoop.fs.s3a.S3AFileSystem”)
.config(“spark.jars.packages”, “org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4”)
.config(“spark.executor.memory”, “4G”)
.config(“spark.executor.instances”, “2”)
.config(“spark.executor.cores”, “2”)
.config(“spark.executor.memoryOverhead”, “1024M”)
.config(“spark.executor.heartbeatInterval”, “60s”)
.config(“spark.memory.offHeap.enabled”, “true”)
.config(“spark.memory.offHeap.size”, “1024M”)
.config(“spark.network.timeout”, “600s”)
.config(“spark.sql.adaptive.coalescePartitions.enabled”, “true”)
.config(“spark.sql.files.maxPartitionBytes”, “128MB”)
.config(“spark.sql.shuffle.partitions”, “20”)
.config(“spark.sql.adaptive.enabled”, “true”)
.config(“spark.hadoop.fs.s3a.fast.upload”, “true”)
.getOrCreate()

try:
    # Read data using Spark
    
    if file_type.lower() == "csv":
        dataframe = spark.read.option("header", True).csv(f"s3a://{s3_bucket}/{s3_key}")
    elif file_type.lower() == "parquet":
        dataframe = spark.read.parquet(f"s3a://{s3_bucket}/{s3_key}")
    else:
        raise ValueError(f"Unsupported file type: {file_type}")
    
    dataframe = dataframe.persist(StorageLevel.DISK_ONLY)
    

    # Initialize Great Expectations Context
    #context = gx.get_context(mode="file", project_root_dir=s3_gx_project)
    context = gx.get_context()
    # Create or get data source
    data_source = context.data_sources.add_or_update_spark(name=data_source_name)
    
    # Create or get data asset
    data_asset_name = f"{data_source_name}_data_asset"
    data_asset = data_source.add_dataframe_asset(name=data_asset_name)
    
    # Create or get batch definition
    batch_definition_name = f"{data_source_name}_batch_definition"
    batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)
    
    # Create expectation suite
    suite_name = f"{data_source_name}_expectation_suite"
    suite = gx.ExpectationSuite(name=suite_name)
    suite.add_expectation(gxe.ExpectColumnValuesToNotBeNull(column="source_type"))
    suite.add_expectation(gxe.ExpectColumnValuesToNotBeNull(column="document_id"))
    suite.add_expectation(gxe.ExpectColumnValuesToNotBeNull(column="full_name"))
    suite.add_expectation(gxe.ExpectColumnValuesToNotBeNull(column="affiliations"))
    suite.add_expectation(gxe.ExpectColumnToExist(column="source_type"))
    suite.add_expectation(gxe.ExpectColumnToExist(column="document_id"))
    suite.add_expectation(gxe.ExpectColumnToExist(column="full_name"))
    suite.add_expectation(gxe.ExpectColumnToExist(column="email"))
    suite.add_expectation(gxe.ExpectColumnToExist(column="affiliations"))
    suite.add_expectation(gxe.ExpectColumnValuesToBeInSet(column="source_type", value_set=["Clinical Trial", "NIH Grant", "Drug Label", "Patent", "PMC", "Preprint", "PubMed"],))
    context.suites.add_or_update(suite)

    # Create validation definition
    validation_definition_name = f"{data_source_name}_validation_definition"
    validation_definition = gx.ValidationDefinition(
        data=batch_definition,
        suite=suite,
        name=validation_definition_name
    )
    context.validation_definitions.add_or_update(validation_definition)

    # Create checkpoint
    checkpoint_name = f"{data_source_name}_checkpoint"
    actions = [gx.checkpoint.actions.UpdateDataDocsAction(name="update_my_site")]
    checkpoint = gx.Checkpoint(
        name=checkpoint_name,
        validation_definitions=[validation_definition],
        actions=actions,
        result_format={"result_format": "COMPLETE"},
    )
    context.checkpoints.add_or_update(checkpoint)

    # Validate Data
    batch = batch_definition.get_batch(batch_parameters={"dataframe": dataframe})
    validation_results = checkpoint.run(batch_parameters={"dataframe": dataframe})
    
    if not validation_results.success:
        logging.warning(f"Data validation failed for {data_source_name}")

    # Generate separate data docs per task
    site_name = f"s3_biorelate_{data_source_name}_site"
    s3_data_docs_prefix = f"great_expectations/uncommitted/data_docs/{data_source_name}/"

    sites = context.list_data_docs_sites()
    if site_name in sites:
        context.delete_data_docs_site(site_name=site_name)
    
    site_config = {
        "class_name": "SiteBuilder",
        "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
        "store_backend": {
            "class_name": "TupleS3StoreBackend",
            "bucket": "my-bucket",
            "prefix": s3_data_docs_prefix,
        },
    }
    context.add_data_docs_site(site_name=site_name, site_config=site_config)
    context.build_data_docs(site_names=site_name)
    
    # Log the Data Docs URL
    data_docs_url = context.open_data_docs()
    logging.info(f"Data Docs URL for {data_source_name}: {data_docs_url}")

    dataframe.unpersist()

except Exception as e:
    logging.error(f"Error while processing {data_source_name}: {str(e)}")
finally:
    spark.stop()

validate_clinical_trial_author = PythonOperator(
task_id=“validate_clinical_trial_author”,
python_callable=validate_data_spark,
op_kwargs={
“file_type”: “parquet”,
“s3_bucket”: “my-bucket”,
“s3_key”: “uploads/2025/01/Authors_Clinical_Trial.parquet”,
“data_source_name”: “biorelate_clinical_trial_author”,
},
dag=dag,
)

validate_clinical_trial_author