I found the root cause and identified a workaround.
The issue is with the logic used to filter rows, controlled by the ignore_row_if parameter. It can take one of the following values:
both_values_are_missing (default)
either_value_is_missing
neither
What happens is that with the default value, some rows are filtered out, which modifies the DataFrame. This occurs in sparkdf_execution_engine.py, inside the get_domain_records method.
The modified DataFrame then causes errors later, such as the one we saw: df1.select(df2.col("a"))
This likely results from operations being applied to DataFrames of different sizes. I suspect the issue comes from this part of the code:
data = df.withColumn("__unexpected", unexpected_condition)
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected"))
Workaround:
Set ignore_row_if to neither. This prevents the DataFrame from being modified and allows the code and tests to run as expected.
I think the issue also depends on the access mode of the Databricks cluster. I can successfully run ExpectColumnPairValuesAToBeGreaterThanB expectations when the access mode is “Dedicated (formerly: Single user)”. If the access mode is changed to “Standard (formerly: Shared)”, I get exceptions of the type " [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column"
I am currently using Databricks runtime 17.0, but the same behavior existed with previous runtime’s too.