1818from taskiq .context import Context
1919from taskiq .exceptions import NoResultError
2020from taskiq .message import TaskiqMessage
21+ from taskiq .receiver .observer import ReceiverObserver
2122from taskiq .receiver .params_parser import parse_params
2223from taskiq .result import TaskiqResult
2324from taskiq .state import TaskiqState
@@ -35,6 +36,7 @@ def __init__(
3536 self ,
3637 broker : AsyncBroker ,
3738 executor : Executor | None = None ,
39+ observer : ReceiverObserver | None = None ,
3840 validate_params : bool = True ,
3941 max_async_tasks : "int | None" = None ,
4042 max_prefetch : int = 0 ,
@@ -54,6 +56,7 @@ def __init__(
5456 self .dependency_graphs : dict [str , DependencyGraph ] = {}
5557 self .propagate_exceptions = propagate_exceptions
5658 self .on_exit = on_exit
59+ self .observer = observer
5760 self .ack_time = ack_type or AcknowledgeType .WHEN_SAVED
5861 self .known_tasks : set [str ] = set ()
5962 self .max_tasks_to_execute = max_tasks_to_execute
@@ -92,6 +95,11 @@ async def callback( # noqa: C901, PLR0912
9295 taskiq_msg = self .broker .formatter .loads (message = message_data )
9396 taskiq_msg .parse_labels ()
9497 except Exception as exc :
98+ if self .observer is not None :
99+ self .observer .on_deserialize_error (
100+ raw = message_data ,
101+ error = exc ,
102+ )
95103 logger .warning (
96104 "Cannot parse message: %s. Skipping execution.\n %s" ,
97105 message_data ,
@@ -102,6 +110,11 @@ async def callback( # noqa: C901, PLR0912
102110 logger .debug (f"Received message: { taskiq_msg } " )
103111 task = self .broker .find_task (taskiq_msg .task_name )
104112 if task is None :
113+ if self .observer is not None :
114+ self .observer .on_task_not_found (
115+ taskiq_msg .task_name ,
116+ )
117+
105118 logger .warning (
106119 'task "%s" is not found. Maybe you forgot to import it?' ,
107120 taskiq_msg .task_name ,
@@ -363,6 +376,7 @@ async def prefetcher(
363376 break
364377 try :
365378 await self .sem_prefetch .acquire ()
379+
366380 if (
367381 self .max_tasks_to_execute
368382 and fetched_tasks >= self .max_tasks_to_execute
@@ -376,13 +390,20 @@ async def prefetcher(
376390 # and continue the loop. So it will check if finished event was set.
377391 if not done :
378392 self .sem_prefetch .release ()
393+
379394 continue
380395 # We're done, so now we need to check
381396 # whether task has returned an error.
382397 message = current_message .result ()
383398 current_message = asyncio .create_task (iterator .__anext__ ()) # type: ignore
384399 fetched_tasks += 1
385400 await queue .put (message )
401+
402+ if self .observer is not None :
403+ self .observer .on_prefetch_queue_size (
404+ queue .qsize (),
405+ )
406+
386407 except (asyncio .CancelledError , StopAsyncIteration ):
387408 break
388409 # We don't want to fetch new messages if we are shutting down.
@@ -413,17 +434,35 @@ def task_cb(task: "asyncio.Task[Any]") -> None:
413434 :param task: finished task
414435 """
415436 tasks .discard (task )
437+ if self .observer is not None :
438+ self .observer .on_active_tasks_count (
439+ len (tasks ),
440+ )
441+
416442 if self .sem is not None :
417443 self .sem .release ()
418444
445+ if self .observer is not None :
446+ self .observer .on_semaphore_status (
447+ self .sem ._value # noqa
448+ )
449+
419450 while True :
420451 try :
421452 # Waits for semaphore to be released.
422453 if self .sem is not None :
423454 await self .sem .acquire ()
455+ if self .observer is not None :
456+ self .observer .on_semaphore_status (
457+ self .sem ._value # noqa
458+ )
424459
425460 self .sem_prefetch .release ()
426461 message = await queue .get ()
462+ if self .observer is not None :
463+ self .observer .on_prefetch_queue_size (
464+ queue .qsize () # noqa
465+ )
427466 if message is QUEUE_DONE :
428467 # asyncio.wait will throw an error if there is nothing to wait for
429468 if tasks :
@@ -438,7 +477,10 @@ def task_cb(task: "asyncio.Task[Any]") -> None:
438477 self .callback (message = message , raise_err = False ),
439478 )
440479 tasks .add (task )
441-
480+ if self .observer is not None :
481+ self .observer .on_active_tasks_count (
482+ len (tasks ),
483+ )
442484 # We want the task to remove itself from the set when it's done.
443485 #
444486 # Because if we won't save it anywhere,
0 commit comments