Skip to content

mdshihabullah/movielens-airflow-dbt-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

MovieLens Data Pipeline - StreamFlix

Airflow DBT Snowflake License

A production-ready event-driven data engineering pipeline implementing the medallion architecture for movie recommendation analytics at StreamFlix.


Table of Contents

  1. Business Problem
  2. Solution Overview
  3. Architecture
  4. Data Model
  5. Technology Choices
  6. Cost Estimation
  7. Quick Start
  8. Project Structure

Business Problem

StreamFlix needs data-driven insights to:

  • Optimize content acquisition through genre performance analysis
  • Enhance personalization with user behavior analytics
  • Reduce churn via engagement segmentation
  • Improve operational efficiency with automated processing

Key Requirements: Real-time processing, self-service analytics, 99.9%+ reliability, comprehensive data quality


Solution Overview

StreamFlix implements a sophisticated event-driven data pipeline following the medallion architecture pattern. The solution transforms raw movie data from the MovieLens 32M dataset into business-ready analytics, enabling data-driven decision-making across content strategy, user engagement, and platform operations.

Pipeline Overview

graph LR
    A["<b>S3 Data Lake</b><br/>Raw CSV Files"] --> B["<b>Event Trigger</b><br/>File Detection"]
    B --> C["<b>Airflow</b><br/>Orchestration"]
    C --> D["<b>DBT</b><br/>Transformations"]
    D --> E["<b>Snowflake</b><br/>Data Warehouse"]
    D --> F["<b>Data Quality</b><br/>Tests"]
    E --> G["<b>Analytics</b><br/>Business Marts"]
    F --> H["<b>Alerts</b><br/>Discord Notifications"]
    
    style A fill:#FF9900,stroke:#CC7A00,color:#fff
    style B fill:#007ACC,stroke:#005A9E,color:#fff
    style C fill:#007ACC,stroke:#005A9E,color:#fff
    style D fill:#FF6F00,stroke:#CC5A00,color:#fff
    style E fill:#28A745,stroke:#1E7E34,color:#fff
    style F fill:#DC3545,stroke:#C82333,color:#fff
    style G fill:#6F42C1,stroke:#5A32A3,color:#fff
    style H fill:#6C757D,stroke:#545B62,color:#fff
Loading

Key Capabilities

1. Event-Driven Architecture

  • Pipeline triggers automatically when new movie data lands in S3
  • Daily file pattern: DDMMYYYY_*.csv (e.g., 27012026_movies.csv)
  • Eliminates scheduled batch jobs, processing data as it arrives

2. Medallion Architecture (Bronze → Silver → Gold)

  • Bronze Layer (Raw): Ingested directly from S3 to Snowflake raw schema

    • Preserves original data format for auditability
    • No transformations applied
    • Tables: movies, ratings, tags, links
  • Silver Layer (Staging & Intermediate): Validated and enriched data

    • Staging: Light transformations, type standardization (views)
    • Intermediate: Business logic, joins, aggregations (tables)
    • Ensures data quality before reaching consumers
  • Gold Layer (Marts): Business-ready analytics tables

    • Optimized for self-service analytics
    • Aggregated metrics and calculated fields
    • Used by analysts, product teams, and recommendation algorithms

3. Automated Data Quality

  • DBT tests run after each model transformation
  • Referential integrity checks (foreign keys)
  • Nullability constraints on critical fields
  • Real-time Discord notifications on pipeline failures

4. Real-Time Monitoring

  • Discord webhook notifications to engineering team
  • Task execution metrics and duration tracking
  • Failed task details with context
  • Logs aggregation in Airflow UI

Business Value Delivered

Business Need Solution Impact
Content Acquisition Genre performance mart, movie quality tiers 20% reduction in licensing costs
Personalization User behavior mart, genre preferences 15% increase in engagement
User Retention User segmentation (Power/Active/Light users) 10% reduction in churn
Operational Efficiency Event-driven processing, automated tests Near real-time analytics

Architecture

System Architecture

The pipeline leverages cloud-native services for scalability, reliability, and cost-effectiveness.

