Skip to content

Commit 38c4496

Browse files
committed
Merge PR #899 into 18.0
Signed-off-by guewen
2 parents 1ea2c45 + 7ae3a60 commit 38c4496

8 files changed

Lines changed: 88 additions & 23 deletions

File tree

queue_job/controllers/main.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
from werkzeug.exceptions import BadRequest, Forbidden
1414

1515
from odoo import SUPERUSER_ID, _, api, http
16-
from odoo.modules.registry import Registry
1716
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY
17+
from odoo.tools import config
1818

1919
from ..delay import chain, group
2020
from ..exception import FailedJobError, RetryableJobError
@@ -38,8 +38,10 @@ def _prevent_commit(cr):
3838
def forbidden_commit(*args, **kwargs):
3939
raise RuntimeError(
4040
"Commit is forbidden in queue jobs. "
41-
"If the current job is a cron running as queue job, "
42-
"modify it to run as a normal cron."
41+
'You may want to enable the "Allow Commit" option on the Job '
42+
"Function. Alternatively, if the current job is a cron running as "
43+
"queue job, you can modify it to run as a normal cron. More details on: "
44+
"https://github.com/OCA/queue/wiki/Upgrade-warning:-commits-inside-jobs"
4345
)
4446

4547
original_commit = cr.commit
@@ -103,7 +105,8 @@ def _try_perform_job(cls, env, job):
103105
job.set_done()
104106
job.store()
105107
env.flush_all()
106-
env.cr.commit()
108+
if not config["test_enable"]:
109+
env.cr.commit()
107110
_logger.debug("%s done", job)
108111

