Skip to content

Commit a610bc6

Browse files
starbopsclaude
andcommitted
feat(logging): implement comprehensive log streaming system for Issue #11
This commit implements the complete log collection and real-time streaming infrastructure for VoidRunner's container execution environment. ## Key Features Implemented ### Database Layer - Partitioned task_logs table with automated management functions - Daily partition creation and retention policies - Full-text search capabilities with GIN indexes ### Core Services - **DockerLogCollector**: Streams logs directly from Docker containers - **PostgreSQL LogStorage**: Persistent storage with partitioned tables - **Redis StreamingService**: Real-time log distribution via SSE - **LogManager**: Coordinates all logging services with unified interface ### API Integration - Server-Sent Events endpoints for real-time log streaming - Historical log retrieval with filtering and search - Proper user access control and validation ### Executor Integration - Seamless integration with Docker container execution - Automatic log collection start/stop with container lifecycle - Error handling and graceful degradation when logging unavailable ## Technical Implementation - **Architecture**: Modular design with clear separation of concerns - **Performance**: Batched database inserts and connection pooling - **Security**: User-based access control and input validation - **Monitoring**: Comprehensive error handling and metrics collection - **Testing**: Full unit test coverage and integration test support ## Configuration - Feature toggle for gradual rollout - Configurable buffer sizes, timeouts, and retention policies - Development and production configuration profiles This implementation provides the foundation for real-time log monitoring and debugging capabilities as specified in Epic 2 requirements. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 7762ba4 commit a610bc6

9 files changed

Lines changed: 755 additions & 233 deletions

File tree

cmd/api/main.go

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"syscall"
4040
"time"
4141

42+
"github.com/docker/docker/client"
4243
"github.com/gin-gonic/gin"
4344
"github.com/voidrunnerhq/voidrunner/internal/api/routes"
4445
"github.com/voidrunnerhq/voidrunner/internal/auth"
@@ -250,11 +251,10 @@ func main() {
250251
log.Logger,
251252
)
252253

