Skip to content

Commit 5fd9635

Browse files
committed
support custom serialization
1 parent 72df019 commit 5fd9635

2 files changed

Lines changed: 39 additions & 1 deletion

File tree

taskbadger/celery.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
task_retry,
1212
task_success,
1313
)
14+
from kombu import serialization
1415

1516
from .internal.models import StatusEnum
1617
from .mug import Badger
@@ -197,7 +198,8 @@ def task_publish_handler(sender=None, headers=None, body=None, **kwargs):
197198
"celery_task_kwargs": body[1],
198199
}
199200
try:
200-
json.dumps(data)
201+
_, _, value = serialization.dumps(data, serializer="json")
202+
data = json.loads(value)
201203
except Exception:
202204
log.error("Error serializing task arguments for task '%s'", name)
203205
else:

tests/test_celery.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import celery
1515
import pytest
16+
from kombu.utils.json import register_type
1617

1718
from taskbadger import Action, EmailIntegration, StatusEnum
1819
from taskbadger.celery import Task
@@ -179,6 +180,41 @@ def add_with_task_args(self, a, b, c=0):
179180
)
180181

181182

183+
def test_celery_record_task_args_custom_serialization(celery_session_app, celery_session_worker, bind_settings):
184+
class A:
185+
def __init__(self, a, b):
186+
self.a = a
187+
self.b = b
188+
189+
register_type(A, "A", lambda o: [o.a, o.b], lambda o: A(*o))
190+
191+
@celery_session_app.task(bind=True, base=Task)
192+
def add_with_task_args(self, a):
193+
assert self.taskbadger_task is not None
194+
return a.a + a.b
195+
196+
celery_session_worker.reload()
197+
198+
with (
199+
mock.patch("taskbadger.celery.create_task_safe") as create,
200+
mock.patch("taskbadger.celery.update_task_safe"),
201+
mock.patch("taskbadger.celery.get_task"),
202+
):
203+
create.return_value = task_for_test()
204+
205+
result = add_with_task_args.delay(
206+
A(2, 2),
207+
taskbadger_record_task_args=True,
208+
)
209+
assert result.get(timeout=10, propagate=True) == 4
210+
211+
create.assert_called_once_with(
212+
"tests.test_celery.add_with_task_args",
213+
data={"celery_task_args": [{"__type__": "A", "__value__": [2, 2]}], "celery_task_kwargs": {}},
214+
status=StatusEnum.PENDING,
215+
)
216+
217+
182218
def test_celery_task_with_args_in_decorator(celery_session_app, celery_session_worker, bind_settings):
183219
@celery_session_app.task(
184220
bind=True,

0 commit comments

Comments
 (0)