If you follow a tutorial on configuring a Datasource
connected to any database (mysql, snowflake, etc), it will ask that you build a configuration with a SimpleSqlAlchemyDatasource
. Although the SimpleSqlAlchemyDatasource
offers a number of convenience methods that simplify the configuration process, it does not allow for additional DataConnectors
to be added. If you need to connect to data using a query or use additional data connectors, the following will be helpful. Below is an equivalent configuration to the SimpleSqlAlchemyDatasource
with a Datasource
.
name: my_datasource
class_name: Datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
connection_string: postgresql+psycopg2://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<DATABASE> # or credentials go here
data_connectors:
default_inferred_data_connector_name:
class_name: InferredAssetSqlDataConnector
name: whole_table
This configuration will behave the same way as a SimpleSqlAlchemyDatasource
, meaning you can send in a BatchRequest
with a table name that you would like to retrieve as a batch. (the table name is taxi_data
in the following BatchRequest
)
batch_request = BatchRequest(
datasource_name="my_datasource",
data_connector_name="default_inferred_data_connector_name",
data_asset_name="taxi_data", # this is the name of the table you want to retrieve
)
You can also add additional DataConnectors
to this configuration, like the RuntimeDataConnector
in the following example:
name: my_datasource
class_name: Datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
connection_string: postgresql+psycopg2://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<DATABASE> # or credentials go here
data_connectors:
default_inferred_data_connector_name:
class_name: InferredAssetSqlDataConnector
name: whole_table
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
With the RuntimeDataConnector
, you can retrieve data from the Datasource as a query using a RuntimeBatchRequest
. For a RuntimeBatchRequest
, the data_asset_name
and batch_identifiers
that uniquely identify the data (e.g. a run_id from an AirFlow DAG run), must be specified by the user.
batch_request = RuntimeBatchRequest(
datasource_name="my_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="default_name", # this can be anything that identifies this data
runtime_parameters={"query": "SELECT * from taxi_data LIMIT 10"},
batch_identifiers={"default_identifier_name": "identifier"},
)