Skip to content

Commit b75378a

Browse files
macdiceavamingli
authored andcommitted
Parallel Hash Full Join.
Full and right outer joins were not supported in the initial implementation of Parallel Hash Join because of deadlock hazards (see discussion). Therefore FULL JOIN inhibited parallelism, as the other join strategies can't do that in parallel either. Add a new PHJ phase PHJ_BATCH_SCAN that scans for unmatched tuples on the inner side of one batch's hash table. For now, sidestep the deadlock problem by terminating parallelism there. The last process to arrive at that phase emits the unmatched tuples, while others detach and are free to go and work on other batches, if there are any, but otherwise they finish the join early. That unfairness is considered acceptable for now, because it's better than no parallelism at all. The build and probe phases are run in parallel, and the new scan-for-unmatched phase, while serial, is usually applied to the smaller of the two relations and is either limited by some multiple of work_mem, or it's too big and is partitioned into batches and then the situation is improved by batch-level parallelism. Author: Melanie Plageman <melanieplageman@gmail.com> Author: Thomas Munro <thomas.munro@gmail.com> Reviewed-by: Thomas Munro <thomas.munro@gmail.com> Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
1 parent 84a8458 commit b75378a

7 files changed

Lines changed: 339 additions & 57 deletions

File tree

src/backend/executor/nodeHash.c

Lines changed: 173 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2388,6 +2388,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
23882388
hjstate->hj_CurTuple = NULL;
23892389
}
23902390

2391+
/*
2392+
* Decide if this process is allowed to run the unmatched scan. If so, the
2393+
* batch barrier is advanced to PHJ_BATCH_SCAN and true is returned.
2394+
* Otherwise the batch is detached and false is returned.
2395+
*/
2396+
bool
2397+
ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
2398+
{
2399+
HashJoinTable hashtable = hjstate->hj_HashTable;
2400+
int curbatch = hashtable->curbatch;
2401+
ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
2402+
2403+
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE);
2404+
2405+
/*
2406+
* It would not be deadlock-free to wait on the batch barrier, because it
2407+
* is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
2408+
* already emitted tuples. Therefore, we'll hold a wait-free election:
2409+
* only one process can continue to the next phase, and all others detach
2410+
* from this batch. They can still go any work on other batches, if there
2411+
* are any.
2412+
*/
2413+
if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier))
2414+
{
2415+
/* This process considers the batch to be done. */
2416+
hashtable->batches[hashtable->curbatch].done = true;
2417+
2418+
/* Make sure any temporary files are closed. */
2419+
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
2420+
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
2421+
2422+
/*
2423+
* Track largest batch we've seen, which would normally happen in
2424+
* ExecHashTableDetachBatch().
2425+
*/
2426+
hashtable->spacePeak =
2427+
Max(hashtable->spacePeak,
2428+
batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
2429+
hashtable->curbatch = -1;
2430+
return false;
2431+
}
2432+
2433+
/* Now we are alone with this batch. */
2434+
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
2435+
Assert(BarrierParticipants(&batch->batch_barrier) == 1);
2436+
2437+
/*
2438+
* Has another process decided to give up early and command all processes
2439+
* to skip the unmatched scan?
2440+
*/
2441+
if (batch->skip_unmatched)
2442+
{
2443+
hashtable->batches[hashtable->curbatch].done = true;
2444+
ExecHashTableDetachBatch(hashtable);
2445+
return false;
2446+
}
2447+
2448+
/* Now prepare the process local state, just as for non-parallel join. */
2449+
ExecPrepHashTableForUnmatched(hjstate);
2450+
2451+
return true;
2452+
}
2453+
23912454
/*
23922455
* ExecScanHashTableForUnmatched
23932456
* scan the hash table for unmatched inner tuples
@@ -2462,6 +2525,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
24622525
return false;
24632526
}
24642527

2528+
/*
2529+
* ExecParallelScanHashTableForUnmatched
2530+
* scan the hash table for unmatched inner tuples, in parallel join
2531+
*
2532+
* On success, the inner tuple is stored into hjstate->hj_CurTuple and
2533+
* econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
2534+
* for the latter.
2535+
*/
2536+
bool
2537+
ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
2538+
ExprContext *econtext)
2539+
{
2540+
HashJoinTable hashtable = hjstate->hj_HashTable;
2541+
HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2542+
2543+
for (;;)
2544+
{
2545+
/*
2546+
* hj_CurTuple is the address of the tuple last returned from the
2547+
* current bucket, or NULL if it's time to start scanning a new
2548+
* bucket.
2549+
*/
2550+
if (hashTuple != NULL)
2551+
hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2552+
else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2553+
hashTuple = ExecParallelHashFirstTuple(hashtable,
2554+
hjstate->hj_CurBucketNo++);
2555+
else
2556+
break; /* finished all buckets */
2557+
2558+
while (hashTuple != NULL)
2559+
{
2560+
if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2561+
{
2562+
TupleTableSlot *inntuple;
2563+
2564+
/* insert hashtable's tuple into exec slot */
2565+
inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2566+
hjstate->hj_HashTupleSlot,
2567+
false); /* do not pfree */
2568+
econtext->ecxt_innertuple = inntuple;
2569+
2570+
/*
2571+
* Reset temp memory each time; although this function doesn't
2572+
* do any qual eval, the caller will, so let's keep it
2573+
* parallel to ExecScanHashBucket.
2574+
*/
2575+
ResetExprContext(econtext);
2576+
2577+
hjstate->hj_CurTuple = hashTuple;
2578+
return true;
2579+
}
2580+
2581+
hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2582+
}
2583+
2584+
/* allow this loop to be cancellable */
2585+
CHECK_FOR_INTERRUPTS();
2586+
}
2587+
2588+
/*
2589+
* no more unmatched tuples
2590+
*/
2591+
return false;
2592+
}
2593+
24652594
/*
24662595
* ExecHashTableReset
24672596
*
@@ -3793,6 +3922,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
37933922
accessor->shared = shared;
37943923
accessor->preallocated = 0;
37953924
accessor->done = false;
3925+
accessor->outer_eof = false;
37963926
accessor->inner_tuples =
37973927
sts_attach(ParallelHashJoinBatchInner(shared),
37983928
hashtable->hjstate->worker_id,
@@ -3838,25 +3968,62 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
38383968
{
38393969
int curbatch = hashtable->curbatch;
38403970
ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
3971+
bool attached = true;
38413972

38423973
/* Make sure any temporary files are closed. */
38433974
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
38443975
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
38453976

