Skip to content

Commit a5eaefd

Browse files
author
Tom Softreck
committed
update
1 parent 9496a69 commit a5eaefd

File tree

3 files changed

+229
-200
lines changed

3 files changed

+229
-200
lines changed

src/dialogchain/engine/connector.py

Lines changed: 78 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,11 @@
44
This module handles the creation and management of source and destination connectors.
55
"""
66

7-
from typing import Dict, Any, Optional, Type, Union
8-
import logging
9-
import importlib
10-
from urllib.parse import urlparse, parse_qs
7+
from typing import Dict, Any, Optional, Type, Union, List
8+
from urllib.parse import urlparse, parse_qs, ParseResult
119

1210
from ..connectors import Source, Destination, ConnectorError
1311

14-
logger = logging.getLogger(__name__)
15-
1612
class ConnectorManager:
1713
"""Manages the creation and lifecycle of connectors."""
1814

@@ -63,7 +59,6 @@ def register_source(self, scheme: str, source_class: Type[Source]) -> None:
6359
source_class: Source class to register
6460
"""
6561
self.source_types[scheme] = source_class
66-
logger.debug(f"Registered source connector: {scheme} -> {source_class.__name__}")
6762

6863
def register_destination(self, scheme: str, dest_class: Type[Destination]) -> None:
6964
"""Register a destination connector class.
@@ -73,7 +68,6 @@ def register_destination(self, scheme: str, dest_class: Type[Destination]) -> No
7368
dest_class: Destination class to register
7469
"""
7570
self.destination_types[scheme] = dest_class
76-
logger.debug(f"Registered destination connector: {scheme} -> {dest_class.__name__}")
7771

7872
def create_source(self, config: Union[str, Dict[str, Any]]) -> Source:
7973
"""Create a source connector from a URI or config dictionary.
@@ -90,10 +84,9 @@ def create_source(self, config: Union[str, Dict[str, Any]]) -> Source:
9084
"""
9185
if isinstance(config, str):
9286
return self._create_source_from_uri(config)
93-
elif isinstance(config, dict):
87+
if isinstance(config, dict):
9488
return self._create_source_from_config(config)
95-
else:
96-
raise ValueError(f"Invalid source config type: {type(config)}")
89+
raise ValueError(f"Invalid source config type: {type(config)}")
9790

9891
def create_destination(self, config: Union[str, Dict[str, Any]]) -> Destination:
9992
"""Create a destination connector from a URI or config dictionary.
@@ -110,171 +103,98 @@ def create_destination(self, config: Union[str, Dict[str, Any]]) -> Destination:
110103
"""
111104
if isinstance(config, str):
112105
return self._create_destination_from_uri(config)
113-
elif isinstance(config, dict):
106+
if isinstance(config, dict):
114107
return self._create_destination_from_config(config)
115-
else:
116-
raise ValueError(f"Invalid destination config type: {type(config)}")
108+
raise ValueError(f"Invalid destination config type: {type(config)}")
117109

118110
def _create_source_from_uri(self, uri: str) -> Source:
119-
"""Create a source connector from a URI.
120-
121-
Args:
122-
uri: Source URI (e.g., 'timer:5s', 'imap://user:pass@server')
123-
124-
Returns:
125-
Configured Source instance
126-
"""
127-
if '://' not in uri and ':' in uri:
128-
# Handle URIs without slashes (e.g., 'timer:5s')
129-
scheme, path = uri.split(':', 1)
130-
uri = f"{scheme}://{path}"
131-
132-
parsed = self._parse_uri(uri)
133-
scheme = parsed['scheme']
134-
135-
if scheme not in self.source_types:
136-
raise ValueError(f"Unsupported source type: {scheme}")
137-
138-
source_class = self.source_types[scheme]
139-
try:
140-
return source_class(uri, **parsed['options'])
141-
except Exception as e:
142-
raise ConnectorError(f"Failed to create source {scheme}: {e}") from e
111+
"""Create a source from a URI string."""
112+
config = self._parse_uri_to_config(uri)
113+
return self._create_source_from_config(config)
143114

144115
def _create_destination_from_uri(self, uri: str) -> Destination:
145-
"""Create a destination connector from a URI.
146-
147-
Args:
148-
uri: Destination URI (e.g., 'http://example.com', 'file:/path/to/file')
149-
150-
Returns:
151-
Configured Destination instance
152-
"""
153-
parsed = self._parse_uri(uri)
154-
scheme = parsed['scheme']
116+
"""Create a destination from a URI string."""
117+
config = self._parse_uri_to_config(uri)
118+
return self._create_destination_from_config(config)
119+
120+
def _parse_uri_to_config(self, uri: str) -> Dict[str, Any]:
121+
"""Parse a URI into a configuration dictionary."""
122+
parsed = urlparse(uri)
123+
scheme = parsed.scheme.lower()
155124

