Skip to content

Commit 7d7ccce

Browse files
author
Tom Softreck
committed
update
1 parent 88f3a50 commit 7d7ccce

4 files changed

Lines changed: 467 additions & 35 deletions

File tree

src/dialogchain/core/connector.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
"""
2+
Connector management for DialogChain engine.
3+
4+
This module handles the creation and management of source and destination connectors.
5+
"""
6+
7+
from typing import Dict, Any, Optional, Type, List, Union
8+
import importlib
9+
import logging
10+
11+
from dialogchain.connectors import Source, Destination
12+
from dialogchain.connectors.sources import (
13+
RTSPSource, TimerSource, FileSource, IMAPSource, GRPCSource
14+
)
15+
from dialogchain.connectors.destinations import (
16+
HTTPDestination, EmailDestination, FileDestination as FileDest,
17+
LogDestination, MQTTDestination, GRPCDestination
18+
)
19+
20+
logger = logging.getLogger(__name__)
21+
22+
23+
class ConnectorManager:
24+
"""Manages the creation and lifecycle of connectors."""
25+
26+
# Default source mappings
27+
DEFAULT_SOURCES = {
28+
'rtsp': RTSPSource,
29+
'timer': TimerSource,
30+
'file': FileSource,
31+
'imap': IMAPSource,
32+
'grpc': GRPCSource,
33+
}
34+
35+
# Default destination mappings
36+
DEFAULT_DESTINATIONS = {
37+
'http': HTTPDestination,
38+
'https': HTTPDestination,
39+
'smtp': EmailDestination,
40+
'file': FileDest,
41+
'log': LogDestination,
42+
'mqtt': MQTTDestination,
43+
'grpc': GRPCDestination,
44+
}
45+
46+
def __init__(
47+
self,
48+
source_types: Optional[Dict[str, Type[Source]]] = None,
49+
destination_types: Optional[Dict[str, Type[Destination]]] = None
50+
):
51+
"""Initialize the connector manager.
52+
53+
Args:
54+
source_types: Additional or override source types
55+
destination_types: Additional or override destination types
56+
"""
57+
self.source_types = {**self.DEFAULT_SOURCES, **(source_types or {})}
58+
self.destination_types = {**self.DEFAULT_DESTINATIONS, **(destination_types or {})}
59+
60+
def create_source(self, uri: str) -> Source:
61+
"""Create a source connector from a URI.
62+
63+
Args:
64+
uri: Source URI (e.g., 'timer:5s', 'imap://user:pass@server')
65+
66+
Returns:
67+
Configured Source instance
68+
69+
Raises:
70+
ValueError: If the URI scheme is not recognized
71+
"""
72+
if '://' not in uri and ':' in uri:
73+
# Handle URIs without slashes (e.g., 'timer:5s')
74+
scheme, path = uri.split(':', 1)
75+
uri = f"{scheme}://{path}"
76+
77+
parsed = self._parse_uri(uri)
78+
scheme = parsed['scheme']
79+
80+
if scheme not in self.source_types:
81+
raise ValueError(f"Unsupported source type: {scheme}")
82+
83+
source_class = self.source_types[scheme]
84+
return source_class(uri)
85+
86+
def create_destination(self, uri: str) -> Destination:
87+
"""Create a destination connector from a URI.
88+
89+
Args:
90+
uri: Destination URI (e.g., 'http://example.com', 'log:info')
91+
92+
Returns:
93+
Configured Destination instance
94+
95+
Raises:
96+
ValueError: If the URI scheme is not recognized
97+
"""
98+
if '://' not in uri and ':' in uri:
99+
# Handle URIs without slashes (e.g., 'log:info')
100+
scheme, path = uri.split(':', 1)
101+
uri = f"{scheme}://{path}"
102+
103+
parsed = self._parse_uri(uri)
104+
scheme = parsed['scheme']
105+
106+
if scheme not in self.destination_types:
107+
raise ValueError(f"Unsupported destination type: {scheme}")
108+
109+
dest_class = self.destination_types[scheme]
110+
return dest_class(uri)
111+
112+
def _parse_uri(self, uri: str) -> Dict[str, str]:
113+
"""Parse a URI into its components.
114+
115+
Args:
116+
uri: The URI to parse
117+
118+
Returns:
119+
Dictionary with parsed URI components (scheme, netloc, path, etc.)
120+
"""
121+
if '://' not in uri and ':' in uri:
122+
# Handle URIs without slashes (e.g., 'timer:5s')
123+
scheme, path = uri.split(':', 1)
124+
return {
125+
'scheme': scheme,
126+
'netloc': '',
127+
'path': path,
128+
'full': uri
129+
}
130+
131+
# Use urllib.parse for standard URIs
132+
from urllib.parse import urlparse
133+
parsed = urlparse(uri)
134+
135+
return {
136+
'scheme': parsed.scheme,
137+
'netloc': parsed.netloc,
138+
'path': parsed.path.lstrip('/'),
139+
'params': parsed.params,
140+
'query': parsed.query,
141+
'fragment': parsed.fragment,
142+
'full': uri
143+
}