3846-
/* Detach from the batch we were last working on. */
3977+
/* After attaching we always get at least to PHJ_BATCH_PROBE. */
3978+
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
3979+
BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
3980+
3981+
/*
3982+
* If we're abandoning the PHJ_BATCH_PROBE phase early without having
3983+
* reached the end of it, it means the plan doesn't want any more
3984+
* tuples, and it is happy to abandon any tuples buffered in this
3985+
* process's subplans. For correctness, we can't allow any process to
3986+
* execute the PHJ_BATCH_SCAN phase, because we will never have the
3987+
* complete set of match bits. Therefore we skip emitting unmatched
3988+
* tuples in all backends (if this is a full/right join), as if those
3989+
* tuples were all due to be emitted by this process and it has
3990+
* abandoned them too.
3991+
*/
38473992
/*
38483993
* CBDB_PARALLEL: Parallel Hash Left Anti Semi (Not-In) Join(parallel-aware)
38493994
* If phs_lasj_has_null is true, that means we have found null when building hash table,
38503995
* there were no batches to detach.
38513996
*/
3852-
if (!hashtable->parallel_state->phs_lasj_has_null && BarrierArriveAndDetach(&batch->batch_barrier))
3997+
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
3998+
!hashtable->parallel_state->phs_lasj_has_null && /* CBDB_PARALLEL */
3999+
!hashtable->batches[curbatch].outer_eof)
4000+
{
4001+
/*
4002+
* This flag may be written to by multiple backends during
4003+
* PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
4004+
* phase so requires no extra locking.
4005+
*/
4006+
batch->skip_unmatched = true;
4007+
}
4008+
4009+
/*
4010+
* Even if we aren't doing a full/right outer join, we'll step through
4011+
* the PHJ_BATCH_SCAN phase just to maintain the invariant that
4012+
* freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
4013+
*/
4014+
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
4015+
!hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */)
4016+
attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
4017+
if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
38534018
{
38544019
/*
3855-
* Technically we shouldn't access the barrier because we're no
3856-
* longer attached, but since there is no way it's moving after
3857-
* this point it seems safe to make the following assertion.
4020+
* We are not longer attached to the batch barrier, but we're the
4021+
* process that was chosen to free resources and it's safe to
4022+
* assert the current phase. The ParallelHashJoinBatch can't go
4023+
* away underneath us while we are attached to the build barrier,
4024+
* making this access safe.
38584025
*/
3859-
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
4026+
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);
38604027

38614028
/* Free shared chunks and buckets. */
38624029
while (DsaPointerIsValid(batch->chunks))

src/backend/executor/nodeHashjoin.c

