Skip to content

Commit fba158b

Browse files
author
Tom Softreck
committed
update
1 parent 862f035 commit fba158b

4 files changed

Lines changed: 163 additions & 58 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "dialogchain"
3-
version = "0.1.15"
3+
version = "0.1.16"
44
description = "DialogChain - A flexible and extensible dialog processing framework"
55
authors = ["Tom Sapletta <info@softreck.dev>"]
66
readme = "README.md"

src/dialogchain/cli.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,28 +55,46 @@ def run(config, env_file, route, dry_run, verbose):
5555
# Run routes
5656
try:
5757
if route:
58-
# Find the specific route by name
5958
route_config = next((r for r in config_data.get('routes', []) if r.get('name') == route), None)
6059
if not route_config:
6160
click.echo(f"❌ Route '{route}' not found in configuration", err=True)
6261
return
6362

63+
if verbose:
64+
click.echo(f"🚀 Starting route: {route}")
65+
6466
# Create source and destination
65-
source = engine.create_source(route_config['from'])
66-
destination = engine.create_destination(route_config['to'])
67+
source = engine.create_source(route_config.get('from'))
68+
destination = engine.create_destination(route_config.get('to'))
69+
70+
if verbose:
71+
click.echo(f"🔌 Source: {type(source).__name__}")
72+
click.echo(f"🎯 Destination: {type(destination).__name__}")
6773

6874
# Run the specific route
69-
asyncio.run(engine.run_route(route_config, source, destination))
75+
asyncio.run(engine.run_route_config(route_config, source, destination))
76+
77+
if verbose:
78+
click.echo(f"✅ Completed route: {route}")
79+
7080
else:
81+
if verbose:
82+
click.echo("🚀 Starting all routes...")
7183
# Run all routes
7284
asyncio.run(engine.run_all_routes())
85+
if verbose:
86+
click.echo("✅ All routes completed")
87+
7388
except KeyboardInterrupt:
74-
click.echo("\n🛑 Shutting down...")
89+
click.echo("\n🛑 Operation cancelled by user")
7590
except Exception as e:
76-
click.echo(f"❌ Error: {e}", err=True)
91+
click.echo(f"❌ Error running routes: {e}", err=True)
7792
if verbose:
7893
import traceback
7994
traceback.print_exc()
95+
return 1
96+
97+
return 0
8098

8199

82100
def update_env_file(env_path, required_vars):

src/dialogchain/connectors.py

Lines changed: 68 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,30 @@ def __init__(self, interval: str):
102102
self.interval = self._parse_interval(interval)
103103

104104
async def receive(self) -> AsyncIterator[Dict[str, Any]]:
105-
"""Yield timer events"""
106-
while True:
107-
yield {
108-
"type": "timer_event",
109-
"timestamp": datetime.now().isoformat(),
110-
"interval": self.interval,
111-
}
112-
await asyncio.sleep(self.interval)
105+
"""Yield timer events
106+
107+
Yields:
108+
Dictionary with timer event data
109+
"""
110+
logger = setup_logger(__name__)
111+
logger.info(f"⏱️ Timer source started with interval: {self.interval}s")
112+
113+
try:
114+
while True:
115+
event = {
116+
"type": "timer_event",
117+
"timestamp": datetime.now().isoformat(),
118+
"interval": self.interval,
119+
}
120+
logger.debug(f"⏱️ Timer event: {event}")
121+
yield event
122+
await asyncio.sleep(self.interval)
123+
except asyncio.CancelledError:
124+
logger.info("⏱️ Timer source cancelled")
125+
raise
126+
except Exception as e:
127+
logger.error(f"⏱️ Timer source error: {e}")
128+
raise
113129

