I am currently accessing parquet files from aws s3 and using AWS MWAA (Airflow) to validate input files. I am using the dataframe approach however I want to access the files directly from s3 without reading into data frame can you suggest changes.
Below is my code for reference
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import great_expectations as gx
import great_expectations.expectations as gxe
from great_expectations.checkpoint import UpdateDataDocsAction
import os
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
import logging
S3 project configuration
s3_gx_project = “s3://bucket/great_expectations/”
Default arguments for the DAG
default_args = {
“owner”: “airflow”,
“start_date”: datetime(2024, 2, 10),
“retries”: 1,
}
Define the DAG
dag = DAG(
“biorelate_validation_author_data_new”,
default_args=default_args,
schedule_interval=None,
catchup=False,
)
def validate_data_spark(file_type, s3_bucket, s3_key, data_source_name):
“”“Validates data using Spark and Great Expectations, then uploads the data docs to S3.”“”
spark = SparkSession.builder
.appName(f"GreatExpectationsValidation_{data_source_name}")
.config(“spark.hadoop.fs.s3a.impl”, “org.apache.hadoop.fs.s3a.S3AFileSystem”)
.config(“spark.jars.packages”, “org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4”)
.config(“spark.executor.memory”, “4G”)
.config(“spark.executor.instances”, “2”)
.config(“spark.executor.cores”, “2”)
.config(“spark.executor.memoryOverhead”, “1024M”)
.config(“spark.executor.heartbeatInterval”, “60s”)
.config(“spark.memory.offHeap.enabled”, “true”)
.config(“spark.memory.offHeap.size”, “1024M”)
.config(“spark.network.timeout”, “600s”)
.config(“spark.sql.adaptive.coalescePartitions.enabled”, “true”)
.config(“spark.sql.files.maxPartitionBytes”, “128MB”)
.config(“spark.sql.shuffle.partitions”, “20”)
.config(“spark.sql.adaptive.enabled”, “true”)
.config(“spark.hadoop.fs.s3a.fast.upload”, “true”)
.getOrCreate()
try:
# Read data using Spark
if file_type.lower() == "csv":
dataframe = spark.read.option("header", True).csv(f"s3a://{s3_bucket}/{s3_key}")
elif file_type.lower() == "parquet":
dataframe = spark.read.parquet(f"s3a://{s3_bucket}/{s3_key}")
else:
raise ValueError(f"Unsupported file type: {file_type}")
dataframe = dataframe.persist(StorageLevel.DISK_ONLY)
# Initialize Great Expectations Context
#context = gx.get_context(mode="file", project_root_dir=s3_gx_project)
context = gx.get_context()
# Create or get data source
data_source = context.data_sources.add_or_update_spark(name=data_source_name)
# Create or get data asset
data_asset_name = f"{data_source_name}_data_asset"
data_asset = data_source.add_dataframe_asset(name=data_asset_name)
# Create or get batch definition
batch_definition_name = f"{data_source_name}_batch_definition"
batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)
# Create expectation suite
suite_name = f"{data_source_name}_expectation_suite"
suite = gx.ExpectationSuite(name=suite_name)
suite.add_expectation(gxe.ExpectColumnValuesToNotBeNull(column="source_type"))
suite.add_expectation(gxe.ExpectColumnValuesToNotBeNull(column="document_id"))
suite.add_expectation(gxe.ExpectColumnValuesToNotBeNull(column="full_name"))
suite.add_expectation(gxe.ExpectColumnValuesToNotBeNull(column="affiliations"))
suite.add_expectation(gxe.ExpectColumnToExist(column="source_type"))
suite.add_expectation(gxe.ExpectColumnToExist(column="document_id"))
suite.add_expectation(gxe.ExpectColumnToExist(column="full_name"))
suite.add_expectation(gxe.ExpectColumnToExist(column="email"))
suite.add_expectation(gxe.ExpectColumnToExist(column="affiliations"))
suite.add_expectation(gxe.ExpectColumnValuesToBeInSet(column="source_type", value_set=["Clinical Trial", "NIH Grant", "Drug Label", "Patent", "PMC", "Preprint", "PubMed"],))
context.suites.add_or_update(suite)
# Create validation definition
validation_definition_name = f"{data_source_name}_validation_definition"
validation_definition = gx.ValidationDefinition(
data=batch_definition,
suite=suite,
name=validation_definition_name
)
context.validation_definitions.add_or_update(validation_definition)
# Create checkpoint
checkpoint_name = f"{data_source_name}_checkpoint"
actions = [gx.checkpoint.actions.UpdateDataDocsAction(name="update_my_site")]
checkpoint = gx.Checkpoint(
name=checkpoint_name,
validation_definitions=[validation_definition],
actions=actions,
result_format={"result_format": "COMPLETE"},
)
context.checkpoints.add_or_update(checkpoint)
# Validate Data
batch = batch_definition.get_batch(batch_parameters={"dataframe": dataframe})
validation_results = checkpoint.run(batch_parameters={"dataframe": dataframe})
if not validation_results.success:
logging.warning(f"Data validation failed for {data_source_name}")
# Generate separate data docs per task
site_name = f"s3_biorelate_{data_source_name}_site"
s3_data_docs_prefix = f"great_expectations/uncommitted/data_docs/{data_source_name}/"
sites = context.list_data_docs_sites()
if site_name in sites:
context.delete_data_docs_site(site_name=site_name)
site_config = {
"class_name": "SiteBuilder",
"site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": "my-bucket",
"prefix": s3_data_docs_prefix,
},
}
context.add_data_docs_site(site_name=site_name, site_config=site_config)
context.build_data_docs(site_names=site_name)
# Log the Data Docs URL
data_docs_url = context.open_data_docs()
logging.info(f"Data Docs URL for {data_source_name}: {data_docs_url}")
dataframe.unpersist()
except Exception as e:
logging.error(f"Error while processing {data_source_name}: {str(e)}")
finally:
spark.stop()
validate_clinical_trial_author = PythonOperator(
task_id=“validate_clinical_trial_author”,
python_callable=validate_data_spark,
op_kwargs={
“file_type”: “parquet”,
“s3_bucket”: “my-bucket”,
“s3_key”: “uploads/2025/01/Authors_Clinical_Trial.parquet”,
“data_source_name”: “biorelate_clinical_trial_author”,
},
dag=dag,
)
validate_clinical_trial_author