Skip to content

Commit 862f035

Browse files
author
Tom Softreck
committed
update
1 parent bd0154c commit 862f035

3 files changed

Lines changed: 69 additions & 42 deletions

File tree

src/dialogchain/cli.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,28 @@ def run(config, env_file, route, dry_run, verbose):
5555
# Run routes
5656
try:
5757
if route:
58-
asyncio.run(engine.run_route(route))
58+
# Find the specific route by name
59+
route_config = next((r for r in config_data.get('routes', []) if r.get('name') == route), None)
60+
if not route_config:
61+
click.echo(f"❌ Route '{route}' not found in configuration", err=True)
62+
return
63+
64+
# Create source and destination
65+
source = engine.create_source(route_config['from'])
66+
destination = engine.create_destination(route_config['to'])
67+
68+
# Run the specific route
69+
asyncio.run(engine.run_route(route_config, source, destination))
5970
else:
71+
# Run all routes
6072
asyncio.run(engine.run_all_routes())
6173
except KeyboardInterrupt:
6274
click.echo("\n🛑 Shutting down...")
6375
except Exception as e:
6476
click.echo(f"❌ Error: {e}", err=True)
77+
if verbose:
78+
import traceback
79+
traceback.print_exc()
6580

6681

6782
def update_env_file(env_path, required_vars):

src/dialogchain/engine.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
Processor, create_processor, FilterProcessor, TransformProcessor,
1717
AggregateProcessor, DebugProcessor, ExternalProcessor
1818
)
19+
20+
# For backward compatibility
21+
ProcessorType = type(Processor)
1922
from .connectors import (
2023
Source, Destination, RTSPSource, FileSource,
2124
HTTPDestination, FileDestination, IMAPSource, TimerSource, GRPCSource,
@@ -159,9 +162,35 @@ async def run_all_routes(self):
159162
"""Run all routes concurrently"""
160163
tasks = []
161164
for route_config in self.routes:
162-
task = asyncio.create_task(self.run_route_config(route_config))
163-
tasks.append(task)
165+
try:
166+
# Create source and destination for this route
167+
from_uri = self.resolve_variables(route_config.get('from', ''))
168+
to_uri = self.resolve_variables(route_config.get('to', ''))
169+
170+
if not from_uri or not to_uri:
171+
self.log(f"Skipping route '{route_config.get('name', 'unnamed')}': missing 'from' or 'to' URI")
172+
continue
173+
174+
source = self.create_source(from_uri)
175+
destination = self.create_destination(to_uri)
176+
177+
# Create and start task for this route
178+
task = asyncio.create_task(
179+
self.run_route(route_config, source, destination)
180+
)
181+
tasks.append(task)
182+
183+
except Exception as e:
184+
self.log(f"Error setting up route '{route_config.get('name', 'unnamed')}': {e}")
185+
if self.verbose:
186+
import traceback
187+
traceback.print_exc()
188+
continue
164189

190+
if not tasks:
191+
self.log("No valid routes to run")
192+
return
193+
165194
self.log(f"Starting {len(tasks)} routes...")
166195
await asyncio.gather(*tasks, return_exceptions=True)
167196

tests/unit/test_http_destination.py

Lines changed: 22 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,16 @@
44
from unittest.mock import AsyncMock, patch
55
from dialogchain.connectors import HTTPDestination
66

7+
class AsyncMockContext:
8+
def __init__(self, return_value):
9+
self.return_value = return_value
10+
11+
async def __aenter__(self):
12+
return self.return_value
13+
14+
async def __aexit__(self, exc_type, exc_val, exc_tb):
15+
pass
16+
717
class TestHTTPDestination:
818
"""Test HTTP destination connector."""
919

@@ -15,23 +25,14 @@ def http_dest(self):
1525
@pytest.mark.asyncio
1626
async def test_send_http_request(self, http_dest, capsys):
1727
"""Test sending an HTTP request."""
18-
# Create a mock response for successful request
28+
# Create a mock response
1929
mock_response = AsyncMock()
2030
mock_response.status = 200
2131
mock_response.text = AsyncMock(return_value="OK")
2232

23-
# Create a mock post method
24-
async def mock_post(*args, **kwargs):
25-
return mock_response
26-
27-
# Create a mock session
28-
mock_session = AsyncMock()
29-
mock_session.post.side_effect = mock_post
30-
mock_session.__aenter__.return_value = mock_session
31-
mock_session.__aexit__.return_value = None
32-
33-
# Patch the ClientSession
34-
with patch('aiohttp.ClientSession', return_value=mock_session):
33+
# Patch the ClientSession.post method directly
34+
with patch('aiohttp.ClientSession.post',
35+
return_value=AsyncMockContext(mock_response)) as mock_post:
3536
# Test with dict message
3637
await http_dest.send({"key": "value"})
3738

@@ -40,7 +41,7 @@ async def mock_post(*args, **kwargs):
4041
assert "🌐 HTTP sent to http://example.com/webhook" in captured.out
4142

4243
# Verify the post was called correctly
43-
mock_session.post.assert_called_once_with(
44+
mock_post.assert_called_once_with(
4445
'http://example.com/webhook',
4546
json={"key": "value"}
4647
)
@@ -53,18 +54,9 @@ async def test_send_string_message(self, http_dest, capsys):
5354
mock_response.status = 200
5455
mock_response.text = AsyncMock(return_value="OK")
5556

56-
# Create a mock post method
57-
async def mock_post(*args, **kwargs):
58-
return mock_response
59-
60-
# Create a mock session
61-
mock_session = AsyncMock()
62-
mock_session.post.side_effect = mock_post
63-
mock_session.__aenter__.return_value = mock_session
64-
mock_session.__aexit__.return_value = None
65-
66-
# Patch the ClientSession
67-
with patch('aiohttp.ClientSession', return_value=mock_session):
57+
# Patch the ClientSession.post method directly
58+
with patch('aiohttp.ClientSession.post',
59+
return_value=AsyncMockContext(mock_response)) as mock_post:
6860
# Test with string message
6961
await http_dest.send("test message")
7062

@@ -73,7 +65,7 @@ async def mock_post(*args, **kwargs):
7365
assert "🌐 HTTP sent to http://example.com/webhook" in captured.out
7466

7567
# Verify the post was called correctly
76-
mock_session.post.assert_called_once_with(
68+
mock_post.assert_called_once_with(
7769
'http://example.com/webhook',
7870
json={"data": "test message"}
7971
)
@@ -86,18 +78,9 @@ async def test_http_error(self, http_dest, capsys):
8678
mock_response.status = 400
8779
mock_response.text = AsyncMock(return_value="Bad Request")
8880

89-
# Create a mock post method
90-
async def mock_post(*args, **kwargs):
91-
return mock_response
92-
93-
# Create a mock session
94-
mock_session = AsyncMock()
95-
mock_session.post.side_effect = mock_post
96-
mock_session.__aenter__.return_value = mock_session
97-
mock_session.__aexit__.return_value = None
98-
99-
# Patch the ClientSession
100-
with patch('aiohttp.ClientSession', return_value=mock_session):
81+
# Patch the ClientSession.post method directly
82+
with patch('aiohttp.ClientSession.post',
83+
return_value=AsyncMockContext(mock_response)) as mock_post:
10184
# Test with dict message
10285
await http_dest.send({"key": "value"})
10386

0 commit comments

Comments
 (0)