We currently set up our CSV batch requests with “inferSchema: True”. However, we sometimes receive a data file where a datetime
column contains all null
s. As one might expect, the engine does not identify the column as containing datetimes
but rather string
s. This then leads to our expect_column_max_to_be_between
expectation erroring, as it can’t compare a datetime
with a string
.
Is there a way we can specify the schema we are expecting, so that the column will be treated as a datetime
even when all values are null
? (We’re struggling to find any documentation for the correct syntax.)
- batch_request:
datasource_name: blob_datasource
data_connector_name: inbound_blob_data_connector
data_asset_name: Patient
batch_spec_passthrough:
reader_method: csv
reader_options:
header: True
inferSchema: True
We are using the SparkDFExecutionEngine
exection engine.
Many thanks
Update: We have tried the following:
- batch_request:
datasource_name: blob_datasource
data_connector_name: inbound_blob_data_connector
data_asset_name: Patient
batch_spec_passthrough:
reader_method: csv
reader_options:
header: True
inferSchema: False
enforceSchema: False # Have also tried True
schema:
FieldName1: StringType
FieldName2: StringType
FieldName3: DateType
FieldName4: BooleanType
FieldName5: StringType
But are seeing the error:
Exception occurred during checkpoint: 'fields'
I have no idea if this works but consider writing the data connector in Python and explicitly specifying a schema for the CSV.
Something like this:
schema = StructType([
StructField("event_time", TimestampType(), True),
# Add other fields as necessary
# StructField("column_name", DataType(), True/False),
])
asset = datasource.add_csv_asset(
asset_name,
filepath_or_buffer=path_to_data,
schema=schema,
)
If I remember correctly, GX just directly passes any additional arguments to Spark so this would translate to roughly the Spark code below:
df = spark.read.csv(
path_to_data,
schema=schema,
)
Alternatively, you can create a Custom Expectation that can handle this special case with only nulls in the column.
1 Like
Thanks Toivo. I’ll give that a try.
I suppose I’ll need to translate our yaml config into python. (Is the yaml route no longer recommended?)
Yes, as far as I know, GX recommends programmatically developing and maintaining the Checkpoints etc using Python. Under the hood, GX then stores the configurations as YAML, which is easier to i.e. push to production.
I personally use Jupyter Notebooks when working with GX. It enables getting immediate feedback when something I’ve written doesn’t work. For example, when there’s something wrong with connecting to the data or when the data I’m working with doesn’t actually conform to the Expectations I’m writing.
Thanks Tovio. I have converted our YAML config into Python (working with Jupyter), and am already seeing benefits from that.
Now to the issue under discussion here…
Our code is currently using Pandas, but it looks like your solution is for Spark. I have been exploring two routes:
- Making your code work with Pandas
- Moving to Spark.
Unfortunately I’ve been struggling with both. Can you help with 1 at all, or would you recommend 2?
Alright, great to hear that!
Yes, I’m quite proficient with Pandas as well. I have no idea why my initial code used Spark 
As for whether to use Pandas or Spark, it depends on your data volume.
Definitely use Pandas for small datasets (see below). Spark is overkill and will just make the validations run longer and have fewer supported Expectations without any benefits.
Switch to Spark when Pandas is not enough.
As for what is a “small dataset”, let’s say less than 5-10 million rows.
(Assuming file size is less than how much memory the cluster has + Pandas overhead)
That is a rough estimate and I’d like to have a benchmark to verify that.
I didn’t find a good benchmarking between Pandas and Spark yet but I’ll look into that and get back to you.
Well, for better or worse, we’re now using Spark - and your code has solved my original problem.
(I’m acutally using spark.read.csv
directly so can’t comment on whether it works to specify the schema in add_csv_asset
.)
My code, in case it helps others:
datasource = context.sources.add_spark("Spark")
schema = StructType([
StructField("Field1", StringType(), True),
StructField("Field2", TimestampType(), True),
StructField("Field3", BooleanType(), True),
])
asset = datasource.add_dataframe_asset(name=file_name)
spark_data_frame = spark.read.csv(f"{csv_base_path}/{file_name}.csv", header=True, schema=schema)
batch_request = asset.build_batch_request(dataframe=spark_data_frame)
Many thanks for your help.
1 Like