This guide provides an end-to-end setup for:
- initializing ingestion and Delta Live Tables (DLT) pipelines,
- evaluating and selecting the best model,
- deploying the champion model to streaming inference,
- implementing automated nightly retraining (with drift-triggered retraining on top).
It is written for this repository's Databricks Asset Bundles (DAB) layout.
Target workflow:
- Ingest market data into Bronze.
- Transform Bronze -> Silver -> Gold features in DLT.
- Train multiple candidate models (tournament).
- Select and promote champion in MLflow Model Registry.
- Score streaming data with the champion model.
- Retrain weekly (plus drift-triggered retraining if needed).
Core bundle resources:
resources/ingestion_workflow.yml->financial_api_ingestion_workflowresources/streaming_pipeline.yml->financial_streaming_pipelineresources/retraining_workflow.yml->financial_retraining_workflowresources/drift_monitoring.yml->drift_monitoring_job
.venv\Scripts\Activate.ps1
uv pip install -e ".[dev,test,streaming]"
databricks auth login
databricks bundle validate -t devBranch protection is active. Never push directly to
main. Always work on afeature/*,fix/*,chore/*, orhotfix/*branch and open a PR. Seeprotect_main.ymlandCICD_EXECUTION_GRAPH.mdfor the full flow.
Run once (as principal with CREATE CATALOG permission):
setup_uc_infrastructure.sql
This script provisions:
mlops_dev|acc|prdcatalogsfinancial_transactionsschema in each catalog- volumes:
packagesstreaming_landing
Grant on each environment (dev, acc, prd):
USE CATALOGUSE SCHEMACREATE TABLESELECT,MODIFY(recommended)READ_VOLUMEonstreaming_landingWRITE_VOLUMEonstreaming_landing(for data writers)
The DLT Bronze source path is configured as:
dbfs:/Volumes/<catalog>/<schema>/streaming_landing/trades
Create the expected landing subfolder and upload sample files (dev):
-- Run in Databricks SQL
LIST 'dbfs:/Volumes/mlops_dev/financial_transactions/streaming_landing/';If trades/ is missing, create it by uploading at least one JSON file to:
dbfs:/Volumes/mlops_dev/financial_transactions/streaming_landing/trades/
Expected JSON schema fields:
trade_id(string)symbol(string)price(double)volume(double)timestamp(timestamp-compatible)exchange(string)
databricks bundle deploy -t dev
databricks bundle run financial_streaming_pipeline -t devValidate medallion outputs:
SHOW TABLES IN mlops_dev.financial_transactions;
SELECT COUNT(*) AS bronze_count FROM mlops_dev.financial_transactions.bronze_trades;
SELECT COUNT(*) AS silver_count FROM mlops_dev.financial_transactions.silver_trades;
SELECT COUNT(*) AS gold_count FROM mlops_dev.financial_transactions.gold_trade_features;If you changed Gold dataset semantics (streaming table -> materialized view style), you may need:
DROP TABLE IF EXISTS mlops_dev.financial_transactions.gold_trade_features;Then rerun the pipeline.
Run tournament + deployment workflow:
databricks bundle run financial_retraining_workflow -t devThis executes:
train_tournament.pydeploy_anomaly_model.py
Use these criteria in order:
- Primary metric:
pr_auc(best for anomaly class imbalance). - Secondary metrics:
f1_score,precision,recall. - Promotion threshold: challenger must beat champion by configured minimum delta.
- Stability checks: compare train/validation gap and recent batch behavior.
Promote only if all are true:
- challenger improves
pr_aucbeyond threshold, - no critical regression in secondary metrics,
- artifacts/params/metrics are fully logged in MLflow,
- model passes smoke inference test on recent Gold data.
Model deployment is handled by deploy_anomaly_model.py in the retraining workflow.
Operationally:
- Champion alias is updated in Model Registry.
- Streaming inference path reads champion version.
- New data in Gold is scored with active champion.
Post-deploy validations:
- champion alias points to expected model version,
- inference endpoint/job can load model artifacts,
- anomaly outputs are produced for fresh records,
- rollback path is tested (
rollback_model.py).
Current retraining schedule in resources/retraining_workflow.yml is nightly:
0 0 0 * * ?(midnight UTC, every day)
To run weekly (example: Monday 02:00 UTC), change to:
schedule:
quartz_cron_expression: "0 0 2 ? * MON *"
timezone_id: UTC
pause_status: ${var.schedule_pause_status}Then deploy:
databricks bundle deploy -t devFor production:
- set
schedule_pause_statustoUNPAUSEDinprd, - keep
devandaccpaused unless testing schedule behavior.
Use hybrid automation:
- Baseline cadence: weekly retraining job.
- Drift monitor (
drift_monitoring_job) runs every 30 minutes. - Escalation policy:
- if drift exceeds threshold, trigger retraining immediately,
- else wait for next weekly cycle.
This balances model freshness and compute cost.
Recommended promotion path:
dev: iterate quickly, fix pipeline/model issues.acc: run full regression and acceptance checks.prd: promote only validated bundle revision and model.
Commands:
databricks bundle deploy -t acc
databricks bundle run financial_streaming_pipeline -t acc
databricks bundle run financial_retraining_workflow -t acc
databricks bundle deploy -t prd
databricks bundle run financial_streaming_pipeline -t prd- Do not manually create DLT target tables; let DLT own them.
- Run preflight table-name conflict checks from
setup_uc_infrastructure.sql. - Keep schema and source path stable across environments.
- Log all model metadata and git SHA for traceability.
- Test rollback after every champion promotion.
- UC infra exists in dev/acc/prd.
- Landing volume path exists and receives data.
- DLT run succeeds end-to-end (Bronze/Silver/Gold).
- Tournament run completes and champion is promoted.
- Streaming inference validates on fresh data.
- Weekly retraining schedule is configured and unpaused in target env.
- Drift monitor is active and alerting thresholds are validated.