If you follow a tutorial on configuring a Datasource connected to any database (mysql, snowflake, etc), it will ask that you build a configuration with a SimpleSqlAlchemyDatasource. Although the SimpleSqlAlchemyDatasource offers a number of convenience methods that simplify the configuration process, it does not allow for additional DataConnectors to be added. If you need to connect to data using a query or use additional data connectors, the following will be helpful. Below is an equivalent configuration to the SimpleSqlAlchemyDatasource with a Datasource.
name: my_datasource
class_name: Datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
connection_string: postgresql+psycopg2://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<DATABASE> # or credentials go here
data_connectors:
default_inferred_data_connector_name:
class_name: InferredAssetSqlDataConnector
name: whole_table
This configuration will behave the same way as a SimpleSqlAlchemyDatasource, meaning you can send in a BatchRequest with a table name that you would like to retrieve as a batch. (the table name is taxi_data in the following BatchRequest)
batch_request = BatchRequest(
datasource_name="my_datasource",
data_connector_name="default_inferred_data_connector_name",
data_asset_name="taxi_data", # this is the name of the table you want to retrieve
)
You can also add additional DataConnectors to this configuration, like the RuntimeDataConnector in the following example:
name: my_datasource
class_name: Datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
connection_string: postgresql+psycopg2://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<DATABASE> # or credentials go here
data_connectors:
default_inferred_data_connector_name:
class_name: InferredAssetSqlDataConnector
name: whole_table
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
With the RuntimeDataConnector, you can retrieve data from the Datasource as a query using a RuntimeBatchRequest. For a RuntimeBatchRequest, the data_asset_name and batch_identifiers that uniquely identify the data (e.g. a run_id from an AirFlow DAG run), must be specified by the user.
batch_request = RuntimeBatchRequest(
datasource_name="my_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="default_name", # this can be anything that identifies this data
runtime_parameters={"query": "SELECT * from taxi_data LIMIT 10"},
batch_identifiers={"default_identifier_name": "identifier"},
)