Skip to content

Commit 4e6dfbe

Browse files
committed
Parallel Hash Full Join and Right Join
PostgreSQL originally excluded FULL and RIGHT outer joins from parallel hash join because of deadlock hazards in the per-batch barrier protocol. PG 14 resolved this by introducing a dedicated PHJ_BATCH_SCAN phase: one elected worker emits unmatched inner-side rows after probing, while the others detach and move on. In CBDB, distributed execution adds a second dimension: after a full outer join the unmatched NULL-filled rows may come from any segment, so the result carries a HashedOJ locus rather than a plain Hashed locus. This change teaches the parallel planner about that: - FULL JOIN and RIGHT JOIN are now valid parallel join types in the distributed planner. Previously they were unconditionally rejected, forcing serial execution across all segments. - The HashedOJ locus produced by a parallel full join now carries parallel_workers, so operators above the join (aggregates, further joins) can remain parallel. - A crash that could occur when a parallel LASJ_NOTIN (NOT IN) join encountered NULL inner keys is fixed. The worker would exit early but the batch barrier, which was never attached to, would be touched on shutdown causing an assertion failure. Example plans (3 segments, parallel_workers=2): -- FULL JOIN: result locus is HashedOJ with Parallel Workers: 2 EXPLAIN(costs off, locus) SELECT count(*) FROM t1 FULL JOIN t2 USING (id); Finalize Aggregate Locus: Entry -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate Locus: HashedOJ Parallel Workers: 2 -> Parallel Hash Full Join Locus: HashedOJ Parallel Workers: 2 Hash Cond: (t1.id = t2.id) -> Parallel Seq Scan on t1 Locus: HashedWorkers -> Parallel Hash -> Parallel Seq Scan on t2 Locus: HashedWorkers -- RIGHT JOIN: when t1 is larger the planner hashes the smaller t2 -- and probes with t1; result locus HashedWorkers EXPLAIN(costs off, locus) SELECT count(*) FROM t1 RIGHT JOIN t2 USING (id); Finalize Aggregate Locus: Entry -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate Locus: HashedWorkers Parallel Workers: 2 -> Parallel Hash Right Join Locus: HashedWorkers Parallel Workers: 2 Hash Cond: (t1.id = t2.id) -> Parallel Seq Scan on t1 Locus: HashedWorkers -> Parallel Hash -> Parallel Seq Scan on t2 Locus: HashedWorkers Performance (3 segments x 2 parallel workers, 6M rows each, 50% overlap): FULL JOIN parallel: 4040 ms serial: 6347 ms speedup: 1.57x RIGHT JOIN parallel: 3039 ms serial: 5568 ms speedup: 1.83x
1 parent ab1a78a commit 4e6dfbe

5 files changed

Lines changed: 39 additions & 23 deletions

File tree

src/backend/cdb/cdbpath.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3112,8 +3112,9 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
31123112
case JOIN_UNIQUE_INNER:
31133113
case JOIN_RIGHT:
31143114
case JOIN_FULL:
3115-
/* Join types are not supported in parallel yet. */
3116-
goto fail;
3115+
outer.ok_to_replicate = false;
3116+
inner.ok_to_replicate = false;
3117+
break;
31173118
case JOIN_DEDUP_SEMI:
31183119
if (!enable_parallel_dedup_semi_join)
31193120
goto fail;

src/backend/cdb/cdbpathlocus.c

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ cdbpathlocus_equal(CdbPathLocus a, CdbPathLocus b)
119119
list_length(a.distkey) != list_length(b.distkey))
120120
return false;
121121

