diff --git a/tests/e2e/README.md b/tests/e2e/README.md new file mode 100644 index 0000000..9395e7b --- /dev/null +++ b/tests/e2e/README.md @@ -0,0 +1,404 @@ +# Kruize Optimizer E2E Tests + +Self-contained end-to-end test framework for Kruize Optimizer that handles complete deployment and testing workflow. + +## Overview + +This E2E test framework: +- ✅ Clones required repositories into [`tests/e2e/.repos`](tests/e2e/.repos) +- ✅ Uses sparse benchmark checkout for sysbench manifests only +- ✅ Creates Kubernetes cluster (Kind/OpenShift) +- ✅ Deploys Prometheus for monitoring +- ✅ Enables cluster monitoring immediately after Prometheus installation +- ✅ Deploys sysbench workload +- ✅ Deploys using operator mode or manifest mode +- ✅ Runs comprehensive E2E tests +- ✅ Cleans up all resources + +**No external script dependencies** - everything is handled in Python! + +## Architecture + +``` +tests/e2e/ +├── run_e2e_tests.py # Main test runner +├── config/ +│ ├── test_config.yaml # Test configuration +│ └── kind-config.yaml # Kind cluster config +├── utils/ +│ ├── deployment_manager.py # Deployment orchestration +│ ├── cluster_utils.py # Kubernetes operations +│ ├── kruize_utils.py # API clients +│ └── log_utils.py # Log parsing +└── tests/ + ├── test_01_complete_workflow.py # 10 tests + ├── test_02_profiles.py # 10 tests + ├── test_03_bulk_jobs.py # 11 tests + └── test_04_webhook.py # 11 tests +``` + +## Prerequisites + +### System Requirements +- Python 3.8+ +- Docker +- kubectl +- Kind (for Kind cluster) or OpenShift CLI (for OpenShift) +- Git + +### Python Dependencies +```bash +pip install -r requirements.txt +``` + +Required packages: +- pytest +- pytest-html +- requests +- pyyaml +- kubernetes + +## Quick Start + +### Run E2E Tests on Kind Cluster (Operator Mode) +```bash +cd tests/e2e +python run_e2e_tests.py --cluster-type kind --mode operator +``` + +### Run E2E Tests on OpenShift (Operator Mode) +```bash +python run_e2e_tests.py --cluster-type openshift --mode operator +``` + +### Skip Cleanup (for debugging) +```bash +python run_e2e_tests.py --cluster-type kind --mode operator --skip-cleanup +``` + +## Configuration + +Edit [`tests/e2e/config/test_config.yaml`](tests/e2e/config/test_config.yaml) to customize: + +```yaml +cluster: + type: kind + name: kruize-e2e-test + namespace: monitoring + +workload: + name: test-sysbench + namespace: default + image: quay.io/kruize/sysbench:latest + +images: + kruize_operator: quay.io/kruize/kruize-operator:latest + kruize_optimizer: quay.io/kruize/kruize-optimizer:0.0.1 + kruize: quay.io/kruize/autotune_operator:latest + kruize_ui: quay.io/kruize/kruize-ui:latest +``` + +## Test Suites + +### 1. Complete Workflow Tests (`test_01_complete_workflow.py`) +Tests the entire deployment and initialization workflow: +- ✅ Cluster accessibility +- ✅ Namespace creation +- ✅ Operator deployment +- ✅ Database initialization +- ✅ Kruize service availability +- ✅ Optimizer service availability +- ✅ Benchmark deployment +- ✅ Health checks +- ✅ Service endpoints +- ✅ Pod status verification + +### 2. Profile Tests (`test_02_profiles.py`) +Tests profile installation and verification: +- ✅ Metric profile installation +- ✅ Metadata profile installation +- ✅ Layer installation +- ✅ Profile listing via API +- ✅ Profile verification in logs +- ✅ Default profiles loaded +- ✅ Custom profiles support + +### 3. Bulk Job Tests (`test_03_bulk_jobs.py`) +Tests bulk job triggering and webhook workflow: +- ✅ Bulk job triggering +- ✅ Webhook callback handling +- ✅ Experiment auto-creation +- ✅ Workload monitoring +- ✅ Job status tracking +- ✅ Experiment validation +- ✅ Recommendation generation +- ✅ End-to-end workflow + +### 4. Webhook Tests (`test_04_webhook.py`) +Tests webhook negative scenarios: +- ✅ Invalid JSON payload +- ✅ Null payload +- ✅ Missing required fields +- ✅ Invalid data types +- ✅ Empty arrays +- ✅ Malformed requests +- ✅ Error handling +- ✅ Response validation + +## Deployment Modes + +### Operator Mode (Default) +Deploys using kruize-operator which manages all components: +- Kruize database (PostgreSQL) +- Kruize service +- Kruize optimizer +- Kruize UI + +```bash +python run_e2e_tests.py --mode operator +``` + +### Manifest Mode +Deploys Kruize via the autotune manifest flow and deploys optimizer separately using this project's kustomize files: +```bash +python run_e2e_tests.py --mode manifest +``` + +## Cluster Types + +### Kind (Default) +Local Kubernetes cluster using Docker: +```bash +python run_e2e_tests.py --cluster-type kind +``` + +### OpenShift +Red Hat OpenShift cluster: +```bash +python run_e2e_tests.py --cluster-type openshift +``` + +### Minikube +Local Kubernetes cluster using VM: +```bash +python run_e2e_tests.py --cluster-type minikube +``` + +## Workflow + +The E2E test runner follows this workflow: + +1. **Setup Phase** + - Clone autotune repository into [`tests/e2e/.repos`](tests/e2e/.repos) + - Sparse checkout only the sysbench benchmark manifests into [`tests/e2e/.repos`](tests/e2e/.repos) + - Create Kubernetes cluster (Kind/OpenShift) + - Create namespaces + +2. **Deployment Phase** + - Deploy Prometheus monitoring + - Enable monitoring immediately after Prometheus installation + - Deploy sysbench workload + - Label sysbench for auto-experiment creation + - Deploy via operator mode, or deploy Kruize plus optimizer in manifest mode + - Wait for all required pods to be ready + +3. **Port Forward Phase** (Kind only) + - Setup port-forward for kruize service (8080) + - Setup port-forward for optimizer service (8081) + +4. **Wait Phase** + - Wait for optimizer to create experiments (configurable, default 120s) + +5. **Test Phase** + - Run pytest test suites + - Generate HTML test report + +6. **Cleanup Phase** + - Terminate port-forward processes + - Delete Kubernetes cluster (Kind) + - Remove cloned repositories + - Clean up temporary files + +## Test Reports + +After running tests, an HTML report is generated: +``` +test-report-{cluster_type}-{mode}.html +``` + +Example: +- `test-report-kind-operator.html` +- `test-report-openshift-operator.html` + +## Debugging + +### View Logs +```bash +# Kruize logs +kubectl logs -l app=kruize -n monitoring + +# Optimizer logs +kubectl logs -l app=kruize-optimizer -n monitoring + +# Operator logs +kubectl logs deployment/kruize-operator -n monitoring +``` + +### Skip Cleanup +Keep cluster running after tests for debugging: +```bash +python run_e2e_tests.py --skip-cleanup +``` + +### Run Specific Tests +```bash +# Run only workflow tests +pytest tests/test_01_complete_workflow.py -v + +# Run only webhook tests +pytest tests/test_04_webhook.py -v + +# Run specific test +pytest tests/test_01_complete_workflow.py::test_cluster_accessible -v +``` + +## Troubleshooting + +### Cluster Creation Fails +```bash +# Check Docker is running +docker ps + +# Check Kind is installed +kind version + +# Delete existing cluster +kind delete cluster --name kruize-test +``` + +### Prometheus Deployment Fails +```bash +# Check Prometheus pods +kubectl get pods -n monitoring + +# Check Prometheus logs +kubectl logs -l app=prometheus -n monitoring +``` + +### Operator Deployment Fails +```bash +# Check operator logs +kubectl logs deployment/kruize-operator -n monitoring + +# Check CRDs +kubectl get crd kruizes.kruize.io + +# Check operator status +kubectl get deployment kruize-operator -n monitoring +``` + +### Port Forward Issues (Kind) +```bash +# Kill existing port-forwards +pkill -f "kubectl port-forward" + +# Manually setup port-forward +kubectl port-forward service/kruize 8080:8080 -n monitoring +``` + +### Tests Fail +```bash +# Check all pods are running +kubectl get pods -n monitoring +kubectl get pods -n default + +# Check services +kubectl get svc -n monitoring + +# Check logs +kubectl logs -l app=kruize-optimizer -n monitoring --tail=100 +``` + +## CI/CD Integration + +### GitHub Actions Example +```yaml +name: E2E Tests + +on: [push, pull_request] + +jobs: + e2e-tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: '3.9' + + - name: Install dependencies + run: | + pip install -r tests/e2e/requirements.txt + + - name: Run E2E tests + run: | + cd tests/e2e + python run_e2e_tests.py --cluster-type kind --mode operator + + - name: Upload test report + if: always() + uses: actions/upload-artifact@v3 + with: + name: test-report + path: tests/e2e/test-report-*.html +``` + +## Development + +### Adding New Tests + +1. Create test file in `tests/` directory +2. Use pytest fixtures for setup/teardown +3. Use utility classes from `utils/` for common operations +4. Follow naming convention: `test_XX_description.py` + +Example: +```python +import pytest +from utils.kruize_utils import OptimizerAPIClient + +def test_new_feature(optimizer_client): + """Test new optimizer feature""" + response = optimizer_client.get_status() + assert response.status_code == 200 +``` + +### Adding New Deployment Steps + +Edit `utils/deployment_manager.py` to add new deployment logic: +```python +def deploy_new_component(self): + """Deploy new component""" + logger.info("Deploying new component...") + # Add deployment logic +``` + +## Contributing + +1. Fork the repository +2. Create feature branch +3. Add tests for new features +4. Ensure all tests pass +5. Submit pull request + +## License + +Apache License 2.0 + +## Support + +For issues and questions: +- GitHub Issues: https://github.com/kruize/kruize-optimizer/issues +- Slack: #kruize on Kubernetes Slack diff --git a/tests/e2e/config/kind-config.yaml b/tests/e2e/config/kind-config.yaml new file mode 100644 index 0000000..24e391f --- /dev/null +++ b/tests/e2e/config/kind-config.yaml @@ -0,0 +1,20 @@ +# Kind cluster configuration for E2E tests +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +nodes: + - role: control-plane + extraPortMappings: + # Expose Kruize service + - containerPort: 30080 + hostPort: 8080 + protocol: TCP + # Expose Optimizer service + - containerPort: 30081 + hostPort: 8081 + protocol: TCP + # Expose Prometheus + - containerPort: 30090 + hostPort: 9090 + protocol: TCP + + diff --git a/tests/e2e/config/test_config.yaml b/tests/e2e/config/test_config.yaml new file mode 100644 index 0000000..e6ff2e0 --- /dev/null +++ b/tests/e2e/config/test_config.yaml @@ -0,0 +1,53 @@ +# E2E Test Configuration for Kruize Optimizer + +# Cluster configuration +cluster: + type: kind # kind or openshift + name: kruize-e2e-test + namespace: monitoring + # Namespace mapping for optimizer pod based on cluster type + optimizer_namespace: + kind: monitoring + openshift: openshift-tuning + +# Image configuration +images: + kruize_operator: "quay.io/kruize/kruize-operator:latest" + kruize_optimizer: "quay.io/kruize/kruize-optimizer:0.0.1" + kruize: "quay.io/kruize/autotune_operator:latest" + kruize_ui: "quay.io/kruize/kruize-ui:latest" + +# Test timeouts (in seconds) +timeouts: + pod_ready: 300 + operator_init: 120 + profile_install: 60 + bulk_job_trigger: 180 + webhook_callback: 120 + +# Optimizer configuration +optimizer: + bulk_scheduler_interval: "2m" # Faster for testing + bulk_scheduler_startup_delay: "30s" + measurement_duration: "15min" + target_labels: + kruize/autotune: "enabled" + +# Test workload configuration +workload: + name: "test-sysbench" + namespace: "default" + image: "quay.io/kruize/sysbench:latest" + +# API endpoints +api: + kruize_port: 8080 + optimizer_port: 8081 + +# Logging +logging: + level: INFO # DEBUG, INFO, WARNING, ERROR + save_pod_logs: true + log_dir: "test_results" + + diff --git a/tests/e2e/requirements.txt b/tests/e2e/requirements.txt new file mode 100644 index 0000000..5664a9b --- /dev/null +++ b/tests/e2e/requirements.txt @@ -0,0 +1,8 @@ +# Python dependencies for E2E tests +pytest>=7.4.0 +pytest-timeout>=2.1.0 +pytest-html>=3.2.0 +pyyaml>=6.0 +requests>=2.31.0 +kubernetes>=27.2.0 +urllib3>=2.0.0 \ No newline at end of file diff --git a/tests/e2e/run_e2e_tests.py b/tests/e2e/run_e2e_tests.py new file mode 100755 index 0000000..9d18e3f --- /dev/null +++ b/tests/e2e/run_e2e_tests.py @@ -0,0 +1,370 @@ +#!/usr/bin/env python3 +""" +E2E Test Runner for Kruize Optimizer + +Self-contained test runner that: +1. Clones required repos under `tests/e2e` +2. Creates Kind/OpenShift cluster +3. Deploys Prometheus +4. Deploys via operator or manifest mode +5. Deploys benchmarks (sysbench) +6. Runs E2E tests +7. Cleans up resources + +Usage: + python run_e2e_tests.py --cluster-type kind --mode operator + python run_e2e_tests.py --cluster-type openshift --mode manifest +""" +import argparse +import logging +import sys +import time +from pathlib import Path + +import pytest +import yaml + +from utils.deployment_manager import DeploymentManager +from utils.cluster_utils import ClusterManager +from utils.kruize_utils import KruizeAPIClient, OptimizerAPIClient + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class E2ETestRunner: + """Main E2E test runner""" + + def __init__(self, cluster_type: str, mode: str, config_file: Path): + self.cluster_type = cluster_type + self.mode = mode # 'operator' or 'manifest' + self.config = self.load_config(config_file) + + # Setup paths + self.test_dir = Path(__file__).parent + self.project_root = self.test_dir.parent.parent + + # Initialize managers + namespace = self.config.get('cluster', {}).get('namespace', 'monitoring') + work_dir = self.test_dir / ".repos" + self.deployment_mgr = DeploymentManager(cluster_type, namespace, work_dir) + self.cluster_mgr = None + + # Port forward processes + self.port_forward_processes = [] + + def load_config(self, config_file: Path) -> dict: + """Load test configuration""" + with open(config_file) as f: + return yaml.safe_load(f) + + def setup_cluster(self): + """Setup Kubernetes cluster""" + logger.info(f"Setting up {self.cluster_type} cluster...") + + if self.cluster_type == "kind": + cluster_name = self.config.get('cluster', {}).get('name', 'kruize-test') + kind_config = self.test_dir / "config" / "kind-config.yaml" + + # Delete existing cluster if any + self.deployment_mgr.delete_kind_cluster(cluster_name) + + # Create new cluster + self.deployment_mgr.create_kind_cluster(cluster_name, kind_config) + + elif self.cluster_type == "openshift": + logger.info("Using existing OpenShift cluster") + # Assume OpenShift cluster is already running + + else: + raise ValueError(f"Unsupported cluster type: {self.cluster_type}") + + # Initialize cluster manager + cluster_name = self.config.get('cluster', {}).get('name', 'kruize-test') + namespace = self.config.get('cluster', {}).get('namespace', 'monitoring') + self.cluster_mgr = ClusterManager(self.cluster_type, cluster_name, namespace) + + logger.info("Cluster setup complete") + + def deploy_components(self): + """Deploy all required components""" + logger.info("Deploying components...") + + # Clone repositories + self.deployment_mgr.clone_repositories() + + namespace = self.config.get('cluster', {}).get('namespace', 'monitoring') + app_namespace = self.config.get('workload', {}).get('namespace', 'default') + + logger.info(f"Creating namespace: {namespace}") + self.deployment_mgr.create_namespace(namespace) + + if app_namespace != namespace: + logger.info(f"Creating namespace: {app_namespace}") + self.deployment_mgr.create_namespace(app_namespace) + + # Deploy Prometheus first + self.deployment_mgr.deploy_prometheus() + + # Wait for Prometheus to be ready + logger.info("Waiting for Prometheus to be ready...") + time.sleep(30) + + # Enable cluster monitoring immediately after Prometheus installation + if self.cluster_type in ["kind", "minikube"]: + self.deployment_mgr.enable_kube_state_metrics_labels() + elif self.cluster_type == "openshift": + self.deployment_mgr.enable_user_workload_monitoring() + + # Deploy benchmarks before Kruize so workloads already exist + self.deploy_benchmarks() + + # Deploy kruize-operator or kruize + if self.mode == "operator": + self.deploy_operator_mode() + else: + self.deploy_manifest_mode() + + logger.info("All components deployed successfully") + + def deploy_operator_mode(self): + """Deploy using operator""" + logger.info("Deploying in operator mode...") + + operator_image = self.config.get('images', {}).get('kruize_operator') + optimizer_image = self.config.get('images', {}).get('kruize_optimizer') + + self.deployment_mgr.deploy_operator(operator_image, optimizer_image) + + # Wait for all operator-managed pods + namespace = self.config.get('cluster', {}).get('namespace', 'monitoring') + + logger.info("Waiting for kruize-db pod...") + self.deployment_mgr.wait_for_pod_ready("app=kruize-db", namespace) + + logger.info("Waiting for kruize pod...") + self.deployment_mgr.wait_for_pod_ready("app=kruize", namespace) + + logger.info("Waiting for kruize-optimizer pod...") + self.deployment_mgr.wait_for_pod_ready("app=kruize-optimizer", namespace) + + logger.info("Waiting for kruize-ui pod...") + self.deployment_mgr.wait_for_pod_ready("app=kruize-ui-nginx", namespace) + + logger.info("Operator mode deployment complete") + + def deploy_manifest_mode(self): + """Deploy using manifests (without operator)""" + logger.info("Deploying in manifest mode...") + + kruize_image = self.config.get('images', {}).get('kruize') + kruize_ui_image = self.config.get('images', {}).get('kruize_ui') + optimizer_image = self.config.get('images', {}).get('kruize_optimizer') + + self.deployment_mgr.deploy_kruize_manifest_mode( + kruize_image, + kruize_ui_image, + optimizer_image + ) + + namespace = self.config.get('cluster', {}).get('namespace', 'monitoring') + + logger.info("Waiting for kruize pod...") + self.deployment_mgr.wait_for_pod_ready("app=kruize", namespace) + + logger.info("Waiting for kruize-db pod...") + self.deployment_mgr.wait_for_pod_ready("app=kruize-db", namespace) + + logger.info("Waiting for kruize-optimizer pod...") + self.deployment_mgr.wait_for_pod_ready("app=kruize-optimizer", namespace) + + logger.info("Manifest mode deployment complete") + + def deploy_benchmarks(self): + """Deploy benchmark workloads""" + logger.info("Deploying benchmarks...") + + app_namespace = self.config.get('workload', {}).get('namespace', 'default') + + logger.info("Deploying sysbench...") + self.deployment_mgr.deploy_benchmarks("sysbench", app_namespace) + self.deployment_mgr.label_workloads( + ["sysbench"], + "kruize/autotune=enabled", + app_namespace + ) + + logger.info("Benchmarks deployed successfully") + + def setup_port_forwards(self): + """Setup port forwarding for services""" + if self.cluster_type != "kind": + logger.info("Port forwarding not needed for non-Kind clusters") + return + + logger.info("Setting up port forwards...") + + namespace = self.config.get('cluster', {}).get('namespace', 'monitoring') + + # Port forward kruize service + kruize_port = self.config.get('kruize_port', 8080) + process = self.deployment_mgr.setup_port_forward( + "kruize", kruize_port, 8080, namespace + ) + self.port_forward_processes.append(process) + + # Port forward optimizer service + optimizer_port = self.config.get('optimizer_port', 8081) + process = self.deployment_mgr.setup_port_forward( + "kruize-optimizer", optimizer_port, 8080, namespace + ) + self.port_forward_processes.append(process) + + logger.info("Port forwards established") + + def run_tests(self): + """Run pytest tests""" + logger.info("Running E2E tests...") + + # Set environment variables for tests + import os + os.environ['CLUSTER_TYPE'] = self.cluster_type + os.environ['DEPLOYMENT_MODE'] = self.mode + os.environ['KRUIZE_URL'] = f"localhost:{self.config.get('kruize_port', 8080)}" + os.environ['OPTIMIZER_URL'] = f"localhost:{self.config.get('optimizer_port', 8081)}" + + # Run pytest + test_dir = self.test_dir / "tests" + pytest_args = [ + str(test_dir), + "-v", + "--tb=short", + f"--html=test-report-{self.cluster_type}-{self.mode}.html", + "--self-contained-html" + ] + + result = pytest.main(pytest_args) + + return result + + def cleanup(self): + """Cleanup resources""" + logger.info("Cleaning up resources...") + + # Kill port forward processes + for process in self.port_forward_processes: + try: + process.terminate() + process.wait(timeout=5) + except Exception as e: + logger.warning(f"Error terminating port-forward: {e}") + + # Delete cluster if Kind + if self.cluster_type == "kind": + cluster_name = self.config.get('kind_cluster_name', 'kruize-test') + self.deployment_mgr.delete_kind_cluster(cluster_name) + + # Cleanup work directory + self.deployment_mgr.cleanup() + + logger.info("Cleanup complete") + + def run(self): + """Main execution flow""" + try: + logger.info("=" * 60) + logger.info("Starting Kruize Optimizer E2E Tests") + logger.info(f"Cluster Type: {self.cluster_type}") + logger.info(f"Deployment Mode: {self.mode}") + logger.info("=" * 60) + + # Setup cluster + self.setup_cluster() + + # Deploy components + self.deploy_components() + + # Setup port forwards + self.setup_port_forwards() + + # Wait for optimizer to create experiments + wait_time = self.config.get('optimizer_wait_duration', 120) + logger.info(f"Waiting {wait_time}s for optimizer to create experiments...") + time.sleep(wait_time) + + # Run tests + result = self.run_tests() + + logger.info("=" * 60) + if result == 0: + logger.info("E2E Tests PASSED") + else: + logger.error("E2E Tests FAILED") + logger.info("=" * 60) + + return result + + except Exception as e: + logger.error(f"E2E test execution failed: {e}", exc_info=True) + return 1 + + finally: + # Always cleanup + if not self.config.get('skip_cleanup', False): + self.cleanup() + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser( + description="Run Kruize Optimizer E2E Tests" + ) + + parser.add_argument( + '--cluster-type', + choices=['kind', 'openshift', 'minikube'], + default='kind', + help='Kubernetes cluster type' + ) + + parser.add_argument( + '--mode', + choices=['operator', 'manifest'], + default='operator', + help='Deployment mode' + ) + + parser.add_argument( + '--config', + type=Path, + default=Path(__file__).parent / "config" / "test_config.yaml", + help='Test configuration file' + ) + + parser.add_argument( + '--skip-cleanup', + action='store_true', + help='Skip cleanup after tests' + ) + + args = parser.parse_args() + + # Load config and update with CLI args + runner = E2ETestRunner(args.cluster_type, args.mode, args.config) + + if args.skip_cleanup: + runner.config['skip_cleanup'] = True + + # Run tests + result = runner.run() + + sys.exit(result) + + +if __name__ == "__main__": + main() + diff --git a/tests/e2e/test_webhook_responses.py b/tests/e2e/test_webhook_responses.py new file mode 100644 index 0000000..1d81f8c --- /dev/null +++ b/tests/e2e/test_webhook_responses.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +""" +Quick test script to check actual webhook responses +""" +import requests +import json + +BASE_URL = "http://localhost:8080" + +def test_response(name, payload, data=None): + """Test a webhook call and print the response""" + print(f"\n{'='*60}") + print(f"Test: {name}") + print(f"{'='*60}") + + if data: + print(f"Payload (raw): {data}") + response = requests.post( + f"{BASE_URL}/webhook", + data=data, + headers={'Content-Type': 'application/json'} + ) + else: + print(f"Payload (json): {json.dumps(payload, indent=2)}") + response = requests.post( + f"{BASE_URL}/webhook", + json=payload, + headers={'Content-Type': 'application/json'} + ) + + print(f"Status Code: {response.status_code}") + print(f"Response: {response.text}") + print(f"Headers: {dict(response.headers)}") + +# Test 1: Invalid JSON +test_response("Invalid JSON", None, data="{invalid json}") + +# Test 2: Null payload +test_response("Null payload", None, data="null") + +# Test 3: Empty array +test_response("Empty array", []) + +# Test 4: Missing summary +test_response("Missing summary", [{}]) + +# Test 5: Null jobId +test_response("Null jobId", [{ + "summary": { + "jobId": None, + "status": "COMPLETED", + "totalExperiments": 10, + "processedExperiments": 8, + "existingExperiments": 2 + } +}]) + +# Test 6: Empty jobId +test_response("Empty jobId", [{ + "summary": { + "jobId": "", + "status": "COMPLETED", + "totalExperiments": 10, + "processedExperiments": 8, + "existingExperiments": 2 + } +}]) + +# Test 7: Whitespace jobId +test_response("Whitespace jobId", [{ + "summary": { + "jobId": " ", + "status": "COMPLETED", + "totalExperiments": 10, + "processedExperiments": 8, + "existingExperiments": 2 + } +}]) + +# Test 8: Valid payload +test_response("Valid payload", [{ + "summary": { + "jobId": "test-valid-job-999", + "status": "COMPLETED", + "totalExperiments": 5, + "processedExperiments": 5, + "existingExperiments": 0 + } +}]) + +# Test 9: Without Content-Type +print(f"\n{'='*60}") +print(f"Test: Without Content-Type header") +print(f"{'='*60}") +payload = [{ + "summary": { + "jobId": "test-job-123", + "status": "COMPLETED", + "totalExperiments": 10, + "processedExperiments": 8, + "existingExperiments": 2 + } +}] +print(f"Payload: {json.dumps(payload, indent=2)}") +response = requests.post(f"{BASE_URL}/webhook", json=payload) +print(f"Status Code: {response.status_code}") +print(f"Response: {response.text}") + +# Made with Bob diff --git a/tests/e2e/tests/__init__.py b/tests/e2e/tests/__init__.py new file mode 100644 index 0000000..7c8bd9f --- /dev/null +++ b/tests/e2e/tests/__init__.py @@ -0,0 +1,4 @@ +""" +E2E test modules for Kruize Optimizer +""" + diff --git a/tests/e2e/tests/test_01_complete_workflow.py b/tests/e2e/tests/test_01_complete_workflow.py new file mode 100644 index 0000000..f1ec82e --- /dev/null +++ b/tests/e2e/tests/test_01_complete_workflow.py @@ -0,0 +1,454 @@ +""" +E2E Test 01: Complete Optimizer Workflow + +This test verifies the complete end-to-end workflow after deployment: +1. Verify optimizer pod is running +2. Verify optimizer service has started +3. Verify profiles are installed via API (metric, metadata, layers) +4. Verify profiles match configsReferenceIndex.json +5. Verify profile installation messages in logs +6. Verify bulk jobs are triggered with autotune label +7. Verify job completion with job_id and status + +This test assumes the cluster is already deployed using optimizer_demo.sh +""" +import pytest +import requests +import logging +import yaml +import os +import sys +import time +import json +import re + +# Add parent directory to path for imports +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from utils.cluster_utils import ClusterManager +from utils.kruize_utils import KruizeAPIClient, OptimizerAPIClient, verify_profiles_installed +from utils.log_utils import ( + parse_optimizer_logs, + check_log_for_message, + extract_job_ids_from_logs, + verify_profile_installation_logs +) + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="module") +def config(): + """Load test configuration""" + config_path = os.path.join(os.path.dirname(__file__), '..', 'config', 'test_config.yaml') + with open(config_path, 'r') as f: + return yaml.safe_load(f) + + +@pytest.fixture(scope="module") +def cluster_manager(config): + """Create cluster manager""" + return ClusterManager( + cluster_type=config['cluster']['type'], + cluster_name=config['cluster']['name'], + namespace=config['cluster']['namespace'] + ) + + +@pytest.fixture(scope="module") +def kruize_client(config): + """Create Kruize API client""" + base_url = f"http://localhost:{config['api']['kruize_port']}" + client = KruizeAPIClient(base_url) + + # Wait for service to be available + if not client.wait_for_service(max_retries=30): + pytest.fail("Kruize service did not become available") + + return client + + +@pytest.fixture(scope="module") +def optimizer_client(config): + """Create Optimizer API client""" + base_url = f"http://localhost:{config['api']['optimizer_port']}" + client = OptimizerAPIClient(base_url) + + # Wait for service to be available + if not client.wait_for_service(max_retries=30): + pytest.fail("Optimizer service did not become available") + + return client + + +class TestCompleteWorkflow: + """Test complete optimizer workflow""" + + def test_01_optimizer_pod_running(self, cluster_manager, config): + """ + Test: Verify kruize-optimizer pod is running + Expected: Pod exists and is in Running state + """ + logger.info("Test: Verify optimizer pod is running") + + # Get the appropriate namespace based on cluster type + cluster_type = config['cluster']['type'] + optimizer_namespace = config['cluster']['optimizer_namespace'].get(cluster_type, 'monitoring') + + logger.info(f"Looking for optimizer pod in namespace: {optimizer_namespace}") + + # Check if optimizer pod is ready + assert cluster_manager.wait_for_pod_ready("app=kruize-optimizer", namespace=optimizer_namespace, timeout=60), \ + "Optimizer pod did not become ready" + + # Get pod name + pod_name = cluster_manager.get_pod_name("app=kruize-optimizer", namespace=optimizer_namespace) + assert pod_name is not None, "Could not find optimizer pod" + + logger.info(f"✓ Optimizer pod is running: {pod_name}") + + def test_02_optimizer_service_started(self, cluster_manager, config): + """ + Test: Verify optimizer service has started + Expected: Logs contain "Kruize Optimizer Service is STARTED!" + """ + logger.info("Test: Verify optimizer service started") + + # Get the appropriate namespace based on cluster type + cluster_type = config['cluster']['type'] + optimizer_namespace = config['cluster']['optimizer_namespace'].get(cluster_type, 'monitoring') + + # Get optimizer pod logs + pod_name = cluster_manager.get_pod_name("app=kruize-optimizer", namespace=optimizer_namespace) + logs = cluster_manager.get_all_pod_logs(pod_name, namespace=optimizer_namespace) + + # Check for service started message + assert check_log_for_message(logs, "Kruize Optimizer Service is STARTED!"), \ + "Optimizer service did not start properly" + + logger.info("✓ Optimizer service started successfully") + + def test_03_load_configs_reference(self): + """ + Test: Load configsReferenceIndex.json for validation + Expected: File exists and contains expected structure + """ + logger.info("Test: Load configs reference index") + + config_path = os.path.join( + os.path.dirname(__file__), + '..', '..', '..', + 'src', 'main', 'resources', 'configs', + 'configsReferenceIndex.json' + ) + + assert os.path.exists(config_path), f"Config file not found: {config_path}" + + with open(config_path, 'r') as f: + self.configs_reference = json.load(f) + + # Validate structure + assert 'metadata_profiles' in self.configs_reference, "Missing metadata_profiles" + assert 'metric_profiles' in self.configs_reference, "Missing metric_profiles" + assert 'layers' in self.configs_reference, "Missing layers" + + logger.info(f"✓ Loaded configs reference: {len(self.configs_reference['metadata_profiles'])} metadata, " + f"{len(self.configs_reference['metric_profiles'])} metric, " + f"{len(self.configs_reference['layers'])} layers") + + def test_04_profiles_installed_via_api(self, kruize_client): + """ + Test: Verify profiles are installed via Kruize API and match configsReferenceIndex.json + Expected: All profiles from config file are installed + """ + logger.info("Test: Verify profiles via API match config reference") + + # Load config reference if not already loaded + if not hasattr(self, 'configs_reference'): + config_path = os.path.join( + os.path.dirname(__file__), + '..', '..', '..', + 'src', 'main', 'resources', 'configs', + 'configsReferenceIndex.json' + ) + with open(config_path, 'r') as f: + self.configs_reference = json.load(f) + + # Get installed profiles from API + metric_profiles = kruize_client.list_metric_profiles() + metadata_profiles = kruize_client.list_metadata_profiles() + layers = kruize_client.list_layers() + + logger.info(f"API returned: {len(metric_profiles)} metric profiles, " + f"{len(metadata_profiles)} metadata profiles, " + f"{len(layers)} layers") + + # Verify metric profiles + expected_metric = [p['name'] for p in self.configs_reference['metric_profiles']] + installed_metric = [p.get('profile_name', p.get('name', '')) for p in metric_profiles] + + for expected_name in expected_metric: + assert expected_name in installed_metric, \ + f"Metric profile '{expected_name}' not found in API response" + + logger.info(f"✓ All {len(expected_metric)} metric profiles installed") + + # Verify metadata profiles + expected_metadata = [p['name'] for p in self.configs_reference['metadata_profiles']] + installed_metadata = [p.get('profile_name', p.get('name', '')) for p in metadata_profiles] + + for expected_name in expected_metadata: + assert expected_name in installed_metadata, \ + f"Metadata profile '{expected_name}' not found in API response" + + logger.info(f"✓ All {len(expected_metadata)} metadata profiles installed") + + # Verify layers + expected_layers = self.configs_reference['layers'] + installed_layers = [layer.get('layer_name', layer.get('name', '')) for layer in layers] + + for expected_name in expected_layers: + assert expected_name in installed_layers, \ + f"Layer '{expected_name}' not found in API response" + + logger.info(f"✓ All {len(expected_layers)} layers installed") + + def test_05_profiles_in_optimizer_logs(self, cluster_manager, config): + """ + Test: Verify specific profile installation messages in optimizer logs + Expected: Logs contain "Metadata profile: Installed: " for each profile + """ + logger.info("Test: Verify profile installation messages in logs") + + # Load config reference if not already loaded + if not hasattr(self, 'configs_reference'): + config_path = os.path.join( + os.path.dirname(__file__), + '..', '..', '..', + 'src', 'main', 'resources', 'configs', + 'configsReferenceIndex.json' + ) + with open(config_path, 'r') as f: + self.configs_reference = json.load(f) + + # Get the appropriate namespace based on cluster type + cluster_type = config['cluster']['type'] + optimizer_namespace = config['cluster']['optimizer_namespace'].get(cluster_type, 'monitoring') + + # Get optimizer pod logs + pod_name = cluster_manager.get_pod_name("app=kruize-optimizer", namespace=optimizer_namespace) + logs = cluster_manager.get_all_pod_logs(pod_name, namespace=optimizer_namespace) + + # Verify metadata profile installation logs + for profile in self.configs_reference['metadata_profiles']: + profile_name = profile['name'] + expected_log = f"Metadata profile: Installed: {profile_name}" + assert check_log_for_message(logs, expected_log), \ + f"Log message not found: '{expected_log}'" + logger.info(f"✓ Found log: {expected_log}") + + # Verify metric profile installation logs + for profile in self.configs_reference['metric_profiles']: + profile_name = profile['name'] + expected_log = f"Metric profile: Installed: {profile_name}" + assert check_log_for_message(logs, expected_log), \ + f"Log message not found: '{expected_log}'" + logger.info(f"✓ Found log: {expected_log}") + + # Verify layer installation logs + for layer_name in self.configs_reference['layers']: + expected_log = f"Layer: Installed: {layer_name}" + assert check_log_for_message(logs, expected_log), \ + f"Log message not found: '{expected_log}'" + logger.info(f"✓ Found log: {expected_log}") + + logger.info("✓ All profile installation messages verified in logs") + + def test_06_workloads_deployed(self, cluster_manager, config): + """ + Test: Verify sysbench workload is deployed + Expected: Sysbench workload pod is running + """ + logger.info("Test: Verify workloads are deployed") + + workload_namespace = config['workload']['namespace'] + + # Check for sysbench + sysbench_ready = cluster_manager.wait_for_pod_ready( + "app=sysbench", + namespace=workload_namespace, + timeout=60 + ) + + if sysbench_ready: + logger.info("✓ Sysbench workload is running") + else: + logger.warning("⚠️ Sysbench workload not found or not ready") + + assert sysbench_ready, "Sysbench workload is not running" + + def test_07_bulk_job_triggered_with_autotune_label(self, cluster_manager, config): + """ + Test: Verify bulk job is triggered with autotune label filter + Expected: Logs show bulk API call with "kruize/autotune": "enabled" label + """ + logger.info("Test: Verify bulk job with autotune label") + + # Get the appropriate namespace based on cluster type + cluster_type = config['cluster']['type'] + optimizer_namespace = config['cluster']['optimizer_namespace'].get(cluster_type, 'monitoring') + + # Get optimizer pod logs + pod_name = cluster_manager.get_pod_name("app=kruize-optimizer", namespace=optimizer_namespace) + logs = cluster_manager.get_all_pod_logs(pod_name, namespace=optimizer_namespace) + + # Check for bulk API call with autotune label + from utils.log_utils import check_bulk_job_with_autotune_label + has_autotune_label = check_bulk_job_with_autotune_label(logs) + + assert has_autotune_label, \ + 'Bulk API call does not include "kruize/autotune": "enabled" label' + + logger.info('✓ Bulk job triggered with "kruize/autotune": "enabled" label') + + def test_07b_bulk_job_completion(self, cluster_manager, config): + """ + Test: Verify at least one bulk job has completed successfully + Expected: Logs contain job_id and completion status with Total/Processed/Existing counts + """ + logger.info("Test: Verify bulk job completion") + + # Get the appropriate namespace based on cluster type + cluster_type = config['cluster']['type'] + optimizer_namespace = config['cluster']['optimizer_namespace'].get(cluster_type, 'monitoring') + + # Get optimizer pod logs + pod_name = cluster_manager.get_pod_name("app=kruize-optimizer", namespace=optimizer_namespace) + logs = cluster_manager.get_all_pod_logs(pod_name, namespace=optimizer_namespace) + + # Extract job information from logs + job_info_list = extract_job_ids_from_logs(logs) + + assert len(job_info_list) > 0, "No bulk jobs found in logs" + logger.info(f"Found {len(job_info_list)} job(s) in logs") + + # Check for at least one completed job + completed_jobs = [job for job in job_info_list if job['status'] == 'completed'] + + assert len(completed_jobs) > 0, "No completed jobs found in logs" + + # Log details of completed jobs + for job in completed_jobs: + logger.info(f"✓ Job {job['job_id']} completed: " + f"Total={job['total']}, Processed={job['processed']}, Existing={job['existing']}") + + # Verify at least one job processed some experiments + jobs_with_processed = [] + for job in completed_jobs: + processed = job.get('processed', 0) + if isinstance(processed, int) and processed > 0: + jobs_with_processed.append(job) + + assert len(jobs_with_processed) > 0, "No jobs processed any experiments" + + logger.info(f"✓ {len(completed_jobs)} job(s) completed successfully") + + def test_08_webhook_callback_received(self, optimizer_client, config): + """ + Test: Verify webhook callback is received + Expected: Experiment counters are updated + """ + logger.info("Test: Verify webhook callback") + + # Get current state + jobs_overview = optimizer_client.get_jobs_overview() + total_experiments = jobs_overview.get('totalExperiments', 0) + processed_experiments = jobs_overview.get('totalExperimentsProcessed', 0) + + logger.info(f"Total experiments: {total_experiments}") + logger.info(f"Processed experiments: {processed_experiments}") + + # If experiments are already processed, test passes + if processed_experiments > 0: + logger.info(f"✓ Webhook callbacks received (processed: {processed_experiments})") + return + + # Otherwise wait for webhook + logger.info("Waiting for webhook callback...") + timeout = config['timeouts']['webhook_callback'] + + start_time = time.time() + webhook_received = False + + while time.time() - start_time < timeout: + current_jobs = optimizer_client.get_jobs_overview() + current_processed = current_jobs.get('totalExperimentsProcessed', 0) + + if current_processed > 0: + webhook_received = True + logger.info(f"✓ Webhook received! Processed: {current_processed}") + break + + logger.debug(f"Waiting... (processed: {current_processed})") + time.sleep(10) + + if not webhook_received: + logger.warning(f"⚠️ No webhook received within {timeout}s") + logger.warning("This may be expected if experiments haven't been created yet") + + def test_09_optimizer_logs_no_errors(self, cluster_manager, config): + """ + Test: Verify optimizer logs don't contain unexpected errors + Expected: No critical errors in logs + """ + logger.info("Test: Verify no critical errors in logs") + + # Get the appropriate namespace based on cluster type + cluster_type = config['cluster']['type'] + optimizer_namespace = config['cluster']['optimizer_namespace'].get(cluster_type, 'monitoring') + + # Get optimizer pod logs + pod_name = cluster_manager.get_pod_name("app=kruize-optimizer", namespace=optimizer_namespace) + logs = cluster_manager.get_all_pod_logs(pod_name, namespace=optimizer_namespace) + + # Parse logs + log_info = parse_optimizer_logs(logs) + + # Check for errors (allow some expected errors) + allowed_errors = [ + "connection refused", # Expected during startup + "not found", # Expected if resources don't exist yet + ] + + critical_errors = [e for e in log_info['errors'] + if not any(allowed in e.lower() for allowed in allowed_errors)] + + if critical_errors: + logger.warning("⚠️ Found some errors in logs:") + for error in critical_errors[:5]: # Show first 5 + logger.warning(f" {error}") + + # Don't fail test on errors, just warn + logger.info("✓ Log check complete") + + def test_10_health_endpoints(self, optimizer_client): + """ + Test: Verify health endpoints are accessible + Expected: Liveness and readiness endpoints return 200 + """ + logger.info("Test: Verify health endpoints") + + # Test liveness + response = requests.get(f"{optimizer_client.base_url}/q/health/live") + assert response.status_code == 200, f"Liveness check failed: {response.status_code}" + logger.info("✓ Liveness endpoint OK") + + # Test readiness + response = requests.get(f"{optimizer_client.base_url}/q/health/ready") + assert response.status_code == 200, f"Readiness check failed: {response.status_code}" + logger.info("✓ Readiness endpoint OK") + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"]) + diff --git a/tests/e2e/tests/test_04_webhook.py b/tests/e2e/tests/test_04_webhook.py new file mode 100644 index 0000000..39f1cb5 --- /dev/null +++ b/tests/e2e/tests/test_04_webhook.py @@ -0,0 +1,294 @@ + +""" +E2E Test 04: Webhook Negative Test Scenarios + +This test module focuses on testing the webhook endpoint with various +invalid/malformed payloads to ensure proper error handling. +""" +import pytest +import requests +import logging +import yaml +import os +import sys + +# Add parent directory to path for imports +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from utils.kruize_utils import OptimizerAPIClient + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="module") +def config(): + """Load test configuration""" + config_path = os.path.join(os.path.dirname(__file__), '..', 'config', 'test_config.yaml') + with open(config_path, 'r') as f: + return yaml.safe_load(f) + + +@pytest.fixture(scope="module") +def optimizer_client(config): + """Create Optimizer API client""" + # Use the configured optimizer port from config (default 8081) + optimizer_port = config.get('api', {}).get('optimizer_port', 8081) + base_url = f"http://localhost:{optimizer_port}" + return OptimizerAPIClient(base_url) + + +class TestWebhookNegativeScenarios: + """Test webhook endpoint with invalid/malformed payloads""" + + def test_webhook_invalid_json(self, optimizer_client): + """ + Test: Send invalid JSON to webhook endpoint + Expected: 400 Bad Request + """ + logger.info("Test: Webhook with invalid JSON") + + response = requests.post( + f"{optimizer_client.base_url}/webhook", + data="{invalid json}", + headers={'Content-Type': 'application/json'} + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + logger.info("✓ Invalid JSON rejected with 400") + + def test_webhook_null_payload(self, optimizer_client): + """ + Test: Send null payload to webhook endpoint + Expected: 400 Bad Request + """ + logger.info("Test: Webhook with null payload") + + response = requests.post( + f"{optimizer_client.base_url}/webhook", + data="null", + headers={'Content-Type': 'application/json'} + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + logger.info("✓ Null payload rejected with 400") + + def test_webhook_empty_array(self, optimizer_client): + """ + Test: Send empty array to webhook endpoint + Expected: 400 Bad Request + """ + logger.info("Test: Webhook with empty array") + + response = requests.post( + f"{optimizer_client.base_url}/webhook", + json=[], + headers={'Content-Type': 'application/json'} + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + logger.info("✓ Empty array rejected with 400") + + def test_webhook_missing_summary(self, optimizer_client): + """ + Test: Send payload without summary field + Expected: 400 Bad Request + """ + logger.info("Test: Webhook with missing summary") + + payload = [{}] + response = requests.post( + f"{optimizer_client.base_url}/webhook", + json=payload, + headers={'Content-Type': 'application/json'} + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + logger.info("✓ Missing summary rejected with 400") + + def test_webhook_null_job_id(self, optimizer_client): + """ + Test: Send payload with null jobID + Expected: 400 Bad Request + """ + logger.info("Test: Webhook with null jobID") + + payload = [{ + "summary": { + "jobID": None, + "status": "COMPLETED", + "total_experiments": 10, + "processed_experiments": 8, + "existing_experiments": 2 + } + }] + + response = requests.post( + f"{optimizer_client.base_url}/webhook", + json=payload, + headers={'Content-Type': 'application/json'} + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + logger.info("✓ Null jobID rejected with 400") + + def test_webhook_empty_job_id(self, optimizer_client): + """ + Test: Send payload with empty jobID + Expected: 400 Bad Request + """ + logger.info("Test: Webhook with empty jobID") + + payload = [{ + "summary": { + "jobID": "", + "status": "COMPLETED", + "total_experiments": 10, + "processed_experiments": 8, + "existing_experiments": 2 + } + }] + + response = requests.post( + f"{optimizer_client.base_url}/webhook", + json=payload, + headers={'Content-Type': 'application/json'} + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + logger.info("✓ Empty jobID rejected with 400") + + def test_webhook_whitespace_job_id(self, optimizer_client): + """ + Test: Send payload with whitespace-only jobID + Expected: 400 Bad Request + """ + logger.info("Test: Webhook with whitespace jobID") + + payload = [{ + "summary": { + "jobID": " ", + "status": "COMPLETED", + "total_experiments": 10, + "processed_experiments": 8, + "existing_experiments": 2 + } + }] + + response = requests.post( + f"{optimizer_client.base_url}/webhook", + json=payload, + headers={'Content-Type': 'application/json'} + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + logger.info("✓ Whitespace jobID rejected with 400") + + def test_webhook_malformed_summary(self, optimizer_client): + """ + Test: Send payload with malformed summary (string instead of object) + Expected: 400 Bad Request + """ + logger.info("Test: Webhook with malformed summary") + + payload = [{ + "summary": "this should be an object" + }] + + response = requests.post( + f"{optimizer_client.base_url}/webhook", + json=payload, + headers={'Content-Type': 'application/json'} + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + logger.info("✓ Malformed summary rejected with 400") + + def test_webhook_missing_content_type(self, optimizer_client): + """ + Test: Send webhook without Content-Type header + Expected: 400 or 415 (Unsupported Media Type) + """ + logger.info("Test: Webhook without Content-Type header") + + payload = [{ + "summary": { + "jobID": "test-job-123", + "status": "COMPLETED", + "total_experiments": 10, + "processed_experiments": 8, + "existing_experiments": 2 + } + }] + + response = requests.post( + f"{optimizer_client.base_url}/webhook", + json=payload + # No Content-Type header + ) + + # Accept either 400 or 415 as valid responses + assert response.status_code in [400, 415], f"Expected 400 or 415, got {response.status_code}" + logger.info(f"✓ Missing Content-Type handled with {response.status_code}") + + def test_webhook_valid_payload_accepted(self, optimizer_client): + """ + Test: Send valid payload to ensure endpoint works correctly + Expected: 200 OK + + This positive test ensures the endpoint isn't broken and can accept valid requests. + """ + logger.info("Test: Webhook with valid payload (positive control)") + + payload = [{ + "summary": { + "jobID": "test-valid-job-999", + "status": "COMPLETED", + "total_experiments": 5, + "processed_experiments": 5, + "existing_experiments": 0 + } + }] + + response = requests.post( + f"{optimizer_client.base_url}/webhook", + json=payload, + headers={'Content-Type': 'application/json'} + ) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + logger.info("✓ Valid payload accepted with 200") + + def test_webhook_multiple_payloads_one_invalid(self, optimizer_client): + """ + Test: Send multiple payloads where one is invalid + Expected: 400 Bad Request (entire request should be rejected) + """ + logger.info("Test: Webhook with multiple payloads (one invalid)") + + payload = [ + { + "summary": { + "jobID": "test-job-valid", + "status": "COMPLETED", + "total_experiments": 5, + "processed_experiments": 5, + "existing_experiments": 0 + } + }, + { + "summary": { + "jobID": None, # Invalid + "status": "COMPLETED", + "total_experiments": 3, + "processed_experiments": 3, + "existing_experiments": 0 + } + } + ] + + response = requests.post( + f"{optimizer_client.base_url}/webhook", + json=payload, + headers={'Content-Type': 'application/json'} + ) + diff --git a/tests/e2e/utils/__init__.py b/tests/e2e/utils/__init__.py new file mode 100644 index 0000000..83d7feb --- /dev/null +++ b/tests/e2e/utils/__init__.py @@ -0,0 +1,5 @@ +""" +Utility modules for Kruize Optimizer E2E tests +""" + + diff --git a/tests/e2e/utils/cluster_utils.py b/tests/e2e/utils/cluster_utils.py new file mode 100644 index 0000000..66210db --- /dev/null +++ b/tests/e2e/utils/cluster_utils.py @@ -0,0 +1,219 @@ +""" +Cluster utility functions for E2E tests +""" +import subprocess +import time +import logging +from typing import Optional, Dict, List + +logger = logging.getLogger(__name__) + + +class ClusterManager: + """Manages Kubernetes cluster operations for E2E tests""" + + def __init__(self, cluster_type: str, cluster_name: str, namespace: str): + self.cluster_type = cluster_type + self.cluster_name = cluster_name + self.namespace = namespace + self.kubectl_cmd = "oc" if cluster_type == "openshift" else "kubectl" + + def run_command(self, cmd: List[str], check: bool = True, capture_output: bool = True) -> subprocess.CompletedProcess: + """Run a shell command""" + logger.debug(f"Running command: {' '.join(cmd)}") + result = subprocess.run(cmd, capture_output=capture_output, text=True, check=check) + if result.returncode != 0 and check: + logger.error(f"Command failed: {result.stderr}") + return result + + def create_kind_cluster(self, config_file: str) -> bool: + """Create a Kind cluster""" + try: + logger.info(f"Creating Kind cluster: {self.cluster_name}") + self.run_command(["kind", "create", "cluster", "--name", self.cluster_name, "--config", config_file]) + logger.info("Kind cluster created successfully") + return True + except subprocess.CalledProcessError as e: + logger.error(f"Failed to create Kind cluster: {e}") + return False + + def delete_kind_cluster(self) -> bool: + """Delete a Kind cluster""" + try: + logger.info(f"Deleting Kind cluster: {self.cluster_name}") + self.run_command(["kind", "delete", "cluster", "--name", self.cluster_name]) + logger.info("Kind cluster deleted successfully") + return True + except subprocess.CalledProcessError as e: + logger.error(f"Failed to delete Kind cluster: {e}") + return False + + def create_namespace(self, namespace: Optional[str] = None) -> bool: + """Create a namespace""" + ns = namespace or self.namespace + try: + logger.info(f"Creating namespace: {ns}") + self.run_command([self.kubectl_cmd, "create", "namespace", ns]) + return True + except subprocess.CalledProcessError: + logger.warning(f"Namespace {ns} may already exist") + return True + + def delete_namespace(self, namespace: Optional[str] = None) -> bool: + """Delete a namespace""" + ns = namespace or self.namespace + try: + logger.info(f"Deleting namespace: {ns}") + self.run_command([self.kubectl_cmd, "delete", "namespace", ns, "--ignore-not-found=true"]) + return True + except subprocess.CalledProcessError as e: + logger.error(f"Failed to delete namespace: {e}") + return False + + def wait_for_pod_ready(self, pod_label: str, namespace: Optional[str] = None, timeout: int = 300) -> bool: + """Wait for pod to be ready""" + ns = namespace or self.namespace + logger.info(f"Waiting for pod with label {pod_label} in namespace {ns} to be ready (timeout: {timeout}s)") + + try: + cmd = [ + self.kubectl_cmd, "wait", "--for=condition=Ready", + f"pod", "-l", pod_label, + "-n", ns, + f"--timeout={timeout}s" + ] + self.run_command(cmd) + logger.info(f"Pod with label {pod_label} is ready") + return True + except subprocess.CalledProcessError as e: + logger.error(f"Pod did not become ready within {timeout}s: {e}") + return False + + def get_pod_name(self, pod_label: str, namespace: Optional[str] = None) -> Optional[str]: + """Get pod name by label""" + ns = namespace or self.namespace + try: + result = self.run_command([ + self.kubectl_cmd, "get", "pod", + "-l", pod_label, + "-n", ns, + "-o", "jsonpath={.items[0].metadata.name}" + ]) + pod_name = result.stdout.strip() + return pod_name if pod_name else None + except subprocess.CalledProcessError: + return None + + def get_pod_logs(self, pod_name: str, namespace: Optional[str] = None, tail: int = 100) -> str: + """Get pod logs""" + ns = namespace or self.namespace + try: + result = self.run_command([ + self.kubectl_cmd, "logs", + pod_name, + "-n", ns, + f"--tail={tail}" + ]) + return result.stdout + except subprocess.CalledProcessError as e: + logger.error(f"Failed to get logs for pod {pod_name}: {e}") + return "" + + def get_all_pod_logs(self, pod_name: str, namespace: Optional[str] = None) -> str: + """Get all pod logs""" + ns = namespace or self.namespace + try: + result = self.run_command([ + self.kubectl_cmd, "logs", + pod_name, + "-n", ns + ]) + return result.stdout + except subprocess.CalledProcessError as e: + logger.error(f"Failed to get logs for pod {pod_name}: {e}") + return "" + + def apply_manifest(self, manifest_file: str) -> bool: + """Apply a Kubernetes manifest""" + try: + logger.info(f"Applying manifest: {manifest_file}") + self.run_command([self.kubectl_cmd, "apply", "-f", manifest_file]) + return True + except subprocess.CalledProcessError as e: + logger.error(f"Failed to apply manifest: {e}") + return False + + def apply_kustomize(self, kustomize_dir: str) -> bool: + """Apply kustomize directory""" + try: + logger.info(f"Applying kustomize: {kustomize_dir}") + self.run_command([self.kubectl_cmd, "apply", "-k", kustomize_dir]) + return True + except subprocess.CalledProcessError as e: + logger.error(f"Failed to apply kustomize: {e}") + return False + + def delete_kustomize(self, kustomize_dir: str) -> bool: + """Delete resources from kustomize directory""" + try: + logger.info(f"Deleting kustomize: {kustomize_dir}") + self.run_command([self.kubectl_cmd, "delete", "-k", kustomize_dir, "--ignore-not-found=true"]) + return True + except subprocess.CalledProcessError as e: + logger.error(f"Failed to delete kustomize: {e}") + return False + + def port_forward(self, service_name: str, local_port: int, remote_port: int, namespace: Optional[str] = None) -> subprocess.Popen: + """Start port forwarding (returns process handle)""" + ns = namespace or self.namespace + logger.info(f"Starting port-forward for {service_name}: {local_port}:{remote_port}") + + cmd = [ + self.kubectl_cmd, "port-forward", + f"service/{service_name}", + f"{local_port}:{remote_port}", + "-n", ns + ] + + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + time.sleep(2) # Give port-forward time to establish + return process + + def get_service_url(self, service_name: str, namespace: Optional[str] = None) -> Optional[str]: + """Get service URL (for OpenShift routes or NodePort services)""" + ns = namespace or self.namespace + + if self.cluster_type == "openshift": + try: + result = self.run_command([ + "oc", "get", "route", service_name, + "-n", ns, + "-o", "jsonpath={.spec.host}" + ]) + host = result.stdout.strip() + return f"http://{host}" if host else None + except subprocess.CalledProcessError: + return None + else: + # For Kind, we use localhost with NodePort + return f"http://localhost:8080" + + def check_deployment_ready(self, deployment_name: str, namespace: Optional[str] = None, timeout: int = 300) -> bool: + """Check if deployment is ready""" + ns = namespace or self.namespace + logger.info(f"Checking if deployment {deployment_name} is ready") + + try: + cmd = [ + self.kubectl_cmd, "wait", "--for=condition=Available", + f"deployment/{deployment_name}", + "-n", ns, + f"--timeout={timeout}s" + ] + self.run_command(cmd) + logger.info(f"Deployment {deployment_name} is ready") + return True + except subprocess.CalledProcessError as e: + logger.error(f"Deployment did not become ready within {timeout}s: {e}") + return False + diff --git a/tests/e2e/utils/deployment_manager.py b/tests/e2e/utils/deployment_manager.py new file mode 100644 index 0000000..5bb49b0 --- /dev/null +++ b/tests/e2e/utils/deployment_manager.py @@ -0,0 +1,389 @@ +""" +Deployment Manager for E2E Tests + +Handles complete deployment workflow without external script dependencies: +- Clone required repos (autotune, selected benchmark manifests) +- Create Kind/OpenShift cluster +- Deploy Prometheus +- Deploy in manifest mode or via operator +- Deploy benchmarks (sysbench) +- Wait for all components to be ready +""" +import subprocess +import logging +import time +import os +import shutil +from pathlib import Path +from typing import Optional, Dict, List + +logger = logging.getLogger(__name__) + + +class DeploymentManager: + """Manages complete E2E test deployment""" + + def __init__(self, cluster_type: str, namespace: str, work_dir: Optional[Path] = None): + self.cluster_type = cluster_type + self.namespace = namespace + self.work_dir = work_dir or Path(__file__).resolve().parent.parent / ".repos" + self.kubectl_cmd = "oc" if cluster_type == "openshift" else "kubectl" + + # Repository URLs + self.autotune_repo = "https://github.com/kruize/autotune.git" + self.benchmarks_repo = "https://github.com/kruize/benchmarks.git" + + # Paths + self.autotune_dir = self.work_dir / "autotune" + self.benchmarks_dir = self.work_dir / "benchmarks" + self.prometheus_script = None + self.benchmark_manifest_paths = { + "sysbench": [ + Path("sysbench/manifests/sysbench.yaml"), + ], + } + self.benchmark_deployments = { + "sysbench": ["sysbench"], + } + + def run_command(self, cmd, check=True, capture_output=True, cwd=None, env=None): + """Run shell command""" + logger.debug(f"Running: {' '.join(cmd) if isinstance(cmd, list) else cmd}") + + # Use current environment and add any custom env vars + run_env = os.environ.copy() + if env: + run_env.update(env) + + if isinstance(cmd, str): + result = subprocess.run(cmd, shell=True, capture_output=capture_output, + text=True, check=check, cwd=cwd, env=run_env) + else: + result = subprocess.run(cmd, capture_output=capture_output, text=True, + check=check, cwd=cwd, env=run_env) + + if result.returncode != 0 and check: + logger.error(f"Command failed: {result.stderr}") + + return result + + def clone_repositories(self): + """Clone required repositories""" + logger.info("Cloning required repositories...") + self.work_dir.mkdir(parents=True, exist_ok=True) + + # Clone autotune (for Prometheus scripts) + if not self.autotune_dir.exists(): + logger.info(f"Cloning autotune to {self.autotune_dir}") + self.run_command([ + "git", "clone", "--depth", "1", + self.autotune_repo, + str(self.autotune_dir) + ]) + + # Clone benchmarks repo metadata only, then sparse checkout the required manifests + if not self.benchmarks_dir.exists(): + logger.info(f"Cloning selected benchmark manifests to {self.benchmarks_dir}") + self.run_command([ + "git", "clone", "--depth", "1", "--filter=blob:none", "--sparse", + self.benchmarks_repo, + str(self.benchmarks_dir) + ]) + sparse_paths = sorted({ + str(path.parent) for paths in self.benchmark_manifest_paths.values() for path in paths + }) + self.run_command( + ["git", "sparse-checkout", "set", *sparse_paths], + cwd=self.benchmarks_dir + ) + + # Set Prometheus script path + if self.cluster_type == "kind": + self.prometheus_script = self.autotune_dir / "scripts" / "prometheus_on_kind.sh" + elif self.cluster_type == "minikube": + self.prometheus_script = self.autotune_dir / "scripts" / "prometheus_on_minikube.sh" + elif self.cluster_type == "openshift": + self.prometheus_script = self.autotune_dir / "scripts" / "prometheus_on_openshift.sh" + + logger.info("Repositories cloned successfully") + + def create_kind_cluster(self, cluster_name: str, config_file: Optional[Path] = None): + """Create Kind cluster""" + logger.info(f"Creating Kind cluster: {cluster_name}") + + cmd = ["kind", "create", "cluster", "--name", cluster_name] + + if config_file and config_file.exists(): + cmd.extend(["--config", str(config_file)]) + + self.run_command(cmd) + logger.info("Kind cluster created successfully") + + def delete_kind_cluster(self, cluster_name: str): + """Delete Kind cluster""" + logger.info(f"Deleting Kind cluster: {cluster_name}") + self.run_command(["kind", "delete", "cluster", "--name", cluster_name], check=False) + + def create_namespace(self, namespace: Optional[str] = None): + """Create namespace""" + ns = namespace or self.namespace + logger.info(f"Creating namespace: {ns}") + + self.run_command([ + self.kubectl_cmd, "create", "namespace", ns + ], check=False) # Don't fail if already exists + + def deploy_kruize_manifest_mode(self, kruize_image: Optional[str] = None, + kruize_ui_image: Optional[str] = None, + optimizer_image: Optional[str] = None): + """Deploy kruize and optimizer in manifest mode""" + logger.info("Deploying kruize in manifest mode...") + + deploy_script = self.autotune_dir / "deploy.sh" + + if not deploy_script.exists(): + raise FileNotFoundError(f"deploy.sh script not found: {deploy_script}") + + deploy_script.chmod(0o755) + + cluster_type_arg = self.cluster_type + if self.cluster_type == "kind": + cluster_type_arg = "kind" + elif self.cluster_type == "minikube": + cluster_type_arg = "minikube" + elif self.cluster_type == "openshift": + cluster_type_arg = "openshift" + + cmd = f"bash {deploy_script} -c {cluster_type_arg} -m crc" + + if kruize_image: + cmd += f" -i {kruize_image}" + if kruize_ui_image: + cmd += f" -u {kruize_ui_image}" + + logger.info("Running kruize manifest deployment") + result = self.run_command( + cmd, + check=False, + capture_output=True, + cwd=self.autotune_dir + ) + + if result.returncode != 0: + logger.error(f"Kruize deployment failed with exit code {result.returncode}") + logger.error(f"STDOUT: {result.stdout}") + logger.error(f"STDERR: {result.stderr}") + raise RuntimeError(f"Kruize deployment failed: {result.stderr}") + + logger.info("Deploying optimizer manifest from project kustomize files") + self.deploy_optimizer_manifest(optimizer_image) + logger.info("Kruize and optimizer deployed successfully in manifest mode") + + def deploy_optimizer_manifest(self, optimizer_image: Optional[str] = None): + """Deploy kruize-optimizer using this project's kustomize files""" + project_root = Path(__file__).parent.parent.parent.parent + + if self.cluster_type == "openshift": + overlay_dir = project_root / "deployment" / "overlays" / "openshift" + else: + overlay_dir = project_root / "deployment" / "overlays" / "kind" + + if not overlay_dir.exists(): + raise FileNotFoundError(f"Overlay directory not found: {overlay_dir}") + + if optimizer_image: + logger.info(f"Requested optimizer image override: {optimizer_image}") + + self.run_command([ + self.kubectl_cmd, "apply", "-k", str(overlay_dir) + ]) + + def deploy_prometheus(self): + """Deploy Prometheus using autotune scripts""" + logger.info("Deploying Prometheus...") + + if not self.prometheus_script or not self.prometheus_script.exists(): + raise FileNotFoundError(f"Prometheus script not found: {self.prometheus_script}") + + # Make script executable + self.prometheus_script.chmod(0o755) + + # Run Prometheus deployment script with -as flags + # -a = non-interactive mode, -s = start + logger.info(f"Running Prometheus script: {self.prometheus_script} -as") + result = self.run_command( + f"bash {self.prometheus_script} -as", + check=False, + capture_output=True, + cwd=self.prometheus_script.parent + ) + + if result.returncode != 0: + logger.error(f"Prometheus deployment failed with exit code {result.returncode}") + logger.error(f"STDOUT: {result.stdout}") + logger.error(f"STDERR: {result.stderr}") + raise RuntimeError(f"Prometheus deployment failed: {result.stderr}") + + logger.info("Prometheus deployed successfully") + + def deploy_operator(self, operator_image: Optional[str] = None, + optimizer_image: Optional[str] = None): + """Deploy kruize-operator using kustomize""" + logger.info("Deploying kruize-operator...") + + # Get project root (3 levels up from this file) + project_root = Path(__file__).parent.parent.parent.parent + + # Determine overlay based on cluster type + if self.cluster_type == "openshift": + overlay_dir = project_root / "deployment" / "overlays" / "openshift" + else: + overlay_dir = project_root / "deployment" / "overlays" / "kind" + + if not overlay_dir.exists(): + raise FileNotFoundError(f"Overlay directory not found: {overlay_dir}") + + # Apply kustomize + logger.info(f"Applying kustomize from: {overlay_dir}") + self.run_command([ + self.kubectl_cmd, "apply", "-k", str(overlay_dir) + ]) + + # Wait for operator to be ready + logger.info("Waiting for operator to be ready...") + self.run_command([ + self.kubectl_cmd, "wait", "--for=condition=Available", + "deployment/kruize-operator", + "-n", self.namespace, + "--timeout=300s" + ]) + + logger.info("Kruize operator deployed successfully") + + def deploy_benchmarks(self, benchmark_name: str, app_namespace: str): + """Deploy benchmarks (sysbench)""" + logger.info(f"Deploying benchmark: {benchmark_name}") + + manifest_paths = self.benchmark_manifest_paths.get(benchmark_name, []) + if not manifest_paths: + logger.warning(f"No benchmark manifests configured for: {benchmark_name}") + return + + resolved_manifests = [self.benchmarks_dir / manifest_path for manifest_path in manifest_paths] + missing_manifests = [str(manifest) for manifest in resolved_manifests if not manifest.exists()] + if missing_manifests: + logger.warning(f"Benchmark manifests not found for {benchmark_name}: {missing_manifests}") + return + + for manifest_file in resolved_manifests: + logger.info(f"Applying: {manifest_file}") + self.run_command([ + self.kubectl_cmd, "apply", "-f", str(manifest_file), + "-n", app_namespace + ], check=False) + + self.wait_for_benchmark_deployments(benchmark_name, app_namespace) + logger.info(f"Benchmark {benchmark_name} deployed successfully") + + def wait_for_benchmark_deployments(self, benchmark_name: str, namespace: str, timeout: int = 300): + """Wait for benchmark deployments to become available""" + deployment_names = self.benchmark_deployments.get(benchmark_name, []) + if not deployment_names: + logger.info(f"No deployment readiness checks configured for benchmark: {benchmark_name}") + return + + for deployment_name in deployment_names: + logger.info( + f"Waiting for benchmark deployment {deployment_name} in namespace {namespace}" + ) + result = self.run_command([ + self.kubectl_cmd, "wait", "--for=condition=Available", + f"deployment/{deployment_name}", + "-n", namespace, + f"--timeout={timeout}s" + ], check=False, capture_output=True) + + if result.returncode != 0: + logger.warning( + f"Benchmark deployment {deployment_name} was not ready: {result.stderr}" + ) + + def wait_for_pod_ready(self, label: str, namespace: Optional[str] = None, timeout: int = 300): + """Wait for pod to be ready""" + ns = namespace or self.namespace + logger.info(f"Waiting for pod with label {label} in namespace {ns}") + + self.run_command([ + self.kubectl_cmd, "wait", "--for=condition=Ready", + "pod", "-l", label, + "-n", ns, + f"--timeout={timeout}s" + ]) + + def enable_kube_state_metrics_labels(self): + """Enable kube state metrics labels for Kind/Minikube""" + if self.cluster_type in ["kind", "minikube"]: + logger.info("Enabling kube state metrics labels...") + + script_path = self.autotune_dir / "scripts" / "enable_kube_state_metrics_labels.sh" + + if script_path.exists(): + script_path.chmod(0o755) + self.run_command(f"bash {script_path}", cwd=script_path.parent, check=False) + + def enable_user_workload_monitoring(self): + """Enable user workload monitoring for OpenShift""" + if self.cluster_type == "openshift": + logger.info("Enabling user workload monitoring...") + + script_path = self.autotune_dir / "scripts" / "enable_user_workload_monitoring_openshift.sh" + + if script_path.exists(): + script_path.chmod(0o755) + self.run_command(f"bash {script_path}", cwd=script_path.parent, check=False) + + def label_workloads(self, deployment_names: List[str], label: str, namespace: str): + """Add the same label to multiple deployments""" + for deployment_name in deployment_names: + self.label_workload(deployment_name, label, namespace) + + def label_workload(self, deployment_name: str, label: str, namespace: str): + """Add label to deployment""" + logger.info(f"Labeling deployment {deployment_name} with {label}") + + # Label the deployment (deployment should exist after wait in deploy_benchmarks) + result = self.run_command([ + self.kubectl_cmd, "label", "deployment", deployment_name, + label, "--overwrite", + "-n", namespace + ], check=False, capture_output=True) + + if result.returncode != 0: + logger.warning(f"Failed to label deployment {deployment_name}: {result.stderr}") + logger.warning("Deployment may not exist yet or may not be a deployment resource") + + def setup_port_forward(self, service_name: str, local_port: int, + remote_port: int, namespace: Optional[str] = None): + """Setup port forwarding (returns process)""" + ns = namespace or self.namespace + logger.info(f"Setting up port-forward for {service_name}: {local_port}:{remote_port}") + + cmd = [ + self.kubectl_cmd, "port-forward", + f"service/{service_name}", + f"{local_port}:{remote_port}", + "-n", ns + ] + + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + time.sleep(3) # Give port-forward time to establish + + return process + + def cleanup(self): + """Cleanup work directory""" + if self.work_dir.exists(): + logger.info(f"Cleaning up work directory: {self.work_dir}") + shutil.rmtree(self.work_dir, ignore_errors=True) + +# Made with Bob diff --git a/tests/e2e/utils/kruize_utils.py b/tests/e2e/utils/kruize_utils.py new file mode 100644 index 0000000..f0b56b7 --- /dev/null +++ b/tests/e2e/utils/kruize_utils.py @@ -0,0 +1,226 @@ +""" +Kruize API utility functions for E2E tests +""" +import requests +import logging +import time +from typing import Dict, List, Optional, Any + +logger = logging.getLogger(__name__) + + +class KruizeAPIClient: + """Client for interacting with Kruize APIs""" + + def __init__(self, base_url: str, timeout: int = 30): + self.base_url = base_url.rstrip('/') + self.timeout = timeout + self.session = requests.Session() + + def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response: + """Make HTTP request to Kruize API""" + url = f"{self.base_url}{endpoint}" + logger.debug(f"{method} {url}") + + try: + response = self.session.request(method, url, timeout=self.timeout, **kwargs) + logger.debug(f"Response status: {response.status_code}") + return response + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + raise + + def list_datasources(self) -> Dict: + """Call listDatasources API""" + response = self._make_request('GET', '/datasources') + return response.json() if response.status_code == 200 else {} + + def list_metric_profiles(self) -> List[Dict]: + """Call listMetricProfiles API""" + response = self._make_request('GET', '/listMetricProfiles') + return response.json() if response.status_code == 200 else [] + + def list_metadata_profiles(self) -> List[Dict]: + """Call listMetadataProfiles API""" + response = self._make_request('GET', '/listMetadataProfiles') + return response.json() if response.status_code == 200 else [] + + def list_layers(self) -> List[Dict]: + """Call listLayers API""" + response = self._make_request('GET', '/listLayers') + return response.json() if response.status_code == 200 else [] + + def health_check(self) -> bool: + """Check if Kruize service is healthy""" + try: + response = self._make_request('GET', '/q/health/live') + return response.status_code == 200 + except: + return False + + def wait_for_service(self, max_retries: int = 30, retry_interval: int = 2) -> bool: + """Wait for Kruize service to be available""" + logger.info(f"Waiting for Kruize service at {self.base_url}") + + for attempt in range(max_retries): + try: + if self.health_check(): + logger.info("Kruize service is available") + return True + except: + pass + + logger.debug(f"Attempt {attempt + 1}/{max_retries}: Service not ready yet") + time.sleep(retry_interval) + + logger.error(f"Kruize service did not become available after {max_retries} attempts") + return False + + +class OptimizerAPIClient: + """Client for interacting with Optimizer APIs""" + + def __init__(self, base_url: str, timeout: int = 30): + self.base_url = base_url.rstrip('/') + self.timeout = timeout + self.session = requests.Session() + + def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response: + """Make HTTP request to Optimizer API""" + url = f"{self.base_url}{endpoint}" + logger.debug(f"{method} {url}") + + try: + response = self.session.request(method, url, timeout=self.timeout, **kwargs) + logger.debug(f"Response status: {response.status_code}") + return response + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + raise + + def get_status(self) -> Dict: + """Get optimizer status""" + response = self._make_request('GET', '/status') + return response.json() if response.status_code == 200 else {} + + def get_jobs_overview(self) -> Dict: + """Get jobs overview""" + response = self._make_request('GET', '/jobs') + return response.json() if response.status_code == 200 else {} + + def send_webhook(self, payload: List[Dict]) -> requests.Response: + """Send webhook payload to optimizer""" + return self._make_request('POST', '/webhook', json=payload, headers={'Content-Type': 'application/json'}) + + def health_check(self) -> bool: + """Check if Optimizer service is healthy""" + try: + response = self._make_request('GET', '/q/health/live') + return response.status_code == 200 + except: + return False + + def wait_for_service(self, max_retries: int = 30, retry_interval: int = 2) -> bool: + """Wait for Optimizer service to be available""" + logger.info(f"Waiting for Optimizer service at {self.base_url}") + + for attempt in range(max_retries): + try: + if self.health_check(): + logger.info("Optimizer service is available") + return True + except: + pass + + logger.debug(f"Attempt {attempt + 1}/{max_retries}: Service not ready yet") + time.sleep(retry_interval) + + logger.error(f"Optimizer service did not become available after {max_retries} attempts") + return False + + +def verify_profiles_installed(kruize_client: KruizeAPIClient) -> Dict[str, bool]: + """Verify that all required profiles are installed""" + results = { + 'metric_profiles': False, + 'metadata_profiles': False, + 'layers': False + } + + try: + # Check metric profiles + metric_profiles = kruize_client.list_metric_profiles() + if metric_profiles and len(metric_profiles) > 0: + logger.info(f"Found {len(metric_profiles)} metric profile(s)") + results['metric_profiles'] = True + else: + logger.warning("No metric profiles found") + + # Check metadata profiles + metadata_profiles = kruize_client.list_metadata_profiles() + if metadata_profiles and len(metadata_profiles) > 0: + logger.info(f"Found {len(metadata_profiles)} metadata profile(s)") + results['metadata_profiles'] = True + else: + logger.warning("No metadata profiles found") + + # Check layers + layers = kruize_client.list_layers() + if layers and len(layers) > 0: + logger.info(f"Found {len(layers)} layer(s)") + results['layers'] = True + else: + logger.warning("No layers found") + + except Exception as e: + logger.error(f"Error verifying profiles: {e}") + + return results + + +def wait_for_job_trigger(optimizer_client: OptimizerAPIClient, initial_count: int, timeout: int = 180) -> bool: + """Wait for a new bulk job to be triggered""" + logger.info(f"Waiting for job trigger (initial count: {initial_count}, timeout: {timeout}s)") + + start_time = time.time() + while time.time() - start_time < timeout: + try: + jobs_overview = optimizer_client.get_jobs_overview() + current_count = jobs_overview.get('jobsTriggered', 0) + + if current_count > initial_count: + logger.info(f"New job triggered! Count: {initial_count} -> {current_count}") + return True + + logger.debug(f"Jobs triggered: {current_count} (waiting for > {initial_count})") + except Exception as e: + logger.debug(f"Error checking job status: {e}") + + time.sleep(5) + + logger.error(f"No new job triggered within {timeout}s") + return False + + +def wait_for_webhook_callback(optimizer_client: OptimizerAPIClient, initial_processed: int, timeout: int = 120) -> bool: + """Wait for webhook callback to be received""" + logger.info(f"Waiting for webhook callback (initial processed: {initial_processed}, timeout: {timeout}s)") + + start_time = time.time() + while time.time() - start_time < timeout: + try: + jobs_overview = optimizer_client.get_jobs_overview() + current_processed = jobs_overview.get('totalExperimentsProcessed', 0) + + if current_processed > initial_processed: + logger.info(f"Webhook received! Processed: {initial_processed} -> {current_processed}") + return True + + logger.debug(f"Experiments processed: {current_processed} (waiting for > {initial_processed})") + except Exception as e: + logger.debug(f"Error checking webhook status: {e}") + + time.sleep(5) + + logger.error(f"No webhook callback received within {timeout}s") + return False diff --git a/tests/e2e/utils/log_utils.py b/tests/e2e/utils/log_utils.py new file mode 100644 index 0000000..078ee7d --- /dev/null +++ b/tests/e2e/utils/log_utils.py @@ -0,0 +1,227 @@ +""" +Log parsing utility functions for E2E tests +""" +import re +import logging +from typing import List, Dict, Optional, Any + +logger = logging.getLogger(__name__) + + +def parse_optimizer_logs(logs: str) -> Dict[str, Any]: + """Parse optimizer logs to extract key information""" + result = { + 'initialized': False, + 'profiles_installed': { + 'metric': False, + 'metadata': False, + 'layers': False + }, + 'jobs_triggered': 0, + 'webhooks_received': 0, + 'errors': [] + } + + # Check for initialization + if 'Bulk scheduler initialized' in logs or 'INFO_BULK_SCHEDULER_INITIALIZED' in logs: + result['initialized'] = True + + # Check for profile installation + if 'Installing metric profile' in logs or 'Metric profile installed' in logs: + result['profiles_installed']['metric'] = True + + if 'Installing metadata profile' in logs or 'Metadata profile installed' in logs: + result['profiles_installed']['metadata'] = True + + if 'Installing layer' in logs or 'Layer installed' in logs: + result['profiles_installed']['layers'] = True + + # Count job triggers + job_trigger_pattern = r'Starting scheduled bulk API call|Calling bulk API' + result['jobs_triggered'] = len(re.findall(job_trigger_pattern, logs, re.IGNORECASE)) + + # Count webhook callbacks + webhook_pattern = r'Received webhook|Processing webhook' + result['webhooks_received'] = len(re.findall(webhook_pattern, logs, re.IGNORECASE)) + + # Extract errors + error_lines = [line for line in logs.split('\n') if 'ERROR' in line or 'Exception' in line] + result['errors'] = error_lines[:10] # Limit to first 10 errors + + return result + + +def check_log_for_message(logs: str, message: str, case_sensitive: bool = False) -> bool: + """Check if logs contain a specific message""" + if case_sensitive: + return message in logs + else: + return message.lower() in logs.lower() + + +def extract_job_ids(logs: str) -> List[str]: + """Extract job IDs from logs""" + # Pattern to match job IDs like "job-123", "job-abc-456", etc. + pattern = r'job[-_][a-zA-Z0-9\-]+' + job_ids = re.findall(pattern, logs) + return list(set(job_ids)) # Return unique job IDs + + +def extract_job_ids_from_logs(logs: str) -> List[Dict[str, str]]: + """ + Extract job IDs and their completion status from logs + Returns list of dicts with job_id, status, total, processed, existing + """ + job_info = [] + + # Pattern for "Bulk API call successful. Response: {"job_id":""}" + job_trigger_pattern = r'Bulk API call successful\. Response: \{"job_id":"([a-f0-9\-]+)"\}' + triggered_jobs = re.findall(job_trigger_pattern, logs) + + # Pattern for "Job completed. Total: X, Processed: Y, Existing: Z" + job_complete_pattern = r'Job ([a-f0-9\-]+) completed\. Total: (\d+), Processed: (\d+), Existing: (\d+)' + completed_jobs = re.findall(job_complete_pattern, logs) + + # Build job info from completed jobs + for job_id, total, processed, existing in completed_jobs: + job_info.append({ + 'job_id': job_id, + 'status': 'completed', + 'total': int(total), + 'processed': int(processed), + 'existing': int(existing) + }) + + # Add triggered but not yet completed jobs + completed_ids = {job['job_id'] for job in job_info} + for job_id in triggered_jobs: + if job_id not in completed_ids: + job_info.append({ + 'job_id': job_id, + 'status': 'triggered', + 'total': None, + 'processed': None, + 'existing': None + }) + + return job_info + + +def verify_profile_installation_logs(logs: str, expected_profiles: Dict[str, List]) -> Dict[str, bool]: + """ + Verify that profile installation messages exist in logs + + Args: + logs: The log content to search + expected_profiles: Dict with keys 'metadata_profiles', 'metric_profiles', 'layers' + Each containing list of profile names or dicts with 'name' key + + Returns: + Dict with profile names as keys and boolean values indicating if found in logs + """ + results = {} + + # Check metadata profiles + if 'metadata_profiles' in expected_profiles: + for profile in expected_profiles['metadata_profiles']: + profile_name = profile['name'] if isinstance(profile, dict) else profile + log_message = f"Metadata profile: Installed: {profile_name}" + results[f"metadata:{profile_name}"] = check_log_for_message(logs, log_message) + + # Check metric profiles + if 'metric_profiles' in expected_profiles: + for profile in expected_profiles['metric_profiles']: + profile_name = profile['name'] if isinstance(profile, dict) else profile + log_message = f"Metric profile: Installed: {profile_name}" + results[f"metric:{profile_name}"] = check_log_for_message(logs, log_message) + + # Check layers + if 'layers' in expected_profiles: + for layer in expected_profiles['layers']: + layer_name = layer if isinstance(layer, str) else layer['name'] + log_message = f"Layer: Installed: {layer_name}" + results[f"layer:{layer_name}"] = check_log_for_message(logs, log_message) + + return results + + +def check_bulk_job_with_autotune_label(logs: str) -> bool: + """ + Check if bulk API call includes autotune label filter + Returns True if found + """ + # Pattern to match the autotune label in bulk API payload + pattern = r'"kruize/autotune"\s*:\s*"enabled"' + return bool(re.search(pattern, logs)) + + +def count_log_occurrences(logs: str, pattern: str) -> int: + """Count occurrences of a pattern in logs""" + return len(re.findall(pattern, logs, re.IGNORECASE)) + + +def get_log_lines_with_pattern(logs: str, pattern: str, context_lines: int = 2) -> List[str]: + """Get log lines matching a pattern with context""" + lines = logs.split('\n') + matching_lines = [] + + for i, line in enumerate(lines): + if re.search(pattern, line, re.IGNORECASE): + # Get context lines before and after + start = max(0, i - context_lines) + end = min(len(lines), i + context_lines + 1) + context = lines[start:end] + matching_lines.extend(context) + matching_lines.append('---') # Separator + + return matching_lines + + +def verify_no_errors(logs: str, allowed_errors: Optional[List[str]] = None) -> tuple[bool, List[str]]: + """Verify that logs don't contain unexpected errors""" + allowed_errors = allowed_errors or [] + + error_lines = [line for line in logs.split('\n') if 'ERROR' in line or 'Exception' in line] + + # Filter out allowed errors + unexpected_errors = [] + for error_line in error_lines: + is_allowed = any(allowed in error_line for allowed in allowed_errors) + if not is_allowed: + unexpected_errors.append(error_line) + + return len(unexpected_errors) == 0, unexpected_errors + + +def extract_api_calls(logs: str) -> Dict[str, int]: + """Extract and count API calls from logs""" + api_calls = { + 'bulk_api': 0, + 'list_datasources': 0, + 'list_metric_profiles': 0, + 'list_metadata_profiles': 0, + 'list_layers': 0 + } + + # Count bulk API calls + api_calls['bulk_api'] = count_log_occurrences(logs, r'Calling bulk API|bulkCreateExperiments') + + # Count profile API calls + api_calls['list_datasources'] = count_log_occurrences(logs, r'listDatasources|/datasources') + api_calls['list_metric_profiles'] = count_log_occurrences(logs, r'listMetricProfiles|/listMetricProfiles') + api_calls['list_metadata_profiles'] = count_log_occurrences(logs, r'listMetadataProfiles|/listMetadataProfiles') + api_calls['list_layers'] = count_log_occurrences(logs, r'listLayers|/listLayers') + + return api_calls + + +def save_logs_to_file(logs: str, filename: str): + """Save logs to a file""" + try: + with open(filename, 'w') as f: + f.write(logs) + logger.info(f"Logs saved to {filename}") + except Exception as e: + logger.error(f"Failed to save logs to {filename}: {e}") + +# Made with Bob