feat(clp-package)!: Dispatch batches of search tasks in parallel to improve query scheduler throughput.#2184
feat(clp-package)!: Dispatch batches of search tasks in parallel to improve query scheduler throughput.#2184gibber9809 wants to merge 16 commits intoy-scope:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughReplaced synchronous in-process dispatch of SEARCH_OR_AGGREGATION jobs with a ProcessPoolExecutor-based dispatch path. Introduced DispatchExecutor to initialize per-process DB pools and create Celery task groups; scheduler functions now accept database configuration and a concurrency parameter. Changes
Sequence Diagram(s)sequenceDiagram
participant Main as Main Process
participant Pool as ProcessPoolExecutor
participant Worker as Worker Process
participant DB as Database
participant Celery as Celery
Main->>Main: create ProcessPoolExecutor (initializer=DispatchExecutor.initialize)
Main->>Pool: submit DispatchExecutor.dispatch_job_and_update_db(job)
Pool->>Worker: run dispatch_job_and_update_db()
Worker->>DB: initialize per-process DB pool / insert query task rows
Worker->>Celery: trigger search task group (task IDs)
Celery-->>Worker: return group_result_id
Worker-->>Main: return (job_id, archive_count, group_result_id)
Main->>Main: restore GroupResult(app) and mark job RUNNING
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| existing_datasets, | ||
| archive_retention_period, | ||
| with concurrent.futures.ProcessPoolExecutor( | ||
| max_workers=16, |
There was a problem hiding this comment.
Do we want to make this configurable?
There was a problem hiding this comment.
I think we should, in package config I assume.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py`:
- Around line 807-835: The code currently sets the job RUNNING and updates the
DB before confirming dispatch succeeded; change this so
dispatch_job_and_update_db_for_executor is awaited per-future and only upon
successful future.result() do you set job.state = InternalJobState.RUNNING and
call set_job_or_task_status/DB updates (QUERY_JOBS_TABLE_NAME) for
start_time/num_tasks. Implement a future→job_id mapping when submitting with
process_pool.submit(dispatch_job_and_update_db_for_executor, ...) so in the
as_completed loop you can catch exceptions for each future, handle failures
(log/mark job as FAILED or revert to PENDING) without leaving the job RUNNING
with no async handle, and only restore GroupResult and set
job.current_sub_job_async_task_result and job.state after future.result()
returns the group_result_id successfully; keep handle_jobs() robust to
per-future exceptions.
- Around line 614-625: The duplicated celery.group construction for search tasks
should be extracted into a single helper function (e.g., build_search_task_group
or _assemble_search_task_group) that accepts the shared inputs (job_id,
archives, task_ids, job_config, executor_clp_metadata_db_conn_params,
executor_results_cache_uri) and returns the celery.group of search.s(...) tasks;
replace the inline generator in this file and the identical block in
get_task_group_for_job() to call that helper so both dispatch paths use the same
assembly logic and avoid drift.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 3b720689-a5e3-43df-9fa3-a889d04dfce8
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
| if job.start_time is None: | ||
| job.start_time = datetime.datetime.now() | ||
| set_job_or_task_status( | ||
| db_conn, | ||
| QUERY_JOBS_TABLE_NAME, | ||
| job_id, | ||
| QueryJobStatus.RUNNING, | ||
| QueryJobStatus.PENDING, | ||
| start_time=job.start_time, | ||
| num_tasks=job.num_archives_to_search, | ||
| ) | ||
|
|
||
| futures.append( | ||
| process_pool.submit( | ||
| dispatch_job_and_update_db_for_executor, | ||
| job.get_config().model_dump(), | ||
| job.get_type(), | ||
| job_id, | ||
| archives_for_search, | ||
| ) | ||
| ) | ||
|
|
||
| for future in concurrent.futures.as_completed(futures): | ||
| job_id, num_archives_for_search, group_result_id = future.result() | ||
| job = active_jobs[job_id] | ||
| job.current_sub_job_async_task_result = celery.result.GroupResult.restore( | ||
| group_result_id, app=app | ||
| ) | ||
| job.state = InternalJobState.RUNNING |
There was a problem hiding this comment.
Do not mark the job RUNNING before the dispatch worker succeeds.
This flips DB state before task rows are inserted and before the async handle is available. If any submitted dispatch fails, the exception escapes this loop, handle_jobs() exits, and the job is left RUNNING with nothing to poll or revoke. Please keep a future→job mapping, handle per-future failures, and move the first RUNNING update behind successful dispatch.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py`
around lines 807 - 835, The code currently sets the job RUNNING and updates the
DB before confirming dispatch succeeded; change this so
dispatch_job_and_update_db_for_executor is awaited per-future and only upon
successful future.result() do you set job.state = InternalJobState.RUNNING and
call set_job_or_task_status/DB updates (QUERY_JOBS_TABLE_NAME) for
start_time/num_tasks. Implement a future→job_id mapping when submitting with
process_pool.submit(dispatch_job_and_update_db_for_executor, ...) so in the
as_completed loop you can catch exceptions for each future, handle failures
(log/mark job as FAILED or revert to PENDING) without leaving the job RUNNING
with no async handle, and only restore GroupResult and set
job.current_sub_job_async_task_result and job.state after future.result()
returns the group_result_id successfully; keep handle_jobs() robust to
per-future exceptions.
LinZhihao-723
left a comment
There was a problem hiding this comment.
The functionality changes lgtm. Leaving a code structure comment for better maintainability.
| existing_datasets, | ||
| archive_retention_period, | ||
| with concurrent.futures.ProcessPoolExecutor( | ||
| max_workers=16, |
There was a problem hiding this comment.
I think we should, in package config I assume.
| # Globals for dispatch executor pool | ||
| executor_db_conn_pool: ConnectionPoolWrapper | None = None | ||
| executor_clp_metadata_db_conn_params: dict[str, any] | None = None | ||
| executor_results_cache_uri: str | None = None |
There was a problem hiding this comment.
- This file is already long and has many functions serving for different purposes.
- The current naming of
_for_executoris a bit confusing.
How about restructuring in the following way to improve the readability and maintainability of the per-executor-process data and methods:
class DispatchExecutor:
# Globals for dispatch executor pool
_db_conn_pool: ConnectionPoolWrapper | None = None
_clp_metadata_db_conn_params: dict[str, any] | None = None
_results_cache_uri: str | None = None
@classmethod
def initialize(
cls,
database_config: Database,
clp_metadata_db_conn_params: dict[str, any],
results_cache_uri: str,
) -> None:
cls._clp_metadata_db_conn_params = clp_metadata_db_conn_params
sql_adapter = SqlAdapter(database_config)
cls._db_conn_pool = sql_adapter.create_connection_pool(
logger=logger, pool_size=1, disable_localhost_socket_connection=True
)
cls._results_cache_uri = results_cache_uri
@staticmethod
def dispatch_job_and_update_db(
job_config: dict[str, any], job_type: QueryJobType, job_id: str, archives: list[dict]
) -> tuple[str, int, str]:
if not QueryJobType.SEARCH_OR_AGGREGATION == job_type:
raise NotImplementedError(f"Unexpected job type: {job_type}")
archive_ids = [a["archive_id"] for a in archives]
with contextlib.closing(SearchJobExecutor._db_conn_pool.connect()) as db_conn:
task_ids = insert_query_tasks_into_db(db_conn, job_id, archive_ids)
celery_task_group = celery.group(
search.s(
job_id=job_id,
archive_id=archives[i]["archive_id"],
task_id=task_ids[i],
job_config=job_config,
dataset=archives[i].get("dataset"),
clp_metadata_db_conn_params=SearchJobExecutor._clp_metadata_db_conn_params,
results_cache_uri=SearchJobExecutor._results_cache_uri,
)
for i in range(len(archives))
)
group_result = celery_task_group.apply_async()
group_result.save()
return job_id, len(archives), group_result.idPool initialization:
with concurrent.futures.ProcessPoolExecutor(
max_workers=16,
initializer=SearchJobExecutor.initialize,
initargs=(database_config, clp_metadata_db_conn_params, results_cache_uri),
) as process_pool:Pool submission:
futures.append(
process_pool.submit(
SearchJobExecutor.dispatch_job_and_update_db,
job.get_config().model_dump(),
job.get_type(),
job_id,
archives_for_search,
)
)There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tools/deployment/package-helm/templates/configmap.yaml (1)
129-136:⚠️ Potential issue | 🟠 MajorRender
concurrency, notparallelism, inquery_scheduler.The generated
clp-config.yamlnow writesquery_scheduler.parallelism, butcomponents/clp-py-utils/clp_py_utils/clp_config.pyonly definesQueryScheduler.concurrencyand does not expose an alias forparallelism. In Helm deployments this setting will be ignored, so changing the chart value will not actually change the process-pool size. Please rename the chart key/template/docs toconcurrency, or add a Pydantic alias ifparallelismmust remain part of the chart API.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/deployment/package-helm/templates/configmap.yaml` around lines 129 - 136, The Helm template writes query_scheduler.parallelism which doesn't match the Python setting QueryScheduler.concurrency in components/clp-py-utils/clp_py_utils/clp_config.py; fix by renaming the template key (in the query_scheduler block) from parallelism to concurrency so the generated clp-config.yaml sets query_scheduler.concurrency, or alternatively add a Pydantic alias for "parallelism" on the QueryScheduler.concurrency field (using Field(..., alias="parallelism")) so both names are accepted—update either the template (key: concurrency) or the QueryScheduler definition (alias) and adjust any docs to match.
♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py (1)
785-830:⚠️ Potential issue | 🔴 CriticalDo not flip jobs to
RUNNINGbefore the dispatch future succeeds.Lines 802-821 update the DB to
RUNNINGand consumeremaining_archives_for_searchbefore the worker has inserted task rows and returned agroup_result_id. IfDispatchExecutor.dispatch_job_and_update_db()fails,future.result()will unwind this loop and the job is leftRUNNINGwith no Celery handle to poll or revoke; the sliced archives are already gone from in-memory state too. Keep a future→job/archives mapping, move the firstRUNNINGtransition behind a successfulfuture.result(), and restore or fail the job explicitly on per-future errors.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py` around lines 785 - 830, The loop currently sets job.start_time and calls set_job_or_task_status(..., QueryJobStatus.RUNNING, ...) and mutates job.remaining_archives_for_search before awaiting DispatchExecutor.dispatch_job_and_update_db; instead, keep a mapping of each submitted future to its job and archives_for_search (e.g., future_map[future] = (job, archives_for_search)), do NOT call set_job_or_task_status or remove archives from the job until the future completes successfully and returns (via future.result()), then on success call set_job_or_task_status to RUNNING and update job.current_sub_job_async_task_result/state; on future.exception() restore job.remaining_archives_for_search by prepending the archives_for_search back, set the job state to a terminal or retryable state (or explicitly set back to PENDING), and ensure any start_time/status updates only happen after DispatchExecutor.dispatch_job_and_update_db returns a group_result_id; reference DispatchExecutor.dispatch_job_and_update_db, QueryJobStatus.RUNNING, job.remaining_archives_for_search, pending_search_jobs, active_jobs, and set_job_or_task_status to locate the changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@tools/deployment/package-helm/templates/configmap.yaml`:
- Around line 129-136: The Helm template writes query_scheduler.parallelism
which doesn't match the Python setting QueryScheduler.concurrency in
components/clp-py-utils/clp_py_utils/clp_config.py; fix by renaming the template
key (in the query_scheduler block) from parallelism to concurrency so the
generated clp-config.yaml sets query_scheduler.concurrency, or alternatively add
a Pydantic alias for "parallelism" on the QueryScheduler.concurrency field
(using Field(..., alias="parallelism")) so both names are accepted—update either
the template (key: concurrency) or the QueryScheduler definition (alias) and
adjust any docs to match.
---
Duplicate comments:
In
`@components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py`:
- Around line 785-830: The loop currently sets job.start_time and calls
set_job_or_task_status(..., QueryJobStatus.RUNNING, ...) and mutates
job.remaining_archives_for_search before awaiting
DispatchExecutor.dispatch_job_and_update_db; instead, keep a mapping of each
submitted future to its job and archives_for_search (e.g., future_map[future] =
(job, archives_for_search)), do NOT call set_job_or_task_status or remove
archives from the job until the future completes successfully and returns (via
future.result()), then on success call set_job_or_task_status to RUNNING and
update job.current_sub_job_async_task_result/state; on future.exception()
restore job.remaining_archives_for_search by prepending the archives_for_search
back, set the job state to a terminal or retryable state (or explicitly set back
to PENDING), and ensure any start_time/status updates only happen after
DispatchExecutor.dispatch_job_and_update_db returns a group_result_id; reference
DispatchExecutor.dispatch_job_and_update_db, QueryJobStatus.RUNNING,
job.remaining_archives_for_search, pending_search_jobs, active_jobs, and
set_job_or_task_status to locate the changes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 1dcefdc8-5f7a-47c4-b67b-be3c33388557
📒 Files selected for processing (7)
components/clp-py-utils/clp_py_utils/clp_config.pycomponents/job-orchestration/job_orchestration/scheduler/query/query_scheduler.pycomponents/package-template/src/etc/clp-config.template.text.yamldocs/src/user-docs/guides-docker-compose-deployment.mddocs/src/user-docs/guides-k8s-deployment.mdtools/deployment/package-helm/templates/configmap.yamltools/deployment/package-helm/values.yaml
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/package-template/src/etc/clp-config.template.json.yaml`:
- Line 67: Add a brief inline comment next to the newly added concurrency field
to explain its purpose and default (e.g., controls number of concurrent query
workers), referencing the corresponding runtime setting
QueryScheduler.concurrency so users can tune it; update the comment style to
match nearby examples (see fields around lines with similar inline comments) and
keep indentation/format consistent with the template.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: e900a1d4-8ac3-46e0-91d5-cd6fe9a3e7cd
📒 Files selected for processing (1)
components/package-template/src/etc/clp-config.template.json.yaml
| # max_datasets_per_query: 10 # Set to null to disable the limit | ||
| # num_archives_to_search_per_sub_job: 16 | ||
| # logging_level: "INFO" | ||
| # concurrency: 16 |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
LGTM! Configuration field correctly added.
The concurrency field is properly added with the correct default value of 16, matching the QueryScheduler.concurrency field definition in the codebase. The indentation and formatting are consistent with other template options.
Optional improvement: Consider adding an inline comment to help users understand this setting, e.g.:
-# concurrency: 16
+# concurrency: 16 # Number of parallel worker processes for dispatching search tasksThis would be consistent with the documentation style used for some other fields (e.g., lines 50, 51, 64) and could help users tune this parameter appropriately.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # concurrency: 16 | |
| # concurrency: 16 # Number of parallel worker processes for dispatching search tasks |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/package-template/src/etc/clp-config.template.json.yaml` at line
67, Add a brief inline comment next to the newly added concurrency field to
explain its purpose and default (e.g., controls number of concurrent query
workers), referencing the corresponding runtime setting
QueryScheduler.concurrency so users can tune it; update the comment style to
match nearby examples (see fields around lines with similar inline comments) and
keep indentation/format consistent with the template.
LinZhihao-723
left a comment
There was a problem hiding this comment.
The code changes lgtm.
In the package deployment code: why do we use parallelism in the helm chart, but concurrency in the package config? Shall we consist the names or it's a special keyword in helm?
I think I just missed renaming in one place while I was deciding -- I'll update to make these consistent. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tools/deployment/package-helm/values.yaml`:
- Line 148: Add an inline comment next to the concurrency key to clarify that
the "concurrency" value configures the scheduler dispatch process concurrency
(the number of concurrent dispatch operations) and is distinct from the
workerConcurrency setting; update the line with an explanatory comment
mentioning both "concurrency" and "workerConcurrency" so operators won't confuse
the scheduler dispatch limit with the worker concurrency limit.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: ba8e4fa1-da8f-4975-ac24-9623d0ba3f4a
📒 Files selected for processing (4)
docs/src/user-docs/guides-k8s-deployment.mdtools/deployment/package-helm/Chart.yamltools/deployment/package-helm/templates/configmap.yamltools/deployment/package-helm/values.yaml
| jobs_poll_delay: 0.1 # seconds | ||
| num_archives_to_search_per_sub_job: 16 | ||
| logging_level: "INFO" | ||
| concurrency: 16 |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Clarify this concurrency knob to avoid operator confusion.
Line 148 is correct, but an inline comment here would help distinguish scheduler dispatch process concurrency from workerConcurrency at Line 31.
💡 Suggested clarification
query_scheduler:
jobs_poll_delay: 0.1 # seconds
num_archives_to_search_per_sub_job: 16
logging_level: "INFO"
+ # Number of process-pool workers used for dispatch in query scheduler.
concurrency: 16📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| concurrency: 16 | |
| query_scheduler: | |
| jobs_poll_delay: 0.1 # seconds | |
| num_archives_to_search_per_sub_job: 16 | |
| logging_level: "INFO" | |
| # Number of process-pool workers used for dispatch in query scheduler. | |
| concurrency: 16 |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/deployment/package-helm/values.yaml` at line 148, Add an inline comment
next to the concurrency key to clarify that the "concurrency" value configures
the scheduler dispatch process concurrency (the number of concurrent dispatch
operations) and is distinct from the workerConcurrency setting; update the line
with an explanatory comment mentioning both "concurrency" and
"workerConcurrency" so operators won't confuse the scheduler dispatch limit with
the worker concurrency limit.
|
@junhaoliao do you think the way I've set up the config for query scheduler concurrency makes sense, or do you think it should work more similarly to the worker concurrency? |
LinZhihao-723
left a comment
There was a problem hiding this comment.
I don't have any more concerns. Will leave the rest to Junhao for the package config checking.
Btw, I think this seems to be a breaking change since we update the package config.
junhaoliao
left a comment
There was a problem hiding this comment.
i agree the change is breaking unless we set the concurrency default count to 1 - now users upgrading without touching their config would suddenly get 16 extra processes they didn't have before, and they should be aware of such resource allocation considerations.
i proposed a change to the concurrency config field name to avoid confusions with the (celery) worker pod / node concurrency settings
| max_datasets_per_query: PositiveInt | None = 10 | ||
| num_archives_to_search_per_sub_job: PositiveInt = 16 | ||
| logging_level: LoggingLevel = "INFO" | ||
| concurrency: PositiveInt = 16 |
There was a problem hiding this comment.
would renaming this as dispatch_pool_size help resolve any potential confusion with the Celery --concurrency parameter and the workConcurrency key in values.yaml?
There was a problem hiding this comment.
I wanted to call it something a bit more generic just because we might end up using the same pool of process to parallelize some parts of retiring tasks as well, but yeah we could rename to something like scheduler_concurrency to further distinguish from the worker concurrency.
I'll rename it to distinguish it a bit more. Also agree that this could mess with current deployments, so I think that making the default lower makes sense -- do you think it's better to default to |
Description
This PR parallelizes task dispatch across processes in a process pool executor to reduce the dispatch bottleneck when running concurrent jobs. On startup, each process in the process pool creates a database connection pool with one connection and stores some config needed for most jobs in the global scope for later use during dispatch. During dispatch task groups are dispatched to celery in the dispatch processes, then rehydrated in the main process so that results can be retrieved from the main process.
Overall, on an old Xeon server setup, this improves overall per-batch scaling (both dispatching and retiring) from ~47.3ms/batch to ~26.5ms/batch given sufficient concurrent jobs for batches of tasks to be dispatched in parallel.
This change is marked "breaking" since it updates the package config.
Checklist
breaking change.
Validation performed
Summary by CodeRabbit
New Features
Chores
Documentation