Skip to content

avtomatika-ai/avtomatika-worker

Avtomatika Worker SDK

Official SDK for building workers compatible with the Avtomatika orchestrator. It automates low-level tasks: polling, heartbeats, S3 payload management, and graceful shutdown.

🚀 Key Features

  • Language: Python 3.11+
  • Protocol: Based on RXON (Reverse Axon Protocol) for Hierarchical Logic Networks (Holarchy).
  • Communication Model:
    • PULL: Workers poll tasks from orchestrators (works behind NAT/Firewall).
    • WebSocket: Real-time command channel (cancellation, custom commands).
  • Zero Trust Security:
    • Mandatory HMAC SHA256 signing for all messages using WORKER_TOKEN.
    • Identity Chain and Origin Worker ID support for provenance tracking.
    • Replay protection with timestamp validation.
  • Traffic Optimization:
    • 3-Tier Skills: Supported (catalog), Available (dynamic limits), and Hot (cached).
    • Stable Hashing: Sends full skill catalog only when changed, using skills_hash for light heartbeats.
  • S3 Streaming: High-performance data transfer using obstore. No OOM on large files.
  • Hardware Awareness: Built-in monitoring for CPU, RAM, and NVIDIA GPUs (via psutil and GPUtil).

🛡 Resilience & Connectivity

  • Independent Managers: Connection to each orchestrator is managed by a separate background task. One server failure doesn't affect others.
  • Infinite Retries: Exponential backoff for registration if the orchestrator is unavailable.
  • Non-blocking Startup: The worker starts polling as soon as it registers successfully with at least one orchestrator.
  • Graceful Shutdown: Handles SIGTERM and SIGINT properly, waiting for active tasks to finish.

🛠 Installation

pip install avtomatika-worker[s3,pydantic]

For development:

pip install -e .[test,dev]

💻 Quick Start

from avtomatika_worker import Worker, TaskFiles

worker = Worker()

@worker.skill("hello_world")
async def my_skill(params: dict, files: TaskFiles):
    """Simple skill that says hello."""
    return {"message": f"Hello, {params.get('name', 'World')}!"}

@worker.on_command("reboot")
async def handle_reboot(command):
    print("Rebooting worker...")

if __name__ == "__main__":
    worker.run()

⚙️ Configuration

Controlled via environment variables:

  • ORCHESTRATORS_CONFIG: JSON list of orchestrator configs (URLs, priorities, weights).
  • ORCHESTRATOR_URL: Simple fallback if only one orchestrator is used (default: http://localhost:8080).
  • WORKER_TOKEN: Secret for HMAC signing (Zero Trust).
  • S3_ENDPOINT_URL, S3_ACCESS_KEY, S3_SECRET_KEY, S3_DEFAULT_BUCKET: Storage settings for large payloads.
  • STRICT_EVENT_VALIDATION: (Default: True) Validates events against schemas before emitting.
  • LOG_LEVEL: Logging verbosity (DEBUG, INFO, WARNING, ERROR).
  • MAX_CONCURRENT_TASKS: Global limit for concurrent task execution.

📜 License

Mozilla Public License v. 2.0.

About

Official Python SDK for building workers and IoT devices compatible with the Avtomatika orchestrator. Automates polling, S3 payload offloading, and heartbeats.

Topics

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Contributors