Add Query to filter based on some columns for athena datasource

I am trying to write some expectations on an athena table partitioned by date. I want to pass date as a parameter for it. I do not want to run my expectations over the whole table. I want to run expectations on the table every day after our etl jobs add data to it.

I cannot find anything other than this thread How to add a RuntimeDataConnector to a SimpleSqlAlchemyDatasource configuration to do that. Other threads on slack and here say row filtering is only allowed only for pandas datasource. So cannot use it for AWS Athena.

Using great-expectations==0.13.31 I ran the following command

great_expectations --v3-api checkpoint new my_checkpoint

In the notebook I have this

yaml_config = f"""
name: {my_checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-my-run-name-template"
validations:
  - batch_request:
      datasource_name: athena_db
      data_connector_name: default_inferred_data_connector_name
      data_asset_name: data_asset_name_redacted
      data_connector_query:
        index: -1
    expectation_suite_name: my_suite_name_redacted
"""

I tried adding the following

validations:
  - batch_request:
      datasource_name: athena_db
      data_connector_name: default_inferred_data_connector_name
      data_asset_name: TABLE_NAME
      runtime_parameters:
        query: select * from DB_NAME.TABLE_NAME where date = '2021-05-01'
    expectation_suite_name: SUITE_NAME

I got this exception

~/miniconda3/envs/great_exp/lib/python3.8/site-packages/great_expectations/data_context/data_context.py in get_batch_list(self, datasource_name, data_connector_name, data_asset_name, batch_request, batch_data, data_connector_query, batch_identifiers, limit, index, custom_filter_function, batch_spec_passthrough, sampling_method, sampling_kwargs, splitter_method, splitter_kwargs, runtime_parameters, query, path, batch_filter_parameters, **kwargs)
   1596         if batch_request:
   1597             # TODO: Raise a warning if any parameters besides batch_requests are specified
-> 1598             return datasource.get_batch_list_from_batch_request(
   1599                 batch_request=batch_request
   1600             )

~/miniconda3/envs/great_exp/lib/python3.8/site-packages/great_expectations/datasource/new_datasource.py in get_batch_list_from_batch_request(self, batch_request)
    157                 batch_spec,
    158                 batch_markers,
--> 159             ) = data_connector.get_batch_data_and_metadata(
    160                 batch_definition=batch_definition,
    161                 runtime_parameters=runtime_parameters,

TypeError: get_batch_data_and_metadata() got an unexpected keyword argument 'runtime_parameters'

Questions

  • How do I add these to this configuration so that I can filter by date?
  • I tried using database_name.table_name but it was not recognized. How do I qualify by database_name? In athena we have separate databases for staging and production environment. So I need to be able to qualify with database_name

Hi @aseembansal! So sorry for delays.

As far as your first question - in order to use a query in this way, you will need to use a RuntimeDataConnector. Currently, it appears that you are using an Inferred DataConnector instead. Unfortunately, our docs for Athena are currently in the process of being updated, but can see an example if you look at one of our other docs for SQL (under the Using a SQL query tab) .

As for your second question, I am unsure whether this is currently possible with Athena on Great Expectations. Would you please confirm whether it works if your Data Connector is properly configured? If it is not possible, we are happy to support a community contribution to enable this feature.

Additionally, you are current that row_filtering is currently only possible for pandas though we hope to enable this for SQL in the not-too-distant future.