src/dialogchain/core/engine.py

Lines changed: 86 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,50 @@
22

33
import asyncio
44
import signal
5-
from typing import Dict, List, Any, Optional, AsyncIterator
6-
from dataclasses import dataclass, field
75
import logging
6+
from typing import Dict, List, Any, Optional, AsyncIterator, Callable, Awaitable
7+
from dataclasses import dataclass, field
88

99
from ..exceptions import DialogChainError, ConfigurationError
10-
from .routes import Route, RouteConfig
11-
from .tasks import TaskManager
12-
from ..connectors import Source, Destination
13-
from ..processors import Processor
10+
from .route import Route
11+
from .connector import ConnectorManager
12+
from .processor import MessageProcessor
1413

1514
logger = logging.getLogger(__name__)
1615

17-
@dataclass
16+
1817
class DialogChainEngine:
1918
"""Main engine class for DialogChain processing."""
2019

21-
config: Dict[str, Any]
22-
routes: List[Route] = field(default_factory=list)
23-
tasks: List[asyncio.Task] = field(default_factory=list)
24-
_is_running: bool = False
25-
verbose: bool = False
26-
27-
def __post_init__(self):
28-
"""Initialize the engine with configuration."""
29-
self.task_manager = TaskManager()
20+
def __init__(
21+
self,
22+
config: Dict[str, Any],
23+
verbose: bool = False,
24+
connector_manager: Optional[ConnectorManager] = None,
25+
processor_factory: Optional[Callable[[Dict[str, Any]], Awaitable[Any]]] = None
26+
):
27+
"""Initialize the DialogChain engine.
28+
29+
Args:
30+
config: Engine configuration dictionary
31+
verbose: Enable verbose logging
32+
connector_manager: Optional pre-configured connector manager
33+
processor_factory: Optional custom processor factory function
34+
"""
35+
self.config = config
36+
self.verbose = verbose
37+
self._is_running = False
38+
self._routes: List[Route] = []
39+
self._tasks: List[asyncio.Task] = []
40+
41+
# Set up logging
3042
self._setup_logging()
43+
44+
# Initialize components
45+
self.connector_manager = connector_manager or ConnectorManager()
46+
self.processor_factory = processor_factory or self._create_processor
47+
48+
# Load and validate configuration
3149
self._load_config()
3250

3351
def _setup_logging(self):
@@ -37,6 +55,7 @@ def _setup_logging(self):
3755
level=log_level,
3856
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
3957
)
58+
logger.setLevel(log_level)
4059

4160
def _load_config(self):
4261
"""Load and validate configuration."""
@@ -46,13 +65,32 @@ def _load_config(self):
4665
# Load routes from config
4766
for route_config in self.config.get('routes', []):
4867
try:
49-
route = Route.from_config(route_config)
50-
self.routes.append(route)
68+
route = self._create_route(route_config)
69+
self._routes.append(route)
5170
logger.info(f"Loaded route: {route.name}")
5271
except Exception as e:
53-
logger.error(f"Failed to load route: {e}")
72+
logger.error(f"Failed to load route: {e}", exc_info=True)
5473
if self.verbose:
55-
logger.exception("Route loading error")
74+
raise
75+
76+
async def _create_processor(self, config: Dict[str, Any]) -> Any:
77+
"""Create a processor from configuration.
78+
79+
This is a default implementation that can be overridden by providing
80+
a custom processor_factory to the engine constructor.
81+
"""
82+
# Import here to avoid circular imports
83+
from ..processors import create_processor
84+
return create_processor(config)
85+
86+
def _create_route(self, config: Dict[str, Any]) -> Route:
87+
"""Create a route from configuration."""
88+
return Route.from_config(
89+
config,
90+
create_source=self.connector_manager.create_source,
91+
create_processor=self.processor_factory,
92+
create_destination=self.connector_manager.create_destination
93+
)
5694

