DLT Integration
dagster-odp provides an enhanced integration with DLT (Data Load Tool) for configuration-driven data ingestion. Using the GitHub API ingestion example from our Chess Data Analysis Tutorial as a reference, this guide explains the components and capabilities of ODP's DLT integration.
Prerequisites
This guide assumes familiarity with DLT's core concepts and operations. If you're new to DLT, we recommend reading How DLT Works before continuing.
Understanding DLT Concepts
Before diving in, let's clarify some terminology:
- DLT Source: A Python module that defines how to extract data from an external system (like an API or database)
- DLT Resource: A logical subset of data from a source (like a specific API endpoint or database table). NOTE: This is different from Dagster/ODP resources!
- DLT Pipeline: Defines how data moves from source to destination, including schema management
- DLT Destination: Where the data gets loaded (like BigQuery, DuckDB, or filesystem)
ODP's Enhanced DLT Integration
ODP provides several improvements over Dagster's built-in DLT integration:
-
Granular Asset Creation:
While Dagster's integration creates a single asset per pipeline, ODP:
- Creates individual assets for each DLT object
- Automatically generates source assets as Dagster external assets
- Sets up proper dependencies between assets
-
Configuration-Driven:
- Pipeline creation through YAML/JSON configuration
- Automatic secrets management
- Parameter validation using Pydantic
Implementation Components
The following sections break down how ODP's DLT integration works, using our tutorial's GitHub API pipeline as an example.
Project Structure
A typical ODP project using DLT has the following structure, with clear separation between ODP configuration and DLT implementation:
my_project/
├── odp_config/
│ ├── dagster_config.yaml # Resource configuration
│ └── workflows/
│ └── dlt_workflow.yaml # Pipeline definition
└── dlt_project/
├── github/ # DLT source implementation
│ ├── __init__.py
│ └── source.py
├── .dlt/
│ └── secrets.toml # DLT credentials
└── schemas/
└── export/
└── github.schema.yaml # Generated DLT schema
Schema Generation
ODP uses DLT's schema file to create appropriate Dagster assets. The schema can be generated by adding schema export to a DLT pipeline:
pipeline = dlt.pipeline(
pipeline_name="github",
destination="duckdb",
dataset_name="github_data",
export_schema_path="schemas/export" # Enables schema generation
)
Running this pipeline creates a schema file that ODP uses to:
- Determine what Dagster assets to create
- Set up proper dependencies between assets
- Configure asset materializations
Secrets Management
DLT expects secrets in environment variables with a specific format: SECTION__KEY=value
For example, when a secrets.toml file contains:
DLT expects an environment variable:
ODP handles this automatically by:
- Reading secrets from
.dlt/secrets.toml
- Converting them to DLT's expected format
- Setting them as environment variables
Resource Configuration
The DLT resource in dagster_config.yaml
tells ODP where to find the DLT implementation:
resources:
- resource_kind: dlt
params:
project_dir: dlt_project # Root directory containing DLT code
Single DLT Resource
Each Dagster code location can have only one DLT resource. However, multiple DLT pipelines can be organized in subdirectories within the project_dir.
Asset Configuration
DLT assets are defined in workflow files. Here's an example configuration with explanations of each component:
assets:
- asset_key: github/api/pull_requests # Creates source asset github/api
task_type: dlt
description: "Fetch GitHub pull requests"
group_name: data_ingestion
params:
# Source Configuration
source_module: github.source # Relative to project_dir
schema_file_path: schemas/export/github.schema.yaml
# DLT Source Parameters
source_params: # Passed to DLT source function
repo: "dagster-io/dagster"
since: "{{#date}}{{context.partition_key}}|%Y-%m-%d{{/date}}"
# DLT Destination Configuration
destination: duckdb # Any supported DLT destination
destination_params: # Destination-specific configuration
database: data/github.db
# DLT Pipeline Configuration
pipeline_params: # Pipeline configuration options
dataset_name: github_data
# DLT Load Configuration
run_params: # Controls data loading behavior
write_disposition: append
Asset Configuration Reference
source_module
Specifies the Python path to the DLT source relative to project_dir
. For example:
github.source
resolves todlt_project/github/source.py
- The module should contain:
- A source function decorated with
@dlt.source
- Resource methods that yield data
- A source function decorated with
schema_file_path
- Path to the schema file relative to
project_dir
- Generated when you run your DLT pipeline with
export_schema_path
- ODP uses this to:
- Create individual assets for each DLT object
- Set up correct dependencies
- Configure materializations
source_params
Parameters passed directly to the DLT source function. Each source has its own configuration options:
- Refer to DLT Verified Sources for source-specific parameters
- Supports ODP variables for dynamic values:
- No need for
config.toml
- all configuration can be passed here
destination
Any DLT-supported destination can be used. ODP adds standardized materialization metadata for:
Destination | Metadata Field | Format |
---|---|---|
BigQuery | destination_table_id |
dataset.table |
DuckDB | destination_table_id |
schema.table |
Filesystem | destination_file_uri |
File path |
Downstream assets can access this metadata replacing /
with _
in the handlebar syntax:
assets:
- asset_key: process_pull_requests
task_type: duckdb_query
depends_on: ["github/api/pull_requests"]
params:
query: SELECT * FROM {{github_api_pull_requests.destination_table_id}}
destination_params
Configuration for the chosen destination. Common parameters:
# BigQuery
destination_params:
dataset: my_dataset
project: my-project
# DuckDB
destination_params:
database: path/to/db.duckdb
# Filesystem
destination_params:
bucket_url: gs://my-bucket/path/ # or local path
For destination-specific configuration, refer to DLT Destinations.
pipeline_params
Pipeline-specific configuration passed to DLT's pipeline creation. See DLT Pipeline Configuration for available options.
Parameters passed to DLT's pipeline creation:
pipeline_params:
dataset_name: github_data # Logical grouping of tables
pipeline_name: github_pipeline # Optional: pipeline identifier
run_params
Controls how DLT loads data:
run_params:
write_disposition: append # append, replace, or merge
merge_key: id # For merge disposition
Asset Generation
In ODP, a DLT asset configuration corresponds to one DLT resource (like a specific API endpoint or database table) but can generate multiple Dagster assets based on that resource's output. Given an asset key like abc/github/pull_requests
, here's what ODP creates:
-
Source Asset:
An external Dagster asset (
abc/github
) representing the data source- Created automatically from the asset key prefix
- Functions as a logical grouping for related DLT objects
- Allows dependency management at the source level
-
DLT Object Assets:
Individual Dagster assets for each object produced by the DLT resource
- Named by combining source asset prefix with DLT object name
- Created automatically based on the DLT schema
- Example for a pull requests API that returns nested data:
- Enables granular dependencies on specific DLT objects
- All these assets come from a single DLT resource (pull requests endpoint)
One Resource, Many Assets
While you define one DLT asset in your ODP configuration targeting a specific DLT resource (like an API endpoint), that resource might produce multiple related data objects. ODP automatically creates separate Dagster assets for each object, allowing downstream tasks to depend on exactly the data they need.
Multiple DLT Pipelines
While you can only have one DLT resource (and thus one project_dir
), you can organize multiple DLT pipelines in subdirectories:
dlt_project/
├── github/ # GitHub API pipeline
│ ├── __init__.py
│ └── source.py
├── stripe/ # Stripe API pipeline
│ ├── __init__.py
│ └── source.py
└── schemas/
└── export/
├── github.schema.yaml
└── stripe.schema.yaml
Each pipeline can have its own:
- Source implementation
- Schema file
- Secrets in
.dlt/secrets.toml
- Configuration in workflow files
Best Practices
-
Asset Organization
- Use meaningful prefixes in asset keys to group related data
- Create separate assets for different DLT resources
- Use asset dependencies to manage relationships
-
Configuration
- Keep secrets in
secrets.toml
- Pass dynamic configuration through
source_params
- Keep secrets in
-
Project Structure
- Organize related DLT pipelines in subdirectories
- Keep schema files in the DLT directory
Common Issues
-
Missing Schema File
Solution: Run your DLT pipeline withexport_schema_path
to generate the schema -
Secrets Not Available
Solution: Ensure secrets are properly configured in.dlt/secrets.toml
-
Invalid Asset Keys
Solution: DLT asset keys must have at least two parts (e.g.,github_api/resource
)
This enhanced DLT integration makes it easier to build maintainable data pipelines while keeping the benefits of both Dagster's asset-based paradigm and DLT's powerful data ingestion capabilities.