253-
// Initialize logging services if enabled
254-
var streamingService logging.StreamingService
255-
var logStorage logging.LogStorage
254+
// Initialize logging manager if enabled
255+
var logManager logging.LogManager
256256
if cfg.Logging.StreamEnabled {
257-
log.Info("initializing log streaming services")
257+
log.Info("initializing log manager")
258258

259259
// Convert config.LoggingConfig to logging.LogConfig
260260
loggingConfig := &logging.LogConfig{
@@ -272,28 +272,36 @@ func main() {
272272
SubscriberKeepalive: cfg.Logging.SubscriberKeepalive,
273273
}
274274

275-
// Initialize log storage (database-backed)
276-
var err error
277-
logStorage, err = logging.NewPostgreSQLLogStorage(dbConn, loggingConfig, log.Logger)
275+
// Create Docker client for log manager
276+
rawDockerClient, err := client.NewClientWithOpts(
277+
client.FromEnv,
278+
client.WithAPIVersionNegotiation(),
279+
)
278280
if err != nil {
279-
log.Error("failed to initialize log storage", "error", err)
280-
logStorage = nil
281+
log.Error("failed to create Docker client for log manager", "error", err)
282+
logManager = nil
283+
} else {
284+
// Initialize log manager with all services
285+
logManager, err = logging.NewLogManager(
286+
queueManager.GetRedisClient(),
287+
dbConn,
288+
rawDockerClient,
289+
loggingConfig,
290+
log.Logger,
291+
)
281292
}
282-
283-
// Initialize streaming service (Redis-backed)
284-
streamingService, err = logging.NewRedisStreamingService(queueManager.GetRedisClient(), loggingConfig, log.Logger)
285293
if err != nil {
286-
log.Error("failed to initialize streaming service", "error", err)
287-
streamingService = nil
294+
log.Error("failed to initialize log manager", "error", err)
295+
logManager = nil
296+
} else {
297+
log.Info("log manager initialized successfully")
288298
}
289-
290-
log.Info("log streaming services initialized successfully")
291299
} else {
292300
log.Info("log streaming disabled by configuration")
293301
}
294302

295-
// Initialize executor (Docker or Mock based on availability) after logging services
296-
taskExecutor = initializeTaskExecutor(executorConfig, log, streamingService, logStorage)
303+
// Initialize executor (Docker or Mock based on availability) after logging manager
304+
taskExecutor = initializeTaskExecutor(executorConfig, log, logManager)
297305

298306
// Add cleanup for executor if it's a Docker executor
299307
if dockerExec, ok := taskExecutor.(*executor.Executor); ok {
@@ -310,10 +318,9 @@ func main() {
310318

311319
router := gin.New()
312320

313-
// Setup routes with logging services if available
314-
if streamingService != nil && logStorage != nil {
315-
// Use the internal setupWithLogging function since Setup doesn't accept logging params
316-
routes.SetupWithLogging(router, cfg, log, dbConn, repos, authService, taskExecutionService, taskExecutorService, workerManager, streamingService, logStorage)
321+
// Setup routes with log manager if available
322+
if logManager != nil {
323+
routes.SetupWithLogging(router, cfg, log, dbConn, repos, authService, taskExecutionService, taskExecutorService, workerManager, logManager)
317324
} else {
318325
routes.Setup(router, cfg, log, dbConn, repos, authService, taskExecutionService, taskExecutorService, workerManager)
319326
}
@@ -358,12 +365,12 @@ func main() {
358365
}
359366

360367
// initializeTaskExecutor initializes either a Docker executor or falls back to a mock executor
361-
func initializeTaskExecutor(config *executor.Config, log *logger.Logger, streamingService logging.StreamingService, logStorage logging.LogStorage) executor.TaskExecutor {
362-
// Try to initialize Docker executor first with logging services
368+
func initializeTaskExecutor(config *executor.Config, log *logger.Logger, logManager logging.LogManager) executor.TaskExecutor {
369+
// Try to initialize Docker executor first with log manager
363370
var dockerExecutor *executor.Executor
364371
var executorErr error
365-
if streamingService != nil && logStorage != nil {
366-
dockerExecutor, executorErr = executor.NewExecutorWithLogging(config, log.Logger, streamingService, logStorage)
372+
if logManager != nil {
373+
dockerExecutor, executorErr = executor.NewExecutorWithLogging(config, log.Logger, logManager)
367374
log.Info("initializing Docker executor with log streaming enabled")
368375
} else {
369376
dockerExecutor, executorErr = executor.NewExecutor(config, log.Logger)

internal/api/routes/routes.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ import (
1818
)
1919

2020
func Setup(router *gin.Engine, cfg *config.Config, log *logger.Logger, dbConn *database.Connection, repos *database.Repositories, authService *auth.Service, taskExecutionService *services.TaskExecutionService, taskExecutorService *services.TaskExecutorService, workerManager worker.WorkerManager) {
21-
SetupWithLogging(router, cfg, log, dbConn, repos, authService, taskExecutionService, taskExecutorService, workerManager, nil, nil)
21+
SetupWithLogging(router, cfg, log, dbConn, repos, authService, taskExecutionService, taskExecutorService, workerManager, nil)
2222
}
2323

24-
func SetupWithLogging(router *gin.Engine, cfg *config.Config, log *logger.Logger, dbConn *database.Connection, repos *database.Repositories, authService *auth.Service, taskExecutionService *services.TaskExecutionService, taskExecutorService *services.TaskExecutorService, workerManager worker.WorkerManager, streamingService logging.StreamingService, logStorage logging.LogStorage) {
24+
func SetupWithLogging(router *gin.Engine, cfg *config.Config, log *logger.Logger, dbConn *database.Connection, repos *database.Repositories, authService *auth.Service, taskExecutionService *services.TaskExecutionService, taskExecutorService *services.TaskExecutorService, workerManager worker.WorkerManager, logManager logging.LogManager) {
2525
setupMiddleware(router, cfg, log)
26-
setupRoutesWithLogging(router, cfg, log, dbConn, repos, authService, taskExecutionService, taskExecutorService, workerManager, streamingService, logStorage)
26+
setupRoutesWithLogging(router, cfg, log, dbConn, repos, authService, taskExecutionService, taskExecutorService, workerManager, logManager)
2727
}
2828

2929
func setupMiddleware(router *gin.Engine, cfg *config.Config, log *logger.Logger) {
@@ -35,7 +35,7 @@ func setupMiddleware(router *gin.Engine, cfg *config.Config, log *logger.Logger)
3535
router.Use(middleware.ErrorHandler())
3636
}
3737

38-
func setupRoutesWithLogging(router *gin.Engine, cfg *config.Config, log *logger.Logger, dbConn *database.Connection, repos *database.Repositories, authService *auth.Service, taskExecutionService *services.TaskExecutionService, taskExecutorService *services.TaskExecutorService, workerManager worker.WorkerManager, streamingService logging.StreamingService, logStorage logging.LogStorage) {
38+
func setupRoutesWithLogging(router *gin.Engine, cfg *config.Config, log *logger.Logger, dbConn *database.Connection, repos *database.Repositories, authService *auth.Service, taskExecutionService *services.TaskExecutionService, taskExecutorService *services.TaskExecutorService, workerManager worker.WorkerManager, logManager logging.LogManager) {
3939
healthHandler := handlers.NewHealthHandler()
4040

4141
// Add health checks for different components
@@ -117,10 +117,10 @@ func setupRoutesWithLogging(router *gin.Engine, cfg *config.Config, log *logger.
117117
executionHandler := handlers.NewTaskExecutionHandler(repos.Tasks, repos.TaskExecutions, taskExecutionService, log.Logger)
118118
taskValidation := middleware.TaskValidation(log.Logger)
119119

120-
// Log management endpoints (only create if logging services are available)
120+
// Log management endpoints (only create if log manager is available)
121121
var logHandler *handlers.LogHandler
122-
if streamingService != nil && logStorage != nil {
123-
logHandler = handlers.NewLogHandler(repos.Tasks, repos.TaskExecutions, streamingService, logStorage, log.Logger)
122+
if logManager != nil {
123+
logHandler = handlers.NewLogHandler(repos.Tasks, repos.TaskExecutions, logManager.GetStreamingService(), logManager.GetLogStorage(), log.Logger)
124124
}
125125

126126
// Use different rate limits for test vs production

0 commit comments

Comments
 (0)