-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgeneric_config.py
More file actions
144 lines (123 loc) · 5.04 KB
/
generic_config.py
File metadata and controls
144 lines (123 loc) · 5.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
from dotenv import load_dotenv
import os
from enum import Enum
load_dotenv()
class ServiceType(str, Enum):
"""Supported service types for distributed processing"""
PDF_DOCLING = "pdf_docling"
IMAGE_PROCESSING = "image_processing"
TEXT_ANALYSIS = "text_analysis"
AUDIO_TRANSCRIPTION = "audio_transcription"
VIDEO_PROCESSING = "video_processing"
# Add more as needed
class NatsConfig(BaseModel):
"""Generic NATS configuration supporting multiple service types"""
url: str = os.getenv("NATS_URL", "nats://localhost:4222")
token: Optional[str] = os.getenv("NATS_TOKEN")
# Connection settings
connect_timeout: int = int(os.getenv("NATS_CONNECT_TIMEOUT", "10"))
max_reconnect_attempts: int = int(os.getenv("NATS_MAX_RECONNECT_ATTEMPTS", "10"))
# Stream settings
max_payload_size: int = 8 * 1024 * 1024 # 8MB default
max_stream_size: int = 1024 * 1024 * 1024 # 1GB default
@property
def connection_url(self) -> str:
"""Get connection URL with token if provided"""
if self.token:
parts = self.url.replace("nats://", "").split(":")
host = parts[0]
port = parts[1] if len(parts) > 1 else "4222"
return f"nats://{self.token}@{host}:{port}"
return self.url
class ServiceConfig(BaseModel):
"""Configuration for a specific service type"""
service_type: ServiceType
stream_name: str
subject_prefix: str
worker_queue_group: str # For load balancing workers
# Service-specific settings
max_processing_time: int = 600 # 10 minutes default
max_retries: int = 3
retry_delay: int = 5
def get_process_subject(self, request_id: str = "*") -> str:
"""Get the subject for processing requests"""
return f"{self.subject_prefix}.process.{request_id}"
def get_result_subject(self, request_id: str = "*") -> str:
"""Get the subject for processing results"""
return f"{self.subject_prefix}.result.{request_id}"
def get_status_subject(self, request_id: str = "*") -> str:
"""Get the subject for status updates"""
return f"{self.subject_prefix}.status.{request_id}"
class GenericDistributedConfig(BaseModel):
"""Main configuration for the generic distributed processing system"""
nats: NatsConfig = NatsConfig()
# Service configurations
services: Dict[ServiceType, ServiceConfig] = {
ServiceType.PDF_DOCLING: ServiceConfig(
service_type=ServiceType.PDF_DOCLING,
stream_name="PDF_PROCESSING",
subject_prefix="pdf.docling",
worker_queue_group="pdf_docling_workers"
),
ServiceType.IMAGE_PROCESSING: ServiceConfig(
service_type=ServiceType.IMAGE_PROCESSING,
stream_name="IMAGE_PROCESSING",
subject_prefix="image.process",
worker_queue_group="image_workers"
),
ServiceType.TEXT_ANALYSIS: ServiceConfig(
service_type=ServiceType.TEXT_ANALYSIS,
stream_name="TEXT_ANALYSIS",
subject_prefix="text.analyze",
worker_queue_group="text_workers"
),
ServiceType.AUDIO_TRANSCRIPTION: ServiceConfig(
service_type=ServiceType.AUDIO_TRANSCRIPTION,
stream_name="AUDIO_TRANSCRIPTION",
subject_prefix="audio.transcribe",
worker_queue_group="audio_workers"
)
}
def get_service_config(self, service_type: ServiceType) -> ServiceConfig:
"""Get configuration for a specific service type"""
return self.services[service_type]
def get_all_stream_names(self) -> List[str]:
"""Get all stream names for setup"""
return [config.stream_name for config in self.services.values()]
def get_all_subjects(self) -> List[str]:
"""Get all subjects that need to be configured"""
subjects = []
for config in self.services.values():
subjects.extend([
f"{config.subject_prefix}.process.*",
f"{config.subject_prefix}.result.*",
f"{config.subject_prefix}.status.*"
])
return subjects
# Global config instance
config = GenericDistributedConfig()
# Convenience functions for backward compatibility
def get_pdf_docling_config() -> ServiceConfig:
"""Get PDF Docling service configuration"""
return config.get_service_config(ServiceType.PDF_DOCLING)
# Legacy config for existing code
class NatsConfig_Legacy(BaseModel):
"""Legacy NATS config for backward compatibility"""
url: str = config.nats.url
token: Optional[str] = config.nats.token
stream_name: str = "DOCUMENTS" # Old default
subject_prefix: str = "docs" # Old default
@property
def connection_url(self) -> str:
return config.nats.connection_url
# Export both for compatibility
__all__ = [
"ServiceType",
"NatsConfig",
"ServiceConfig",
"GenericDistributedConfig",
"config",
"get_pdf_docling_config"
]