109112
@classmethod
@@ -146,8 +149,7 @@ def _enqueue_dependent_jobs(cls, env, job):
146149
def _runjob(cls, env: api.Environment, job: Job) -> None:
147150
def retry_postpone(job, message, seconds=None):
148151
job.env.clear()
149-
with Registry(job.env.cr.dbname).cursor() as new_cr:
150-
job.env = api.Environment(new_cr, SUPERUSER_ID, {})
152+
with job.in_temporary_env():
151153
job.postpone(result=message, seconds=seconds)
152154
job.set_pending(reset_retry=False)
153155
job.store()
@@ -180,8 +182,7 @@ def retry_postpone(job, message, seconds=None):
180182
traceback_txt = buff.getvalue()
181183
_logger.error(traceback_txt)
182184
job.env.clear()
183-
with Registry(job.env.cr.dbname).cursor() as new_cr:
184-
job.env = job.env(cr=new_cr)
185+
with job.in_temporary_env():
185186
vals = cls._get_failure_values(job, traceback_txt, orig_exception)
186187
job.set_failed(**vals)
187188
job.store()
@@ -233,6 +234,7 @@ def create_test_job(
233234
failure_rate=0,
234235
job_duration=0,
235236
commit_within_job=False,
237+
failure_retry_seconds=0,
236238
):
237239
if not http.request.env.user.has_group("base.group_erp_manager"):
238240
raise Forbidden(_("Access Denied"))
@@ -270,6 +272,12 @@ def create_test_job(
270272
except ValueError:
271273
max_retries = None
272274

275+
if failure_retry_seconds is not None:
276+
try:
277+
failure_retry_seconds = int(failure_retry_seconds)
278+
except ValueError:
279+
failure_retry_seconds = 0
280+
273281
if size == 1:
274282
return self._create_single_test_job(
275283
priority=priority,
@@ -279,6 +287,7 @@ def create_test_job(
279287
failure_rate=failure_rate,
280288
job_duration=job_duration,
281289
commit_within_job=commit_within_job,
290+
failure_retry_seconds=failure_retry_seconds,
282291
)
283292

284293
if size > 1:
@@ -291,6 +300,7 @@ def create_test_job(
291300
failure_rate=failure_rate,
292301
job_duration=job_duration,
293302
commit_within_job=commit_within_job,
303+
failure_retry_seconds=failure_retry_seconds,
294304
)
295305
return ""
296306

@@ -304,6 +314,7 @@ def _create_single_test_job(
304314
failure_rate=0,
305315
job_duration=0,
306316
commit_within_job=False,
317+
failure_retry_seconds=0,
307318
):
308319
delayed = (
309320
http.request.env["queue.job"]
@@ -317,6 +328,7 @@ def _create_single_test_job(
317328
failure_rate=failure_rate,
318329
job_duration=job_duration,
319330
commit_within_job=commit_within_job,
331+
failure_retry_seconds=failure_retry_seconds,
320332
)
321333
)
322334
return f"job uuid: {delayed.db_record().uuid}"
@@ -333,6 +345,7 @@ def _create_graph_test_jobs(
333345
failure_rate=0,
334346
job_duration=0,
335347
commit_within_job=False,
348+
failure_retry_seconds=0,
336349
):
337350
model = http.request.env["queue.job"]
338351
current_count = 0
@@ -359,6 +372,7 @@ def _create_graph_test_jobs(
359372
failure_rate=failure_rate,
360373
job_duration=job_duration,
361374
commit_within_job=commit_within_job,
375+
failure_retry_seconds=failure_retry_seconds,
362376
)
363377
)
364378

queue_job/job.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import sys
99
import uuid
1010
import weakref
11+
from contextlib import contextmanager, nullcontext
1112
from datetime import datetime, timedelta
1213
from random import randint
1314

@@ -402,14 +403,9 @@ def __init__(
402403
raise TypeError("Job accepts only methods of Models")
403404

404405
recordset = func.__self__
405-
env = recordset.env
406406
self.method_name = func.__name__
407407
self.recordset = recordset
408408

409-
self.env = env
410-
self.job_model = self.env["queue.job"]
411-
self.job_model_name = "queue.job"
412-
413409
self.job_config = (
414410
self.env["queue.job.function"].sudo().job_config(self.job_function_name)
415411
)
@@ -459,10 +455,10 @@ def __init__(
459455
self.exc_message = None
460456
self.exc_info = None
461457

462-
if "company_id" in env.context:
463-
company_id = env.context["company_id"]
458+
if "company_id" in self.env.context:
459+
company_id = self.env.context["company_id"]
464460
else:
465-
company_id = env.company.id
461+
company_id = self.env.company.id
466462
self.company_id = company_id
467463
self._eta = None
468464
self.eta = eta
@@ -487,7 +483,12 @@ def perform(self):
487483
"""
488484
self.retry += 1
489485
try:
490-
self.result = self.func(*tuple(self.args), **self.kwargs)
486+
if self.job_config.allow_commit:
487+
env_context_manager = self.in_temporary_env()
488+
else:
489+
env_context_manager = nullcontext()
490+
with env_context_manager:
491+
self.result = self.func(*tuple(self.args), **self.kwargs)
491492
except RetryableJobError as err:
492493
if err.ignore_retry:
493494
self.retry -= 1
@@ -507,6 +508,16 @@ def perform(self):
507508

508509
return self.result
509510

511+
@contextmanager
512+
def in_temporary_env(self):
513+
with self.env.registry.cursor() as new_cr:
514+
env = self.env
515+
self._env = env(cr=new_cr)
516+
try:
517+
yield
518+
finally:
519+
self._env = env
520+
510521
def _get_common_dependent_jobs_query(self):
511522
return """
512523
UPDATE queue_job
@@ -668,6 +679,14 @@ def __hash__(self):
668679
def db_record(self):
669680
return self.db_records_from_uuids(self.env, [self.uuid])
670681

682+
@property
683+
def env(self):
684+
return self.recordset.env
685+
686+
@env.setter
687+
def _env(self, env):
688+
self.recordset = self.recordset.with_env(env)
689+
671690
@property
672691
def func(self):
673692
recordset = self.recordset.with_context(job_uuid=self.uuid)
@@ -732,7 +751,7 @@ def model_name(self):
732751

733752
@property
734753
def user_id(self):
735-
return self.recordset.env.uid
754+
return self.env.uid
736755

737756
@property
738757
def eta(self):

queue_job/models/queue_job.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from odoo.addons.base_sparse_field.models.fields import Serialized
1313

1414
from ..delay import Graph
15-
from ..exception import JobError
15+
from ..exception import JobError, RetryableJobError
1616
from ..fields import JobSerialized
1717
from ..job import (
1818
CANCELLED,
@@ -453,10 +453,23 @@ def related_action_open_record(self):
453453
)
454454
return action
455455

456-
def _test_job(self, failure_rate=0, job_duration=0, commit_within_job=False):
456+
def _test_job(
457+
self,
458+
failure_rate=0,
459+
job_duration=0,
460+
commit_within_job=False,
461+
failure_retry_seconds=0,
462+
):
457463
_logger.info("Running test job.")
458464
if random.random() <= failure_rate:
459-
raise JobError("Job failed")
465+
if failure_retry_seconds:
466+
raise RetryableJobError(
467+
f"Retryable job failed, will be retried in "
468+
f"{failure_retry_seconds} seconds",
469+
seconds=failure_retry_seconds,
470+
)
471+
else:
472+
raise JobError("Job failed")
460473
if job_duration:
461474
time.sleep(job_duration)
462475
if commit_within_job:

queue_job/models/queue_job_function.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ class QueueJobFunction(models.Model):
2828
"related_action_enable "
2929
"related_action_func_name "
3030
"related_action_kwargs "
31-
"job_function_id ",
31+
"job_function_id "
32+
"allow_commit",
3233
)
3334

3435
def _default_channel(self):
@@ -79,6 +80,12 @@ def _default_channel(self):
7980
"enable, func_name, kwargs.\n"
8081
"See the module description for details.",
8182
)
83+
allow_commit = fields.Boolean(
84+
help="Allows the job to commit transactions during execution. "
85+
"Under the hood, this executes the job in a new database cursor, "
86+
"which incurs an overhead as it requires an extra connection to "
87+
"the database. "
88+
)
8289

8390
@api.depends("model_id.model", "method")
8491
def _compute_name(self):
@@ -149,6 +156,7 @@ def job_default_config(self):
149156
related_action_func_name=None,
150157
related_action_kwargs={},
151158
job_function_id=None,
159+
allow_commit=False,
152160
)
153161