114130
def _parse_interval(self, interval_str: str) -> float:
115131
"""Parse interval string to seconds
@@ -749,52 +765,56 @@ async def send(self, message: Any) -> None:
749765

750766

751767
class LogDestination(Destination):
752-
"""Log destination for both console and file logging"""
768+
"""Log destination for both console and file logging
769+
770+
URI format: log:info or log:debug or log:error
771+
"""
753772

754773
def __init__(self, uri: str):
755774
parsed = urlparse(uri)
756-
# Handle different URI formats:
757-
# - log:relative/path
758-
# - log:///absolute/path
759-
# - log://hostname/absolute/path
760-
# - log://relative/path
761-
if parsed.scheme == 'log' and not parsed.netloc and not parsed.path.startswith('//'):
762-
# Case: log:relative/path
763-
self.log_file = parsed.path
764-
elif parsed.scheme == 'log' and parsed.netloc and not parsed.path:
765-
# Case: log://hostname (without path)
766-
self.log_file = parsed.netloc
767-
elif parsed.scheme == 'log' and (parsed.netloc or parsed.path):
768-
# Case: log:///path or log://hostname/path
769-
self.log_file = parsed.path.lstrip('/') or parsed.netloc
770-
else:
771-
self.log_file = None
775+
# Extract log level from URI (e.g., 'log:info' -> 'info')
776+
self.log_level = parsed.path.upper() if parsed.path else 'INFO'
777+
if self.log_level not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
778+
self.log_level = 'INFO'
779+
780+
self.logger = setup_logger('dialogchain.destination.log')
781+
self.logger.info(f" Log destination initialized with level: {self.log_level}")
772782

773783
async def send(self, message: Any) -> None:
774-
"""Log message to console and optionally to a file"""
775-
log_msg = f"📝 {datetime.now().isoformat()}: {message}"
776-
777-
# Always print to console
778-
print(log_msg)
784+
"""Log message with the specified log level
779785
780-
# Also log using the logger
781-
logger.info(log_msg)
782-
783-
if self.log_file:
784-
try:
785-
# Ensure directory exists
786-
log_dir = os.path.dirname(self.log_file)
787-
if log_dir and not os.path.exists(log_dir):
788-
os.makedirs(log_dir, exist_ok=True)
789-
790-
# Make sure to use absolute path
791-
abs_log_file = os.path.abspath(self.log_file)
792-
with open(abs_log_file, "a", encoding="utf-8") as f:
793-
f.write(log_msg + "\n")
794-
except Exception as e:
795-
error_msg = f"❌ Log file error: {e}"
796-
print(error_msg)
797-
logger.error(error_msg)
786+
Args:
787+
message: The message to log. Can be a string or any JSON-serializable object.
788+
789+
Raises:
790+
Exception: If there's an error processing the message
791+
"""
792+
try:
793+
# Convert message to string if it's not already
794+
if not isinstance(message, (str, bytes)):
795+
try:
796+
message = json.dumps(message, indent=2, default=str)
797+
except (TypeError, ValueError) as e:
798+
self.logger.warning(f"Could not convert message to JSON: {e}")
799+
message = str(message)
800+
801+
# Log with the appropriate level
802+
if self.log_level == 'DEBUG':
803+
self.logger.debug(message)
804+
elif self.log_level == 'INFO':
805+
self.logger.info(message)
806+
elif self.log_level == 'WARNING':
807+
self.logger.warning(message)
808+
elif self.log_level == 'ERROR':
809+
self.logger.error(message)
810+
elif self.log_level == 'CRITICAL':
811+
self.logger.critical(message)
812+
else:
813+
self.logger.info(message) # Default to info
814+
815+
except Exception as e:
816+
self.logger.error(f"Error in LogDestination.send: {e}")
817+
raise
798818

799819

800820
class GRPCDestination(Destination):

src/dialogchain/engine.py

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,33 @@ def parse_uri(uri: str) -> Tuple[str, str]:
6262

6363
class DialogChainEngine:
6464
def __init__(self, config: Dict[str, Any], verbose: bool = False):
65+
# Initialize logger first
66+
self.logger = setup_logger(__name__)
67+
self.logger.info("🔧 Initializing DialogChainEngine")
68+
6569
self.config = config
6670
self.verbose = verbose
6771
self.routes = config.get("routes", [])
6872
self.running_processes = {}
6973
self._is_running = False
7074
self._tasks = []
75+
76+
self.logger.info(f"🔧 Loaded {len(self.routes)} routes from config")
77+
if self.verbose:
78+
self.logger.info(f"🔧 Verbose mode enabled")
7179

7280
# Validate the configuration on initialization
81+
self.logger.info("🔧 Validating configuration...")
7382
errors = self.validate_config()
7483
if errors:
7584
error_msg = "Invalid configuration:\n" + "\n".join(
7685
f"- {error}" for error in errors
7786
)
87+
self.logger.error(f"❌ Configuration validation failed: {error_msg}")
7888
raise ValueError(error_msg)
7989

90+
self.logger.info("✅ Engine initialized successfully")
91+
8092
@property
8193
def is_running(self) -> bool:
8294
"""Return whether the engine is currently running."""
@@ -109,7 +121,7 @@ async def start(self):
109121
await destination.connect()
110122

111123
# Create and store task
112-
task = asyncio.create_task(self.run_route(route, source, destination))
124+
task = asyncio.create_task(self.run_route_config(route, source, destination))
113125
self._tasks.append(task)
114126

115127
self.log("Engine started successfully")
@@ -176,7 +188,7 @@ async def run_all_routes(self):
176188

177189
# Create and start task for this route
178190
task = asyncio.create_task(
179-
self.run_route(route_config, source, destination)
191+
self.run_route_config(route_config, source, destination)
180192
)
181193
tasks.append(task)
182194

@@ -194,7 +206,7 @@ async def run_all_routes(self):
194206
self.log(f"Starting {len(tasks)} routes...")
195207
await asyncio.gather(*tasks, return_exceptions=True)
196208

197-
async def run_route(self, route: Dict[str, Any], source: Source, destination: Destination):
209+
async def run_route_config(self, route: Dict[str, Any], source: Source, destination: Destination):
198210
"""Run a specific route with the given source and destination
199211
200212
Args:
@@ -206,8 +218,20 @@ async def run_route(self, route: Dict[str, Any], source: Source, destination: De
206218
It runs the route configuration with the provided source and destination.
207219
"""
208220
route_name = route.get("name", "unnamed")
221+
logger.info(f"🔧 Starting route: {route_name}")
222+
logger.info(f"🔧 Route config: {route}")
223+
logger.info(f"🔧 Source type: {type(source).__name__}")
224+
logger.info(f"🔧 Destination type: {type(destination).__name__}")
209225
self.log(f"Starting route: {route_name}")
210226

227+
if not source:
228+
logger.error(f"❌ No source provided for route: {route_name}")
229+
return
230+
231+
if not destination:
232+
logger.error(f"❌ No destination provided for route: {route_name}")
233+
return
234+
211235
try:
212236
# Connect to source if not already connected
213237
if hasattr(source, 'connect') and callable(source.connect) and not getattr(source, 'is_connected', False):
@@ -447,6 +471,49 @@ def create_destination(self, uri: str) -> Destination:
447471
return HTTPDestination(uri, **config)
448472
elif scheme == "file":
449473
return FileDestination(path, **config)
474+
elif scheme == "log":
475+
# Handle log destination (e.g., log:info, log:error)
476+
log_level = path.upper() if path else "INFO"
477+
478+
# Create a simple destination that logs messages
479+
class LogDestination:
480+
def __init__(self, level):
481+
self.level = level
482+
self.sent_messages = []
483+
484+
async def send(self, message):
485+
log_message = f"[Log Destination] {message}"
486+
if self.level == "DEBUG":
487+
self.logger.debug(log_message)
488+
elif self.level == "INFO":
489+
self.logger.info(log_message)
490+
elif self.level == "WARNING":
491+
self.logger.warning(log_message)
492+
elif self.level == "ERROR":
493+
self.logger.error(log_message)
494+
elif self.level == "CRITICAL":
495+
self.logger.critical(log_message)
496+
else:
497+
self.logger.info(log_message) # Default to info
498+
499+
# Store the message for testing/verification
500+
self.sent_messages.append(message)
501+
return True
502+
503+
async def connect(self):
504+
pass
505+
506+
async def disconnect(self):
507+
pass
508+
509+
async def __aenter__(self):
510+
return self
511+
512+
async def __aexit__(self, exc_type, exc_val, exc_tb):
513+
pass
514+
515+
return LogDestination(log_level)
516+
450517
elif scheme == "mock":
451518
# For testing purposes
452519
from unittest.mock import AsyncMock

0 commit comments

Comments
 (0)