122+
/*
123+
* CBDB_PARALLEL: What if both a and b are HashedOJ with parallel workers > 0 ?
124+
* Are they equal in practice?
125+
*/
126+
122127
if ((CdbPathLocus_IsHashed(a) || CdbPathLocus_IsHashedOJ(a)) &&
123128
(CdbPathLocus_IsHashed(b) || CdbPathLocus_IsHashedOJ(b)))
124129
return cdbpath_distkey_equal(a.distkey, b.distkey);
@@ -544,7 +549,7 @@ cdbpathlocus_from_subquery(struct PlannerInfo *root,
544549
else
545550
{
546551
Assert(CdbPathLocus_IsHashedOJ(subpath->locus));
547-
CdbPathLocus_MakeHashedOJ(&locus, distkeys, numsegments);
552+
CdbPathLocus_MakeHashedOJ(&locus, distkeys, numsegments, subpath->locus.parallel_workers);
548553
}
549554
}
550555
else
@@ -711,7 +716,7 @@ cdbpathlocus_pull_above_projection(struct PlannerInfo *root,
711716
CdbPathLocus_MakeHashedWorkers(&newlocus, newdistkeys, numsegments, locus.parallel_workers);
712717
}
713718
else
714-
CdbPathLocus_MakeHashedOJ(&newlocus, newdistkeys, numsegments);
719+
CdbPathLocus_MakeHashedOJ(&newlocus, newdistkeys, numsegments, locus.parallel_workers);
715720
return newlocus;
716721
}
717722
else
@@ -880,7 +885,7 @@ cdbpathlocus_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b)
880885

881886
newdistkeys = lappend(newdistkeys, newdistkey);
882887
}
883-
CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments);
888+
CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments, 0 /* Both are 0 parallel here*/);
884889
}
885890
Assert(cdbpathlocus_is_valid(resultlocus));
886891
return resultlocus;
@@ -1236,8 +1241,14 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo
12361241
Assert(cdbpathlocus_is_valid(a));
12371242
Assert(cdbpathlocus_is_valid(b));
12381243

1239-
/* Do both input rels have same locus? */
1240-
if (cdbpathlocus_equal(a, b))
1244+
/*
1245+
* Do both input rels have same locus?
1246+
* CBDB_PARALLEL: for FULL JOIN, it could be different even both
1247+
* are same loucs. Because the NULL values could be on any segments
1248+
* after join.
1249+
*/
1250+
1251+
if (jointype != JOIN_FULL && cdbpathlocus_equal(a, b))
12411252
return a;
12421253

12431254
/*
@@ -1412,8 +1423,9 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo
14121423
* If inner is hashed workers, and outer is hashed. Join locus will be hashed.
14131424
* If outer is hashed workers, and inner is hashed. Join locus will be hashed workers.
14141425
* Seems we should just return outer locus anyway.
1426+
* Things changed since we have parallel full join now.
14151427
*/
1416-
if (parallel_aware)
1428+
if (parallel_aware && jointype != JOIN_FULL)
14171429
return a;
14181430

14191431
numsegments = CdbPathLocus_NumSegments(a);
@@ -1469,7 +1481,9 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo
14691481
newdistkeys = lappend(newdistkeys, newdistkey);
14701482
}
14711483

1472-
CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments);
1484+
Assert(CdbPathLocus_NumParallelWorkers(a) == CdbPathLocus_NumParallelWorkers(b));
1485+
1486+
CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments, CdbPathLocus_NumParallelWorkers(a));
14731487
}
14741488
Assert(cdbpathlocus_is_valid(resultlocus));
14751489
return resultlocus;

src/backend/executor/nodeHash.c

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2401,11 +2401,11 @@ ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
24012401
int curbatch = hashtable->curbatch;
24022402
ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
24032403

2404-
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE);
2404+
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING);
24052405

24062406
/*
24072407
* It would not be deadlock-free to wait on the batch barrier, because it
2408-
* is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
2408+
* is in PHJ_BATCH_PROBING phase, and thus processes attached to it have
24092409
* already emitted tuples. Therefore, we'll hold a wait-free election:
24102410
* only one process can continue to the next phase, and all others detach
24112411
* from this batch. They can still go any work on other batches, if there
@@ -3975,12 +3975,12 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
39753975
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
39763976
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
39773977

3978-
/* After attaching we always get at least to PHJ_BATCH_PROBE. */
3979-
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
3978+
/* After attaching we always get at least to PHJ_BATCH_PROBING. */
3979+
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING ||
39803980
BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
39813981