Lines changed: 67 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,12 @@
8080
* aren't enough to go around. For each batch there is a separate barrier
8181
* with the following phases:
8282
*
83-
* PHJ_BATCH_ELECTING -- initial state
84-
* PHJ_BATCH_ALLOCATING -- one allocates buckets
85-
* PHJ_BATCH_LOADING -- all load the hash table from disk
86-
* PHJ_BATCH_PROBING -- all probe
87-
* PHJ_BATCH_DONE -- end
83+
* PHJ_BATCH_ELECT -- initial state
84+
* PHJ_BATCH_ALLOCATE* -- one allocates buckets
85+
* PHJ_BATCH_LOAD -- all load the hash table from disk
86+
* PHJ_BATCH_PROBE -- all probe
87+
* PHJ_BATCH_SCAN* -- one does full/right unmatched scan
88+
* PHJ_BATCH_FREE* -- one frees memory
8889
*
8990
* Batch 0 is a special case, because it starts out in phase
9091
* PHJ_BATCH_PROBING; populating batch 0's hash table is done during
@@ -97,11 +98,17 @@
9798
*
9899
* To avoid deadlocks, we never wait for any barrier unless it is known that
99100
* all other backends attached to it are actively executing the node or have
100-
* already arrived. Practically, that means that we never return a tuple
101-
* while attached to a barrier, unless the barrier has reached its final
102-
* state. In the slightly special case of the per-batch barrier, we return
103-
* tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
104-
* BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
101+
* finished. Practically, that means that we never emit a tuple while attached
102+
* to a barrier, unless the barrier has reached a phase that means that no
103+
* process will wait on it again. We emit tuples while attached to the build
104+
* barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
105+
* PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
106+
* respectively without waiting, using BarrierArriveAndDetach() and
107+
* BarrierArriveAndDetachExceptLast() respectively. The last to detach
108+
* receives a different return value so that it knows that it's safe to
109+
* clean up. Any straggler process that attaches after that phase is reached
110+
* will see that it's too late to participate or access the relevant shared
111+
* memory objects.
105112
*
106113
*-------------------------------------------------------------------------
107114
*/
@@ -493,8 +500,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
493500
if (HJ_FILL_INNER(node))
494501
{
495502
/* set up to scan for unmatched inner tuples */
496-
ExecPrepHashTableForUnmatched(node);
497-
node->hj_JoinState = HJ_FILL_INNER_TUPLES;
503+
if (parallel)
504+
{
505+
/*
506+
* Only one process is currently allow to handle
507+
* each batch's unmatched tuples, in a parallel
508+
* join.
509+
*/
510+
if (ExecParallelPrepHashTableForUnmatched(node))
511+
node->hj_JoinState = HJ_FILL_INNER_TUPLES;
512+
else
513+
node->hj_JoinState = HJ_NEED_NEW_BATCH;
514+
}
515+
else
516+
{
517+
ExecPrepHashTableForUnmatched(node);
518+
node->hj_JoinState = HJ_FILL_INNER_TUPLES;
519+
}
498520
}
499521
else
500522
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -605,25 +627,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
605627
{
606628
node->hj_MatchedOuter = true;
607629

608-
if (parallel)
609-
{
610-
/*
611-
* Full/right outer joins are currently not supported
612-
* for parallel joins, so we don't need to set the
613-
* match bit. Experiments show that it's worth
614-
* avoiding the shared memory traffic on large
615-
* systems.
616-
*/
617-
Assert(!HJ_FILL_INNER(node));
618-
}
619-
else
620-
{
621-
/*
622-
* This is really only needed if HJ_FILL_INNER(node),
623-
* but we'll avoid the branch and just set it always.
624-
*/
630+
631+
/*
632+
* This is really only needed if HJ_FILL_INNER(node), but
633+
* we'll avoid the branch and just set it always.
634+
*/
635+
if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
625636
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
626-
}
627637

628638
/* In an antijoin, we never return a matched tuple */
629639
if (node->js.jointype == JOIN_ANTI ||
@@ -682,7 +692,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
682692
* so any unmatched inner tuples in the hashtable have to be
683693
* emitted before we continue to the next batch.
684694
*/
685-
if (!ExecScanHashTableForUnmatched(node, econtext))
695+
if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
696+
: ExecScanHashTableForUnmatched(node, econtext)))
686697
{
687698
/* no more unmatched tuples */
688699
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -1241,6 +1252,8 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
12411252
}
12421253

12431254
/* End of this batch */
1255+
hashtable->batches[curbatch].outer_eof = true;
1256+
12441257
return NULL;
12451258
}
12461259

@@ -1521,15 +1534,34 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
15211534
* hash table stays alive until everyone's finished
15221535
* probing it, but no participant is allowed to wait at
15231536
* this barrier again (or else a deadlock could occur).
1524-
* All attached participants must eventually call
1525-
* BarrierArriveAndDetach() so that the final phase
1526-
* PHJ_BATCH_DONE can be reached.
1537+
* All attached participants must eventually detach from
1538+
* the barrier and one worker must advance the phase so
1539+
* that the final phase is reached.
15271540
*/
15281541
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
15291542
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1543+
15301544
return true;
1545+
case PHJ_BATCH_SCAN:
1546+
1547+
/*
1548+
* In principle, we could help scan for unmatched tuples,
1549+
* since that phase is already underway (the thing we
1550+
* can't do under current deadlock-avoidance rules is wait
1551+
* for others to arrive at PHJ_BATCH_SCAN, because
1552+
* PHJ_BATCH_PROBE emits tuples, but in this case we just
1553+
* got here without waiting). That is not yet done. For
1554+
* now, we just detach and go around again. We have to
1555+
* use ExecHashTableDetachBatch() because there's a small
1556+
* chance we'll be the last to detach, and then we're
1557+
* responsible for freeing memory.
1558+
*/
1559+
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1560+
hashtable->batches[batchno].done = true;
1561+
ExecHashTableDetachBatch(hashtable);
1562+
break;
15311563

1532-
case PHJ_BATCH_DONE:
1564+
case PHJ_BATCH_FREE:
15331565

15341566
/*
15351567
* Already done. Detach and go around again (if any

0 commit comments

Comments
 (0)