This guide provides comprehensive best practices for designing, securing, and deploying production-grade REST APIs for NTSB aviation accident data. Covers FastAPI implementation, authentication, rate limiting, versioning, and auto-documentation with 12+ production-ready examples.
- RESTful API Design Principles
- FastAPI for ML Model Serving
- Authentication Strategies
- Rate Limiting Implementation
- API Versioning
- OpenAPI Documentation
- SDK Generation
graph TD
A[API Root /api/v1/] --> B[/accidents]
A --> C[/aircraft]
A --> D[/investigations]
A --> E[/predictions]
B --> F[/accidents/{id}]
B --> G[/accidents/search]
B --> H[/accidents/statistics]
C --> I[/aircraft/{id}]
D --> J[/investigations/{id}/status]
E --> K[/predictions/severity]
E --> L[/predictions/cause]
| Method | Purpose | Idempotent | Safe |
|---|---|---|---|
| GET | Retrieve resource(s) | Yes | Yes |
| POST | Create resource | No | No |
| PUT | Update/replace entire resource | Yes | No |
| PATCH | Partial update | No | No |
| DELETE | Remove resource | Yes | No |
# api/main.py - Production FastAPI application
from fastapi import FastAPI, HTTPException, Depends, Query, Path, Body
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Dict, Any
from datetime import datetime, date
import polars as pl
from contextlib import asynccontextmanager
import joblib
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global state for ML models
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Startup/shutdown lifecycle management
Load ML models on startup, cleanup on shutdown
"""
logger.info("Loading ML models...")
ml_models['severity_classifier'] = joblib.load('models/severity_classifier.pkl')
ml_models['cause_predictor'] = joblib.load('models/cause_predictor.pkl')
logger.info("ML models loaded successfully")
yield
# Cleanup
logger.info("Shutting down, cleaning up resources...")
ml_models.clear()
# Initialize FastAPI app
app = FastAPI(
title="NTSB Aviation API",
description="RESTful API for aviation accident data and ML predictions",
version="1.0.0",
docs_url="/api/v1/docs",
redoc_url="/api/v1/redoc",
openapi_url="/api/v1/openapi.json",
lifespan=lifespan
)
# Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://dashboard.example.com"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Pydantic models for request/response validation
class AccidentBase(BaseModel):
"""Base accident model"""
event_id: str = Field(..., description="Unique event identifier")
event_date: date = Field(..., description="Date of accident")
location: str = Field(..., min_length=1, max_length=255)
latitude: Optional[float] = Field(None, ge=-90, le=90)
longitude: Optional[float] = Field(None, ge=-180, le=180)
aircraft_category: str = Field(..., description="Aircraft type")
injury_severity: str = Field(..., regex="^(FATL|SERS|MINR|NONE)$")
class Config:
schema_extra = {
"example": {
"event_id": "20230101001",
"event_date": "2023-01-01",
"location": "Los Angeles, CA",
"latitude": 34.0522,
"longitude": -118.2437,
"aircraft_category": "airplane",
"injury_severity": "SERS"
}
}
class AccidentCreate(AccidentBase):
"""Model for creating new accident record"""
investigator_id: Optional[str] = None
narrative: Optional[str] = Field(None, max_length=10000)
class AccidentResponse(AccidentBase):
"""Model for accident response with metadata"""
id: int
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
class PaginatedResponse(BaseModel):
"""Generic paginated response"""
items: List[Any]
total: int
page: int
page_size: int
total_pages: int
class PredictionRequest(BaseModel):
"""ML prediction request"""
aircraft_category: str
phase_of_flight: str
weather_condition: str
pilot_experience_hours: int = Field(..., ge=0)
aircraft_age_years: float = Field(..., ge=0)
@validator('pilot_experience_hours')
def validate_experience(cls, v):
if v > 50000:
raise ValueError('Pilot experience hours seems unrealistic')
return v
class PredictionResponse(BaseModel):
"""ML prediction response"""
predicted_severity: str
confidence: float = Field(..., ge=0, le=1)
contributing_factors: List[str]
model_version: str
# Health check endpoint
@app.get("/health", tags=["system"])
async def health_check():
"""
Health check endpoint for monitoring
Returns service health status and dependency checks
"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "1.0.0",
"models_loaded": len(ml_models) > 0,
"dependencies": {
"database": "connected",
"redis": "connected"
}
}
# Accident endpoints
@app.get(
"/api/v1/accidents",
response_model=PaginatedResponse,
tags=["accidents"],
summary="List accidents with pagination"
)
async def list_accidents(
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(50, ge=1, le=100, description="Items per page"),
severity: Optional[str] = Query(None, regex="^(FATL|SERS|MINR|NONE)$"),
start_date: Optional[date] = None,
end_date: Optional[date] = None,
aircraft_category: Optional[str] = None
):
"""
Retrieve paginated list of accidents with optional filters
- **page**: Page number (1-indexed)
- **page_size**: Number of items per page (max 100)
- **severity**: Filter by injury severity
- **start_date**: Filter accidents after this date
- **end_date**: Filter accidents before this date
- **aircraft_category**: Filter by aircraft type
"""
try:
# Load data
df = pl.read_parquet('data/processed/accidents.parquet')
# Apply filters
if severity:
df = df.filter(pl.col('injury_severity') == severity)
if start_date:
df = df.filter(pl.col('event_date') >= start_date)
if end_date:
df = df.filter(pl.col('event_date') <= end_date)
if aircraft_category:
df = df.filter(pl.col('aircraft_category') == aircraft_category)
# Pagination
total = len(df)
offset = (page - 1) * page_size
df_page = df.slice(offset, page_size)
return {
"items": df_page.to_dicts(),
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}
except Exception as e:
logger.error(f"Error in list_accidents: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get(
"/api/v1/accidents/{event_id}",
response_model=AccidentResponse,
tags=["accidents"],
summary="Get accident by ID"
)
async def get_accident(
event_id: str = Path(..., description="Unique event identifier")
):
"""
Retrieve detailed information for a specific accident
Returns complete accident record including narrative and investigation status
"""
df = pl.read_parquet('data/processed/accidents.parquet')
accident = df.filter(pl.col('event_id') == event_id)
if len(accident) == 0:
raise HTTPException(status_code=404, detail=f"Accident {event_id} not found")
return accident.to_dicts()[0]
@app.post(
"/api/v1/accidents",
response_model=AccidentResponse,
status_code=201,
tags=["accidents"],
summary="Create new accident record"
)
async def create_accident(
accident: AccidentCreate = Body(...),
# auth: str = Depends(verify_token) # Uncomment for authentication
):
"""
Create a new accident record
Requires authentication. Returns created record with assigned ID.
"""
# Validate and create record
# In production, this would insert into database
logger.info(f"Creating accident record: {accident.event_id}")
# Mock response
return {
**accident.dict(),
"id": 12345,
"created_at": datetime.now(),
"updated_at": datetime.now()
}
@app.get(
"/api/v1/accidents/statistics/summary",
tags=["statistics"],
summary="Get accident statistics"
)
async def get_statistics(
start_date: Optional[date] = None,
end_date: Optional[date] = None
):
"""
Calculate comprehensive accident statistics
Returns aggregated metrics including counts, rates, and trends
"""
df = pl.read_parquet('data/processed/accidents.parquet')
if start_date:
df = df.filter(pl.col('event_date') >= start_date)
if end_date:
df = df.filter(pl.col('event_date') <= end_date)
stats = {
"total_accidents": len(df),
"by_severity": df.group_by('injury_severity').count().to_dicts(),
"by_aircraft_category": df.group_by('aircraft_category').count().to_dicts(),
"monthly_trend": df.group_by_dynamic('event_date', every='1mo')
.count()
.to_dicts(),
"average_injuries": float(df['total_injuries'].mean()),
"fatal_accident_rate": float(
len(df.filter(pl.col('injury_severity') == 'FATL')) / len(df) * 100
)
}
return stats
# ML prediction endpoints
@app.post(
"/api/v1/predictions/severity",
response_model=PredictionResponse,
tags=["predictions"],
summary="Predict accident severity"
)
async def predict_severity(
request: PredictionRequest = Body(...),
# auth: str = Depends(verify_api_key) # Uncomment for API key auth
):
"""
Predict accident severity using ML model
Input features:
- Aircraft category
- Phase of flight
- Weather conditions
- Pilot experience
- Aircraft age
Returns predicted severity with confidence score
"""
try:
# Prepare features
features = [
request.aircraft_category,
request.phase_of_flight,
request.weather_condition,
request.pilot_experience_hours,
request.aircraft_age_years
]
# Get model
model = ml_models.get('severity_classifier')
if not model:
raise HTTPException(status_code=503, detail="ML model not available")
# Predict
prediction = model.predict([features])[0]
confidence = float(model.predict_proba([features]).max())
# Get feature importance
feature_names = ['aircraft', 'phase', 'weather', 'experience', 'age']
importances = model.feature_importances_
top_factors = sorted(
zip(feature_names, importances),
key=lambda x: x[1],
reverse=True
)[:3]
return {
"predicted_severity": prediction,
"confidence": confidence,
"contributing_factors": [f[0] for f in top_factors],
"model_version": "1.0.0"
}
except Exception as e:
logger.error(f"Prediction error: {e}")
raise HTTPException(status_code=500, detail="Prediction failed")
# Streaming endpoint for large datasets
@app.get(
"/api/v1/accidents/export",
tags=["accidents"],
summary="Export accidents as CSV stream"
)
async def export_accidents():
"""
Stream accident data as CSV for bulk export
Returns CSV data with gzip compression for large datasets
"""
async def generate_csv():
df = pl.read_parquet('data/processed/accidents.parquet')
# Header
yield ','.join(df.columns) + '\n'
# Stream rows in batches
batch_size = 1000
for i in range(0, len(df), batch_size):
batch = df.slice(i, batch_size)
for row in batch.iter_rows():
yield ','.join(str(x) for x in row) + '\n'
return StreamingResponse(
generate_csv(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=accidents.csv"}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info",
access_log=True
)# api/ml_service.py - Production ML serving with versioning
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import joblib
import numpy as np
from typing import Dict, List, Optional
from datetime import datetime
from pathlib import Path
import asyncio
from functools import lru_cache
class ModelRegistry:
"""
Centralized model registry with versioning and A/B testing
"""
def __init__(self, models_dir: Path):
self.models_dir = models_dir
self.models: Dict[str, Dict] = {}
self.model_metrics: Dict[str, List] = {}
def load_model(self, name: str, version: str) -> Any:
"""Load model from disk"""
model_path = self.models_dir / name / version / "model.pkl"
if not model_path.exists():
raise FileNotFoundError(f"Model not found: {model_path}")
return joblib.load(model_path)
def register_model(
self,
name: str,
version: str,
model: Any,
metadata: Dict
):
"""Register model in memory"""
if name not in self.models:
self.models[name] = {}
self.models[name][version] = {
'model': model,
'metadata': metadata,
'loaded_at': datetime.now(),
'prediction_count': 0
}
def get_model(self, name: str, version: Optional[str] = None):
"""Get model by name and version (latest if not specified)"""
if name not in self.models:
raise ValueError(f"Model {name} not found")
if version:
return self.models[name].get(version)
# Return latest version
versions = sorted(self.models[name].keys(), reverse=True)
return self.models[name][versions[0]]
def record_prediction(self, name: str, version: str, latency_ms: float):
"""Record prediction metrics"""
key = f"{name}:{version}"
if key not in self.model_metrics:
self.model_metrics[key] = []
self.model_metrics[key].append({
'timestamp': datetime.now(),
'latency_ms': latency_ms
})
# Keep only last 1000 predictions
self.model_metrics[key] = self.model_metrics[key][-1000:]
# Update prediction count
if name in self.models and version in self.models[name]:
self.models[name][version]['prediction_count'] += 1
def get_metrics(self, name: str, version: str) -> Dict:
"""Get model performance metrics"""
key = f"{name}:{version}"
metrics = self.model_metrics.get(key, [])
if not metrics:
return {}
latencies = [m['latency_ms'] for m in metrics]
return {
'total_predictions': len(metrics),
'avg_latency_ms': np.mean(latencies),
'p50_latency_ms': np.percentile(latencies, 50),
'p95_latency_ms': np.percentile(latencies, 95),
'p99_latency_ms': np.percentile(latencies, 99)
}
# Global registry
registry = ModelRegistry(Path('models'))
app = FastAPI(title="ML Model Service")
@app.on_event("startup")
async def load_models():
"""Load all models on startup"""
models_to_load = [
('severity_classifier', '1.0.0'),
('severity_classifier', '1.1.0'),
('cause_predictor', '1.0.0')
]
for name, version in models_to_load:
try:
model = registry.load_model(name, version)
metadata = {
'name': name,
'version': version,
'framework': 'scikit-learn',
'metrics': {'accuracy': 0.92, 'f1': 0.89}
}
registry.register_model(name, version, model, metadata)
print(f"Loaded {name} v{version}")
except Exception as e:
print(f"Failed to load {name} v{version}: {e}")
@app.post("/predict/{model_name}")
async def predict(
model_name: str,
features: List[float],
version: Optional[str] = None,
background_tasks: BackgroundTasks = None
):
"""
Make prediction with specified model
Supports versioning and A/B testing
"""
start_time = datetime.now()
try:
# Get model
model_info = registry.get_model(model_name, version)
if not model_info:
raise HTTPException(status_code=404, detail="Model not found")
model = model_info['model']
# Predict
prediction = model.predict([features])[0]
probabilities = model.predict_proba([features])[0].tolist()
# Calculate latency
latency_ms = (datetime.now() - start_time).total_seconds() * 1000
# Record metrics asynchronously
if background_tasks:
background_tasks.add_task(
registry.record_prediction,
model_name,
model_info['metadata']['version'],
latency_ms
)
return {
"prediction": prediction,
"probabilities": probabilities,
"model_version": model_info['metadata']['version'],
"latency_ms": latency_ms
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/models")
async def list_models():
"""List all available models with metadata"""
models_list = []
for name, versions in registry.models.items():
for version, info in versions.items():
models_list.append({
'name': name,
'version': version,
'loaded_at': info['loaded_at'].isoformat(),
'prediction_count': info['prediction_count'],
'metadata': info['metadata']
})
return {"models": models_list}
@app.get("/models/{model_name}/metrics")
async def get_model_metrics(model_name: str, version: Optional[str] = None):
"""Get performance metrics for a model"""
model_info = registry.get_model(model_name, version)
if not model_info:
raise HTTPException(status_code=404, detail="Model not found")
metrics = registry.get_metrics(
model_name,
model_info['metadata']['version']
)
return {
"model": model_name,
"version": model_info['metadata']['version'],
"metrics": metrics
}# api/auth.py - JWT-based authentication
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
from pydantic import BaseModel
# Configuration
SECRET_KEY = "your-secret-key-here-change-in-production" # Use environment variable
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
scopes: List[str] = []
class UserInDB(User):
hashed_password: str
# Mock database
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
"scopes": ["read:accidents", "write:accidents", "predict:severity"]
}
}
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash password"""
return pwd_context.hash(password)
def get_user(db: dict, username: str) -> Optional[UserInDB]:
"""Get user from database"""
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(db: dict, username: str, password: str) -> Optional[UserInDB]:
"""Authenticate user"""
user = get_user(db, username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""
Dependency to get current authenticated user from JWT token
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(
username=username,
scopes=payload.get("scopes", [])
)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""Dependency to ensure user is active"""
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def check_scopes(required_scopes: List[str]):
"""
Dependency factory for scope-based authorization
"""
async def scope_checker(current_user: User = Depends(get_current_active_user)):
for scope in required_scopes:
if scope not in current_user.scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Not enough permissions. Required scope: {scope}"
)
return current_user
return scope_checker
# Token endpoint
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""
OAuth2 token endpoint
Authenticate with username/password and receive JWT token
"""
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": user.scopes},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# Protected endpoint example
@app.get("/api/v1/protected")
async def protected_route(
current_user: User = Depends(check_scopes(["read:accidents"]))
):
"""
Protected endpoint requiring authentication and specific scope
"""
return {
"message": "This is protected data",
"user": current_user.username,
"scopes": current_user.scopes
}# api/api_key_auth.py - Simple API key authentication
from fastapi import Security, HTTPException, status
from fastapi.security import APIKeyHeader
from typing import Optional
import secrets
import hashlib
from datetime import datetime
API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
# Mock API key database (use real database in production)
api_keys_db = {
hashlib.sha256("test-api-key-123".encode()).hexdigest(): {
"name": "Test Client",
"scopes": ["read", "predict"],
"rate_limit": 1000,
"created_at": datetime.now(),
"last_used": None
}
}
async def verify_api_key(api_key: str = Security(api_key_header)) -> dict:
"""
Verify API key from header
Returns API key metadata if valid
"""
if api_key is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing API key"
)
# Hash the provided key
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
# Look up in database
key_data = api_keys_db.get(key_hash)
if key_data is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key"
)
# Update last used
key_data['last_used'] = datetime.now()
return key_data
def generate_api_key() -> str:
"""Generate cryptographically secure API key"""
return secrets.token_urlsafe(32)
@app.post("/api/v1/api-keys")
async def create_api_key(
name: str,
scopes: List[str],
current_user: User = Depends(get_current_active_user)
):
"""
Generate new API key for authenticated user
Requires authentication
"""
# Generate key
api_key = generate_api_key()
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
# Store in database
api_keys_db[key_hash] = {
"name": name,
"scopes": scopes,
"rate_limit": 1000,
"created_at": datetime.now(),
"owner": current_user.username,
"last_used": None
}
return {
"api_key": api_key, # Only shown once!
"name": name,
"scopes": scopes,
"created_at": datetime.now()
}
@app.get("/api/v1/predict/with-key")
async def predict_with_api_key(
features: List[float],
key_data: dict = Depends(verify_api_key)
):
"""
Protected prediction endpoint using API key authentication
"""
# Check if key has required scope
if "predict" not in key_data["scopes"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API key does not have 'predict' scope"
)
# Make prediction
# ... prediction logic ...
return {
"prediction": "SERS",
"confidence": 0.87,
"api_key_name": key_data["name"]
}# api/rate_limiter.py - Production rate limiting
from fastapi import Request, HTTPException, status
from fastapi.responses import JSONResponse
import redis
import time
from typing import Optional
import asyncio
class TokenBucketRateLimiter:
"""
Token bucket algorithm for rate limiting
Allows burst traffic while enforcing average rate
"""
def __init__(
self,
redis_client: redis.Redis,
rate: int = 100, # tokens per window
window: int = 60, # seconds
burst: int = 120 # max tokens (burst allowance)
):
self.redis = redis_client
self.rate = rate
self.window = window
self.burst = burst
self.refill_rate = rate / window # tokens per second
def _get_key(self, identifier: str) -> str:
"""Generate Redis key for rate limit bucket"""
return f"rate_limit:token_bucket:{identifier}"
async def is_allowed(self, identifier: str) -> tuple[bool, dict]:
"""
Check if request is allowed
Returns (allowed, metadata)
"""
key = self._get_key(identifier)
now = time.time()
# Get current bucket state
pipe = self.redis.pipeline()
pipe.hgetall(key)
pipe.expire(key, self.window * 2) # Keep bucket alive
result = pipe.execute()
bucket = result[0]
if not bucket:
# Initialize new bucket
tokens = self.burst - 1
last_refill = now
self.redis.hset(key, mapping={
'tokens': tokens,
'last_refill': last_refill
})
return True, {
'remaining': int(tokens),
'limit': self.burst,
'reset': int(now + self.window)
}
# Calculate tokens to add since last refill
tokens = float(bucket[b'tokens'])
last_refill = float(bucket[b'last_refill'])
time_passed = now - last_refill
tokens_to_add = time_passed * self.refill_rate
# Refill bucket
tokens = min(self.burst, tokens + tokens_to_add)
if tokens >= 1:
# Allow request and consume token
tokens -= 1
self.redis.hset(key, mapping={
'tokens': tokens,
'last_refill': now
})
return True, {
'remaining': int(tokens),
'limit': self.burst,
'reset': int(now + self.window)
}
else:
# Rate limit exceeded
retry_after = int((1 - tokens) / self.refill_rate)
return False, {
'remaining': 0,
'limit': self.burst,
'reset': int(now + retry_after),
'retry_after': retry_after
}
class SlidingWindowRateLimiter:
"""
Sliding window log algorithm for precise rate limiting
More accurate than fixed window, but higher memory usage
"""
def __init__(
self,
redis_client: redis.Redis,
max_requests: int = 100,
window: int = 60
):
self.redis = redis_client
self.max_requests = max_requests
self.window = window
def _get_key(self, identifier: str) -> str:
return f"rate_limit:sliding_window:{identifier}"
async def is_allowed(self, identifier: str) -> tuple[bool, dict]:
"""Check if request is allowed using sliding window"""
key = self._get_key(identifier)
now = time.time()
window_start = now - self.window
# Use Redis sorted set to store timestamps
pipe = self.redis.pipeline()
# Remove old requests outside window
pipe.zremrangebyscore(key, 0, window_start)
# Count requests in current window
pipe.zcard(key)
# Add current request timestamp
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, self.window)
results = pipe.execute()
request_count = results[1]
if request_count < self.max_requests:
return True, {
'remaining': self.max_requests - request_count - 1,
'limit': self.max_requests,
'reset': int(now + self.window)
}
else:
# Get oldest request to calculate retry time
oldest = self.redis.zrange(key, 0, 0, withscores=True)
if oldest:
retry_after = int(oldest[0][1] + self.window - now)
else:
retry_after = self.window
return False, {
'remaining': 0,
'limit': self.max_requests,
'reset': int(now + retry_after),
'retry_after': retry_after
}
# Middleware for automatic rate limiting
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
rate_limiter = TokenBucketRateLimiter(redis_client, rate=100, window=60, burst=120)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
"""
Apply rate limiting to all requests
Uses client IP or API key as identifier
"""
# Skip rate limiting for health checks
if request.url.path == "/health":
return await call_next(request)
# Determine identifier (IP or API key)
api_key = request.headers.get("X-API-Key")
identifier = api_key if api_key else request.client.host
# Check rate limit
allowed, metadata = await rate_limiter.is_allowed(identifier)
if not allowed:
return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={
"error": "Rate limit exceeded",
"retry_after": metadata['retry_after']
},
headers={
"X-RateLimit-Limit": str(metadata['limit']),
"X-RateLimit-Remaining": str(metadata['remaining']),
"X-RateLimit-Reset": str(metadata['reset']),
"Retry-After": str(metadata['retry_after'])
}
)
# Process request
response = await call_next(request)
# Add rate limit headers
response.headers["X-RateLimit-Limit"] = str(metadata['limit'])
response.headers["X-RateLimit-Remaining"] = str(metadata['remaining'])
response.headers["X-RateLimit-Reset"] = str(metadata['reset'])
return response# api/versioning.py - API versioning strategies
from fastapi import APIRouter, FastAPI
from typing import List
app = FastAPI()
# V1 API
router_v1 = APIRouter(prefix="/api/v1", tags=["v1"])
@router_v1.get("/accidents")
async def list_accidents_v1():
"""V1: Returns basic accident list"""
return {"version": "1.0", "data": []}
# V2 API with enhanced features
router_v2 = APIRouter(prefix="/api/v2", tags=["v2"])
@router_v2.get("/accidents")
async def list_accidents_v2(
include_predictions: bool = False
):
"""
V2: Returns accident list with optional ML predictions
New in v2:
- Includes aircraft manufacturer data
- Optional ML severity predictions
- Enhanced filtering
"""
return {
"version": "2.0",
"data": [],
"predictions": [] if include_predictions else None
}
app.include_router(router_v1)
app.include_router(router_v2)
# Version negotiation via Accept header
from fastapi import Header
@app.get("/api/accidents")
async def list_accidents_versioned(
accept: str = Header("application/vnd.ntsb.v1+json")
):
"""
Content negotiation versioning
Clients specify version via Accept header:
- application/vnd.ntsb.v1+json
- application/vnd.ntsb.v2+json
"""
if "v2" in accept:
# V2 response
return {"version": "2.0", "data": []}
else:
# V1 response (default)
return {"version": "1.0", "data": []}# api/openapi_customization.py - Enhanced API documentation
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
app = FastAPI()
def custom_openapi():
"""
Customize OpenAPI schema with additional metadata
"""
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="NTSB Aviation API",
version="1.0.0",
description="""
# NTSB Aviation Accident Database API
Production-grade REST API for aviation accident data and ML predictions.
## Features
- **Accident Data**: Comprehensive accident records from 1962-present
- **ML Predictions**: Real-time severity and cause prediction
- **Geospatial**: Location-based queries with coordinate filtering
- **Statistics**: Aggregated metrics and trend analysis
## Authentication
All endpoints except `/health` require authentication:
1. **JWT Tokens**: For user authentication
- Obtain token from `/token` endpoint
- Include in `Authorization: Bearer <token>` header
2. **API Keys**: For service-to-service
- Include in `X-API-Key` header
- Request key from `/api/v1/api-keys` endpoint
## Rate Limiting
- **Authenticated**: 1000 requests/hour
- **Anonymous**: 100 requests/hour
- **Burst**: Up to 120 requests/minute
Rate limit headers included in all responses.
## Support
- Email: api-support@ntsb.example.com
- Docs: https://docs.ntsb.example.com
- Status: https://status.ntsb.example.com
""",
routes=app.routes,
contact={
"name": "NTSB API Support",
"url": "https://ntsb.example.com/support",
"email": "api@ntsb.example.com"
},
license_info={
"name": "Apache 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.html"
}
)
# Add security schemes
openapi_schema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
},
"ApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key"
}
}
# Add servers
openapi_schema["servers"] = [
{"url": "https://api.ntsb.example.com", "description": "Production"},
{"url": "https://staging-api.ntsb.example.com", "description": "Staging"},
{"url": "http://localhost:8000", "description": "Development"}
]
# Add example responses
openapi_schema["components"]["examples"] = {
"AccidentExample": {
"value": {
"event_id": "20230101001",
"event_date": "2023-01-01",
"location": "Los Angeles, CA",
"severity": "SERS"
}
}
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi# sdk/generate_sdk.py - Generate Python SDK from OpenAPI spec
import json
import requests
from pathlib import Path
from jinja2 import Template
def generate_python_sdk(openapi_url: str, output_dir: Path):
"""
Generate Python SDK from OpenAPI specification
Creates a fully-typed Python client library
"""
# Fetch OpenAPI spec
response = requests.get(openapi_url)
spec = response.json()
# SDK client template
client_template = Template('''
"""
{{ spec.info.title }} Python SDK
Auto-generated from OpenAPI specification
"""
import requests
from typing import Optional, List, Dict, Any
from datetime import date
class {{ class_name }}:
"""{{ spec.info.description }}"""
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.session = requests.Session()
if api_key:
self.session.headers['X-API-Key'] = api_key
def _request(
self,
method: str,
path: str,
**kwargs
) -> Dict[str, Any]:
"""Make HTTP request"""
url = f"{self.base_url}{path}"
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
{% for path, methods in spec.paths.items() %}
{% for method, operation in methods.items() %}
def {{ operation.operationId }}(
self,
{% for param in operation.parameters or [] %}
{{ param.name }}: {{ param.schema.type }},
{% endfor %}
) -> Dict[str, Any]:
"""
{{ operation.summary }}
{{ operation.description }}
"""
{% if method == 'get' %}
params = {
{% for param in operation.parameters or [] %}
'{{ param.name }}': {{ param.name }},
{% endfor %}
}
return self._request('{{ method.upper() }}', '{{ path }}', params=params)
{% elif method == 'post' %}
return self._request('{{ method.upper() }}', '{{ path }}', json=data)
{% endif %}
{% endfor %}
{% endfor %}
''')
# Generate SDK
sdk_code = client_template.render(
spec=spec,
class_name='NTSBClient'
)
# Write to file
output_file = output_dir / 'ntsb_sdk.py'
output_file.write_text(sdk_code)
print(f"SDK generated: {output_file}")
# Usage example
if __name__ == '__main__':
generate_python_sdk(
'http://localhost:8000/api/v1/openapi.json',
Path('sdk')
)// sdk/ntsb-client.ts - TypeScript SDK example
/**
* NTSB Aviation API TypeScript SDK
* Auto-generated client library
*/
export interface AccidentQuery {
page?: number;
pageSize?: number;
severity?: 'FATL' | 'SERS' | 'MINR' | 'NONE';
startDate?: string;
endDate?: string;
}
export interface Accident {
eventId: string;
eventDate: string;
location: string;
latitude?: number;
longitude?: number;
aircraftCategory: string;
injurySeverity: string;
}
export interface PaginatedResponse<T> {
items: T[];
total: number;
page: number;
pageSize: number;
totalPages: number;
}
export class NTSBClient {
private baseUrl: string;
private apiKey?: string;
constructor(baseUrl: string, apiKey?: string) {
this.baseUrl = baseUrl.replace(/\/$/, '');
this.apiKey = apiKey;
}
private async request<T>(
method: string,
path: string,
options: RequestInit = {}
): Promise<T> {
const headers: Record<string, string> = {
'Content-Type': 'application/json',
...options.headers as Record<string, string>,
};
if (this.apiKey) {
headers['X-API-Key'] = this.apiKey;
}
const response = await fetch(`${this.baseUrl}${path}`, {
method,
headers,
...options,
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail || 'API request failed');
}
return response.json();
}
/**
* List accidents with pagination and filters
*/
async listAccidents(
query: AccidentQuery = {}
): Promise<PaginatedResponse<Accident>> {
const params = new URLSearchParams();
if (query.page) params.append('page', query.page.toString());
if (query.pageSize) params.append('page_size', query.pageSize.toString());
if (query.severity) params.append('severity', query.severity);
if (query.startDate) params.append('start_date', query.startDate);
if (query.endDate) params.append('end_date', query.endDate);
const queryString = params.toString();
const path = `/api/v1/accidents${queryString ? `?${queryString}` : ''}`;
return this.request<PaginatedResponse<Accident>>('GET', path);
}
/**
* Get accident by ID
*/
async getAccident(eventId: string): Promise<Accident> {
return this.request<Accident>('GET', `/api/v1/accidents/${eventId}`);
}
/**
* Predict accident severity
*/
async predictSeverity(features: {
aircraftCategory: string;
phaseOfFlight: string;
weatherCondition: string;
pilotExperienceHours: number;
aircraftAgeYears: number;
}): Promise<{
predictedSeverity: string;
confidence: number;
contributingFactors: string[];
}> {
return this.request('POST', '/api/v1/predictions/severity', {
body: JSON.stringify(features),
});
}
}
// Usage example
const client = new NTSBClient(
'https://api.ntsb.example.com',
'your-api-key'
);
// Fetch accidents
const accidents = await client.listAccidents({
page: 1,
pageSize: 50,
severity: 'FATL',
startDate: '2023-01-01',
endDate: '2023-12-31'
});
console.log(`Found ${accidents.total} accidents`);- See SECURITY_BEST_PRACTICES.md for authentication hardening and vulnerability prevention
- See PERFORMANCE_OPTIMIZATION.md for API response time optimization
- See VISUALIZATION_DASHBOARDS.md for integrating APIs with dashboards
- See MODEL_DEPLOYMENT_GUIDE.md for ML model serving patterns
This guide covered:
- RESTful Design: Resource-oriented URLs, HTTP semantics, pagination
- FastAPI: Production app structure, Pydantic validation, lifespan management
- Authentication: JWT tokens, OAuth2, API keys with scope-based authorization
- Rate Limiting: Token bucket and sliding window algorithms with Redis
- Versioning: URL-based, header-based, content negotiation strategies
- Documentation: Custom OpenAPI schemas with examples and security definitions
- SDKs: Auto-generated Python and TypeScript client libraries
Key Takeaways:
- FastAPI: 3-5x faster than Flask for I/O-bound workloads
- JWT: Stateless authentication, 10-100x fewer database queries vs sessions
- Token Bucket: Allows burst traffic (20% over rate) with average enforcement
- API Keys: Simpler than OAuth2, ideal for service-to-service auth
- Rate Limiting: Reduces abuse by 95%+, protects from DDoS
Performance Benchmarks:
- FastAPI serves 10,000+ requests/second on single core
- Redis rate limiter adds <1ms latency overhead
- JWT validation: ~0.5ms per request
- Proper caching reduces response time from 500ms to 50ms