39823982
/*
3983-
* If we're abandoning the PHJ_BATCH_PROBE phase early without having
3983+
* If we're abandoning the PHJ_BATCH_PROBING phase early without having
39843984
* reached the end of it, it means the plan doesn't want any more
39853985
* tuples, and it is happy to abandon any tuples buffered in this
39863986
* process's subplans. For correctness, we can't allow any process to
@@ -3995,13 +3995,13 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
39953995
* If phs_lasj_has_null is true, that means we have found null when building hash table,
39963996
* there were no batches to detach.
39973997
*/
3998-
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
3998+
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING &&
39993999
!hashtable->parallel_state->phs_lasj_has_null && /* CBDB_PARALLEL */
40004000
!hashtable->batches[curbatch].outer_eof)
40014001
{
40024002
/*
40034003
* This flag may be written to by multiple backends during
4004-
* PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
4004+
* PHJ_BATCH_PROBING phase, but will only be read in PHJ_BATCH_SCAN
40054005
* phase so requires no extra locking.
40064006
*/
40074007
batch->skip_unmatched = true;
@@ -4012,10 +4012,11 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
40124012
* the PHJ_BATCH_SCAN phase just to maintain the invariant that
40134013
* freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
40144014
*/
4015-
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
4015+
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING &&
40164016
!hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */)
40174017
attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
4018-
if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
4018+
if (attached && !hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */ &&
4019+
BarrierArriveAndDetach(&batch->batch_barrier))
40194020
{
40204021
/*
40214022
* We are not longer attached to the batch barrier, but we're the

src/backend/executor/nodeHashjoin.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@
8383
* PHJ_BATCH_ELECT -- initial state
8484
* PHJ_BATCH_ALLOCATE* -- one allocates buckets
8585
* PHJ_BATCH_LOAD -- all load the hash table from disk
86-
* PHJ_BATCH_PROBE -- all probe
86+
* PHJ_BATCH_PROBING -- all probe
8787
* PHJ_BATCH_SCAN* -- one does full/right unmatched scan
8888
* PHJ_BATCH_FREE* -- one frees memory
8989
*
@@ -102,7 +102,7 @@
102102
* to a barrier, unless the barrier has reached a phase that means that no
103103
* process will wait on it again. We emit tuples while attached to the build
104104
* barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
105-
* PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
105+
* PHJ_BATCH_PROBING. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
106106
* respectively without waiting, using BarrierArriveAndDetach() and
107107
* BarrierArriveAndDetachExceptLast() respectively. The last to detach
108108
* receives a different return value so that it knows that it's safe to
@@ -1549,7 +1549,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
15491549
* since that phase is already underway (the thing we
15501550
* can't do under current deadlock-avoidance rules is wait
15511551
* for others to arrive at PHJ_BATCH_SCAN, because
1552-
* PHJ_BATCH_PROBE emits tuples, but in this case we just
1552+
* PHJ_BATCH_PROBING emits tuples, but in this case we just
15531553
* got here without waiting). That is not yet done. For
15541554
* now, we just detach and go around again. We have to
15551555
* use ExecHashTableDetachBatch() because there's a small

src/include/cdb/cdbpathlocus.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,13 +292,13 @@ typedef struct CdbPathLocus
292292
_locus->parallel_workers = (parallel_workers_); \
293293
Assert(cdbpathlocus_is_valid(*_locus)); \
294294
} while (0)
295-
#define CdbPathLocus_MakeHashedOJ(plocus, distkey_, numsegments_) \
295+
#define CdbPathLocus_MakeHashedOJ(plocus, distkey_, numsegments_, parallel_workers_) \
296296
do { \
297297
CdbPathLocus *_locus = (plocus); \
298298
_locus->locustype = CdbLocusType_HashedOJ; \
299299
_locus->numsegments = (numsegments_); \
300300
_locus->distkey = (distkey_); \
301-
_locus->parallel_workers = 0; \
301+
_locus->parallel_workers = (parallel_workers_); \
302302
Assert(cdbpathlocus_is_valid(*_locus)); \
303303
} while (0)
304304
#define CdbPathLocus_MakeHashedWorkers(plocus, distkey_, numsegments_, parallel_workers_) \

0 commit comments

Comments
 (0)