Advanced patterns and implementation details for streaming flow execution.
- PHP Async Adaptation
- Streaming Patterns
- SSE Implementation
- Performance Optimization
- Advanced Patterns
Langflow uses Python's native async/await for streaming. PHP doesn't have native async, so we use Generators as a functional equivalent.
async def execute_flow():
async for event in stream:
yield eventpublic function executeFlow(): Generator {
while ($event = $this->queue->dequeue()) {
yield $event;
}
}| Langflow (Python) | Claude-PHP-Agent (PHP) |
|---|---|
asyncio.Queue |
SplQueue |
await queue.get() |
$queue->dequeue() |
queue.put_nowait() |
$queue->enqueue() |
| Async subscribers | Iterator pattern |
Generators provide several advantages for streaming:
- Memory Efficiency: Values are produced on-demand
- Lazy Evaluation: Only compute what's needed
- State Management: Generator maintains execution state
- Iterable: Works with
foreachloops naturally
// Generator function
function streamTokens(): Generator {
foreach ($tokens as $token) {
yield $token; // Suspend execution, return value
}
}
// Consumer
foreach (streamTokens() as $token) {
echo $token; // Values produced one at a time
}Stream tokens directly as they're received:
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
if ($event['type'] === 'token') {
echo $event['data']['token'];
flush(); // Immediate output
}
}Use Case: CLI applications, real-time console output
Pros: Lowest latency, simple Cons: No buffering, may be chatty
Accumulate tokens before displaying:
$buffer = '';
$bufferSize = 10;
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
if ($event['type'] === 'token') {
$buffer .= $event['data']['token'];
if (strlen($buffer) >= $bufferSize) {
echo $buffer;
flush();
$buffer = '';
}
}
}
echo $buffer; // Flush remainingUse Case: Web applications with network latency
Pros: Fewer writes, better network efficiency Cons: Slightly higher latency
Only stream specific event types:
$streamTypes = ['token', 'progress', 'tool_start'];
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
if (in_array($event['type'], $streamTypes)) {
processEvent($event);
}
}Use Case: Filtered event streams, reduced bandwidth
Pros: Lower bandwidth, focused events Cons: May miss important events
Transform events before consumption:
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
$transformed = transformEvent($event);
yield $transformed;
}
function transformEvent(array $event): array {
return match ($event['type']) {
'token' => ['type' => 'text', 'content' => $event['data']['token']],
'progress' => ['type' => 'status', 'percent' => $event['data']['percent']],
default => $event
};
}Use Case: API format conversion, frontend adaptation
Pros: Decoupled formats, reusable transformations Cons: Additional processing overhead
Stream to multiple consumers simultaneously:
class EventBroadcaster {
private array $consumers = [];
public function addConsumer(callable $consumer): void {
$this->consumers[] = $consumer;
}
public function broadcast(array $event): void {
foreach ($this->consumers as $consumer) {
$consumer($event);
}
}
}
$broadcaster = new EventBroadcaster();
$broadcaster->addConsumer(fn($e) => logEvent($e));
$broadcaster->addConsumer(fn($e) => updateUI($e));
$broadcaster->addConsumer(fn($e) => saveToDatabase($e));
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
$broadcaster->broadcast($event);
}Use Case: Multiple UI components, logging + display
Pros: Separation of concerns, extensible Cons: Higher memory usage
<?php
// Set SSE headers
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');
header('X-Accel-Buffering: no'); // Disable nginx buffering
// Disable PHP output buffering
if (ob_get_level()) ob_end_clean();
foreach ($executor->streamSSE($agent, $task) as $sseData) {
echo $sseData; // Already formatted as SSE
flush();
}event: token.received
data: {"type":"token.received","data":{"token":"Hello"},"timestamp":1234567.89}
event: progress.update
data: {"type":"progress.update","data":{"percent":50.0},"timestamp":1234567.90}
event: end
data: {"type":"end","data":{"status":"completed"},"timestamp":1234567.91}
const eventSource = new EventSource('/stream?task=' + encodeURIComponent(task));
// Listen to specific events
eventSource.addEventListener('token', (e) => {
const data = JSON.parse(e.data);
appendToken(data.data.token);
});
eventSource.addEventListener('progress', (e) => {
const data = JSON.parse(e.data);
updateProgressBar(data.data.percent);
});
eventSource.addEventListener('end', (e) => {
console.log('Stream complete');
eventSource.close();
});
eventSource.onerror = (e) => {
console.error('SSE error:', e);
eventSource.close();
};- Always set proper headers - Including cache-control
- Disable output buffering - Both PHP and web server
- Flush regularly - After each event
- Handle client disconnects - Check connection status
- Include heartbeats - Prevent timeout (optional)
- Close connection properly - Client and server side
$lastHeartbeat = time();
foreach ($executor->streamSSE($agent, $task) as $sseData) {
echo $sseData;
flush();
// Send heartbeat every 15 seconds
if (time() - $lastHeartbeat > 15) {
echo ": heartbeat\n\n";
flush();
$lastHeartbeat = time();
}
// Check if client disconnected
if (connection_aborted()) {
break;
}
}// High throughput (more memory)
$queue = new EventQueue(maxSize: 500);
// Memory constrained (may drop events)
$queue = new EventQueue(maxSize: 50);
// Balanced
$queue = new EventQueue(maxSize: 100); // DefaultMonitor dropped events:
$stats = $queue->getStats();
if ($stats['dropped_events'] > 0) {
// Consider increasing queue size
}Only register events you need:
// All events (higher overhead)
$manager->registerDefaultEvents();
// Minimal events (lower overhead)
$manager->registerEvent('on_token', FlowEvent::TOKEN_RECEIVED);
$manager->registerEvent('on_end', FlowEvent::FLOW_COMPLETED);Keep listeners lightweight:
// ❌ Bad: Heavy processing in listener
$manager->subscribe(function($event) {
processComplexData($event); // Blocks event loop
saveToDatabase($event); // I/O operation
});
// ✅ Good: Lightweight listener
$manager->subscribe(function($event) {
$eventQueue->enqueue($event); // Fast enqueue
});
// Process in separate thread/process
while ($event = $eventQueue->dequeue()) {
processComplexData($event);
}Process events in batches:
$batch = [];
$batchSize = 10;
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
$batch[] = $event;
if (count($batch) >= $batchSize) {
processBatch($batch);
$batch = [];
}
}
processBatch($batch); // Process remainingClear queue periodically:
$eventCount = 0;
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
processEvent($event);
$eventCount++;
// Clear processed events from memory
if ($eventCount % 100 === 0) {
gc_collect_cycles();
}
}Estimate remaining time based on completed iterations:
$startTime = microtime(true);
$totalIterations = 10;
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
if ($event['type'] === 'iteration_end') {
$current = $event['data']['iteration'];
$elapsed = microtime(true) - $startTime;
$avgTime = $elapsed / $current;
$remaining = ($totalIterations - $current) * $avgTime;
echo sprintf(
"Progress: %d/%d - ETA: %.1fs\n",
$current,
$totalIterations,
$remaining
);
}
}Record events for replay:
$recorder = new EventRecorder();
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
$recorder->record($event);
processEvent($event);
}
// Replay later
foreach ($recorder->replay() as $event) {
processEvent($event);
}
class EventRecorder {
private array $events = [];
public function record(array $event): void {
$this->events[] = $event;
}
public function replay(): Generator {
foreach ($this->events as $event) {
yield $event;
}
}
public function save(string $path): void {
file_put_contents($path, json_encode($this->events));
}
}Stream different content based on conditions:
function conditionalStream(
StreamingFlowExecutor $executor,
AgentInterface $agent,
string $task,
bool $verbose
): Generator {
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
if ($verbose) {
yield $event; // All events
} else {
// Only essential events
if (in_array($event['type'], ['token', 'end'])) {
yield $event;
}
}
}
}Aggregate multiple events into summaries:
class EventAggregator {
private int $tokenCount = 0;
private array $toolCalls = [];
public function aggregate(array $event): void {
match ($event['type']) {
'token' => $this->tokenCount++,
'tool_start' => $this->toolCalls[] = $event['data']['tool'],
default => null
};
}
public function getSummary(): array {
return [
'tokens' => $this->tokenCount,
'tools' => array_count_values($this->toolCalls)
];
}
}
$aggregator = new EventAggregator();
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
$aggregator->aggregate($event);
}
$summary = $aggregator->getSummary();
// ['tokens' => 150, 'tools' => ['calculator' => 2, 'search' => 1]]Implement error recovery in streaming:
$maxRetries = 3;
$retryCount = 0;
while ($retryCount < $maxRetries) {
try {
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
if ($event['type'] === 'error') {
throw new RuntimeException($event['data']['error']);
}
processEvent($event);
}
break; // Success
} catch (RuntimeException $e) {
$retryCount++;
if ($retryCount >= $maxRetries) {
throw $e;
}
sleep(1); // Backoff
}
}use Psr\Log\LogLevel;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
$logger = new Logger('flow');
$logger->pushHandler(new StreamHandler('php://stdout', LogLevel::DEBUG));
$executor = new StreamingFlowExecutor($eventManager, $eventQueue, $logger);foreach ($executor->executeWithStreaming($agent, $task) as $event) {
if ($event['type'] === 'progress') {
$stats = $queue->getStats();
if ($stats['utilization'] > 80) {
error_log("Warning: Queue utilization at {$stats['utilization']}%");
}
}
}$timings = [];
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
$timings[] = [
'type' => $event['type'],
'timestamp' => $event['data']['timestamp'] ?? microtime(true)
];
}
// Analyze timing gaps
for ($i = 1; $i < count($timings); $i++) {
$gap = $timings[$i]['timestamp'] - $timings[$i-1]['timestamp'];
if ($gap > 1.0) { // More than 1 second
echo "Large gap: {$gap}s between events\n";
}
}- Event Reference - Complete event documentation
- Main Documentation - Overview and quick start
- Examples - Working code examples