When I’m trying to validate any expectation with parameters column_A, column_B, I get the following error:
[CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "col_1". It’s probably because of illegal references like df1.select(df2.col(\"a\"))
. SQLSTATE: 42704
My code:
df = spark.sql(f"SELECT * FROM samples.sales_schema.sales")
context = gx.get_context()
data_source_name = sales
data_source = context.data_sources.add_spark(name=data_source_name)
data_asset_name = sales_data_asset
data_asset = data_source.add_dataframe_asset(name=data_asset_name)
batch_parameters = {“dataframe”: df}
batch_definition_name = f"sales_batch_definition"
batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)
batch = batch_definition.get_batch(batch_parameters=batch_parameters)
Exp = gxe.ExpectColumnPairValuesToBeEqual(column_A = col_1, column_B = col_2, mostly=0.5)
batch.validate(Exp)
My environment:
Databricks Runtime: 15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12)
great_expectations[spark]: 1.3.2
The same issue happens also for: ExpectColumnPairValuesAToBeGreaterThanB, ExpectColumnPairValuesToBeInSet.
bidek56
2
What does your df
look like? Does it have col_1
and col_2
?
Han
3
This works for me
Possible to print out your dataframe? Also, check if there is missing quotation
yes it does, all other expectations with only one column work for me.
I checked with quotations but still gives me the same error. It works for Expectations with one column input.
This local pyspark example works fine for me.
import great_expectations as gx
from pyspark.sql.functions import col
from pyspark.sql import SparkSession, DataFrame
import os
os.environ["GX_ANALYTICS_ENABLED"] = "false"
def check_frame():
spark = SparkSession.builder.master("local").getOrCreate()
df: DataFrame = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "col_1")) \
.withColumn("col_2", col("col_1") )
df.createOrReplaceTempView("tbl2")
df2 = spark.sql(f"SELECT * FROM tbl2")
print(df2.show())
context = gx.get_context()
data_source = context.data_sources.add_spark(name="data_source_name")
data_asset = data_source.add_dataframe_asset(name="data_asset_name")
batch_parameters = {"dataframe": df}
batch_definition_name = f"sales_batch_definition"
batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)
batch = batch_definition.get_batch(batch_parameters=batch_parameters)
Exp = gx.expectations.ExpectColumnPairValuesToBeEqual(column_A = "col_1", column_B = "col_2", mostly=0.5)
batch.validate(Exp)
if __name__ == '__main__':
check_frame()