Chess Data Analysis Tutorial (Advanced) - Part 3: Data Transformations with DBT
In this final part of our tutorial, we'll implement the analytical layer of our pipeline using DBT and export the results. We'll be working with this portion of our pipeline:
We'll create DBT models to:
- Calculate monthly statistics for each player's games
- Join this with player profile information
- Export the final results to a CSV file
Understanding ODP's DBT Integration
ODP's integration with DBT is designed to be both simple and flexible. Here's how it works:
Basic DBT Integration
The simplest way to use DBT with ODP is to just define a DBT resource in your dagster_config.yaml
. ODP will create a Dagster DBT asset that:
- Automatically discover all DBT models in your project
- Create corresponding Dagster assets for each model
- Set up the correct dependencies between models based on DBT's
ref()
andsource()
functions - Make these assets available for materialization in Dagster
When to Define DBT Tasks
While the basic integration works well for simple cases, there are times when you'll want to explicitly define a DBT task in your workflow files:
- Variable Passing: When you need to pass variables to your DBT models (like a target date or filtering criteria)
- Partitioning: When the model needs to be included in a partition configuration for incremental processing
- Custom Scheduling: When you need specific scheduling or triggering logic for the model
In our tutorial, we define a DBT task for monthly_player_stats
and include it in our partitioning because:
- It needs the
target_month
variable from our partition key - It uses incremental processing based on that partition
For monthly_player_profiles
, we let ODP automatically create the asset since it doesn't require any special handling.
Setting Up DBT
Let's start with the DBT setup:
-
In the
Choosechess_analysis
directory, initialize DBT:duckdb
as your database. -
Create a
profiles.yml
file in thechess_dbt
directory:profiles.ymlchess_dbt: outputs: dev: path: /Users/your_username/chess_analysis/data/chess.db type: duckdb
Important
Replace
/Users/your_username/chess_analysis/data/chess.db
with the full absolute path to your DuckDB database file. DBT requires a full path here - relative paths won't work.Profile Selection
The
profile
andtarget
specified in your DBT resource configuration must match your profiles.yml. In this case, we're using profilechess_dbt
and targetdev
. -
Add the DBT and DuckDB resources to
odp_config/dagster_config.yaml
:dagster_config.yamlThe DBT resource parameters explained:resources: - resource_kind: dlt params: project_dir: chess_dlt - resource_kind: soda params: project_dir: odp_config/soda checks_dir: scans - resource_kind: dbt params: project_dir: chess_dbt # Path to your DBT project profile: chess_dbt # Must match profiles.yml target: dev # Target from profiles.yml - resource_kind: duckdb params: database_path: data/chess.db # Needed for the export task
project_dir
: Location of your DBT project relative to where Dagster runsprofile
: Name of the DBT profile to use (must match profiles.yml)target
: The specific target within the profile (e.g., dev, prod)
Configuring DBT Sources
Create models/sources.yml
to tell DBT about our source tables:
First, let's clean up the default DBT models:
Now create models/sources.yml
to tell DBT about our source tables:
version: 2
sources:
- name: chess_data
schema: chess_games
tables:
- name: players_games
meta:
dagster:
asset_key: ["chess_com", "chess_data", "players_games"]
- name: players_profiles
meta:
dagster:
asset_key: ["chess_com", "chess_data", "players_profiles"]
meta.dagster.asset_key
is crucial here - it tells Dagster which assets correspond to which DBT sources. This enables:
- Automatic dependency management
- Asset lineage tracking
- Triggering DBT models
Creating DBT Models
We'll create two DBT models: one for calculating monthly statistics and another for combining those stats with player information.
Monthly player stats
First, create models/monthly_player_stats.sql
:
{{ config(
materialized='incremental',
unique_key=['player_id', 'game_month'],
meta={'dagster': {'group': 'data_transformation'}}
) }}
{% set target_month = var('target_month', none) %}
WITH deduped_games AS (
SELECT DISTINCT
uuid,
white__aid,
black__aid,
end_time,
white__result,
black__result,
accuracies__white,
accuracies__black
FROM {{ source('chess_data', 'players_games') }}
),
player_games AS (
SELECT
white__aid AS player_id,
DATE_TRUNC('month', CAST(end_time AS DATE)) AS game_month,
uuid AS game_id,
'white' AS color,
white__result AS result,
CAST(accuracies__white AS FLOAT) AS accuracy
FROM deduped_games
UNION ALL
SELECT
black__aid AS player_id,
DATE_TRUNC('month', CAST(end_time AS DATE)) AS game_month,
uuid AS game_id,
'black' AS color,
black__result AS result,
CAST(accuracies__black AS FLOAT) AS accuracy
FROM deduped_games
)
SELECT
player_id,
game_month,
COUNT(DISTINCT game_id) AS total_games,
SUM(CASE WHEN result = 'win' THEN 1 ELSE 0 END) AS total_wins,
ROUND(AVG(CASE WHEN result = 'win' THEN 1 ELSE 0 END) * 100, 2) AS win_percentage,
ROUND(AVG(CASE WHEN color = 'white' THEN accuracy ELSE NULL END), 2) AS avg_white_accuracy,
ROUND(AVG(CASE WHEN color = 'black' THEN accuracy ELSE NULL END), 2) AS avg_black_accuracy
FROM player_games
{% if is_incremental() %}
WHERE game_month = DATE '{{ target_month }}'
{% endif %}
GROUP BY 1, 2
Let's examine the key components of this model:
-
Model Configuration
- We use
materialized='incremental'
to make this an incremental model, processing only new data each run - The
group
in the metadata specifies which Dagster group this DBT model should belong to - The
target_month
variable from our partition key controls which data to process
- We use
-
Incremental Processing Strategy
- The
WHERE
clause in the incremental model ensures we only process games from the current partition - This approach is particularly efficient when running on cloud data warehouses like BigQuery or Snowflake, as it minimizes the amount of data processed and costs
- Each partition's results are appended to the table, building up our historical analysis over time
- The
-
Handling Duplicate Games
- Remember in Part 2, our Soda check warned us about duplicate game records
- The
deduped_games
CTE usesSELECT DISTINCT
on theuuid
field to ensure each game is counted only once - We handle this at the transformation layer rather than ingestion to preserve the raw data as received from the API
-
Data Transformation
- We use a
UNION ALL
to combine games where the player played as white or black - We calculate various statistics including total games, wins, win percentage, and average accuracy
- We use a
Monthly player profiles
We then create models/monthly_player_profiles.sql
:
{{ config(
materialized='table',
meta={'dagster': {'group': 'data_transformation'}}
) }}
WITH player_profiles AS (
SELECT
aid AS player_id,
name,
country,
title,
followers
FROM {{ source('chess_data', 'players_profiles') }}
)
SELECT
pp.player_id,
pp.name AS player_name,
pp.country,
pp.title,
pp.followers,
mps.game_month,
mps.total_games,
mps.total_wins,
mps.win_percentage,
mps.avg_white_accuracy,
mps.avg_black_accuracy
FROM {{ ref('monthly_player_stats') }} mps
JOIN player_profiles pp ON mps.player_id = pp.player_id
ORDER BY pp.player_id, mps.game_month
This model:
- Joins our monthly statistics with player profile information
- Is materialized as a table since it's our final output
- Includes both player attributes and their monthly performance metrics
Configuring ODP for DBT Integration
Let's update our monthly_load.yaml
to include DBT transformations:
assets:
- asset_key: chess_com/chess_data/players_games
task_type: dlt
description: chess.com source
group_name: data_ingestion
params:
source_module: chess.source
schema_file_path: schemas/export/chess.schema.yaml
source_params:
players:
- magnuscarlsen
- hikaru
- ghandeevam2003
- fabianocaruana
- gukeshdommaraju
- chesswarrior7197
- firouzja2003
- lovevae
- lachesisq
- thevish
start_month: "{{#date}}{{context.partition_key}}|%Y/%m{{/date}}"
end_month: "{{#date}}{{context.partition_key}}|%Y/%m{{/date}}"
destination: duckdb
destination_params:
credentials: data/chess.db
pipeline_params:
dataset_name: chess_games
run_params:
write_disposition: append
- asset_key: players_games_analysis
task_type: dbt
params:
selection: monthly_player_stats
dbt_vars:
target_month: '{{context.partition_key}}'
- asset_key: player_profiles_export
task_type: duckdb_table_to_file
description: "Export player data to csv"
group_name: data_export
depends_on: [monthly_player_profiles]
params:
source_table_id: monthly_player_profiles
destination_file_uri: data/monthly_player_profiles.csv
partitions:
- assets: ["chess_com/chess_data/players_games", "players_games_analysis"]
params:
start: "2022-01-01"
schedule_type: MONTHLY
DBT Task Configuration
- asset_key: players_games_analysis
task_type: dbt
params:
selection: monthly_player_stats
dbt_vars:
target_month: '{{context.partition_key}}'
task_type
: Tells ODP this is a DBT taskselection
: Specifies which DBT model(s) to run. This field supports the DBT selection syntax that can include one or many model names, tags or paths.dbt_vars
: Variables passed to your DBT models. These are accessible in DBT usingvar()
Auto-created vs. Explicit Assets
Notice that we:
- Explicitly defined
players_games_analysis
formonthly_player_stats
- Referenced
monthly_player_profiles
only in dependencies
This is because:
monthly_player_stats
needs partition information and variablesmonthly_player_profiles
works fine with Dagster's automatic asset creation
Running the Complete Pipeline
Now we can run our complete pipeline:
-
Start the Dagster development server:
-
Navigate to the Jobs page in the UI and find the
monthly_chess_data_ingestion
job. -
Launch a new run, selecting the partition you want to process.
-
After the job completes, you can find your results in
data/monthly_player_profiles.csv
.
The complete pipeline in the Dagster UI will look like this:
Examining the Results
The final CSV file will contain rich analytics about each player's monthly performance:
- Basic player information (name, country, title)
- Total games played each month
- Win/loss statistics
- Average accuracy with white and black pieces
Conclusion
In this tutorial series, we've built a complete data pipeline using dagster-odp that:
- Ingests data from an external API using DLT
- Implements data quality checks with Soda
- Transforms data using DBT
- Manages incremental processing and partitioned loads
- Exports results for analysis
Key advantages of using ODP for this pipeline include:
- Configuration-driven development requiring minimal code
- Built-in integrations with modern data tools
- Automatic handling of dependencies between components
- Easy scheduling and partitioning of data loads
- Robust data quality checking
This pipeline can serve as a template for similar data processing needs, whether you're working with chess data or any other domain requiring regular data ingestion, transformation, and quality control.