Synapse
Install dlt with Synapseβ
To install the dlt library with Synapse dependencies:
pip install "dlt[synapse]"
Setup guideβ
Prerequisitesβ
Microsoft ODBC Driver for SQL Server
The Microsoft ODBC Driver for SQL Server must be installed to use this destination. This can't be included with
dlt
's python dependencies, so you must install it separately on your system. You can find the official installation instructions here.Supported driver versions:
ODBC Driver 18 for SQL Server
π‘ Older driver versions don't work properly because they don't support the
LongAsMax
keyword that was introduced inODBC Driver 18 for SQL Server
. Synapse does not support the legacy "long data types", and requires "max data types" instead.dlt
uses theLongAsMax
keyword to automatically do the conversion.
Azure Synapse Workspace and dedicated SQL pool
You need an Azure Synapse workspace with a dedicated SQL pool to load data into. If you don't have one yet, you can use this quickstart.
Stepsβ
1. Initialize a project with a pipeline that loads to Synapse by running
dlt init chess synapse
2. Install the necessary dependencies for Synapse by running
pip install -r requirements.txt
This will install dlt
with the synapse extra that contains all dependencies required for the Synapse destination.
3. Create a loader user
Execute the following SQL statements to set up the loader user. Change the password and replace yourpool
with the name of your dedicated SQL pool:
-- on master database, using a SQL admin account
CREATE LOGIN loader WITH PASSWORD = 'your_loader_password';
-- on yourpool database
CREATE USER loader FOR LOGIN loader;
-- DDL permissions
GRANT CREATE SCHEMA ON DATABASE :: yourpool TO loader;
GRANT CREATE TABLE ON DATABASE :: yourpool TO loader;
GRANT CREATE VIEW ON DATABASE :: yourpool TO loader;
-- DML permissions
GRANT ADMINISTER DATABASE BULK OPERATIONS TO loader; -- only required when loading from staging Storage Account
Optionally, you can create a WORKLOAD GROUP
and add the loader
user as a member to manage workload isolation. See the instructions on setting up a loader user for an example of how to do this.
4. Enter your credentials into .dlt/secrets.toml
.
Example, replace with your database connection info:
[destination.synapse.credentials]
database = "yourpool"
username = "loader"
password = "your_loader_password"
host = "your_synapse_workspace_name.sql.azuresynapse.net"
Equivalently, you can also pass a connection string as follows:
# keep it at the top of your toml file! before any section starts
destination.synapse.credentials = "synapse://loader:your_loader_password@your_synapse_workspace_name.azuresynapse.net/yourpool"
To pass credentials directly you can use the credentials
argument of dlt.destinations.synapse(...)
:
pipeline = dlt.pipeline(
pipeline_name='chess',
destination=dlt.destinations.synapse(
credentials='synapse://loader:your_loader_password@your_synapse_workspace_name.azuresynapse.net/yourpool'
),
dataset_name='chess_data'
)
To use Active Directory Principal, you can use the sqlalchemy.engine.URL.create
method to create the connection URL using your Active Directory Service Principal credentials. First create the connection string as:
conn_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={server_name};"
f"DATABASE={database_name};"
f"UID={service_principal_id}@{tenant_id};"
f"PWD={service_principal_secret};"
f"Authentication=ActiveDirectoryServicePrincipal"
)
Next, create the connection URL:
connection_url = URL.create(
"mssql+pyodbc",
query={"odbc_connect": conn_str}
)
Once you have the connection URL, you can directly use it in your pipeline configuration or convert it to a string.
pipeline = dlt.pipeline(
pipeline_name='chess',
destination=dlt.destinations.synapse(
credentials=connection_url.render_as_string(hide_password=False)
),
dataset_name='chess_data'
)
Write dispositionβ
All write dispositions are supported.
β The
staging-optimized
replace
strategy is not implemented for Synapse.
Data loadingβ
Data is loaded via INSERT
statements by default.
π‘ Multi-row
INSERT INTO ... VALUES
statements are not possible in Synapse, because it doesn't support the Table Value Constructor.dlt
usesINSERT INTO ... SELECT ... UNION
statements as described here to work around this limitation.
Supported file formatsβ
- insert-values is used by default
- parquet is used when staging is enabled
Data type limitationsβ
- Synapse cannot load
TIME
columns fromparquet
files.dlt
will fail such jobs permanently. Use theinsert_values
file format instead, or convertdatetime.time
objects tostr
ordatetime.datetime
, to loadTIME
columns. - Synapse does not have a complex/JSON/struct data type. The
dlt
complex
data type is mapped to thenvarchar
type in Synapse.
Table index typeβ
The table index type of the created tables can be configured at the resource level with the synapse_adapter
:
from dlt.destinations.adapters import synapse_adapter
info = pipeline.run(
synapse_adapter(
data=your_resource,
table_index_type="clustered_columnstore_index",
)
)
Possible values:
heap
: create HEAP tables that do not have an index (default)clustered_columnstore_index
: create CLUSTERED COLUMNSTORE INDEX tables
β Important:
- Set
default_table_index_type
to"clustered_columnstore_index"
if you want to change the default (see additional destination options).- CLUSTERED COLUMNSTORE INDEX tables do not support the
varchar(max)
,nvarchar(max)
, andvarbinary(max)
data types. If you don't specify theprecision
for columns that map to any of these types,dlt
will use the maximum lengthsvarchar(4000)
,nvarchar(4000)
, andvarbinary(8000)
.- While Synapse creates CLUSTERED COLUMNSTORE INDEXES by default,
dlt
creates HEAP tables by default. HEAP is a more robust choice because it supports all data types and doesn't require conversions.- When using the
insert-from-staging
replace
strategy, the staging tables are always created as HEAP tablesβany configuration of the table index types is ignored. The HEAP strategy makes sense for staging tables for reasons explained here.dlt
system tables are always created as HEAP tables, regardless of any configuration. This is in line with Microsoft's recommendation that "for small lookup tables, less than 60 million rows, consider using HEAP or clustered index for faster query performance."- Child tables, if any, inherit the table index type of their parent table.
Supported column hintsβ
Synapse supports the following column hints:
primary_key
- creates aPRIMARY KEY NONCLUSTERED NOT ENFORCED
constraint on the columnunique
- creates aUNIQUE NOT ENFORCED
constraint on the column
β These hints are disabled by default. This is because the
PRIMARY KEY
andUNIQUE
constraints are tricky in Synapse: they are not enforced and can lead to inaccurate results if the user does not ensure all column values are unique. For the column hints to take effect, thecreate_indexes
configuration needs to be set toTrue
, see additional destination options.
Staging supportβ
Synapse supports Azure Blob Storage (both standard and ADLS Gen2) as a file staging destination. dlt
first uploads Parquet files to the blob container, and then instructs Synapse to read the Parquet file and load its data into a Synapse table using the COPY INTO statement.
Please refer to the Azure Blob Storage filesystem documentation to learn how to configure credentials for the staging destination. By default, dlt
will use these credentials for both the write into the blob container, and the read from it to load into Synapse. Managed Identity authentication can be enabled through the staging_use_msi
option (see additional destination options).
To run Synapse with staging on Azure Blob Storage:
# Create a dlt pipeline that will load
# chess player data to the snowflake destination
# via staging on Azure Blob Storage
pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination='synapse',
staging='filesystem', # add this to activate the staging location
dataset_name='player_data'
)
Additional destination optionsβ
The following settings can optionally be configured:
[destination.synapse]
default_table_index_type = "heap"
create_indexes = "false"
staging_use_msi = "false"
[destination.synapse.credentials]
port = "1433"
connect_timeout = 15
port
and connect_timeout
can also be included in the connection string:
# keep it at the top of your toml file! before any section starts
destination.synapse.credentials = "synapse://loader:your_loader_password@your_synapse_workspace_name.azuresynapse.net:1433/yourpool?connect_timeout=15"
Descriptions:
default_table_index_type
sets the table index type that is used if no table index type is specified on the resource.create_indexes
determines ifprimary_key
andunique
column hints are applied.staging_use_msi
determines if the Managed Identity of the Synapse workspace is used to authorize access to the staging Storage Account. Ensure the Managed Identity has the Storage Blob Data Reader role (or a higher-privileged role) assigned on the blob container if you set this option to"true"
.port
used for the ODBC connection.connect_timeout
sets the timeout for thepyodbc
connection attempt, in seconds.
dbt supportβ
Integration with dbt is currently not supported.
Syncing of dlt
stateβ
This destination fully supports dlt state sync.
Additional Setup guidesβ
- Load data from Capsule CRM to Azure Synapse in python with dlt
- Load data from MySQL to Azure Synapse in python with dlt
- Load data from Pinterest to Azure Synapse in python with dlt
- Load data from Adobe Analytics to Azure Synapse in python with dlt
- Load data from Sentry to Azure Synapse in python with dlt
- Load data from Coinbase to Azure Synapse in python with dlt
- Load data from Trello to Azure Synapse in python with dlt
- Load data from Keap to Azure Synapse in python with dlt
- Load data from Bitbucket to Azure Synapse in python with dlt
- Load data from Mux to Azure Synapse in python with dlt