A scalable, production-ready distributed task processing system built with Go, PostgreSQL, and Redis.
- Horizontal Scaling: Deploy multiple worker processes across different machines
- Priority Queue: Redis-based job queuing with priority support and atomic operations
- Persistent Storage: PostgreSQL for reliable job storage and worker management
- REST API: Comprehensive HTTP API for job submission, monitoring, and management
- Built-in Handlers: Ready-to-use job processors for common tasks
- Retry Logic: Automatic retry with exponential backoff for failed jobs
- Worker Health: Heartbeat monitoring and automatic worker registration
- Graceful Shutdown: Safe handling of in-flight jobs during shutdown
- Real-time Stats: Job processing metrics and queue monitoring
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ API Gateway │ │ Redis │ │ PostgreSQL │
│ (Port 8080) │◄──►│ (Queue) │ │ (Storage) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
│ ┌─────────────────┐ │
└──────────────►│ Worker 1 │◄─────────────┘
└─────────────────┘
┌─────────────────┐
│ Worker 2 │◄─────────────┘
└─────────────────┘
┌─────────────────┐
│ Worker N │◄─────────────┘
└─────────────────┘
# Install dependencies
brew install postgresql redis go
# Start services
brew services start postgresql
brew services start redis
# Create database
createdb distributed_tasks# Build all components
make build
# Start all services in background
make start-all
# Or start services individually
make start-worker # Terminal 1
make start-api # Terminal 2# Submit a test job
curl -X POST http://localhost:8080/api/jobs \
-H "Content-Type: application/json" \
-d '{
"type": "math",
"payload": {
"operation": "multiply",
"a": 6,
"b": 7
},
"priority": 10,
"queue_name": "default"
}'
# Check job status
curl http://localhost:8080/api/jobs/{job_id}
# Monitor queue stats
curl http://localhost:8080/api/statsThe system includes several production-ready job handlers:
Echoes back the job payload for testing and debugging.
{
"type": "echo",
"payload": {
"message": "Hello, World!",
"data": [1, 2, 3]
}
}Performs arithmetic operations with proper error handling.
{
"type": "math",
"payload": {
"operation": "add|subtract|multiply|divide",
"a": 10,
"b": 5
}
}Makes HTTP requests with configurable methods and headers.
{
"type": "http_request",
"payload": {
"url": "https://api.example.com/data",
"method": "GET",
"headers": {
"Authorization": "Bearer token123"
}
}
}Simulates work with configurable delays (useful for testing and rate limiting).
{
"type": "delay",
"payload": {
"duration": "5s",
"message": "Processing delayed task"
}
}Generates greetings in multiple languages.
{
"type": "hello_world",
"payload": {
"name": "Alice",
"language": "spanish"
}
}Always fails for testing error handling and retry logic.
{
"type": "fail",
"payload": {
"reason": "Testing failure scenarios",
"retryable": true
}
}POST /api/jobs- Submit a new jobGET /api/jobs/{id}- Get job detailsPUT /api/jobs/{id}- Update jobDELETE /api/jobs/{id}- Delete jobGET /api/jobs- List jobs with filtering
GET /api/queues- List all queuesGET /api/queues/{name}- Get queue detailsDELETE /api/queues/{name}- Purge queue
GET /api/workers- List active workersGET /api/workers/{id}- Get worker details
GET /health- Health checkGET /api/stats- System statisticsGET /docs- API documentation
Configure the system using environment variables:
# Database
export DB_HOST=localhost
export DB_PORT=5432
export DB_NAME=distributed_tasks
export DB_USER=postgres
export DB_PASSWORD=""
# Redis
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=""
# Worker Configuration
export WORKER_MAX_CONCURRENCY=10
export WORKER_QUEUES=default,high_priority,background
export WORKER_POLL_INTERVAL=1s
export WORKER_JOB_TIMEOUT=30m
export WORKER_MAX_RETRIES=3
export WORKER_RETRY_DELAY=5s
# API Gateway
export API_HOST=0.0.0.0
export API_PORT=8080
export API_READ_TIMEOUT=30s
export API_WRITE_TIMEOUT=30s
# Logging
export LOG_LEVEL=info
export LOG_FORMAT=json# Start multiple workers with different concurrency
WORKER_MAX_CONCURRENCY=5 ./bin/worker &
WORKER_MAX_CONCURRENCY=3 ./bin/worker &# Machine 1 - High priority jobs
WORKER_QUEUES=high_priority,default ./bin/worker
# Machine 2 - Background processing
WORKER_QUEUES=background,default ./bin/worker
# Machine 3 - Mixed workload
WORKER_QUEUES=default,high_priority,background ./bin/workerFROM golang:1.19-alpine AS builder
WORKDIR /app
COPY . .
RUN make build
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/bin/worker .
COPY --from=builder /app/bin/api-gateway .
CMD ["./worker"]distributed/
├── cmd/ # Application entry points
│ ├── api-gateway/ # REST API server
│ └── worker/ # Worker process
├── internal/ # Internal packages
│ ├── api/ # HTTP handlers and routes
│ ├── config/ # Configuration management
│ ├── health/ # Health checks
│ ├── metrics/ # Metrics collection
│ ├── queue/ # Redis queue implementation
│ ├── storage/ # PostgreSQL storage
│ └── worker/ # Worker logic and handlers
├── pkg/ # Public packages
│ ├── errors/ # Error handling utilities
│ ├── logger/ # Structured logging
│ └── models/ # Data models and types
├── deployments/ # Deployment configurations
├── docs/ # Documentation
└── monitoring/ # Monitoring and observability
package main
import (
"context"
"time"
"eskev/distributed/internal/worker"
"eskev/distributed/pkg/models"
)
// CustomHandler implements a custom job type
type CustomHandler struct{}
func (h *CustomHandler) GetJobType() string {
return "image_resize"
}
func (h *CustomHandler) Process(ctx context.Context, job *models.Job) (*worker.JobResult, error) {
// Extract parameters from job.Payload
imageURL := job.Payload["image_url"].(string)
width := int(job.Payload["width"].(float64))
height := int(job.Payload["height"].(float64))
// Process the image (your custom logic here)
outputURL, err := resizeImage(imageURL, width, height)
if err != nil {
return &worker.JobResult{
Success: false,
Error: err.Error(),
Retryable: true,
}, nil
}
return &worker.JobResult{
Success: true,
Result: map[string]interface{}{
"input_url": imageURL,
"output_url": outputURL,
"width": width,
"height": height,
"processed_at": time.Now(),
},
}, nil
}
func (h *CustomHandler) GetTimeout() time.Duration {
return 5 * time.Minute
}
func (h *CustomHandler) IsRetryable(err error) bool {
// Retry on network errors, not on invalid image formats
return !strings.Contains(err.Error(), "invalid format")
}
// Register the handler
func main() {
w := worker.NewWorker(cfg, store, queue, log)
w.RegisterHandler(&CustomHandler{})
w.Start()
}The system provides comprehensive monitoring:
# Worker Statistics
curl http://localhost:8080/api/workers/{worker_id}/stats
# Queue Depth Monitoring
curl http://localhost:8080/api/queues/default/size
# Job Processing Rates
curl http://localhost:8080/api/stats/processing_rates
# Failed Job Analysis
curl "http://localhost:8080/api/jobs?status=failed&limit=10"- Worker Concurrency: Adjust
WORKER_MAX_CONCURRENCYbased on CPU cores and job types - Queue Strategy: Use multiple queues for different job priorities
- Database: Optimize PostgreSQL for your job volume
- Redis: Configure Redis persistence and memory settings
- Timeouts: Set appropriate timeouts for different job types
- Deploy multiple API gateway instances behind a load balancer
- Run workers across multiple machines for fault tolerance
- Use Redis Cluster for queue high availability
- Set up PostgreSQL replication for data durability
- Metrics: Prometheus + Grafana
- Logging: ELK Stack or structured JSON logs
- Alerting: Monitor queue depth, worker health, job failure rates
- Tracing: Add distributed tracing for complex job workflows
make build # Build all binaries
make start-worker # Start worker (blocking)
make start-api # Start API gateway (blocking)
make start-all # Start all services in background
make stop # Stop background services
make clean # Clean build artifacts
make help # Show all commandsMIT License - see LICENSE file for details.
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
For questions and support, please open an issue on GitHub.