Skip to content

Commit 2eba64e

Browse files
committed
initial changes to record args & kwargs
1 parent d93ae1d commit 2eba64e

3 files changed

Lines changed: 80 additions & 2 deletions

File tree

taskbadger/celery.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
KWARG_PREFIX = "taskbadger_"
2020
TB_KWARGS_ARG = f"{KWARG_PREFIX}kwargs"
21-
IGNORE_ARGS = {TB_KWARGS_ARG, f"{KWARG_PREFIX}task", f"{KWARG_PREFIX}task_id"}
21+
IGNORE_ARGS = {TB_KWARGS_ARG, f"{KWARG_PREFIX}task", f"{KWARG_PREFIX}task_id", f"{KWARG_PREFIX}record_task_args"}
2222
TB_TASK_ID = f"{KWARG_PREFIX}task_id"
2323

2424
TERMINAL_STATES = {
@@ -124,6 +124,8 @@ def apply_async(self, *args, **kwargs):
124124
if Badger.is_configured():
125125
headers["taskbadger_track"] = True
126126
headers[TB_KWARGS_ARG] = tb_kwargs
127+
if "record_task_args" in tb_kwargs:
128+
headers["taskbadger_record_task_args"] = tb_kwargs.pop("record_task_args")
127129

128130
result = super().apply_async(*args, **kwargs)
129131

@@ -187,6 +189,12 @@ def task_publish_handler(sender=None, headers=None, body=None, **kwargs):
187189
kwargs["status"] = StatusEnum.PENDING
188190
name = kwargs.pop("name", headers["task"])
189191

192+
global_record_task_args = celery_system and celery_system.record_task_args
193+
if headers.get("taskbadger_record_task_args", global_record_task_args):
194+
data = kwargs.setdefault("data", {})
195+
data["celery_task_args"] = body[0]
196+
data["celery_task_kwargs"] = body[1]
197+
190198
task = create_task_safe(name, **kwargs)
191199
if task:
192200
meta = {TB_TASK_ID: task.id}

taskbadger/systems/celery.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
class CelerySystemIntegration(System):
77
identifier = "celery"
88

9-
def __init__(self, auto_track_tasks=True, includes=None, excludes=None):
9+
def __init__(self, auto_track_tasks=True, includes=None, excludes=None, record_task_args=False):
1010
"""
1111
Args:
1212
auto_track_tasks: Automatically track all Celery tasks regardless of whether they are using the
@@ -16,10 +16,12 @@ def __init__(self, auto_track_tasks=True, includes=None, excludes=None):
1616
matches both an include and an exclude, it will be excluded.
1717
excludes: A list of task names to exclude from tracking. As with `includes`, these can be either
1818
the full task name or a regular expression. Exclusions take precedence over inclusions.
19+
record_task_args: Record the arguments passed to each task.
1920
"""
2021
self.auto_track_tasks = auto_track_tasks
2122
self.includes = includes
2223
self.excludes = excludes
24+
self.record_task_args = record_task_args
2325

2426
if auto_track_tasks:
2527
# Importing this here ensures that the Celery signal handlers are registered

tests/test_celery.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,74 @@ def add_with_task_args(self, a, b):
111111
create.assert_called_once_with("new_name", value_max=10, actions=actions, status=StatusEnum.PENDING)
112112

113113

114+
def test_celery_record_args(celery_session_app, celery_session_worker, bind_settings):
115+
@celery_session_app.task(bind=True, base=Task)
116+
def add_with_task_args(self, a, b):
117+
assert self.taskbadger_task is not None
118+
return a + b
119+
120+
celery_session_worker.reload()
121+
122+
with (
123+
mock.patch("taskbadger.celery.create_task_safe") as create,
124+
mock.patch("taskbadger.celery.update_task_safe"),
125+
mock.patch("taskbadger.celery.get_task"),
126+
):
127+
create.return_value = task_for_test()
128+
129+
result = add_with_task_args.apply_async(
130+
(2, 2),
131+
taskbadger_name="new_name",
132+
taskbadger_value_max=10,
133+
taskbadger_kwargs={"data": {"foo": "bar"}},
134+
taskbadger_record_task_args=True,
135+
)
136+
assert result.get(timeout=10, propagate=True) == 4
137+
138+
create.assert_called_once_with(
139+
"new_name",
140+
value_max=10,
141+
data={"foo": "bar", "celery_task_args": (2, 2), "celery_task_kwargs": {}},
142+
status=StatusEnum.PENDING,
143+
)
144+
145+
146+
def test_celery_record_task_kwargs(celery_session_app, celery_session_worker, bind_settings):
147+
@celery_session_app.task(bind=True, base=Task)
148+
def add_with_task_args(self, a, b, c=0):
149+
assert self.taskbadger_task is not None
150+
return a + b + c
151+
152+
celery_session_worker.reload()
153+
154+
with (
155+
mock.patch("taskbadger.celery.create_task_safe") as create,
156+
mock.patch("taskbadger.celery.update_task_safe"),
157+
mock.patch("taskbadger.celery.get_task"),
158+
):
159+
create.return_value = task_for_test()
160+
161+
actions = [Action("stale", integration=EmailIntegration(to="test@test.com"))]
162+
result = add_with_task_args.delay(
163+
2,
164+
2,
165+
c=3,
166+
taskbadger_name="new_name",
167+
taskbadger_value_max=10,
168+
taskbadger_kwargs={"actions": actions},
169+
taskbadger_record_task_args=True,
170+
)
171+
assert result.get(timeout=10, propagate=True) == 7
172+
173+
create.assert_called_once_with(
174+
"new_name",
175+
value_max=10,
176+
data={"celery_task_args": (2, 2), "celery_task_kwargs": {"c": 3}},
177+
actions=actions,
178+
status=StatusEnum.PENDING,
179+
)
180+
181+
114182
def test_celery_task_with_args_in_decorator(celery_session_app, celery_session_worker, bind_settings):
115183
@celery_session_app.task(
116184
bind=True,

0 commit comments

Comments
 (0)