156-
if scheme not in self.destination_types:
157-
raise ValueError(f"Unsupported destination type: {scheme}")
125+
# Create base config
126+
config = {
127+
'uri': uri,
128+
'scheme': scheme,
129+
'path': parsed.path or ''
130+
}
158131

159-
dest_class = self.destination_types[scheme]
160-
try:
161-
return dest_class(uri, **parsed['options'])
162-
except Exception as e:
163-
raise ConnectorError(f"Failed to create destination {scheme}: {e}") from e
132+
# Add query parameters
133+
if parsed.query:
134+
query_params = parse_qs(parsed.query, keep_blank_values=True)
135+
for key, value in query_params.items():
136+
config[key] = value[0] if len(value) == 1 else value
137+
138+
# Add optional components
139+
if parsed.netloc:
140+
config['netloc'] = parsed.netloc
141+
if parsed.params:
142+
config['params'] = dict(parse_qs(parsed.params))
143+
if parsed.fragment:
144+
config['fragment'] = parsed.fragment
145+
if parsed.username:
146+
config['username'] = parsed.username
147+
if parsed.password:
148+
config['password'] = parsed.password
149+
if parsed.hostname:
150+
config['hostname'] = parsed.hostname
151+
if parsed.port is not None:
152+
config['port'] = parsed.port
153+
154+
return config
164155

165156
def _create_source_from_config(self, config: Dict[str, Any]) -> Source:
166-
"""Create a source connector from a config dictionary.
167-
168-
Args:
169-
config: Source configuration dictionary
170-
171-
Returns:
172-
Configured Source instance
173-
"""
174-
source_type = config.get('type')
175-
if not source_type:
176-
raise ValueError("Source config must include 'type'")
177-
178-
if source_type not in self.source_types:
179-
# Try to dynamically import the source
180-
try:
181-
module_name, _, class_name = source_type.rpartition('.')
182-
if not module_name:
183-
raise ValueError(f"Invalid source type format: {source_type}")
184-
185-
module = importlib.import_module(module_name)
186-
source_class = getattr(module, class_name)
187-
self.register_source(source_type, source_class)
188-
except (ImportError, AttributeError) as e:
189-
raise ValueError(f"Unknown source type: {source_type}") from e
190-
191-
source_class = self.source_types[source_type]
157+
"""Create a source from a config dictionary."""
158+
source_type = self._get_connector_type(config, 'source')
192159
try:
193-
return source_class(**{k: v for k, v in config.items() if k != 'type'})
160+
return self.source_types[source_type](config)
194161
except Exception as e:
195-
raise ConnectorError(f"Failed to create source {source_type}: {e}") from e
162+
raise ConnectorError(f"Failed to create source from config: {e}") from e
196163

197164
def _create_destination_from_config(self, config: Dict[str, Any]) -> Destination:
198-
"""Create a destination connector from a config dictionary.
199-
200-
Args:
201-
config: Destination configuration dictionary
202-
203-
Returns:
204-
Configured Destination instance
205-
"""
206-
dest_type = config.get('type')
207-
if not dest_type:
208-
raise ValueError("Destination config must include 'type'")
209-
210-
if dest_type not in self.destination_types:
211-
# Try to dynamically import the destination
212-
try:
213-
module_name, _, class_name = dest_type.rpartition('.')
214-
if not module_name:
215-
raise ValueError(f"Invalid destination type format: {dest_type}")
216-
217-
module = importlib.import_module(module_name)
218-
dest_class = getattr(module, class_name)
219-
self.register_destination(dest_type, dest_class)
220-
except (ImportError, AttributeError) as e:
221-
raise ValueError(f"Unknown destination type: {dest_type}") from e
222-
223-
dest_class = self.destination_types[dest_type]
165+
"""Create a destination from a config dictionary."""
166+
dest_type = self._get_connector_type(config, 'destination')
224167
try:
225-
return dest_class(**{k: v for k, v in config.items() if k != 'type'})
168+
return self.destination_types[dest_type](config)
226169
except Exception as e:
227-
raise ConnectorError(f"Failed to create destination {dest_type}: {e}") from e
170+
raise ConnectorError(f"Failed to create destination from config: {e}") from e
228171

229-
@staticmethod
230-
def _parse_uri(uri: str) -> Dict[str, Any]:
231-
"""Parse a URI into its components and query parameters.
232-
233-
Args:
234-
uri: URI to parse
235-
236-
Returns:
237-
Dictionary with 'scheme', 'netloc', 'path', 'params', 'query', 'fragment',
238-
and 'options' (query parameters as a dict)
239-
"""
240-
if '://' not in uri and ':' in uri:
241-
# Handle simple scheme:path format
242-
scheme, path = uri.split(':', 1)
243-
return {
244-
'scheme': scheme,
245-
'netloc': '',
246-
'path': path,
247-
'params': '',
248-
'query': '',
249-
'fragment': '',
250-
'options': {}
251-
}
252-
253-
parsed = urlparse(uri)
254-
options = {}
255-
256-
# Parse query parameters
257-
if parsed.query:
258-
query_params = parse_qs(parsed.query, keep_blank_values=True)
259-
options = {k: v[0] if len(v) == 1 else v for k, v in query_params.items()}
172+
def _get_connector_type(self, config: Dict[str, Any], connector_kind: str) -> str:
173+
"""Get the connector type from config and validate it exists."""
174+
if not isinstance(config, dict):
175+
raise ValueError(f"{connector_kind.capitalize()} config must be a dictionary")
260176

