Skip to content

Commit 0343759

Browse files
feat: add queue_wait_seconds, task_errors_by_type, and pre_send timestamp to PrometheusMiddleware
1 parent 8f7c4a1 commit 0343759

3 files changed

Lines changed: 78 additions & 5 deletions

File tree

taskiq/abc/broker.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@
3232
from taskiq.utils import maybe_awaitable
3333
from taskiq.warnings import TaskiqDeprecationWarning
3434

35-
if sys.version_info >= (3, 11):
35+
if sys.version_info >= (
36+
3,
37+
11,
38+
): # Check which python version are we running to import correctly
3639
from typing import Self
3740
else:
38-
from typing_extensions import Self
41+
from typing_extensions import Self # pragma: no cover
3942

4043

4144
if TYPE_CHECKING: # pragma: no cover
@@ -46,6 +49,7 @@
4649
_FuncParams = ParamSpec("_FuncParams")
4750
_ReturnType = TypeVar("_ReturnType")
4851

52+
# an event handler can be either a sync or an async function that has one parameter of type TaskiqState
4953
EventHandler: TypeAlias = Callable[[TaskiqState], Awaitable[None] | None]
5054

5155
logger = getLogger("taskiq")

taskiq/cli/worker/process_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def __init__(
169169
for path_to_watch in watch_paths:
170170
logger.debug(f"Watching directory: {path_to_watch}")
171171
observer.schedule(
172-
FileWatcher(
172+
FileWatcher( # type: ignore
173173
callback=schedule_workers_reload,
174174
path=Path(path_to_watch),
175175
use_gitignore=not args.no_gitignore,

taskiq/middlewares/prometheus_middleware.py

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
import datetime
12
import os
23
from logging import getLogger
34
from pathlib import Path
45
from tempfile import gettempdir
56
from typing import Any
6-
77
from taskiq.abc.middleware import TaskiqMiddleware
88
from taskiq.message import TaskiqMessage
99
from taskiq.result import TaskiqResult
@@ -43,7 +43,7 @@ def __init__(
4343
logger.debug("Initializing metrics")
4444

4545
try:
46-
from prometheus_client import Counter, Histogram # noqa: PLC0415
46+
from prometheus_client import Counter, Histogram, Gauge # noqa: PLC0415
4747
except ImportError as exc:
4848
raise ImportError(
4949
"Cannot initialize metrics. Please install 'taskiq[metrics]'.",
@@ -74,6 +74,24 @@ def __init__(
7474
"Time of function execution",
7575
["task_name"],
7676
)
77+
78+
self.in_flight_tasks = Gauge(
79+
"in_flight_tasks",
80+
"Number of tasks in flight",
81+
["task_name"],
82+
multiprocess_mode="livesum",
83+
)
84+
self.queue_wait_seconds = Histogram(
85+
"queue_wait_seconds",
86+
"time task spent in message queue",
87+
["task_name"],
88+
)
89+
self.task_errors_by_type = Counter(
90+
"task_errors_by_type",
91+
"Number of errors raised in tasks by their type",
92+
["task_name", "error_type"],
93+
)
94+
7795
self.server_port = server_port
7896
self.server_addr = server_addr
7997

@@ -104,6 +122,24 @@ def startup(self) -> None:
104122
except OSError as exc:
105123
logger.debug("Cannot start prometheus server: %s", exc)
106124

125+
def pre_send(
126+
self,
127+
message: "TaskiqMessage",
128+
) -> "TaskiqMessage":
129+
"""
130+
Function to track the time a task spend in queue.
131+
132+
This function tracks the time a task spends in a queue until it is executed.
133+
134+
:param message: current message.
135+
:return: message
136+
"""
137+
if not message.labels.get("_taskiq_enqueue_timestamp"):
138+
message.labels["_taskiq_enqueue_timestamp"] = datetime.datetime.now(
139+
datetime.UTC,
140+
).isoformat() # Might conside using timezones too
141+
return message
142+
107143
def pre_execute(
108144
self,
109145
message: "TaskiqMessage",
@@ -117,9 +153,41 @@ def pre_execute(
117153
:param message: current message.
118154
:return: message
119155
"""
156+
if message.labels.get(
157+
"_taskiq_enqueue_timestamp",
158+
): # Handle case where the sender doesn't use the prometheus middleware
159+
time_delta = datetime.datetime.now(
160+
datetime.UTC,
161+
) - datetime.datetime.fromisoformat(
162+
message.labels["_taskiq_enqueue_timestamp"],
163+
)
164+
time_delta = max(0, time_delta.total_seconds())
165+
self.queue_wait_seconds.labels(message.task_name).observe(
166+
time_delta,
167+
)
168+
169+
self.in_flight_tasks.labels(message.task_name).inc()
120170
self.received_tasks.labels(message.task_name).inc()
121171
return message
122172

173+
def on_error(
174+
self,
175+
message: TaskiqMessage,
176+
result: TaskiqResult[Any], # pylint: disable=unused-argument
177+
exception: BaseException,
178+
) -> None:
179+
"""
180+
This function tracks the number of errors raised by tasks.
181+
182+
:param message: the received task message
183+
:param result: the result of task
184+
:param exception: exception raised
185+
"""
186+
self.task_errors_by_type.labels(
187+
message.task_name,
188+
type(exception).__name__,
189+
).inc()
190+
123191
def post_execute(
124192
self,
125193
message: "TaskiqMessage",
@@ -135,6 +203,7 @@ def post_execute(
135203
self.found_errors.labels(message.task_name).inc()
136204
else:
137205
self.success_tasks.labels(message.task_name).inc()
206+
self.in_flight_tasks.labels(message.task_name).dec()
138207
self.execution_time.labels(message.task_name).observe(result.execution_time)
139208

140209
def post_save(

0 commit comments

Comments
 (0)