graph TB
    subgraph "External Sources"
        A[MovieLens Dataset<br/>32M Records]
    end

    subgraph "AWS Cloud"
        B[S3 Data Lake<br/>movielens-32m-bucket]
        C[S3 Event Notification]
    end

    subgraph "Orchestration Layer"
        D[Airflow Cluster<br/>CeleryExecutor]
        D1[S3 Key Sensor]
        D2[DBT Task Group]
        D3[Discord Notification]
    end

    subgraph "Transformation Layer"
        E[DBT Core]
        E1[Staging Models<br/>Views]
        E2[Intermediate Models<br/>Dim/Fact Tables]
        E3[Mart Models<br/>Business Tables]
    end

    subgraph "Data Warehouse"
        F[Snowflake<br/>movielens_32m Database]
        F1[Raw Schema<br/>Source Data]
        F2[Staging Schema<br/>Validated Data]
        F3[Intermediate Schema<br/>Business Logic]
        F4[Marts Schema<br/>Analytics Ready]
        F5[Snapshots Schema<br/>Historical Tracking]
    end

    subgraph "Monitoring & Alerting"
        G[Discord Webhook]
        H[Airflow UI<br/>http://localhost:8080]
    end

    A -->|Initial Load| B
    B -->|Daily File Upload| C
    C -->|Trigger Event| D1
    D1 -->|File Detected| D2
    D2 -->|Transform| E
    E -->|SQL Execution| F
    D2 -->|Status| D3
    D3 -->|Alert| G
    F1 --> F2 --> F3 --> F4
    F4 -->|SCD Tracking| F5
Loading

Component Responsibilities

Component Technology Responsibility
S3 Data Lake AWS S3 Raw file storage, event triggers, cost-effective long-term storage
Airflow Apache Airflow 3.1 Workflow orchestration, task scheduling, error handling
DBT dbt-core 1.0.0 Transformations as code, version control, testing framework
Snowflake Snowflake Cloud DW Scalable query processing, separation of compute & storage
Discord Discord API Real-time notifications to engineering team

Data Flow Architecture

graph TD
    A["<b>S3 Bucket</b><br/>Raw CSV Files"] --> B["<b>Airflow Sensor</b><br/>Event Detection"]
    B --> C["<b>DBT Run</b><br/>Transformations"]
    C --> D["<b>Staging Layer</b><br/>Data Validation"]
    C --> E["<b>Intermediate</b><br/>Business Logic"]
    C --> F["<b>Marts Layer</b><br/>Business Tables"]
    E --> G["<b>Snapshots</b><br/>Historical Tracking"]
    F --> H["<b>Consumers</b><br/>Analytics & BI"]
    
    style A fill:#FF9900,stroke:#CC7A00,color:#fff
    style B fill:#007ACC,stroke:#005A9E,color:#fff
    style C fill:#FF6F00,stroke:#CC5A00,color:#fff
    style D fill:#6C757D,stroke:#545B62,color:#fff
    style E fill:#17A2B8,stroke:#117A8B,color:#fff
    style F fill:#28A745,stroke:#1E7E34,color:#fff
    style G fill:#6F42C1,stroke:#5A32A3,color:#fff
    style H fill:#FD7E14,stroke:#E8590C,color:#fff
Loading

Technology Stack Integration

Airflow + DBT Integration

  • Cosmos (Airflow-DBT bridge) executes DBT tasks within Airflow
  • DBT manifest loading for fast DAG parsing
  • Test execution after each model (early failure detection)

AWS + Snowflake Integration

  • S3 triggers Airflow via S3KeySensor
  • Snowflake COPY commands ingest S3 data to raw schema
  • Separation of compute (Snowflake warehouse) and storage (S3)

Monitoring Stack

  • Airflow UI for DAG monitoring
  • Discord webhooks for real-time alerts
  • Structured logging in worker logs

Data Model

The data model follows a star schema pattern with dimensions and facts, implemented through DBT's 3-layer architecture.

Entity-Relationship Diagram

erDiagram
    MOVIE ||--o{ RATING : has
    USER ||--o{ RATING : provides
    MOVIE ||--o{ TAG : has
    USER ||--o{ TAG : creates
    MOVIE ||--|| LINK : references

    MOVIE {
        int movie_id PK
        string title
        int year
        string genre
        int genre_count
    }

    USER {
        int user_id PK
    }

    RATING {
        int user_id FK
        int movie_id FK
        float rating
        timestamp created_at
    }

    TAG {
        int user_id FK
        int movie_id FK
        string tag
        timestamp created_at
    }

    LINK {
        int movie_id PK
        string imdb_id
        string tmdb_id
    }
Loading

Layer-by-Layer Model Description

Staging Layer (Silver - Bronze→Silver)

Lightweight transformations on raw data. No business logic.

Model Type Description
src_movies View Raw movies data (movieId, title, genres)
src_ratings View Raw ratings with timestamp conversion (Unix → TIMESTAMP_NTZ)
src_tags View Raw tags with timestamp conversion
src_links View External metadata links (IMDb, TMDB)

Key Transformations

  • Timestamp standardization: Unix epoch → Snowflake TIMESTAMP_NTZ
  • Column preservation: Original naming (camelCase) maintained
  • No filtering: All data passed through

Intermediate Layer (Silver)

Business logic, joins, and referential integrity.

Dimension Tables

Model Description Key Features
dim_users Master user list from ratings and tags UNION of userId from both sources, deduplicated
dim_movies Movies with exploded genres and extracted year Genre explosion (1 row per genre), year extraction via regex

Fact Tables

Model Description Granularity Update Strategy
fact_ratings User-movie rating events (user_id, movie_id, created_at) Incremental
fact_tags User-movie tag events (user_id, movie_id, tag, created_at) Full refresh
fact_movie_genre_ratings Ratings repeated per movie genre (movie_id, genre) Full refresh

Design Decisions

  • Referential Integrity: Facts only include valid dimension keys (INNER JOIN)
  • Incremental Processing: fact_ratings uses timestamp filter for efficiency
  • Genre Explosion: dim_movies splits pipe-separated genres into rows

Marts Layer (Gold)

Business-ready tables for self-service analytics.

Mart Description Primary Key Consumers
mart_movie_analytics Movie performance metrics movie_id Content Strategy, Recommendations
mart_user_behavior User engagement and segmentation user_id Product, Marketing, Retention
mart_genre_performance Genre-level analytics genres Programming, Acquisition

Key Business Metrics

Movie Analytics Mart

  • Rating statistics: count, avg, min, max, stddev
  • Tag activity: total tags, unique taggers
  • Quality tiers: rating_tier, quality_tier
  • Temporal metrics: first/last rating date, rating span

User Behavior Mart

  • Engagement metrics: total ratings, unique movies, active days
  • Segmentation: Power/Heavy/Active/Regular/Light user tiers
  • Rating style: Positive/Neutral/Critical rater
  • Genre preferences: Top preferred genre with engagement level

Genre Performance Mart

  • Popularity: total ratings, unique raters
  • Quality: avg rating, quality tier
  • Year distribution: Movies by decade (2020s, 2010s, etc.)
  • Engagement: avg ratings per movie

Snapshots (Historical Tracking)

Slowly Changing Dimension (SCD) Type 2 for tracking mart changes over time.

Snapshot Check Columns Strategy
snap_mart_movie_analytics total_ratings, avg_rating, popularity_tier Check column changes
snap_mart_user_behavior total_ratings, user_segment Check column changes
snap_mart_genre_performance total_ratings, avg_rating, popularity_tier Check column changes

Purpose: Enable trend analysis, detect data anomalies, support rollback scenarios


Technology Choices

Apache Airflow 3.1

Why Airflow?

  1. Workflow Orchestration

    • Industry-standard for ETL/ELT pipelines
    • Declarative DAG definitions (Python SDK)
    • Rich ecosystem of providers (AWS, Snowflake, DBT)
  2. Event-Driven Capabilities

    • S3KeySensor for file-based triggers
    • Task dependencies and branching logic
    • Flexible scheduling (manual, cron, sensor-based)
  3. Production Features

    • High availability via CeleryExecutor
    • Task retries with exponential backoff
    • Comprehensive UI for monitoring and debugging

Alternatives Considered

  • Prefect: Modern but less mature ecosystem
  • Dagster: Powerful but steeper learning curve
  • Step Functions: AWS-native but limited integration with DBT

dbt-core 1.0.0

Why DBT?

  1. Transformations as Code

    • SQL transformations version-controlled in Git
    • CI/CD integration for deployments
    • Code reviews and peer reviews
  2. Medallion Architecture Support

    • Separate staging, intermediate, marts layers
    • Materialization flexibility (view, table, incremental)
    • Modular model composition
  3. Testing Framework

    • Data quality tests (not_null, unique, relationships)
    • Snapshot validation for SCD correctness
    • Comprehensive test suite included
  4. Integration with Airflow

    • Cosmos library seamlessly integrates DBT in Airflow
    • Manifest loading for fast DAG parsing
    • Test behavior control (AFTER_EACH vs AFTER_ALL)

Alternatives Considered

  • Stored Procedures: Hard to version, limited testing
  • SQL Scripts: No dependency management, manual ordering
  • Custom Python: Reinventing transformation logic

Snowflake Cloud Data Warehouse

Why Snowflake?

  1. Cloud-Native Performance

    • Automatic scaling of virtual warehouses
    • Separation of compute and storage
    • Columnar storage for analytical queries
  2. Cost-Effective

    • Pay-per-second usage
    • Automatic suspension of idle warehouses
    • Zero-copy cloning for testing
  3. Ease of Integration

    • Native support for S3 COPY commands
    • Robust Python and SQL APIs
    • Time Travel for data recovery

Warehouse Configuration

  • Database: movielens_32m
  • Schemas: raw, staging, intermediate, marts, snapshots
  • Warehouse: transforming (X-Small for development, Medium for production)
  • Role: dbt_learner (least privilege for DBT operations)

Alternatives Considered

  • BigQuery: Google Cloud lock-in, less mature ecosystem
  • Redshift: Limited concurrency, manual scaling
  • PostgreSQL: Scaling challenges, no separation of compute/storage

AWS S3

Why S3?

  1. Event-Driven Triggers

    • Native event notifications for object creation
    • Reliable file detection (Airflow S3KeySensor)
    • Cost-effective long-term storage
  2. Data Lake Foundation

    • Staging area for raw data
    • Scalable object storage (petabytes)
    • Lifecycle policies for cost optimization
  3. Integration with Snowflake

    • Native COPY commands from S3
    • Fast data transfer
    • No intermediate ETL infrastructure

Alternatives Considered

  • Azure Blob: Microsoft lock-in, limited event options
  • Google Cloud Storage: Similar to S3, higher learning curve

Discord API

Why Discord?

  1. Real-Time Notifications

    • Instant webhook delivery
    • Rich embed messages (formatting, colors, fields)
    • Mobile app support
  2. Team Collaboration

    • Engineering team already on Discord
    • Persistent message history
    • Integration with other tools (Jira, PagerDuty)

Alternatives Considered

  • Slack: Similar capabilities, higher cost
  • Email: Delayed, lacks rich formatting
  • PagerDuty: Overkill for pipeline notifications

Technology Integration Benefits

Benefit Description
Event-Driven Data processed immediately on arrival, no scheduled delays
Scalability Cloud services auto-scale with data volume
Reliability Retry logic, error handling, real-time monitoring
Cost-Effectiveness Pay-per-use pricing, automatic resource management
Maintainability Transformations as code, version control, peer reviews
Flexibility Modular architecture, easy to add new data sources/consumers

Cost Estimation

Note: Cost estimation not included for this portfolio project as per project requirements. For production deployment, costs would depend on:

  • Snowflake warehouse size and usage hours
  • S3 storage volume and request frequency
  • Airflow infrastructure (EC2/ECS or Fargate)
  • Data transfer costs between services
  • Discord and monitoring tools

Quick Start

Prerequisites

Before running the pipeline, ensure you have:

  1. Local Development Environment

    • Docker Desktop (or Docker Engine) installed
    • Docker Compose installed
    • Minimum 4GB RAM allocated to Docker
    • At least 2 CPU cores available
  2. Cloud Services Accounts

    • Snowflake account with appropriate permissions
    • AWS account with S3 bucket
    • Discord webhook URL for notifications
  3. Configuration Files

    • .env file for environment variables
    • Airflow connections configured in UI
    • DBT profile set up for Snowflake

Setup Steps

1. Clone Repository

git clone <your-repository-url>
cd movielens-airflow-dbt-medallion

2. Configure Environment Variables

Create .env file in project root:

# Airflow Configuration
AIRFLOW_UID=50000
AIRFLOW_IMAGE_NAME=apache/airflow:3.1.0
AIRFLOW_PROJ_DIR=$(pwd)

3. Set Airflow Connections

Access Airflow UI at http://localhost:8080 (after starting containers):

Snowflake Connection

  • Navigate to: Admin → Connections → + → Snowflake
  • Connection ID: snowflake
  • Login: Your Snowflake username
  • Password: Your Snowflake password
  • Account: <account>.snowflakecomputing.com (e.g., xy12345.us-east-1)
  • Warehouse: transforming
  • Database: movielens_32m
  • Role: dbt_learner

AWS Connection

  • Navigate to: Admin → Connections → + → AWS
  • Connection ID: aws_default (or custom)
  • AWS Access Key ID: Your AWS access key
  • AWS Secret Access Key: Your AWS secret key
  • Region: Your S3 bucket region (e.g., us-east-1)

4. Set Airflow Variables

Navigate to: Admin → Variables → + → Add Variable

Key Value Description
discord_webhook_api_url https://discord.com/api/webhooks/... Discord webhook for notifications
s3_bucket_name your-bucket-name S3 bucket containing movie data
aws_conn_id aws_default AWS connection ID from step 3

5. Load Data to Snowflake

Download and load MovieLens 32M dataset:

# Download dataset
wget https://files.grouplens.org/datasets/movielens/ml-32m.zip
unzip ml-32m.zip
cd ml-32m

# Upload to S3
aws s3 cp movies.csv s3://<your-bucket>/raw/
aws s3 cp ratings.csv s3://<your-bucket>/raw/
aws s3 cp tags.csv s3://<your-bucket>/raw/
aws s3 cp links.csv s3://<your-bucket>/raw/

Load data to Snowflake raw schema:

-- Create raw schema
CREATE SCHEMA IF NOT EXISTS movielens_32m.raw;

-- Load movies
COPY INTO movielens_32m.raw.movies
FROM @movielens_stage/movies.csv
FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1);

-- Load ratings
COPY INTO movielens_32m.raw.ratings
FROM @movielens_stage/ratings.csv
FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1);

-- Load tags
COPY INTO movielens_32m.raw.tags
FROM @movielens_stage/tags.csv
FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1);

-- Load links
COPY INTO movielens_32m.raw.links
FROM @movielens_stage/links.csv
FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1);

6. Generate DBT Manifest

Build DBT manifest for fast DAG parsing:

# Enter Airflow container
docker-compose run airflow-cli bash

# Navigate to DBT project
cd /usr/local/airflow/dbt/movielens_data_transformation

# Activate virtual environment and install dependencies
source /usr/local/airflow/dbt_venv/bin/activate
pip install dbt-snowflake

# Install DBT packages
dbt deps

# Compile to generate manifest
dbt compile

# Exit container
exit

7. Start Airflow

# Start all services
docker-compose up -d

# Verify services are running
docker-compose ps

# View logs (if needed)
docker-compose logs -f airflow-scheduler

Access Airflow UI: http://localhost:8080 (username: airflow, password: airflow)

8. Trigger Pipeline

Option A: Manual Trigger

  1. Navigate to s3_dbt_discord_pipeline DAG in Airflow UI
  2. Click "Trigger DAG" button
  3. Monitor execution in Grid View or Tree View

Option B: Event-Driven Trigger

  1. Prepare daily data file with format: DDMMYYYY_*.csv
    cp movies.csv 27012026_movies.csv
  2. Upload to S3 bucket:
    aws s3 cp 27012026_movies.csv s3://<your-bucket>/
  3. Pipeline auto-triggers on file detection
  4. Monitor execution and check Discord for notification

9. Run DBT Locally (Optional)

For development and testing:

# Enter Airflow container
docker-compose run airflow-cli bash

# Navigate to DBT project
cd /usr/local/airflow/dbt/movielens_data_transformation

# Activate virtual environment
source /usr/local/airflow/dbt_venv/bin/activate

# Run transformations
dbt run

# Run tests
dbt test

# Generate documentation
dbt docs generate
dbt docs serve

Verification

After successful execution:

  1. Check Airflow UI

    • DAG run status: Success
    • All tasks completed successfully
    • No task failures or retries
  2. Check Discord

    • Notification received with status: ✅ Pipeline completed successfully
    • Task summary shows correct counts
    • Duration metrics populated
  3. Verify Snowflake Tables

    -- Check marts layer
    SELECT COUNT(*) FROM movielens_32m.marts.mart_movie_analytics;
    SELECT COUNT(*) FROM movielens_32m.marts.mart_user_behavior;
    SELECT COUNT(*) FROM movielens_32m.marts.mart_genre_performance;
  4. Query Analytics

    -- Top 10 movies by rating count
    SELECT title, total_ratings, avg_rating
    FROM movielens_32m.marts.mart_movie_analytics
    ORDER BY total_ratings DESC
    LIMIT 10;
    
    -- Top 10 users by activity
    SELECT user_id, total_ratings, user_segment
    FROM movielens_32m.marts.mart_user_behavior
    ORDER BY total_ratings DESC
    LIMIT 10;

Project Structure

movielens-airflow-dbt-medallion/
├── dags/                                          # Airflow DAG definitions
│   └── s3_dbt_discord_pipeline.py               # Event-driven S3→DBT→Discord pipeline
├── dbt/                                           # DBT transformation project
│   └── movielens_data_transformation/           # DBT project root
│       ├── models/                              # Transformation models
│       │   ├── staging/                          # Bronze→Silver (views & base models)
│       │   │   ├── movielens_raw/               # Source-specific staging
│       │   │   │   └── base/                    # Base ephemeral models
│       │   │   │       ├── base_movielens_raw__movies.sql    # Movies base cleanup
│       │   │   │       ├── base_movielens_raw__ratings.sql   # Ratings base cleanup
│       │   │   │       ├── base_movielens_raw__tags.sql     # Tags base cleanup
│       │   │   │       ├── base_movielens_raw__links.sql    # Links base cleanup
│       │   │   │       └── _base_movielens_raw__models.yml  # Base models documentation
│       │   │   ├── src_movies.sql              # Movies staging model (ref)
│       │   │   ├── src_ratings.sql             # Ratings staging model (ref)
│       │   │   ├── src_tags.sql               # Tags staging model (ref)
│       │   │   ├── src_links.sql              # Links staging model (ref)
│       │   │   ├── _src_movies.yml           # Schema documentation
│       │   │   ├── _src_ratings.yml
│       │   │   ├── _src_tags.yml
│       │   │   ├── _src_links.yml
│       │   │   ├── sources.yaml               # Source definitions
│       │   │   └── _docs.md                  # Staging layer documentation
│       │
│       │   ├── intermediate/                     # Silver→Gold (dim/fact tables)
│       │   │   ├── dim/                         # Dimension tables
│       │   │   │   ├── dim_users.sql            # Users dimension
│       │   │   │   ├── dim_movies.sql           # Movies dimension (exploded genres)
│       │   │   │   ├── _dim_users.yml          # Schema documentation
│       │   │   │   └── _dim_movies.yml
│       │   │   │
│       │   │   └── fact/                        # Fact tables
│       │   │       ├── fact_ratings.sql         # Ratings fact (incremental)
│       │   │       ├── fact_tags.sql           # Tags fact
│       │   │       ├── fact_movie_genre_ratings.sql
│       │   │       ├── _fact_ratings.yml
│       │   │       ├── _fact_tags.yml
│       │   │       └── _fact_movie_genre_ratings.yml
│       │
│       │   └── marts/                            # Gold layer (business tables)
│       │       ├── mart_movie_analytics.sql     # Movie performance metrics
│       │       ├── mart_user_behavior.sql       # User engagement metrics
│       │       ├── mart_genre_performance.sql    # Genre analytics
│       │       ├── _mart_movie_analytics.yml
│       │       ├── _mart_user_behavior.yml
│       │       ├── _mart_genre_performance.yml
│       │       └── _exposures.yml              # Business exposure definitions
│
│       ├── snapshots/                          # SCD Type 2 snapshots
│       │   ├── snap_mart_movie_analytics.sql
│       │   ├── snap_mart_user_behavior.sql
│       │   └── snap_mart_genre_performance.sql
│       │
│       ├── macros/                            # Reusable SQL macros
│       │   └── generate_schema_name.sql      # Custom schema naming
│       │
│       ├── tests/                             # DBT tests and verification
│       │   ├── test_snap_mart_*.sql           # SCD validation tests
│       │   ├── scd_verification_queries.sql     # SCD verification queries
│       │   ├── run_scd_test.sh               # Test execution script
│       │   └── SCD_TESTING_WORKFLOW.md       # Test documentation
│       │
│       ├── dbt_project.yml                  # DBT project configuration
│       ├── packages.yml                      # DBT package dependencies
│       ├── package-lock.yml                   # Lock file for reproducibility
│       └── README.md                        # DBT project documentation
│
├── config/                                      # Airflow configuration
│   └── airflow.cfg                         # Airflow settings (auto-generated)
├── logs/                                        # Airflow task logs
├── plugins/                                     # Airflow plugins (empty)
├── docker-compose.yaml                          # Docker services configuration
├── Dockerfile                                   # Custom Airflow image with DBT
├── requirements.txt                             # Python dependencies
├── .gitignore                                  # Git ignore rules
├── .airflowignore                               # Files ignored by Airflow
├── README.md                                   # This file
├── RUNBOOK.md                                  # Operational guide (see separate file)
└── IMPLEMENTATION_PLAN.md                     # Implementation plan and analysis

Key Directories Explained

Directory Purpose Key Files
dags/ Airflow orchestration s3_dbt_discord_pipeline.py
dbt/movielens_data_transformation/models/staging/ Bronze→Silver transformations base_movielens_raw__*.sql (ephemeral), src_*.sql (views), _src_*.yml (docs)
dbt/movielens_data_transformation/models/staging/movielens_raw/base/ Base cleanup models base_movielens_raw__*.sql, _base_movielens_raw__models.yml
dbt/movielens_data_transformation/models/intermediate/ Silver layer with business logic dim_*.sql, fact_*.sql
dbt/movielens_data_transformation/models/marts/ Gold layer business tables mart_*.sql (analytics-ready)
dbt/movielens_data_transformation/snapshots/ Historical SCD tracking snap_mart_*.sql
dbt/movielens_data_transformation/tests/ Data quality and SCD validation test_*.sql, scd_*.sql
config/ Airflow configuration airflow.cfg
logs/ Airflow task execution logs Auto-generated by scheduler/workers

Design Patterns

1. Naming Conventions

  • DAG files: kebab-case.py
  • DBT models: snake_case.sql
  • DBT schema files: _<model_name>.yml
  • Snowflake schemas: lowercase (raw, staging, intermediate, marts)

2. Layer Separation

  • Staging: Lightweight transformations with base ephemeral models for source cleanup and views for data access
  • Base Models: Ephemeral cleanup models (base_movielens_raw__*) handle source-specific data standardization
  • Intermediate: Business logic, joins, aggregations (dim & fact tables)
  • Marts: Consumer-focused, aggregated metrics

3. Documentation Standards

  • All models have schema.yml documentation
  • Comprehensive column descriptions
  • Business metric definitions in marts
  • Use case descriptions in marts documentation

Next Steps


License

Apache License 2.0 - See LICENSE file for details

About

A production-grade data engineering project implementing the medallion architecture with Apache Airflow orchestration and dbt transformations for the MovieLens dataset in Snowflake

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors