Skip to content

Latest commit

 

History

History
873 lines (658 loc) · 24.8 KB

File metadata and controls

873 lines (658 loc) · 24.8 KB

Real-Time Collaboration

DataLineagePy now supports real-time collaboration for lineage editing and viewing using a simple WebSocket-based server and client.

Example: Real-Time Collaboration

# Start the server (in one terminal)
from datalineagepy.collaboration.realtime_collaboration import CollaborationServer
CollaborationServer().run()

# Start a client (in another terminal)
from datalineagepy.collaboration.realtime_collaboration import CollaborationClient
CollaborationClient().run()

See examples/realtime_collaboration_demo.py for a full working demo.

Features:

  • Real-time state sharing for lineage graphs
  • WebSocket-based server and client
  • Easy to extend for collaborative editing
  • Integrates with all DataLineagePy features

Version Control / Lineage Versioning

DataLineagePy now supports versioning, rollback, and diff utilities for lineage graphs.

Example: Lineage Versioning

from datalineagepy.core.tracker import LineageTracker
from datalineagepy.core.lineage_versioning import LineageVersionManager

tracker = LineageTracker(name="versioning_demo")
version_mgr = LineageVersionManager()

# Simulate lineage changes
tracker.create_node("data", "dataset_v1")
version_mgr.save_version(tracker.export_graph())
tracker.create_node("data", "dataset_v2")
version_mgr.save_version(tracker.export_graph())

# List versions
print("Available versions:", version_mgr.list_versions())

# Diff versions
print("Diff v1 to v2:", version_mgr.diff_versions(0, 1))

# Rollback
restored = version_mgr.rollback(0)
print("Rolled back to version 1. Node names:", [n['name'] for n in restored['nodes']])

See examples/lineage_versioning_example.py for a full working demo.

Features:

  • Save, list, and rollback lineage graph versions
  • Diff between any two versions
  • Integrates with all DataLineagePy features

Custom Connector SDK

DataLineagePy now provides a production-ready SDK for building custom data connectors with full lineage tracking.

Example: Custom Connector

from datalineagepy.connectors.custom_connector_sdk import BaseCustomConnector

class MyCSVConnector(BaseCustomConnector):
    def connect(self, file_path):
        self.file_path = file_path
        print(f"Connected to CSV file: {file_path}")

    def execute(self, operation: str, *args, **kwargs):
        if operation == "read":
            with open(self.file_path, "r") as f:
                data = f.read()
            node = self.tracker.create_node("csv_read", self.file_path)
            node.add_metadata("operation", operation)
            return data
        else:
            raise NotImplementedError(f"Operation '{operation}' not supported.")

    def close(self):
        print("Connection closed.")

# Usage
connector = MyCSVConnector(name="csv_connector_demo")
connector.connect("test_data.csv")
data = connector.execute("read")
print("Read data:", data[:50], "...")
connector.close()
print("Exported lineage:", connector.export_lineage())

See examples/custom_connector_example.py for a full working demo.

Features:

  • Easy base class for custom connectors
  • Full lineage tracking for all operations
  • Simple export of lineage graph
  • Integrates with all DataLineagePy features

Monitoring & Alerting Integrations

DataLineagePy now includes production-ready monitoring and alerting integrations for Slack and Email.

Example: Monitoring & Alerting

from datalineagepy.core.performance import PerformanceMonitor
from datalineagepy.alerting.integrations import send_slack_alert, send_email_alert
from datalineagepy.core.tracker import LineageTracker
import os

tracker = LineageTracker(name="alerting_demo")
monitor = PerformanceMonitor(tracker)

def slow_op():
    import time; time.sleep(2)
    return "done"

monitor.start_monitoring()
monitor.time_operation("slow_op", slow_op)
monitor.stop_monitoring()
summary = monitor.get_performance_summary()

# Send Slack alert if operation is slow
slack_webhook = os.getenv("SLACK_WEBHOOK_URL")
if slack_webhook and summary['total_execution_time'] > 1.0:
    send_slack_alert(slack_webhook, f"[ALERT] Slow operation detected: {summary['total_execution_time']:.2f}s")

# Send Email alert if memory usage is high
smtp_server = os.getenv("SMTP_SERVER")
smtp_port = int(os.getenv("SMTP_PORT", "465"))
sender = os.getenv("ALERT_EMAIL_SENDER")
password = os.getenv("ALERT_EMAIL_PASSWORD")
recipient = os.getenv("ALERT_EMAIL_RECIPIENT")
if sender and password and recipient and summary['current_memory_usage'] > 100:
    send_email_alert(smtp_server, smtp_port, sender, password, recipient,
                    "[ALERT] High Memory Usage",
                    f"Current memory usage: {summary['current_memory_usage']:.2f} MB")

