diff --git a/dist/index.html b/dist/index.html index 7cf9141..3a18234 100644 --- a/dist/index.html +++ b/dist/index.html @@ -12,8 +12,8 @@ - - + +
@@ -52,6 +52,7 @@ Agents Usage Skills + Workflows Chat Logs Monitor @@ -100,6 +101,7 @@ + diff --git a/lib/workflow-worker-runner.js b/lib/workflow-worker-runner.js new file mode 100644 index 0000000..83b4340 --- /dev/null +++ b/lib/workflow-worker-runner.js @@ -0,0 +1,68 @@ +const { spawnSync } = require('node:child_process'); +const { upsertWorkerStatus } = require('./workflow-worker-status'); + +function currentTimestamp(now) { + return typeof now === 'function' ? now() : (now || new Date().toISOString()); +} + +function statusForExit(result = {}) { + if (result.signal) return 'stopped'; + if (result.status === 0) return 'idle'; + if (result.status === 130 || result.status === 143) return 'stopped'; + return 'error'; +} + +function noteForExit(result = {}) { + if (result.signal) return `terminated by ${result.signal}`; + return `exited ${Number.isInteger(result.status) ? result.status : 1}`; +} + +function writeWorkerStatus(options, status, note) { + const snapshot = upsertWorkerStatus({ + repoDir: options.repoDir, + file: options.file, + id: options.id, + label: options.label, + provider: options.provider, + sessionId: options.sessionId, + status, + note, + updatedAt: currentTimestamp(options.now), + }); + if (typeof options.onStatus === 'function') options.onStatus(snapshot); + return snapshot; +} + +function runWorkerCommand(options) { + if (!options || typeof options !== 'object') throw new Error('options are required'); + const command = String(options.command || '').trim(); + if (!command) throw new Error('command is required'); + + writeWorkerStatus(options, 'generating', options.startNote || 'started'); + + const runner = options.spawnSync || spawnSync; + const result = runner(command, options.args || [], { + cwd: options.cwd || options.repoDir || process.cwd(), + env: options.env || process.env, + stdio: options.stdio || 'inherit', + shell: Boolean(options.shell), + }); + + if (result.error) { + writeWorkerStatus(options, 'error', result.error.message || 'failed to launch'); + return { exitCode: 1, signal: result.signal || null, error: result.error }; + } + + const finalStatus = statusForExit(result); + writeWorkerStatus(options, finalStatus, noteForExit(result)); + + return { + exitCode: Number.isInteger(result.status) ? result.status : (result.signal ? 143 : 1), + signal: result.signal || null, + }; +} + +module.exports = { + runWorkerCommand, + statusForExit, +}; diff --git a/lib/workflow-worker-status.js b/lib/workflow-worker-status.js new file mode 100644 index 0000000..44a49a1 --- /dev/null +++ b/lib/workflow-worker-status.js @@ -0,0 +1,93 @@ +const fs = require('fs'); +const path = require('path'); +const { normalizeWorkerStatus } = require('./workflows'); + +function normalizeString(value) { + return String(value || '').trim(); +} + +function resolveWorkerStatusPath(repoDir, file) { + const resolvedRepoDir = path.resolve(repoDir || process.cwd()); + const rawFile = normalizeString(file); + if (!rawFile || path.isAbsolute(rawFile)) { + throw new Error('worker status file must be a repo-local relative path'); + } + const resolvedPath = path.resolve(resolvedRepoDir, rawFile); + const relative = path.relative(resolvedRepoDir, resolvedPath); + if (relative.startsWith('..') || path.isAbsolute(relative)) { + throw new Error('worker status file resolves outside repo'); + } + return resolvedPath; +} + +function readSnapshot(snapshotPath) { + try { + const raw = fs.readFileSync(snapshotPath, 'utf8'); + const parsed = JSON.parse(raw); + if (Array.isArray(parsed)) return { workers: parsed }; + if (parsed && typeof parsed === 'object') { + return { ...parsed, workers: Array.isArray(parsed.workers) ? parsed.workers : [] }; + } + } catch (error) { + if (error.code !== 'ENOENT') throw error; + } + return { workers: [] }; +} + +function buildWorkerPatch({ id, label, provider, status, sessionId, updatedAt, now, note }) { + const workerId = normalizeString(id); + if (!workerId) throw new Error('worker id is required'); + + const patch = { + id: workerId, + status: normalizeWorkerStatus(status), + updated_at: normalizeString(updatedAt || now || new Date().toISOString()), + }; + const normalizedLabel = normalizeString(label); + const normalizedProvider = normalizeString(provider); + const normalizedSessionId = normalizeString(sessionId); + const normalizedNote = normalizeString(note); + if (normalizedLabel) patch.label = normalizedLabel; + if (normalizedProvider) patch.provider = normalizedProvider; + if (normalizedSessionId) patch.session_id = normalizedSessionId; + if (normalizedNote) patch.note = normalizedNote; + return patch; +} + +function writeSnapshotAtomic(snapshotPath, snapshot) { + fs.mkdirSync(path.dirname(snapshotPath), { recursive: true }); + const tmpPath = `${snapshotPath}.${process.pid}.${Date.now()}.tmp`; + fs.writeFileSync(tmpPath, `${JSON.stringify(snapshot, null, 2)}\n`); + fs.renameSync(tmpPath, snapshotPath); +} + +function upsertWorkerStatus(options) { + const snapshotPath = resolveWorkerStatusPath(options.repoDir, options.file); + const snapshot = readSnapshot(snapshotPath); + const patch = buildWorkerPatch(options); + const workers = new Map(); + + for (const worker of snapshot.workers) { + if (worker && typeof worker === 'object' && normalizeString(worker.id)) { + workers.set(normalizeString(worker.id), { ...worker }); + } + } + + workers.set(patch.id, { + ...(workers.get(patch.id) || {}), + ...patch, + }); + + const nextSnapshot = { + ...snapshot, + updated_at: patch.updated_at, + workers: Array.from(workers.values()).sort((a, b) => String(a.id).localeCompare(String(b.id))), + }; + writeSnapshotAtomic(snapshotPath, nextSnapshot); + return nextSnapshot; +} + +module.exports = { + resolveWorkerStatusPath, + upsertWorkerStatus, +}; diff --git a/lib/workflows.js b/lib/workflows.js new file mode 100644 index 0000000..4c546f3 --- /dev/null +++ b/lib/workflows.js @@ -0,0 +1,355 @@ +const fs = require('fs'); +const os = require('os'); +const path = require('path'); +const yaml = require('js-yaml'); + +const DEFAULT_REPO_CANDIDATES = [ + path.join(os.homedir(), 'worktrees', 'MiraRepo_runtime_main'), + path.join(os.homedir(), 'repos', 'MiraRepo'), +]; + +function resolveMiraRepoDir(explicitDir = process.env.MIRA_REPO_DIR) { + const candidates = [explicitDir, ...DEFAULT_REPO_CANDIDATES].filter(Boolean); + for (const candidate of candidates) { + const resolved = path.resolve(candidate); + if (fs.existsSync(path.join(resolved, 'docs', 'workflows'))) return resolved; + } + return path.resolve(candidates[0] || DEFAULT_REPO_CANDIDATES[0]); +} + +function toPosixRelative(root, filePath) { + return path.relative(root, filePath).split(path.sep).join('/'); +} + +function listFiles(dir, predicate = () => true) { + try { + return fs.readdirSync(dir) + .map((name) => path.join(dir, name)) + .filter((filePath) => { + try { return fs.statSync(filePath).isFile() && predicate(filePath); } + catch { return false; } + }) + .sort(); + } catch { + return []; + } +} + +function readYamlFile(filePath) { + const raw = fs.readFileSync(filePath, 'utf8'); + const data = yaml.load(raw) || {}; + return { raw, data }; +} + +function getNested(obj, pathParts) { + let current = obj; + for (const part of pathParts) { + if (!current || typeof current !== 'object') return undefined; + current = current[part]; + } + return current; +} + +function normalizeString(value) { + return String(value || '').trim(); +} + +function normalizeKey(value) { + return normalizeString(value).toLowerCase(); +} + +function pickWorkflowId(filePath, definition) { + return normalizeString(definition.id) || path.basename(filePath).replace(/\.(ya?ml)$/i, ''); +} + +function findRunbook(repoDir, id, definition) { + const linkedRunbook = getNested(definition, ['links', 'runbook']); + const candidates = [ + linkedRunbook && path.resolve(repoDir, linkedRunbook), + path.join(repoDir, 'docs', 'workflows', 'runbooks', `${id}.md`), + ].filter(Boolean); + + for (const candidate of candidates) { + if (candidate.startsWith(repoDir) && fs.existsSync(candidate)) { + return candidate; + } + } + return null; +} + +function findLatestReport(repoDir, id) { + const reportsDir = path.join(repoDir, 'reports', 'workflows'); + const reports = listFiles(reportsDir, (filePath) => /\.(md|txt|json|ya?ml)$/i.test(filePath)) + .map((filePath) => { + let stat; + try { stat = fs.statSync(filePath); } catch { return null; } + const base = path.basename(filePath).toLowerCase(); + const score = base.includes(normalizeKey(id)) ? 2 : 0; + return { filePath, mtimeMs: stat.mtimeMs, score }; + }) + .filter(Boolean) + .filter((item) => item.score > 0) + .sort((a, b) => b.score - a.score || b.mtimeMs - a.mtimeMs); + return reports[0]?.filePath || null; +} + +function findCronJob(definition, id, cronJobs = []) { + const explicitIds = [ + getNested(definition, ['execution', 'hermes_cron_job_id']), + getNested(definition, ['execution', 'cron_job_id']), + definition.hermes_cron_job_id, + definition.cron_job_id, + ].map(normalizeKey).filter(Boolean); + + const names = [id, definition.name].map(normalizeKey).filter(Boolean); + + return cronJobs.find((job) => explicitIds.includes(normalizeKey(job.id))) + || cronJobs.find((job) => names.includes(normalizeKey(job.name)) || names.includes(normalizeKey(job.id))) + || cronJobs.find((job) => names.some((name) => normalizeKey(job.name).includes(name))) + || null; +} + +function classifyWorkflow({ runbookPath, cron }) { + const warnings = []; + const deliver = normalizeKey(cron?.deliver); + const lastDeliveryError = normalizeString(cron?.lastDeliveryError || cron?.deliveryError); + const lastStatus = normalizeKey(cron?.lastStatus); + const cronStatus = normalizeKey(cron?.status); + + if (lastDeliveryError) { + if (deliver === 'local' && /deliver=origin|origin/i.test(lastDeliveryError)) { + warnings.push('last_delivery_error appears stale because current delivery is local'); + } else { + return { status: 'active_failure', warnings: [lastDeliveryError] }; + } + } + + if (['failed', 'failure', 'error'].includes(lastStatus) || ['failed', 'failure', 'error'].includes(cronStatus)) { + return { status: 'active_failure', warnings: [`cron status indicates ${lastStatus || cronStatus}`] }; + } + + if (warnings.length) return { status: 'stale_warning', warnings }; + if (!runbookPath) return { status: 'missing_runbook', warnings: ['runbook is missing'] }; + if (!cron) return { status: 'missing_cron', warnings: ['cron job is not linked'] }; + return { status: 'ok', warnings }; +} + +const WORKER_STATUS_KEYS = ['starting', 'generating', 'waiting_approval', 'idle', 'error', 'stopped', 'unknown']; + +function normalizeWorkerStatus(status) { + const value = normalizeKey(status).replace(/[\s-]+/g, '_'); + if (['starting', 'booting', 'queued'].includes(value)) return 'starting'; + if (['generating', 'running', 'busy', 'in_progress', 'working'].includes(value)) return 'generating'; + if (['waiting_approval', 'approval_required', 'blocked', 'paused_for_approval'].includes(value)) return 'waiting_approval'; + if (['idle', 'ready', 'complete', 'completed', 'done', 'success', 'ok'].includes(value)) return 'idle'; + if (['error', 'failed', 'failure', 'crashed', 'timeout'].includes(value)) return 'error'; + if (['stopped', 'cancelled', 'canceled', 'killed', 'terminated'].includes(value)) return 'stopped'; + return 'unknown'; +} + +function normalizeWorker(worker, index = 0, { fallback = true } = {}) { + const id = normalizeString(worker.id || worker.name || (fallback ? `worker-${index + 1}` : '')); + return { + id, + label: normalizeString(worker.label || worker.title || worker.name || (fallback ? worker.id || `Worker ${index + 1}` : '')), + provider: normalizeString(worker.provider || worker.type || worker.agent || (fallback ? 'unknown' : '')), + status: normalizeWorkerStatus(worker.status || worker.state), + sessionId: normalizeString(worker.session_id || worker.sessionId || worker.providerSessionId), + updatedAt: normalizeString(worker.updated_at || worker.updatedAt || worker.last_seen_at || worker.lastSeenAt), + note: normalizeString(worker.note || worker.message || worker.reason), + }; +} + +function normalizeWorkerList(rawWorkers, options = {}) { + if (!Array.isArray(rawWorkers)) return []; + return rawWorkers + .filter((worker) => worker && typeof worker === 'object') + .map((worker, index) => normalizeWorker(worker, index, options)); +} + +function resolveRepoLocalPath(repoDir, filePath) { + const rawPath = normalizeString(filePath); + if (!rawPath || path.isAbsolute(rawPath)) return null; + const resolvedRepoDir = path.resolve(repoDir); + const resolvedPath = path.resolve(resolvedRepoDir, rawPath); + const relative = path.relative(resolvedRepoDir, resolvedPath); + if (relative.startsWith('..') || path.isAbsolute(relative)) return null; + return resolvedPath; +} + +function readWorkerStatusSnapshot(repoDir, definition) { + const statusFile = getNested(definition, ['execution', 'worker_status_file']) + || getNested(definition, ['execution', 'workerStatusFile']) + || definition.worker_status_file + || definition.workerStatusFile; + const snapshotPath = resolveRepoLocalPath(repoDir, statusFile); + if (!snapshotPath || !fs.existsSync(snapshotPath)) return []; + + try { + const snapshot = readYamlFile(snapshotPath).data; + if (Array.isArray(snapshot)) return snapshot; + if (Array.isArray(snapshot?.workers)) return snapshot.workers; + return []; + } catch { + return []; + } +} + +function mergeWorkerSnapshots(baseWorkers, runtimeWorkers) { + const merged = new Map(); + for (const worker of baseWorkers) merged.set(worker.id, worker); + for (const runtimeWorker of runtimeWorkers) { + if (!runtimeWorker.id) continue; + const previous = merged.get(runtimeWorker.id) || {}; + merged.set(runtimeWorker.id, { + ...previous, + ...runtimeWorker, + label: runtimeWorker.label || previous.label || runtimeWorker.id, + provider: runtimeWorker.provider || previous.provider || 'unknown', + }); + } + return Array.from(merged.values()); +} + +function normalizeWorkers(definition, { repoDir } = {}) { + const rawWorkers = getNested(definition, ['execution', 'workers']) || definition.workers || []; + const baseWorkers = normalizeWorkerList(rawWorkers); + if (!repoDir) return baseWorkers; + + const runtimeWorkers = normalizeWorkerList(readWorkerStatusSnapshot(repoDir, definition), { fallback: false }); + if (!runtimeWorkers.length) return baseWorkers; + return mergeWorkerSnapshots(baseWorkers, runtimeWorkers); +} + +function emptyWorkerSummary() { + return WORKER_STATUS_KEYS.reduce((summary, status) => { + summary[status] = 0; + return summary; + }, { total: 0 }); +} + +function summarizeWorkers(workflows) { + const summary = emptyWorkerSummary(); + for (const workflow of workflows) { + for (const worker of workflow.workers || []) { + summary.total += 1; + const status = WORKER_STATUS_KEYS.includes(worker.status) ? worker.status : 'unknown'; + summary[status] += 1; + } + } + return summary; +} + +function workflowSummary(workflows) { + const summary = { + total: workflows.length, + ok: 0, + active_failure: 0, + stale_warning: 0, + missing_runbook: 0, + missing_cron: 0, + unknown: 0, + workers: emptyWorkerSummary(), + }; + for (const workflow of workflows) { + const key = workflow.status || 'unknown'; + summary[key] = (summary[key] || 0) + 1; + } + summary.workers = summarizeWorkers(workflows); + return summary; +} + +function buildWorkflowIndex({ repoDir = resolveMiraRepoDir(), cronJobs = [] } = {}) { + const resolvedRepoDir = path.resolve(repoDir); + const definitionsDir = path.join(resolvedRepoDir, 'docs', 'workflows', 'definitions'); + const definitionFiles = listFiles(definitionsDir, (filePath) => /\.ya?ml$/i.test(filePath)); + + const workflows = definitionFiles.map((definitionPath) => { + let parsed; + try { + parsed = readYamlFile(definitionPath); + } catch (error) { + const id = path.basename(definitionPath).replace(/\.(ya?ml)$/i, ''); + return { + id, + name: id, + definitionPath: toPosixRelative(resolvedRepoDir, definitionPath), + runbookPath: null, + latestReportPath: null, + cron: null, + status: 'active_failure', + warnings: [`definition parse failed: ${error.message}`], + }; + } + + const definition = parsed.data; + const id = pickWorkflowId(definitionPath, definition); + const runbookPath = findRunbook(resolvedRepoDir, id, definition); + const latestReportPath = findLatestReport(resolvedRepoDir, id); + const cron = findCronJob(definition, id, cronJobs); + const health = classifyWorkflow({ runbookPath, cron }); + const workers = normalizeWorkers(definition, { repoDir: resolvedRepoDir }); + + return { + id, + name: normalizeString(definition.name) || id, + businessGoal: normalizeString(definition.business_goal || definition.businessGoal), + schedule: getNested(definition, ['schedule', 'human_readable']) + || getNested(definition, ['schedule', 'expression']) + || normalizeString(cron?.schedule), + riskLevel: getNested(definition, ['risk', 'level']) || definition.risk_level || 'unknown', + definitionPath: toPosixRelative(resolvedRepoDir, definitionPath), + runbookPath: runbookPath ? toPosixRelative(resolvedRepoDir, runbookPath) : null, + latestReportPath: latestReportPath ? toPosixRelative(resolvedRepoDir, latestReportPath) : null, + cron: cron ? { + id: cron.id, + name: cron.name, + status: cron.status, + schedule: cron.schedule, + nextRun: cron.nextRun, + lastRun: cron.lastRun, + lastStatus: cron.lastStatus, + deliver: cron.deliver, + lastDeliveryError: cron.lastDeliveryError, + } : null, + workers, + status: health.status, + warnings: health.warnings, + }; + }); + + workflows.sort((a, b) => a.id.localeCompare(b.id)); + return { + repoDir: resolvedRepoDir, + workflows, + summary: workflowSummary(workflows), + }; +} + +function getWorkflowDetail({ repoDir = resolveMiraRepoDir(), id, cronJobs = [] } = {}) { + const index = buildWorkflowIndex({ repoDir, cronJobs }); + const workflow = index.workflows.find((item) => item.id === id); + if (!workflow) return null; + const definitionAbs = path.resolve(index.repoDir, workflow.definitionPath); + const runbookAbs = workflow.runbookPath ? path.resolve(index.repoDir, workflow.runbookPath) : null; + const reportAbs = workflow.latestReportPath ? path.resolve(index.repoDir, workflow.latestReportPath) : null; + + return { + ...workflow, + repoDir: index.repoDir, + definitionRaw: fs.existsSync(definitionAbs) ? fs.readFileSync(definitionAbs, 'utf8') : '', + runbookRaw: runbookAbs && fs.existsSync(runbookAbs) ? fs.readFileSync(runbookAbs, 'utf8') : '', + latestReportRaw: reportAbs && fs.existsSync(reportAbs) ? fs.readFileSync(reportAbs, 'utf8').slice(0, 12000) : '', + }; +} + +module.exports = { + buildWorkflowIndex, + classifyWorkflow, + findCronJob, + getWorkflowDetail, + normalizeWorkerStatus, + normalizeWorkers, + resolveMiraRepoDir, + summarizeWorkers, +}; diff --git a/package.json b/package.json index 32d454d..f1124a2 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,9 @@ "serve": "node server.js", "setup": "bash install.sh", "test": "node --test", - "reset-password": "node scripts/reset-password.js" + "worker-status": "node scripts/workflow-worker-status.js", + "reset-password": "node scripts/reset-password.js", + "worker-run": "node scripts/workflow-worker-run.js" }, "dependencies": { "@pydantic/genai-prices": "^0.0.56", diff --git a/scripts/workflow-worker-run.js b/scripts/workflow-worker-run.js new file mode 100644 index 0000000..de3f799 --- /dev/null +++ b/scripts/workflow-worker-run.js @@ -0,0 +1,95 @@ +#!/usr/bin/env node +const { runWorkerCommand } = require('../lib/workflow-worker-runner'); + +function usage() { + return `Usage: node scripts/workflow-worker-run.js --repo