A production-style market data ingestion and validation framework for historical OHLCV bars (Daily + 1-Minute) using Alpaca market data. The pipeline writes curated, partitioned Parquet datasets and provides a structured QA layer with artifact-based observability and optional strict enforcement suitable for CI gating and trading research workflows.
Alpaca API
↓
Alpaca Client (retry + pagination)
↓
Normalization Layer (UTC + dedupe)
↓
Partitioned Parquet (symbol/year or symbol/date)
↓
DuckDB Analytics + QA Observability
-
Auth via
.env -
Supports:
symbol,start,end,timeframe(1Day,1Min),feed(default:iex) -
Automatic pagination
-
Exponential backoff for:
- HTTP 429 (rate limiting)
- HTTP 5xx (server errors)
- network timeouts
-
Returns results as a pandas DataFrame
-
Iterates over a 50-ticker universe
-
Fetches daily bars in monthly windows
-
Normalizes into a canonical schema (UTC enforced)
-
Deterministic dedupe on primary key:
(symbol, ts_utc, timeframe) -
Writes partitioned Parquet:
data/curated/bars_daily/symbol=XYZ/year=YYYY/ -
Safe to re-run (idempotent)
-
Day-windowed ingestion for intraday OHLCV bars
-
Failure isolation per (symbol, date)
-
Restartable + rate-limit safe
-
Writes partitioned Parquet:
data/curated/bars_1m/symbol=XYZ/date=YYYY-MM-DD/
- Implemented via
pyarrow.write_to_dataset - Uses
delete_matchingto safely overwrite partitions - DuckDB-compatible queries over glob paths
-
Duplicate primary key detection
-
OHLC integrity checks
-
Timestamp sanity / ordering checks
-
Gap detection (daily + intraday)
-
Coverage metrics (calendar-aware)
-
Deterministic artifacts per QA run:
artifacts/qa/<run_id>/ qa_summary_by_symbol.csv qa_summary_global.csv qa_coverage_by_symbol.csv -
Optional strict enforcement mode (CI-friendly)
The repository includes a framework-level QA export command that computes integrity metrics and writes deterministic artifacts per run.
Artifacts are written to:
artifacts/qa/<run_id>/
qa_summary_by_symbol.csv
qa_summary_global.csv
qa_coverage_by_symbol.csv
Exports QA metrics but does not fail the pipeline.
python -m src.ingestion.qa_export \
--timeframe all \
--start 2025-11-01 \
--end 2025-12-01You may also run for a single timeframe:
python -m src.ingestion.qa_export \
--timeframe 1D \
--start 2025-11-01 \
--end 2025-12-01python -m src.ingestion.qa_export \
--timeframe 1Min \
--start 2025-11-01 \
--end 2025-12-01Fails the pipeline if integrity thresholds are exceeded.
python -m src.ingestion.qa_export \
--timeframe all \
--start 2025-11-01 \
--end 2025-12-01 \
--strict \
--max-duplicate-keys 0 \
--max-ohlc-violations 0| Condition | Exit Code |
|---|---|
| PASS (within thresholds) | 0 |
| WARN (coverage/gap thresholds exceeded) | 0 |
| FAIL (strict integrity violation) | 1 |
Strict mode ensures:
- Duplicate primary keys fail the pipeline
- OHLC violations fail the pipeline
- QA artifacts are still written for debugging
- CI jobs automatically fail when data integrity is compromised
- name: Strict QA
run: |
python -m src.ingestion.qa_export \
--timeframe all \
--start 2025-11-01 \
--end 2025-12-01 \
--strict \
--max-duplicate-keys 0 \
--max-ohlc-violations 0If integrity rules are violated, the CI job exits with code 1.
In trading and backtesting systems:
- Duplicate bars distort indicators (EMA, RSI, VWAP, etc.)
- OHLC violations corrupt return calculations
- Silent ingestion errors invalidate research
- Coverage gaps can bias signal evaluation
The qa_export CLI formalizes dataset observability and enables production-grade guardrails before any modeling or execution layer consumes the data.
The project operates on a curated universe (currently 50 tickers) optimized for stable historical availability.
Document:
- Selection logic (liquidity + large-cap bias)
- Exclusion rules
- Intended expansion strategy
Tip: include this as
docs/universe.mdso it’s easy to audit and evolve.
Reusable Alpaca historical market data client:
src/ingestion/alpaca_client.py
from src.ingestion.alpaca_client import AlpacaMarketDataClient
client = AlpacaMarketDataClient.from_env()
df = client.fetch_bars(
symbol="AAPL",
start="2025-01-01T00:00:00Z",
end="2025-02-01T00:00:00Z",
timeframe="1Day"
)
print(df.head())python -m venv venv
.\venv\Scripts\activatepip install -r requirements.txtIf you install new packages and want to pin exact versions:
pip freeze > requirements.txtCreate a .env file in the project root:
ALPACA_API_KEY=your_key_here
ALPACA_SECRET_KEY=your_secret_here
ALPACA_DATA_FEED=iexdata/
curated/
bars_daily/
bars_1m/
artifacts/
qa/
reports/
src/
ingestion/
qa/
utils/
notebooks/
python -m src.ingestion.backfill_daily --start 2023-01-01 --end 2026-01-01data/curated/bars_daily/symbol=XYZ/year=YYYY/part-*.parquet
Windowed ingestion of 1-minute OHLCV bars (IEX-compatible) into curated Parquet partitions.
Designed for:
- Idempotent re-runs
- Restartable backfills
- Rate-limit safe API usage
- Failure isolation
- DuckDB/Pandas/backtesting compatibility
| Column | Type | Description |
|---|---|---|
| symbol | string | Ticker symbol |
| ts_utc | timestamp (UTC) | Bar open time in UTC |
| open | float | Open price |
| high | float | High price |
| low | float | Low price |
| close | float | Close price |
| volume | float/int | Trade volume |
| source | string | e.g., alpaca_iex |
| timeframe | string | "1Min" |
| date | string | YYYY-MM-DD derived from ts_utc |
Primary key (idempotency):
(symbol, ts_utc, timeframe)
data/curated/bars_1m/
symbol=XYZ/
date=YYYY-MM-DD/
part-*.parquet
python -m src.ingestion.backfill_1m \
--start 2024-01-01 \
--end 2024-04-01Optional flags:
--symbols configs/tickers_50.txt
--max-symbols 5
--sleep-ms 100
--no-progressFailed (symbol, date) requests are recorded:
reports/backfill_1m_failures_YYYYMMDD_HHMMSS.csv
Backfill continues even when individual symbol/day fetches fail.
Example: validate per-symbol ranges and row counts directly over Parquet:
import duckdb
con = duckdb.connect()
df = con.execute("""
SELECT symbol,
MIN(ts_utc) AS min_ts,
MAX(ts_utc) AS max_ts,
COUNT(*) AS row_count
FROM 'data/curated/bars_daily/**/*.parquet'
GROUP BY symbol
ORDER BY symbol
""").df()
print(df.head())Validates curated market data after ingestion runs (daily and 1-minute).
Ensures:
- Primary key uniqueness
- OHLC integrity
- Timestamp ordering
- Coverage / gap detection
- Repeatable reporting
- Optional strict enforcement mode
| Timeframe | Path |
|---|---|
| Daily (1D) | data/curated/bars_daily/**/*.parquet |
| 1-Minute | data/curated/bars_1m/**/*.parquet |
Expected columns:
symbol, ts_utc, open, high, low, close, volume, source, timeframe
Primary key:
(symbol, ts_utc, timeframe)
Detects duplicates on:
(symbol, ts_utc, timeframe)
- Reports per symbol
- Strict mode fails if duplicates exist
Validates:
high >= max(open, close)low <= min(open, close)high >= lowopen, high, low, close > 0volume >= 0- Required fields non-null
ts_utcnon-null- Out-of-order detection per symbol
- Calendar-aware expected sessions (XNYS default)
Gap metrics computed within (symbol, session_date) using timestamp deltas:
gap_countgap_segmentsmax_gap_len
This prevents overnight/weekend closures from being counted as gaps.
Note on Alpaca IEX feed: missing minute bars may reflect “no trades” rather than corruption. Tune thresholds accordingly.
QA exports are written to:
artifacts/qa/<run_id>/
qa_summary_by_symbol.csv
qa_summary_global.csv
qa_coverage_by_symbol.csv
Global summary includes:
- total_rows / unique_rows / duplicate_rows
- total_gap_count
- total_ohlc_violation_count
- pct_symbols_below_coverage_threshold
- overall_status (PASS / WARN / FAIL)
Run IDs are deterministic from:
dataset_name + interval + start_ts + end_ts + calendar
When strict mode is enabled:
- Duplicate primary keys → FAIL (exit code 1)
- OHLC violations over threshold → FAIL (exit code 1)
- Artifacts still written for debugging
- CI-compatible gating
python -m src.ingestion.qa_export \
--timeframe all \
--start 2025-11-01 \
--end 2025-12-01 \
--strict \
--max-duplicate-keys 0 \
--max-ohlc-violations 0Run unit tests:
pytest -qTests cover:
- Normalization logic (canonical schema + UTC + sorting)
- Duplicate key detection
- OHLC violation detection
- Strict enforcement path
Notebook:
notebooks/01_duckdb_sanity_queries.ipynb
Validates:
- Schema presence and timestamp parsing
- Coverage + completeness
- Duplicate primary key contract
- Basic validity constraints
- Example analytics with window functions
Exports:
reports/duckdb_sanity_summary.csv
- Fully session-aware expected-minute index comparison
- Extended-hours handling
- Per-run historical QA trend table
- Machine-readable JSON summary export
- CI auto-fail policies based on
qa_summary_global.csv