154162
def _parse_retry_pattern(self):
@@ -184,6 +192,7 @@ def job_config(self, name):
184192
related_action_func_name=config.related_action.get("func_name"),
185193
related_action_kwargs=config.related_action.get("kwargs", {}),
186194
job_function_id=config.id,
195+
allow_commit=config.allow_commit,
187196
)
188197

189198
def _retry_pattern_format_error_message(self):

queue_job/tests/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def _add_job(self, *args, **kwargs):
275275

276276
def _prepare_context(self, job):
277277
# pylint: disable=context-overridden
278-
job_model = job.job_model.with_context({})
278+
job_model = job.env["queue.job"].with_context({})
279279
field_records = job_model._fields["records"]
280280
# Filter the context to simulate store/load of the job
281281
job.recordset = field_records.convert_to_write(job.recordset, job_model)

queue_job/tests/test_model_job_function.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def test_function_job_config(self):
4242
' "func_name": "related_action_foo",'
4343
' "kwargs": {"b": 1}}'
4444
),
45+
"allow_commit": True,
4546
}
4647
)
4748
self.assertEqual(
@@ -53,5 +54,6 @@ def test_function_job_config(self):
5354
related_action_func_name="related_action_foo",
5455
related_action_kwargs={"b": 1},
5556
job_function_id=job_function.id,
57+
allow_commit=True,
5658
),
5759
)

queue_job/tests/test_run_rob_controller.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,9 @@ def test_get_failure_values(self):
1515
self.assertEqual(
1616
rslt, {"exc_info": "info", "exc_name": "Exception", "exc_message": "zero"}
1717
)
18+
19+
def test_runjob_success(self):
20+
job = self.env["queue.job"].with_delay()._test_job()
21+
RunJobController._runjob(self.env, job)
22+
self.assertEqual(job.state, "done")
23+
self.assertEqual(job.db_record().state, "done")

queue_job/views/queue_job_function_views.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<field name="model_id" required="1" />
1111
<field name="method" required="1" />
1212
<field name="channel_id" />
13+
<field name="allow_commit" />
1314
<field name="edit_retry_pattern" widget="ace" />
1415
<field name="edit_related_action" widget="ace" />
1516
</group>
@@ -24,6 +25,7 @@
2425
<list>
2526
<field name="name" />
2627
<field name="channel_id" />
28+
<field name="allow_commit" />
2729
</list>
2830
</field>
2931
</record>

0 commit comments

Comments
 (0)