Can we specify the data schema in yaml config?

We currently set up our CSV batch requests with “inferSchema: True”. However, we sometimes receive a data file where a datetime column contains all nulls. As one might expect, the engine does not identify the column as containing datetimes but rather strings. This then leads to our expect_column_max_to_be_between expectation erroring, as it can’t compare a datetime with a string.

Is there a way we can specify the schema we are expecting, so that the column will be treated as a datetime even when all values are null? (We’re struggling to find any documentation for the correct syntax.)

  - batch_request:
      datasource_name: blob_datasource
      data_connector_name: inbound_blob_data_connector
      data_asset_name: Patient
      batch_spec_passthrough:
        reader_method: csv
        reader_options:
          header: True
          inferSchema: True

We are using the SparkDFExecutionEngine exection engine.

Many thanks

Update: We have tried the following:

- batch_request:
      datasource_name: blob_datasource
      data_connector_name: inbound_blob_data_connector
      data_asset_name: Patient
      batch_spec_passthrough:
        reader_method: csv
        reader_options:
          header: True
          inferSchema: False
          enforceSchema: False    # Have also tried True
          schema:
            FieldName1: StringType
            FieldName2: StringType
            FieldName3: DateType
            FieldName4: BooleanType
            FieldName5: StringType

But are seeing the error:

Exception occurred during checkpoint: 'fields'

I have no idea if this works but consider writing the data connector in Python and explicitly specifying a schema for the CSV.
Something like this:

schema = StructType([
    StructField("event_time", TimestampType(), True),
    # Add other fields as necessary
    # StructField("column_name", DataType(), True/False),
])
asset = datasource.add_csv_asset(
    asset_name, 
    filepath_or_buffer=path_to_data,
    schema=schema,
)

If I remember correctly, GX just directly passes any additional arguments to Spark so this would translate to roughly the Spark code below:

df = spark.read.csv(
    path_to_data,
    schema=schema,
)

Alternatively, you can create a Custom Expectation that can handle this special case with only nulls in the column.

1 Like

Thanks Toivo. I’ll give that a try.

I suppose I’ll need to translate our yaml config into python. (Is the yaml route no longer recommended?)

Yes, as far as I know, GX recommends programmatically developing and maintaining the Checkpoints etc using Python. Under the hood, GX then stores the configurations as YAML, which is easier to i.e. push to production.

I personally use Jupyter Notebooks when working with GX. It enables getting immediate feedback when something I’ve written doesn’t work. For example, when there’s something wrong with connecting to the data or when the data I’m working with doesn’t actually conform to the Expectations I’m writing.

Thanks Tovio. I have converted our YAML config into Python (working with Jupyter), and am already seeing benefits from that.

Now to the issue under discussion here…

Our code is currently using Pandas, but it looks like your solution is for Spark. I have been exploring two routes:

  1. Making your code work with Pandas
  2. Moving to Spark.

Unfortunately I’ve been struggling with both. Can you help with 1 at all, or would you recommend 2?

Alright, great to hear that!

Yes, I’m quite proficient with Pandas as well. I have no idea why my initial code used Spark :smiley:

As for whether to use Pandas or Spark, it depends on your data volume.
Definitely use Pandas for small datasets (see below). Spark is overkill and will just make the validations run longer and have fewer supported Expectations without any benefits.
Switch to Spark when Pandas is not enough.

As for what is a “small dataset”, let’s say less than 5-10 million rows.
(Assuming file size is less than how much memory the cluster has + Pandas overhead)
That is a rough estimate and I’d like to have a benchmark to verify that.
I didn’t find a good benchmarking between Pandas and Spark yet but I’ll look into that and get back to you.

Well, for better or worse, we’re now using Spark - and your code has solved my original problem.

(I’m acutally using spark.read.csv directly so can’t comment on whether it works to specify the schema in add_csv_asset.)

My code, in case it helps others:

    datasource = context.sources.add_spark("Spark")

    schema = StructType([
        StructField("Field1", StringType(), True),
        StructField("Field2", TimestampType(), True),
        StructField("Field3", BooleanType(), True),
    ])
    
    asset = datasource.add_dataframe_asset(name=file_name)
    spark_data_frame = spark.read.csv(f"{csv_base_path}/{file_name}.csv", header=True, schema=schema)
    batch_request = asset.build_batch_request(dataframe=spark_data_frame)

Many thanks for your help.

1 Like