-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproducer_consumer.py
More file actions
277 lines (222 loc) · 12 KB
/
producer_consumer.py
File metadata and controls
277 lines (222 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.progress import Progress, BarColumn, TextColumn, TimeElapsedColumn, TaskID
import time
import random
import queue
import threading
import signal
import sys
from loguru import logger
import os
# Configure loguru with filters
logger.remove() # Remove default handler
logger.add("logs/app.log", rotation="10 MB", retention="7 days", level="DEBUG")
# Remove the single worker.log - we'll add individual worker logs dynamically
logger.add("logs/producer.log", filter=lambda record: record["extra"].get("component") == "producer", level="DEBUG")
logger.add("logs/consumer.log", filter=lambda record: record["extra"].get("component") == "consumer", level="DEBUG")
# Global event for graceful shutdown
shutdown_event = threading.Event()
# Global progress instance for signal handler
progress_instance = None
def signal_handler(signum, frame):
"""Handle Ctrl+C signal for graceful shutdown"""
if progress_instance:
progress_instance.console.print("\n[yellow]Shutdown signal received. Gracefully shutting down...[/yellow]")
logger.info("Shutdown signal received")
shutdown_event.set()
def consumer_worker(worker_id: int, task_queue: queue.Queue, progress: Progress, rich_task_id: TaskID, stop_event: threading.Event):
# Add individual log file for this worker
worker_log_id = logger.add(f"logs/consumer_worker_{worker_id}.log",
filter=lambda record: record["extra"].get("worker_id") == worker_id,
level="DEBUG")
# Use global logger with worker context
worker_logger = logger.bind(worker_id=worker_id)
worker_logger.info("Consumer-Worker-{} started", worker_id)
completed_tasks = 0
while not stop_event.is_set() and not shutdown_event.is_set():
try:
task = task_queue.get(timeout=0.1)
except queue.Empty:
logger.debug("Worker {} found no tasks in queue, waiting...", worker_id)
time.sleep(1)
continue
worker_logger.debug("Processing task {}", task)
# Simulate work for this task
work_steps = random.randint(5, 15)
for _ in range(work_steps):
if stop_event.is_set() or shutdown_event.is_set():
break
time.sleep(random.uniform(0.01, 0.05))
progress.update(rich_task_id, advance=1)
completed_tasks += 1
task_queue.task_done()
worker_logger.debug("Completed task {}", task)
worker_logger.info("Consumer-Worker-{} stopping after completing {} tasks", worker_id, completed_tasks)
# Remove the worker's log handler when done
logger.remove(worker_log_id)
return completed_tasks
def producer_worker(worker_id: int, task_queue: queue.Queue, stop_event: threading.Event, progress: Progress, rich_task_id: TaskID):
"""Producer worker thread that adds tasks to the queue"""
# Add individual log file for this producer worker
producer_worker_log_id = logger.add(f"logs/producer_worker_{worker_id}.log",
filter=lambda record: record["extra"].get("producer_worker_id") == worker_id,
level="DEBUG")
# Use global logger with producer worker context
producer_worker_logger = logger.bind(producer_worker_id=worker_id)
producer_worker_logger.info("Producer-Worker-{} started", worker_id)
task_count = 0
while not stop_event.is_set() and not shutdown_event.is_set():
# Simulate checking for new tasks (e.g., from database, file system, API, etc.)
if random.random() < 0.8: # 80% chance of finding new tasks
batch_size = random.randint(2, 5)
for _ in range(batch_size):
task_count += 1
# Use worker_id to create unique task IDs across producer workers
unique_task_id = f"{worker_id}-{task_count}"
task_queue.put(unique_task_id)
producer_worker_logger.debug("Added task {} to queue", unique_task_id)
progress.update(rich_task_id, advance=1)
time.sleep(random.uniform(0.05, 0.2)) # Check interval
producer_worker_logger.info("Producer-Worker-{} stopping after adding {} tasks", worker_id, task_count)
# Remove the producer worker's log handler when done
logger.remove(producer_worker_log_id)
return task_count
def producer(task_queue: queue.Queue, num_workers: int, stop_event: threading.Event, progress: Progress, producer_task_ids: list):
"""Producer thread that manages multiple producer workers"""
# Use global logger with producer context
producer_logger = logger.bind(component="producer")
producer_logger.info("Producer started with {} workers", num_workers)
producer_worker_stop_event = threading.Event()
if num_workers == 1:
producer_logger.warning("Only one producer worker specified, using single-threaded mode")
total_tasks_added = producer_worker(1, task_queue, producer_worker_stop_event, progress, producer_task_ids[0])
else:
producer_logger.debug("Starting producer workers")
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = {}
for i in range(num_workers):
future = executor.submit(producer_worker, i+1, task_queue, producer_worker_stop_event, progress, producer_task_ids[i])
futures[future] = i + 1 # Store future with worker ID
# Wait for stop signal
while not stop_event.is_set() and not shutdown_event.is_set():
time.sleep(0.1)
producer_logger.info("Signaling producer workers to stop")
producer_worker_stop_event.set()
total_tasks_added = 0
for future in as_completed(futures):
worker_id = futures[future]
result = future.result()
producer_logger.debug("Producer worker {} finished: {}", worker_id, result)
total_tasks_added += result
producer_logger.info("Producer finished - total {} tasks added by all workers", total_tasks_added)
return total_tasks_added
def consumer(task_queue: queue.Queue, num_workers: int, producer_stop_event: threading.Event, progress: Progress, consumer_task_ids: list):
"""Consumer thread that manages worker threads and progress display"""
# Use global logger with consumer context
consumer_logger = logger.bind(component="consumer")
consumer_logger.info("Consumer started with {} workers", num_workers)
worker_stop_event = threading.Event()
if num_workers == 1:
consumer_logger.warning("Only one consumer worker specified, using single-threaded mode")
total_tasks_completed = consumer_worker(1, task_queue, progress, consumer_task_ids[0], worker_stop_event)
else:
consumer_logger.debug("Starting consumer workers")
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = {}
for i in range(num_workers):
future = executor.submit(consumer_worker, i + 1, task_queue, progress, consumer_task_ids[i], worker_stop_event)
futures[future] = i + 1 # Store future with worker ID
# Wait for producer to finish or shutdown signal
while not producer_stop_event.is_set() and not shutdown_event.is_set():
time.sleep(0.1)
if shutdown_event.is_set():
consumer_logger.debug("Shutdown event set, stopping workers")
elif producer_stop_event.is_set():
consumer_logger.debug("Producer stopped, signaling workers to stop")
elif task_queue.empty():
consumer_logger.debug("Task queue is empty, signaling workers to stop")
else:
consumer_logger.debug("Signaling workers to stop")
# Signal workers to stop
worker_stop_event.set()
total_tasks_completed = 0
for future in as_completed(futures):
worker_id = futures[future]
result = future.result()
consumer_logger.debug("Consumer worker {} finished: {}", worker_id, result)
total_tasks_completed += result
consumer_logger.info("Consumer finished - total {} tasks completed by all workers", total_tasks_completed)
return total_tasks_completed
def main(num_producer_workers: int = 1, num_consumer_workers: int = 1):
global progress_instance
# Add console handler for main function only
main_logger_id = logger.add(sys.stderr, level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>main</cyan> | {message}",
filter=lambda record: "worker_id" not in record["extra"] and record["extra"].get("component") is None and "producer_worker_id" not in record["extra"])
logger.info("Application starting")
# Set up signal handler for graceful shutdown
signal.signal(signal.SIGINT, signal_handler)
# Create logs directory if it doesn't exist
os.makedirs("logs", exist_ok=True)
# Create task queue and stop event for producer
task_queue = queue.Queue()
for i in range(20): # Reduced initial tasks since producer will add more
task_queue.put(f"initial-{i}")
logger.info("Initialized queue with 20 initial tasks")
producer_stop_event = threading.Event()
try:
logger.info("Starting producer ({}) workers and consumer ({}) workers threads", num_producer_workers, num_consumer_workers)
# Create a single shared Progress instance
with Progress(
TextColumn("[bold]{task.description}", justify="right"),
BarColumn(),
TextColumn("{task.completed} tasks"),
TimeElapsedColumn(),
) as progress:
progress_instance = progress # Make it accessible to signal handler
# Create progress tasks for producer workers
producer_task_ids = []
for i in range(num_producer_workers):
task_id = progress.add_task(
description=f"[green]Producer-{i+1}",
total=None
)
producer_task_ids.append(task_id)
# Create progress tasks for consumer workers
consumer_task_ids = []
for i in range(num_consumer_workers):
task_id = progress.add_task(
description=f"[blue]Consumer-{i+1}",
total=None
)
consumer_task_ids.append(task_id)
# Start producer and consumer threads using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=2) as main_executor:
# Start producer thread with multiple workers
producer_future = main_executor.submit(producer, task_queue, num_producer_workers, producer_stop_event, progress, producer_task_ids)
# Start consumer thread
consumer_future = main_executor.submit(consumer, task_queue, num_consumer_workers, producer_stop_event, progress, consumer_task_ids)
# Wait for shutdown signal
while not shutdown_event.is_set():
time.sleep(0.1)
# Stop producer
producer_stop_event.set()
logger.info("Stopping producer...")
# Wait for both to complete
producer_result = producer_future.result()
consumer_result = consumer_future.result()
# Clear the global reference
progress_instance = None
# Log results after Progress context exits to avoid interference
logger.info("Producer result: {}", producer_result)
logger.info("Consumer result: {}", consumer_result)
logger.info("All tasks completed!")
except KeyboardInterrupt:
logger.warning("Forced shutdown...")
shutdown_event.set()
finally:
progress_instance = None
logger.info("Application shutdown complete")
logger.remove(main_logger_id)
if __name__ == "__main__":
main()