5795
async def start(self):
5896
"""Start the engine and all routes."""
@@ -63,16 +101,23 @@ async def start(self):
63101
self._is_running = True
64102
logger.info("Starting DialogChain engine...")
65103

66-
try:
67-
for route in self.routes:
68-
task = asyncio.create_task(self._run_route(route))
69-
self.tasks.append(task)
70-
71-
logger.info("Engine started successfully")
72-
except Exception as e:
73-
logger.error(f"Failed to start engine: {e}")
74-
await self.stop()
75-
raise
104+
# Set up signal handlers for graceful shutdown
105+
loop = asyncio.get_running_loop()
106+
for sig in (signal.SIGINT, signal.SIGTERM):
107+
loop.add_signal_handler(sig, lambda: asyncio.create_task(self.stop()))
108+
109+
# Start all routes
110+
for route in self._routes:
111+
try:
112+
await route.start()
113+
self._tasks.append(asyncio.create_task(route._run_loop()))
114+
logger.info(f"Started route: {route.name}")
115+
except Exception as e:
116+
logger.error(f"Failed to start route {route.name}: {e}", exc_info=True)
117+
if self.verbose:
118+
raise
119+
120+
logger.info("DialogChain engine started successfully")
76121

77122
async def stop(self):
78123
"""Stop the engine and clean up resources."""
@@ -83,15 +128,21 @@ async def stop(self):
83128
self._is_running = False
84129

85130
# Cancel all running tasks
86-
for task in self.tasks:
131+
for task in self._tasks:
87132
if not task.done():
88133
task.cancel()
89134

90-
if self.tasks:
91-
await asyncio.gather(*self.tasks, return_exceptions=True)
135+
# Stop all routes
136+
stop_tasks = [route.stop() for route in self._routes]
137+
if stop_tasks:
138+
await asyncio.gather(*stop_tasks, return_exceptions=True)
92139

93-
self.tasks.clear()
94-
logger.info("Engine stopped")
140+
# Wait for tasks to complete
141+
if self._tasks:
142+
await asyncio.gather(*self._tasks, return_exceptions=True)
143+
144+
self._tasks.clear()
145+
logger.info("DialogChain engine stopped")
95146

96147
async def _run_route(self, route: Route):
97148
"""Run a single route."""

src/dialogchain/core/processor.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
"""
2+
Message processing for DialogChain engine.
3+
4+
This module handles the processing of messages through a series of processors.
5+
"""
6+
7+
from typing import Any, Dict, List, Optional, Callable, Awaitable
8+
import asyncio
9+
import logging
10+
11+
from dialogchain.processors import Processor
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class MessageProcessor:
17+
"""Handles processing of messages through a series of processors."""
18+
19+
def __init__(self, processors: List[Processor] = None):
20+
"""Initialize the message processor.
21+
22+
Args:
23+
processors: List of processors to apply to messages
24+
"""
25+
self.processors = processors or []
26+
27+
async def process(self, message: Any) -> Any:
28+
"""Process a message through all processors.
29+
30+
Args:
31+
message: The message to process
32+
33+
Returns:
34+
The processed message, or None if the message was filtered out
35+
"""
36+
processed_message = message
37+
38+
for processor in self.processors:
39+
try:
40+
processed_message = await processor.process(processed_message)
41+
if processed_message is None:
42+
logger.debug("Message was filtered out by processor")
43+
return None
44+
except Exception as e:
45+
logger.error(f"Error in processor {processor.__class__.__name__}: {e}", exc_info=True)
46+
raise
47+
48+
return processed_message
49+
50+
@classmethod
51+
def from_config(cls, config: Dict[str, Any], create_processor: Callable) -> 'MessageProcessor':
52+
"""Create a message processor from a configuration.
53+
54+
Args:
55+
config: Processor configuration
56+
create_processor: Function to create a processor from a config
57+
58+
Returns:
59+
Configured MessageProcessor instance
60+
"""
61+
processors = []
62+
63+
for proc_config in config.get('processors', []):
64+
try:
65+
processor = create_processor(proc_config)
66+
if processor:
67+
processors.append(processor)
68+
except Exception as e:
69+
logger.error(f"Error creating processor: {e}", exc_info=True)
70+
raise
71+
72+
return cls(processors=processors)

0 commit comments

Comments
 (0)