Unable to run ‘ExpectColumnPairValuesToBeEqual’ with spark on Databricks

When I’m trying to validate any expectation with parameters column_A, column_B, I get the following error:

[CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "col_1". It’s probably because of illegal references like df1.select(df2.col(\"a\")). SQLSTATE: 42704

My code:

df = spark.sql(f"SELECT * FROM samples.sales_schema.sales")
context = gx.get_context()
data_source_name = sales
data_source = context.data_sources.add_spark(name=data_source_name)
data_asset_name = sales_data_asset
data_asset = data_source.add_dataframe_asset(name=data_asset_name)
batch_parameters = {“dataframe”: df}
batch_definition_name = f"sales_batch_definition"
batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)
batch = batch_definition.get_batch(batch_parameters=batch_parameters)
Exp = gxe.ExpectColumnPairValuesToBeEqual(column_A = col_1, column_B = col_2, mostly=0.5)
batch.validate(Exp)

My environment:
Databricks Runtime: 15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12)
great_expectations[spark]: 1.3.2

The same issue happens also for: ExpectColumnPairValuesAToBeGreaterThanB, ExpectColumnPairValuesToBeInSet.

What does your df look like? Does it have col_1 and col_2?

This works for me

Possible to print out your dataframe? Also, check if there is missing quotation

yes it does, all other expectations with only one column work for me.

I checked with quotations but still gives me the same error. It works for Expectations with one column input.

This local pyspark example works fine for me.

import great_expectations as gx
from pyspark.sql.functions import col
from pyspark.sql import SparkSession, DataFrame

import os
os.environ["GX_ANALYTICS_ENABLED"] = "false"

def check_frame():

    spark = SparkSession.builder.master("local").getOrCreate()

    df: DataFrame = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "col_1")) \
            .withColumn("col_2", col("col_1") )

    df.createOrReplaceTempView("tbl2")
    df2 = spark.sql(f"SELECT * FROM tbl2")
    print(df2.show())

    context = gx.get_context()
    data_source = context.data_sources.add_spark(name="data_source_name")
    data_asset = data_source.add_dataframe_asset(name="data_asset_name")
    batch_parameters = {"dataframe": df} 
    batch_definition_name = f"sales_batch_definition" 
    batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)
    batch = batch_definition.get_batch(batch_parameters=batch_parameters)
    Exp = gx.expectations.ExpectColumnPairValuesToBeEqual(column_A = "col_1", column_B = "col_2", mostly=0.5)
    batch.validate(Exp)

if __name__ == '__main__':
    check_frame()