See examples/monitoring_alerting_demo.py for a full working demo.

Features:

  • Performance monitoring for all operations
  • Slack alerting via webhook
  • Email alerting via SMTP
  • Easy integration with production workflows

Advanced Security: RBAC & Encryption

DataLineagePy provides enterprise-grade security with full Role-Based Access Control (RBAC) and at-rest encryption using AES-256-GCM with master key management.

RBAC Example

from datalineagepy.security.rbac import RBACManager
rbac = RBACManager()
rbac.add_role('admin', ['read', 'write', 'delete'])
rbac.add_role('analyst', ['read'])
rbac.add_user('alice', ['admin'])
rbac.add_user('bob', ['analyst'])
print('Alice can write:', rbac.check_access('alice', 'write'))  # True
print('Bob can write:', rbac.check_access('bob', 'write'))      # False

Encryption Example

import os
from datalineagepy.security.encryption.data_encryption import EncryptionManager
os.environ['MASTER_ENCRYPTION_KEY'] = 'supersecretkey1234567890123456'  # 32 chars for AES-256
enc_mgr = EncryptionManager()
secret = "Sensitive DataLineagePy data!"
encrypted = enc_mgr.encrypt_sensitive_data(secret)
decrypted = enc_mgr.decrypt_sensitive_data(encrypted)
print('Original:', secret)
print('Encrypted:', encrypted)
print('Decrypted:', decrypted)

See examples/security_rbac_encryption_demo.py for a full working demo.

Features:

  • Role-based access control (RBAC) for users and roles
  • AES-256-GCM encryption with master key and key rotation
  • Field-level encryption and compliance-ready audit trail
  • Easy integration with all DataLineagePy backends

Production Deployment Guide

🚀 Enterprise Production Deployment

This guide covers best practices for deploying DataLineagePy in production environments, based on our Phase 3 performance analysis and enterprise requirements.

📊 Production Readiness Score: 88.5/100

DataLineagePy achieved excellent production readiness in our comprehensive testing:

  • Performance: 75.4/100
  • Memory Optimization: 100/100 (Perfect)
  • Competitive Position: 87.5/100
  • Overall Enterprise Readiness: 88.5/100

🎯 Deployment Architecture

Small Scale Deployment (< 10,000 rows/day)

# Simple single-process deployment
from datalineagepy import LineageTracker, LineageDataFrame
import pandas as pd

# Initialize tracker with production config
tracker = LineageTracker(
    name="production_tracker",
    config={
        "memory_optimization": True,
        "performance_monitoring": True,
        "cleanup_interval": 1000,  # operations
        "max_graph_size": 50000    # nodes
    }
)

# Production data processing
def process_data_batch(data_source: str) -> None:
    """Production data processing function."""
    df = pd.read_csv(data_source)
    ldf = LineageDataFrame(df, f"batch_{data_source}", tracker)

    # Your data transformations
    processed = ldf.filter(ldf._df['status'] == 'active')
    result = processed.groupby('category').agg({
        'revenue': 'sum',
        'count': 'count'
    })

    # Export with lineage
    result.to_csv(f"output_{data_source}.csv")

    # Optional: Cleanup for memory efficiency
    if len(tracker.nodes) > 10000:
        tracker.cleanup_old_nodes(keep_recent=5000)

Medium Scale Deployment (10,000 - 100,000 rows/day)

# Multi-process deployment with memory management
import multiprocessing as mp
from datalineagepy import LineageTracker
from datalineagepy.core.performance import PerformanceMonitor
import logging

