|
| 1 | +# Analytics Output Port Design |
| 2 | + |
| 3 | +## Status: Approved |
| 4 | + |
| 5 | +## Date: 2025-01-21 |
| 6 | + |
| 7 | +## Problem Statement |
| 8 | + |
| 9 | +When connecting a component's `rawOutput` (which contains complex nested JSON) to the Analytics Sink, OpenSearch hits the default field limit of 1000 fields. This is because: |
| 10 | + |
| 11 | +1. **Dynamic mapping explosion**: Elasticsearch/OpenSearch creates a field for every unique JSON path |
| 12 | +2. **Nested structures**: Arrays with objects like `issues[0].metadata.schema` create many paths |
| 13 | +3. **Varying schemas**: Different scanner outputs accumulate unique field paths over time |
| 14 | + |
| 15 | +Example error: |
| 16 | + |
| 17 | +``` |
| 18 | +illegal_argument_exception: Limit of total fields [1000] has been exceeded |
| 19 | +``` |
| 20 | + |
| 21 | +## Solution |
| 22 | + |
| 23 | +### Design Decisions |
| 24 | + |
| 25 | +1. **Each component owns its analytics schema** |
| 26 | + - Components output structured `list<json>` through dedicated ports (`findings`, `results`, `secrets`, `issues`) |
| 27 | + - Component authors define the structure appropriate for their tool |
| 28 | + - No generic "one schema fits all" approach |
| 29 | + |
| 30 | +2. **Analytics Sink accepts `list<json>`** |
| 31 | + - Input type: `z.array(z.record(z.string(), z.unknown()))` |
| 32 | + - Each item in the array is indexed as a separate document |
| 33 | + - Rejects arbitrary nested objects (must be an array) |
| 34 | + |
| 35 | +3. **Same timestamp for all findings in a batch** |
| 36 | + - All findings from one component execution share the same `@timestamp` |
| 37 | + - Captured once at the start of indexing, applied to all documents |
| 38 | + |
| 39 | +4. **Nested `shipsec` context** |
| 40 | + - Workflow context stored under `shipsec.*` namespace |
| 41 | + - Prevents field name collision with component data |
| 42 | + - Clear separation: component fields at root, system fields under `shipsec` |
| 43 | + |
| 44 | +5. **Nested objects serialized before indexing** |
| 45 | + - Any nested object or array within a finding is JSON-stringified |
| 46 | + - Prevents field explosion from dynamic mapping |
| 47 | + - Trade-off: Can't query inside serialized fields directly, but prevents index corruption |
| 48 | + |
| 49 | +6. **No `data` wrapper** |
| 50 | + - Original PRD design wrapped component output in a `data` field |
| 51 | + - New design: finding fields are at the top level for easier querying |
| 52 | + |
| 53 | +### Document Structure |
| 54 | + |
| 55 | +**Before (PRD design):** |
| 56 | + |
| 57 | +```json |
| 58 | +{ |
| 59 | + "workflow_id": "...", |
| 60 | + "workflow_name": "...", |
| 61 | + "run_id": "...", |
| 62 | + "node_ref": "...", |
| 63 | + "component_id": "...", |
| 64 | + "@timestamp": "...", |
| 65 | + "asset_key": "...", |
| 66 | + "data": { |
| 67 | + "check_id": "DB_RLS_DISABLED", |
| 68 | + "severity": "CRITICAL", |
| 69 | + "metadata": { "schema": "public", "table": "users" } |
| 70 | + } |
| 71 | +} |
| 72 | +``` |
| 73 | + |
| 74 | +**After (new design):** |
| 75 | + |
| 76 | +```json |
| 77 | +{ |
| 78 | + "check_id": "DB_RLS_DISABLED", |
| 79 | + "severity": "CRITICAL", |
| 80 | + "title": "RLS Disabled on Table: users", |
| 81 | + "resource": "public.users", |
| 82 | + "metadata": "{\"schema\":\"public\",\"table\":\"users\"}", |
| 83 | + "scanner": "supabase-scanner", |
| 84 | + "asset_key": "abcdefghij1234567890", |
| 85 | + "finding_hash": "a1b2c3d4e5f67890", |
| 86 | + |
| 87 | + "shipsec": { |
| 88 | + "organization_id": "org_123", |
| 89 | + "run_id": "shipsec-run-xxx", |
| 90 | + "workflow_id": "d1d33161-929f-4af4-9a64-xxx", |
| 91 | + "workflow_name": "Supabase Security Audit", |
| 92 | + "component_id": "core.analytics.sink", |
| 93 | + "node_ref": "analytics-sink-1" |
| 94 | + }, |
| 95 | + |
| 96 | + "@timestamp": "2025-01-21T10:30:00.000Z" |
| 97 | +} |
| 98 | +``` |
| 99 | + |
| 100 | +### Component Output Ports |
| 101 | + |
| 102 | +Components should use their existing structured list outputs: |
| 103 | + |
| 104 | +| Component | Port | Type | Notes | |
| 105 | +| ---------------- | --------- | -------------------------------------------- | ------------------------- | |
| 106 | +| Nuclei | `results` | `z.array(z.record(z.string(), z.unknown()))` | Scanner + asset_key added | |
| 107 | +| TruffleHog | `results` | `z.array(z.record(z.string(), z.unknown()))` | Scanner + asset_key added | |
| 108 | +| Supabase Scanner | `results` | `z.array(z.record(z.string(), z.unknown()))` | Scanner + asset_key added | |
| 109 | + |
| 110 | +All `results` ports include: |
| 111 | + |
| 112 | +- `scanner`: Scanner identifier (e.g., `'nuclei'`, `'trufflehog'`, `'supabase-scanner'`) |
| 113 | +- `asset_key`: Primary asset identifier from the finding |
| 114 | +- `finding_hash`: Stable hash for deduplication (16-char hex from SHA-256) |
| 115 | + |
| 116 | +### Finding Hash for Deduplication |
| 117 | + |
| 118 | +The `finding_hash` enables tracking findings across workflow runs: |
| 119 | + |
| 120 | +**Generation:** |
| 121 | + |
| 122 | +```typescript |
| 123 | +import { createHash } from 'crypto'; |
| 124 | + |
| 125 | +function generateFindingHash(...fields: (string | undefined | null)[]): string { |
| 126 | + const normalized = fields.map((f) => (f ?? '').toLowerCase().trim()).join('|'); |
| 127 | + return createHash('sha256').update(normalized).digest('hex').slice(0, 16); |
| 128 | +} |
| 129 | +``` |
| 130 | + |
| 131 | +**Key fields per scanner:** |
| 132 | +| Scanner | Hash Fields | |
| 133 | +|---------|-------------| |
| 134 | +| Nuclei | `templateId + host + matchedAt` | |
| 135 | +| TruffleHog | `DetectorType + Redacted + filePath` | |
| 136 | +| Supabase Scanner | `check_id + projectRef + resource` | |
| 137 | + |
| 138 | +**Use cases:** |
| 139 | + |
| 140 | +- **New vs recurring**: Is this finding appearing for the first time? |
| 141 | +- **First-seen / last-seen**: When did we first detect this? Is it still present? |
| 142 | +- **Resolution tracking**: Findings that stop appearing may be resolved |
| 143 | +- **Deduplication**: Remove duplicates in dashboards across runs |
| 144 | + |
| 145 | +### `shipsec` Context Fields |
| 146 | + |
| 147 | +The indexer automatically adds these fields under `shipsec`: |
| 148 | + |
| 149 | +| Field | Description | |
| 150 | +| ----------------- | --------------------------------------------- | |
| 151 | +| `organization_id` | Organization that owns the workflow | |
| 152 | +| `run_id` | Unique identifier for this workflow execution | |
| 153 | +| `workflow_id` | ID of the workflow definition | |
| 154 | +| `workflow_name` | Human-readable workflow name | |
| 155 | +| `component_id` | Component type (e.g., `core.analytics.sink`) | |
| 156 | +| `node_ref` | Node reference in the workflow graph | |
| 157 | +| `asset_key` | Auto-detected or specified asset identifier | |
| 158 | + |
| 159 | +### Querying in OpenSearch |
| 160 | + |
| 161 | +With this structure, users can: |
| 162 | + |
| 163 | +- Filter by organization: `shipsec.organization_id: "org_123"` |
| 164 | +- Filter by workflow: `shipsec.workflow_id: "xxx"` |
| 165 | +- Filter by run: `shipsec.run_id: "xxx"` |
| 166 | +- Filter by asset: `asset_key: "api.example.com"` |
| 167 | +- Filter by scanner: `scanner: "nuclei"` |
| 168 | +- Filter by component-specific fields: `severity: "CRITICAL"` |
| 169 | +- Aggregate by severity: `terms` aggregation on `severity` field |
| 170 | +- Track finding history: `finding_hash: "a1b2c3d4" | sort @timestamp` |
| 171 | +- Find recurring findings: Group by `finding_hash`, count occurrences |
| 172 | + |
| 173 | +### Trade-offs |
| 174 | + |
| 175 | +| Decision | Pro | Con | |
| 176 | +| ------------------------ | ------------------------- | ------------------------------------------ | |
| 177 | +| Serialize nested objects | Prevents field explosion | Can't query inside serialized fields | |
| 178 | +| `shipsec` namespace | No field collision | Slightly more verbose queries | |
| 179 | +| No generic schema | Better fit per component | Less consistency across components | |
| 180 | +| Same timestamp per batch | Accurate (same scan time) | Can't distinguish individual finding times | |
| 181 | + |
| 182 | +### Implementation Files |
| 183 | + |
| 184 | +1. `/worker/src/utils/opensearch-indexer.ts` - Add `shipsec` context, serialize nested objects |
| 185 | +2. `/worker/src/components/core/analytics-sink.ts` - Accept `list<json>`, consistent timestamp |
| 186 | +3. Component files - Ensure structured output, add `results` port where missing |
| 187 | + |
| 188 | +### Backward Compatibility |
| 189 | + |
| 190 | +- Existing workflows connecting `rawOutput` to Analytics Sink will still work |
| 191 | +- Analytics Sink continues to accept any data type for backward compatibility |
| 192 | +- New `list<json>` processing only triggers when input is an array |
| 193 | + |
| 194 | +### Future Considerations |
| 195 | + |
| 196 | +1. **Index templates**: Create OpenSearch index template with explicit mappings for `shipsec.*` fields |
| 197 | +2. **Field discovery**: Build UI to show available fields from indexed data |
| 198 | +3. **Schema validation**: Optional strict mode to validate findings against expected schema |
0 commit comments