Overview of dagster-odp
dagster-odp (open data platform) extends Dagster's capabilities by providing a configuration-driven approach to building data pipelines. It's designed to streamline pipeline development and management for teams already using or familiar with Dagster.
What is dagster-odp?
dagster-odp acts as an abstraction layer, translating user-defined configurations into Dagster primitives such as assets, resources, sensors, schedules, partitions, and asset checks. This approach offers several advantages:
- Configuration-driven pipeline creation: Data analysts and scientists can create pipelines using pre-defined or custom tasks through YAML configuration, reducing the need for direct Python coding.
- Declarative pipeline definitions: ODP separates pipeline logic from task implementation, facilitating easier maintenance and iteration of data workflows.
- Task reusability: Tasks can be defined once and reused across multiple pipelines, promoting code efficiency and standardization.
- Integration with popular tools: ODP provides built-in support for popular data tools, including DLT for data ingestion and Soda for data quality checks.
Key Integrations
ODP provides enhanced integrations with popular data tools:
- DLT: Granular asset creation and automatic dependency management for data ingestion
- DBT: Configuration-driven transformation with automatic asset creation and variable management
- Soda: Data quality monitoring through configuration-based asset checks
Building pipelines with ODP
ODP uses two main types of configuration files to define and manage pipelines:
1. Resource Configuration
The dagster_config.yaml
file defines Dagster resources that interface with external services and tools.
Example resource configuration:
2. Workflow Configuration
Files in the workflows
directory define your pipeline components using assets, jobs, and automation. These files can be in YAML or JSON format and specify what your pipeline does and how it runs.
Example workflow configuration:
assets:
- asset_key: raw_data
task_type: gcs_file_to_bq
params:
source_file_uri: gs://my-bucket/raw_data.csv
destination_table_id: my_project.my_dataset.raw_data
jobs:
- job_id: daily_etl
asset_selection: [raw_data]
triggers:
- trigger_id: daily_run
trigger_type: schedule
params:
schedule_kind: cron
schedule_params:
cron_schedule: "0 1 * * *"
This workflow configuration creates a simple ETL pipeline that:
- Loads data from GCS to BigQuery
- Runs automatically every day at 1 AM
All of this is achieved without writing any Python code. You can split workflow configurations across multiple files for better organization, and they can reference resources defined in your dagster_config.yaml
.
2. Leverage Pre-built Components
ODP comes with a variety of pre-built components, with more on the way, to accelerate pipeline development:
Pre-defined tasks
- GCS to BigQuery data transfer
- BigQuery to GCS export
- DuckDB operations
- Shell command execution
Integrated libraries
- DLT (Data Load Tool) for data ingestion from various sources
- Soda for data quality checks
3. Create Custom Tasks
While ODP provides many pre-built components, it also allows data engineers to create custom tasks for specific needs. Here's how you would write a custom task to anonymize data:
from dagster_odp.tasks.manager import BaseTask, odp_task
from google.cloud import bigquery
@odp_task("anonymize_pii", required_resources=["bigquery"])
class AnonymizePIITask(BaseTask):
source_table: str
destination_table: str
columns_to_hash: list[str]
def run(self) -> Dict:
# Anonymization logic
# Access the Bigquery resource using self._resources["bigquery"]
pass
Use the custom task in your pipeline configuration:
assets:
- asset_key: anonymized_user_data
task_type: anonymize_pii
params:
source_table: my_project.raw_data.users
destination_table: my_project.processed_data.anonymized_users
columns_to_hash: ["email", "phone_number", "social_security_number"]
4. Use Dynamic Configuration
ODP supports various configuration variables and features:
- Time-based partitioning for incremental processing
- Variable substitution using context, resource, and parent asset information
- Date formatting helpers for working with different date formats
- Sensor context passing for event-driven pipelines
Why Use dagster-odp?
1. Empower Non-Engineers
By using a configuration-based approach, ODP allows data analysts and scientists to create and modify pipelines without deep engineering knowledge.
2. Standardize Pipeline Structure
ODP encourages a standardized approach to pipeline development within an organization. This leads to more consistent, maintainable, and understandable data workflows.
3. Accelerate Development
With pre-built components and integrations, ODP significantly reduces the time and effort required to create new data pipelines. Teams can focus on the "what" (pipeline business logic) rather than the "how" (task implementation details).
4. Improve Maintainability
The declarative nature of ODP configurations makes it easier to understand, modify, and version control pipelines. This improves long-term maintainability of data workflows.
5. Scale with Flexibility
As your data needs grow, ODP allows you to easily extend functionality through custom tasks while maintaining the simplicity of configuration-based pipeline creation for all team members.
Next Steps
Now that you're familiar with dagster-odp's concepts and benefits, you're ready to start building your first pipeline. Check out our:
- Quickstart Guide to get started
- Tutorials for more advanced real-world use cases
- Concepts section for a deeper understanding of ODP's components
- Integrations documentation to learn about tool-specific features