class ProductionPipeline:
    """Production-grade data pipeline with monitoring."""

    def __init__(self, config: dict):
        self.config = config
        self.tracker = LineageTracker(name="production")
        self.performance_monitor = PerformanceMonitor(self.tracker)
        self.setup_logging()

    def setup_logging(self):
        """Configure production logging."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('datalineage_production.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def process_batch(self, batch_id: str, data: pd.DataFrame):
        """Process a single data batch with monitoring."""
        self.logger.info(f"Processing batch {batch_id}")

        try:
            # Start performance monitoring
            self.performance_monitor.start_monitoring()

            # Create lineage dataframe
            ldf = LineageDataFrame(data, f"batch_{batch_id}", self.tracker)

            # Your business logic here
            result = self.apply_business_logic(ldf)

            # Performance check
            perf_summary = self.performance_monitor.get_performance_summary()
            if perf_summary['average_execution_time'] > 1.0:  # 1 second threshold
                self.logger.warning(f"Slow batch processing: {batch_id}")

            return result

        except Exception as e:
            self.logger.error(f"Error processing batch {batch_id}: {str(e)}")
            raise
        finally:
            self.performance_monitor.stop_monitoring()

    def apply_business_logic(self, ldf):
        """Your business transformation logic."""
        # Data cleaning
        cleaned = ldf.filter(ldf._df['quality_score'] > 0.8)

        # Feature engineering
        enhanced = cleaned.assign(
            revenue_per_unit=cleaned._df['revenue'] / cleaned._df['quantity']
        )

        # Aggregation
        summary = enhanced.groupby(['region', 'product_category']).agg({
            'revenue': ['sum', 'mean'],
            'quantity': 'sum',
            'revenue_per_unit': 'mean'
        })

        return summary

    def run_production_pipeline(self, data_batches: list):
        """Run the complete production pipeline."""
        results = []

        for batch_id, data in data_batches:
            result = self.process_batch(batch_id, data)
            results.append(result)

            # Memory management
            if len(self.tracker.nodes) > 50000:
                self.cleanup_tracker()

        # Generate production report
        self.generate_production_report()
        return results

    def cleanup_tracker(self):
        """Clean up tracker to prevent memory issues."""
        self.logger.info("Performing tracker cleanup")
        initial_nodes = len(self.tracker.nodes)

        # Keep only recent 10,000 nodes
        self.tracker.cleanup_old_nodes(keep_recent=10000)

        final_nodes = len(self.tracker.nodes)
        self.logger.info(f"Cleaned up {initial_nodes - final_nodes} nodes")

    def generate_production_report(self):
        """Generate production monitoring report."""
        perf_summary = self.performance_monitor.get_performance_summary()

        report = {
            'timestamp': datetime.now().isoformat(),
            'total_operations': perf_summary['total_operations'],
            'success_rate': perf_summary['successful_operations'] / perf_summary['total_operations'] * 100,
            'average_execution_time': perf_summary['average_execution_time'],
            'memory_usage': perf_summary['current_memory_usage'],
            'graph_stats': self.tracker.get_graph_stats()
        }

        # Save report
        with open(f"production_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f:
            json.dump(report, f, indent=2)

        self.logger.info(f"Production report saved: {report}")

Large Scale Deployment (> 100,000 rows/day)

# Distributed deployment with chunking and optimization
from datalineagepy import LineageTracker
from datalineagepy.benchmarks import MemoryProfiler
import asyncio
import aiofiles

class DistributedLineagePipeline:
    """Large-scale distributed lineage processing."""

    def __init__(self, chunk_size: int = 50000):
        self.chunk_size = chunk_size
        self.trackers = {}  # Multiple trackers for parallel processing
        self.memory_profiler = MemoryProfiler()

    async def process_large_dataset(self, dataset_path: str, output_path: str):
        """Process large datasets with chunking."""

        # Memory check before starting
        memory_health = self.memory_profiler.check_memory_health()
        if memory_health['risk_level'] == 'high':
            raise RuntimeError("Insufficient memory for large dataset processing")

        # Process in chunks
        chunk_results = []
        chunk_id = 0

        for chunk in pd.read_csv(dataset_path, chunksize=self.chunk_size):
            chunk_result = await self.process_chunk(chunk, chunk_id)
            chunk_results.append(chunk_result)
            chunk_id += 1

            # Memory management after each chunk
            if chunk_id % 10 == 0:  # Every 10 chunks
                await self.cleanup_memory()

        # Combine results
        final_result = pd.concat(chunk_results, ignore_index=True)
        final_result.to_csv(output_path, index=False)

        return final_result

    async def process_chunk(self, chunk: pd.DataFrame, chunk_id: int):
        """Process a single chunk asynchronously."""
        tracker_name = f"chunk_tracker_{chunk_id}"

        # Create dedicated tracker for this chunk
        tracker = LineageTracker(name=tracker_name)
        self.trackers[chunk_id] = tracker

        # Process chunk
        ldf = LineageDataFrame(chunk, f"chunk_{chunk_id}", tracker)

        # Apply transformations
        result = await self.async_transform(ldf)

        return result

    async def async_transform(self, ldf):
        """Asynchronous data transformation."""
        # Simulate async I/O operations
        await asyncio.sleep(0.01)

        # Your transformation logic
        filtered = ldf.filter(ldf._df['valid'] == True)
        transformed = filtered.assign(
            processed_date=pd.Timestamp.now()
        )

        return transformed

    async def cleanup_memory(self):
        """Cleanup memory periodically."""
        # Remove old trackers
        trackers_to_remove = list(self.trackers.keys())[:-5]  # Keep last 5

        for tracker_id in trackers_to_remove:
            del self.trackers[tracker_id]

        # Force garbage collection
        import gc
        gc.collect()

        # Log memory status
        memory_status = self.memory_profiler.get_memory_usage()
        print(f"Memory after cleanup: {memory_status:.1f} MB")

⚡ Performance Optimization

Memory Optimization Strategies

Based on our 100/100 memory optimization score, here are proven strategies:

1. Tracker Cleanup

# Regular cleanup for long-running processes
def setup_automatic_cleanup(tracker, max_nodes=50000):
    """Setup automatic cleanup based on node count."""

    def cleanup_callback():
        if len(tracker.nodes) > max_nodes:
            tracker.cleanup_old_nodes(keep_recent=max_nodes // 2)

    # Register cleanup callback
    tracker.register_cleanup_callback(cleanup_callback)

2. Batch Processing

# Process data in batches to control memory
def process_in_batches(data, batch_size=10000):
    """Process large datasets in memory-efficient batches."""

    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]

        # Create temporary tracker for batch
        batch_tracker = LineageTracker(name=f"batch_{i}")
        ldf = LineageDataFrame(batch, f"batch_{i}", batch_tracker)

        # Process batch
        result = process_batch(ldf)

        # Export immediately
        result.to_csv(f"output_batch_{i}.csv")

        # Cleanup
        del batch_tracker, ldf
        gc.collect()

3. Memory Monitoring

# Continuous memory monitoring
from datalineagepy.benchmarks import MemoryProfiler

profiler = MemoryProfiler()

def monitor_memory_usage(tracker):
    """Monitor memory usage during processing."""

    # Check memory health
    health = profiler.check_memory_health()

    if health['risk_level'] == 'high':
        # Emergency cleanup
        tracker.cleanup_old_nodes(keep_recent=1000)
        gc.collect()

        print(f"Emergency cleanup performed. Memory: {health['current_memory']:.1f} MB")

    return health

Speed Optimization Strategies

Based on our performance analysis (76-165% overhead), optimize for your use case:

1. Selective Lineage Tracking

# Only track critical operations
tracker = LineageTracker(
    config={
        "track_operations": ["transform", "aggregate", "join"],
        "skip_operations": ["filter", "select"],  # Skip lightweight operations
        "column_level_tracking": False  # Disable for speed if not needed
    }
)

2. Asynchronous Processing

import asyncio

async def async_lineage_processing(data_sources):
    """Process multiple data sources asynchronously."""

    tasks = []
    for source in data_sources:
        task = asyncio.create_task(process_data_source(source))
        tasks.append(task)

    results = await asyncio.gather(*tasks)
    return results

🔒 Security & Compliance

Data Privacy

# Configure privacy settings
tracker = LineageTracker(
    config={
        "privacy_mode": True,
        "anonymize_data": True,
        "exclude_sensitive_columns": ["ssn", "credit_card", "personal_id"],
        "encryption": True
    }
)

Audit Compliance

# Comprehensive audit logging
import logging

# Setup audit logger
audit_logger = logging.getLogger('datalineage_audit')
audit_handler = logging.FileHandler('datalineage_audit.log')
audit_handler.setFormatter(logging.Formatter(
    '%(asctime)s - %(levelname)s - %(message)s'
))
audit_logger.addHandler(audit_handler)

# Track all operations for compliance
def compliant_processing(data, user_id, purpose):
    """Process data with full audit trail."""

    # Log access
    audit_logger.info(f"Data access: user={user_id}, purpose={purpose}")

    # Create tracker with audit info
    tracker = LineageTracker(
        name=f"audit_{user_id}_{datetime.now().isoformat()}",
        metadata={
            "user_id": user_id,
            "purpose": purpose,
            "compliance_level": "GDPR",
            "retention_period": "7_years"
        }
    )

    # Process with full lineage
    ldf = LineageDataFrame(data, "sensitive_data", tracker)
    result = ldf.filter(ldf._df['consent'] == True)

    # Log completion
    audit_logger.info(f"Processing complete: {len(result)} records processed")

    return result

📊 Monitoring & Alerting

Health Checks

def production_health_check(tracker):
    """Comprehensive production health check."""

    health_report = {
        "timestamp": datetime.now().isoformat(),
        "memory_usage": get_memory_usage(),
        "graph_size": len(tracker.nodes),
        "operations_count": len(tracker.operations),
        "performance_score": None
    }

    # Memory check
    if health_report["memory_usage"] > 1000:  # 1GB threshold
        health_report["alerts"] = ["HIGH_MEMORY_USAGE"]

    # Graph size check
    if health_report["graph_size"] > 100000:  # 100k nodes threshold
        health_report["alerts"] = health_report.get("alerts", []) + ["LARGE_GRAPH_SIZE"]

    # Performance check
    from datalineagepy.benchmarks import PerformanceBenchmarkSuite
    benchmark = PerformanceBenchmarkSuite()
    perf_score = benchmark.get_performance_score()
    health_report["performance_score"] = perf_score

    if perf_score < 70:
        health_report["alerts"] = health_report.get("alerts", []) + ["LOW_PERFORMANCE"]

    return health_report

Automated Reporting

def setup_automated_reporting(tracker, interval_hours=24):
    """Setup automated production reporting."""

    import schedule
    import time

    def generate_daily_report():
        """Generate daily production report."""

        # Performance metrics
        from datalineagepy.benchmarks import PerformanceBenchmarkSuite, MemoryProfiler

        perf_benchmark = PerformanceBenchmarkSuite()
        memory_profiler = MemoryProfiler()

        report = {
            "date": datetime.now().date().isoformat(),
            "performance_score": perf_benchmark.get_performance_score(),
            "memory_optimization": memory_profiler.get_optimization_score(),
            "graph_stats": tracker.get_graph_stats(),
            "operations_summary": tracker.get_operations_summary()
        }

        # Save report
        report_file = f"daily_report_{report['date']}.json"
        with open(report_file, 'w') as f:
            json.dump(report, f, indent=2)

        print(f"Daily report saved: {report_file}")

    # Schedule daily reports
    schedule.every(interval_hours).hours.do(generate_daily_report)

    # Run scheduler
    while True:
        schedule.run_pending()
        time.sleep(3600)  # Check every hour

🚀 Deployment Patterns

Docker Deployment

# Dockerfile for DataLineagePy production
FROM python:3.9-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt /app/
WORKDIR /app
RUN pip install -r requirements.txt

# Copy application
COPY . /app/

# Environment variables
ENV PYTHONPATH=/app
ENV DATALINEAGE_ENV=production
ENV DATALINEAGE_LOG_LEVEL=INFO

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python health_check.py

# Run application
CMD ["python", "production_pipeline.py"]

Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: datalineage-pipeline
spec:
  replicas: 3
  selector:
    matchLabels:
      app: datalineage-pipeline
  template:
    metadata:
      labels:
        app: datalineage-pipeline
    spec:
      containers:
        - name: datalineage
          image: datalineage:latest
          env:
            - name: DATALINEAGE_ENV
              value: "production"
            - name: MEMORY_LIMIT
              value: "2Gi"
          resources:
            requests:
              memory: "1Gi"
              cpu: "500m"
            limits:
              memory: "2Gi"
              cpu: "1000m"
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /ready
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 5

📈 Production Metrics

Key Performance Indicators (KPIs)

Monitor these metrics for production success:

  1. Performance Metrics

    • Average operation time: < 100ms
    • Memory usage: < 500MB for typical workloads
    • Success rate: > 99.5%
  2. Lineage Quality Metrics

    • Graph completeness: > 95%
    • Column-level coverage: > 90%
    • Operation accuracy: 100%
  3. System Health Metrics

    • Memory leaks: 0 detected
    • Error rate: < 0.1%
    • Uptime: > 99.9%

Alerting Thresholds

PRODUCTION_THRESHOLDS = {
    "memory_usage_mb": 1000,
    "operation_time_ms": 500,
    "graph_size_nodes": 100000,
    "error_rate_percent": 0.5,
    "performance_score_min": 70
}

🎯 Production Checklist

Pre-Deployment

  • Performance benchmarking completed
  • Memory profiling validated
  • Security review passed
  • Monitoring configured
  • Backup procedures established
  • Disaster recovery tested

Deployment

  • Staged rollout plan
  • Health checks configured
  • Logging properly set up
  • Metrics collection enabled
  • Alerting rules configured
  • Documentation updated

Post-Deployment

  • Performance monitoring active
  • Error tracking functional
  • Capacity planning updated
  • Team training completed
  • Runbook documentation ready
  • Incident response procedures tested

📚 Additional Resources


Ready for production deployment? Follow this guide step-by-step to ensure a successful enterprise deployment of DataLineagePy!