261-
# Add username/password from netloc if present
262-
if '@' in parsed.netloc:
263-
auth_part, netloc = parsed.netloc.rsplit('@', 1)
264-
if ':' in auth_part:
265-
username, password = auth_part.split(':', 1)
266-
options['username'] = username
267-
options['password'] = password
177+
connector_type = config.get('type') or config.get('scheme')
178+
if not connector_type:
179+
raise ValueError(f"{connector_kind.capitalize()} config must include 'type' or 'scheme'")
268180

269-
return {
270-
'scheme': parsed.scheme,
271-
'netloc': parsed.netloc,
272-
'path': parsed.path,
273-
'params': parsed.params,
274-
'query': parsed.query,
275-
'fragment': parsed.fragment,
276-
'options': options
277-
}
181+
if connector_type not in getattr(self, f"{connector_kind}_types"):
182+
raise ValueError(f"Unknown {connector_kind} type: {connector_type}")
183+
184+
return connector_type
185+
186+
def get_source_schemes(self) -> List[str]:
187+
"""Get a list of registered source schemes."""
188+
return list(self.source_types.keys())
189+
190+
def get_destination_schemes(self) -> List[str]:
191+
"""Get a list of registered destination schemes."""
192+
return list(self.destination_types.keys())
193+
194+
async def close(self) -> None:
195+
"""Close all connectors and release resources."""
196+
# No resources to clean up in the base implementation
197+
pass
278198

279199
# Default connector manager instance
280200
default_connector_manager = ConnectorManager()

src/dialogchain/engine/route.py

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -317,47 +317,18 @@ async def _handle_error(self, error: Exception, message: Any = None) -> bool:
317317
return handled
318318

319319
async def _safe_receive(self, source: Source) -> Any:
320-
"""Safely receive data from a source with retries."""
321-
for attempt in range(self.retry_attempts + 1):
322-
try:
323-
return await asyncio.wait_for(
324-
source.receive(),
325-
timeout=self.timeout
326-
)
327-
except asyncio.TimeoutError:
328-
if attempt == self.retry_attempts:
329-
logger.error(f"Timeout receiving from source in route {self.name}")
330-
raise
331-
await asyncio.sleep(self.retry_delay)
332-
except Exception as e:
333-
logger.error(f"Error receiving from source in route {self.name}: {e}")
334-
if attempt == self.retry_attempts:
335-
raise
336-
await asyncio.sleep(self.retry_delay)
337-
return None
338-
339-
async def _safe_send(self, destination: Destination, data: Any) -> None:
340-
"""Safely send data to a destination with retries."""
341-
for attempt in range(self.retry_attempts + 1):
342-
try:
343-
await asyncio.wait_for(
344-
destination.send(data),
345-
timeout=self.timeout
346-
)
347-
return
348-
except asyncio.TimeoutError:
349-
if attempt == self.retry_attempts:
350-
logger.error(f"Timeout sending to destination in route {self.name}")
351-
raise
352-
await asyncio.sleep(self.retry_delay)
353-
except Exception as e:
354-
logger.error(f"Error sending to destination in route {self.name}: {e}")
355-
if attempt == self.retry_attempts:
356-
raise
357-
await asyncio.sleep(self.retry_delay)
358-
359-
async def _safe_receive(self, source: Source) -> Any:
360-
"""Safely receive data from a source with retries."""
320+
"""Safely receive data from a source with retries.
321+
322+
Args:
323+
source: The source to receive data from
324+
325+
Returns:
326+
The received data, or None if all retries are exhausted
327+
328+
Raises:
329+
asyncio.TimeoutError: If the operation times out after all retries
330+
Exception: If an error occurs after all retries
331+
"""
361332
for attempt in range(self.retry_attempts + 1):
362333
try:
363334
return await asyncio.wait_for(
@@ -377,7 +348,16 @@ async def _safe_receive(self, source: Source) -> Any:
377348
return None
378349

379350
async def _safe_send(self, destination: Destination, data: Any) -> None:
380-
"""Safely send data to a destination with retries."""
351+
"""Safely send data to a destination with retries.
352+
353+
Args:
354+
destination: The destination to send data to
355+
data: The data to send
356+
357+
Raises:
358+
asyncio.TimeoutError: If the operation times out after all retries
359+
Exception: If an error occurs after all retries
360+
"""
381361
for attempt in range(self.retry_attempts + 1):
382362
try:
383363
await asyncio.wait_for(

0 commit comments

Comments
 (0)