Skip to content
This repository was archived by the owner on Jul 23, 2024. It is now read-only.

Commit 0eaf850

Browse files
author
Kuien Liu
committed
HAWQ-1660. refactor according to reviews
1 parent 2320cf0 commit 0eaf850

2 files changed

Lines changed: 119 additions & 140 deletions

File tree

src/backend/cdb/cdbparquetrowgroup.c

Lines changed: 111 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -215,26 +215,33 @@ ParquetRowGroupReader_ScanNextTuple(
215215
int natts = slot->tts_tupleDescriptor->natts;
216216
Assert(natts <= tupDesc->natts);
217217

218-
Datum *values = slot_get_values(slot);
219-
bool *nulls = slot_get_isnull(slot);
220-
221218
bool useBloomFilter = false;
222-
int joinKeyCount = 0;
223-
int *joinKeySet = NULL;
219+
List *joinKeyAtts = NIL;
220+
List *nonJoinKeyAtts = NIL;
221+
List *allAtts = NIL;
222+
223+
/* prepare data structure to sperate join keys from other attributes */
224224
if (rfState != NULL && rfState->hasRuntimeFilter && !rfState->stopRuntimeFilter)
225225
{
226226
useBloomFilter = true;
227227

228-
joinKeyCount = list_length(rfState->joinkeys);
229-
Assert(joinKeyCount <= natts);
230-
joinKeySet = palloc(sizeof(int) * joinKeyCount);
231-
228+
/* find out attributes in hash join key */
232229
ListCell *hk;
233-
int i = 0;
234230
foreach(hk, rfState->joinkeys)
235231
{
236232
AttrNumber attrno = (AttrNumber) lfirst(hk);
237-
joinKeySet[i++] = attrno -1;
233+
lappend_int(joinKeyAtts, attrno - 1);
234+
}
235+
}
236+
237+
/* find out attributes not in hash join keys */
238+
for (int i = 0; i < natts; i++)
239+
{
240+
lappend_int(allAtts, i);
241+
242+
if(joinKeyAtts != NIL && list_find_int(joinKeyAtts, i) < 0)
243+
{
244+
lappend_int(nonJoinKeyAtts, i);
238245
}
239246
}
240247

@@ -247,103 +254,47 @@ ParquetRowGroupReader_ScanNextTuple(
247254
rowGroupReader->rowRead++;
248255

249256
/*
250-
* Step 1: fetch those columns as hash join keys
251-
*/
252-
int colReaderIndex = 0;
253-
for (int i = 0; i < natts; i++)
254-
{
255-
if (projs[i] == false)
256-
{
257-
nulls[i] = true;
258-
continue;
259-
}
260-
261-
bool isJoinKeyColumn = false;
262-
for (int j = 0; j < joinKeyCount; j++)
263-
{
264-
if (joinKeySet[j] == i)
265-
{
266-
isJoinKeyColumn = true;
267-
break;
268-
}
269-
}
270-
271-
if (isJoinKeyColumn)
272-
{
273-
ParquetColumnReader *nextReader =
274-
&rowGroupReader->columnReaders[colReaderIndex];
275-
int hawqTypeID = tupDesc->attrs[i]->atttypid;
276-
277-
ParquetRowGroupReader_ScanOneAttribute(
278-
rowGroupReader, hawqAttrToParquetColNum[i],
279-
nextReader, &values[i], &nulls[i], hawqTypeID);
280-
}
281-
282-
colReaderIndex += hawqAttrToParquetColNum[i];
283-
}
284-
285-
/*
286-
* Step 2: skip following columns decoding if bloomfilter is mismatched
257+
* In case using BloomFilter, we first fetch those columns in hash join keys,
258+
* then check whether their hash values contained by bloomfilter. If negative,
259+
* we skip following columns reading and decoding to speed up.
287260
*/
288261
if (useBloomFilter)
289262
{
263+
ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader,
264+
hawqAttrToParquetColNum, projs, slot, joinKeyAtts);
265+
266+
Datum *values = slot_get_values(slot);
290267
uint32_t hashkey = 0;
291-
for (int i = 0; i < joinKeyCount; i++)
268+
269+
ListCell *hk;
270+
int i = 0;
271+
foreach(hk, joinKeyAtts)
292272
{
293273
Datum keyval;
294274
uint32 hkey;
295275

296276
/* rotate hashkey left 1 bit at each step */
297277
hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
298-
keyval = values[joinKeySet[i]];
278+
keyval = values[lfirst_int(hk)];
299279

300280
/* Evaluate expression */
301281
hkey = DatumGetUInt32(
302-
FunctionCall1(&rfState->hashfunctions[i], keyval));
282+
FunctionCall1(&rfState->hashfunctions[i++], keyval));
303283
hashkey ^= hkey;
304284
}
305285

306286
if (!FindBloomFilter(rfState->bloomfilter, hashkey))
307287
{
308288
continue;
309289
}
310-
}
311290

312-
/*
313-
* Step 3: fetch those columns not in hash join keys
314-
*/
315-
colReaderIndex = 0;
316-
for (int i = 0; i < natts; i++)
291+
ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader,
292+
hawqAttrToParquetColNum, projs, slot, nonJoinKeyAtts);
293+
}
294+
else
317295
{
318-
// it is not expensive to do twice
319-
if (projs[i] == false)
320-
{
321-
nulls[i] = true;
322-
continue;
323-
}
324-
325-
bool isJoinKeyColumn = false;
326-
for (int j = 0; j < joinKeyCount; j++)
327-
{
328-
if (joinKeySet[j] == i)
329-
{
330-
isJoinKeyColumn = true;
331-
break;
332-
}
333-
}
334-
335-
if (!isJoinKeyColumn)
336-
{
337-
ParquetColumnReader *nextReader =
338-
&rowGroupReader->columnReaders[colReaderIndex];
339-
int hawqTypeID = tupDesc->attrs[i]->atttypid;
340-
341-
ParquetRowGroupReader_ScanOneAttribute(
342-
rowGroupReader, hawqAttrToParquetColNum[i],
343-
nextReader, &values[i], &nulls[i], hawqTypeID);
344-
}
345-
346-
colReaderIndex += hawqAttrToParquetColNum[i];
296+
ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader,
297+
hawqAttrToParquetColNum, projs, slot, allAtts);
347298
}
348299

349300
/*construct tuple, and return back*/
@@ -356,59 +307,87 @@ ParquetRowGroupReader_ScanNextTuple(
356307
}
357308

358309
/*
359-
* Get one attribute of a tuple from current row group into slot.
360-
*
361-
* Similar to ParquetColumnReader_readValue() but consider more hawq types.
310+
* Get specified attributes of a tuple from current row group into slot.
362311
*/
363312
void
364-
ParquetRowGroupReader_ScanOneAttribute(
365-
ParquetRowGroupReader *rowGroupReader,
366-
int colChildNum, // hawqAttrToParquetColNum
367-
ParquetColumnReader *columnReader,
368-
Datum *value,
369-
bool *null,
370-
int hawqTypeID)
313+
ParquetRowGroupReader_ScanNextTupleColumns(
314+
TupleDesc tupDesc,
315+
ParquetRowGroupReader *rowGroupReader,
316+
int *hawqAttrToParquetColNum,
317+
bool *projs,
318+
TupleTableSlot *slot,
319+
List *attsList)
371320
{
372-
if (colChildNum == 1)
373-
{
374-
ParquetColumnReader_readValue(columnReader, value, null, hawqTypeID);
375-
}
376-
else
321+
int natts = slot->tts_tupleDescriptor->natts;
322+
Assert(natts <= tupDesc->natts);
323+
324+
Datum *values = slot_get_values(slot);
325+
bool *nulls = slot_get_isnull(slot);
326+
327+
int colReaderIndex = 0;
328+
for(int i = 0; i < natts; i++)
377329
{
378-
/*
379-
* Because there are some memory reused inside the whole column reader, so need
380-
* to switch the context from PerTupleContext to rowgroup->context
381-
*/
382-
MemoryContext oldContext = MemoryContextSwitchTo(
383-
rowGroupReader->memoryContext);
330+
/* it is not expensive to do twice in case of bloomfilter */
331+
if(projs[i] == false)
332+
{
333+
nulls[i] = true;
334+
continue;
335+
}
384336

385-
switch (hawqTypeID) {
386-
case HAWQ_TYPE_POINT:
387-
ParquetColumnReader_readPoint(columnReader, value, null);
388-
break;
389-
case HAWQ_TYPE_PATH:
390-
ParquetColumnReader_readPATH(columnReader, value, null);
391-
break;
392-
case HAWQ_TYPE_LSEG:
393-
ParquetColumnReader_readLSEG(columnReader, value, null);
394-
break;
395-
case HAWQ_TYPE_BOX:
396-
ParquetColumnReader_readBOX(columnReader, value, null);
397-
break;
398-
case HAWQ_TYPE_CIRCLE:
399-
ParquetColumnReader_readCIRCLE(columnReader, value, null);
400-
break;
401-
case HAWQ_TYPE_POLYGON:
402-
ParquetColumnReader_readPOLYGON(columnReader, value, null);
403-
break;
404-
default:
405-
/* TODO array type */
406-
/* TODO UDT */
407-
Insist(false);
408-
break;
337+
/* skip those attributes not in given list */
338+
if (attsList != NIL && list_find_int(attsList, i) < 0)
339+
{
340+
colReaderIndex += hawqAttrToParquetColNum[i];
341+
continue;
342+
}
343+
344+
ParquetColumnReader *nextReader =
345+
&rowGroupReader->columnReaders[colReaderIndex];
346+
int hawqTypeID = tupDesc->attrs[i]->atttypid;
347+
348+
if (hawqAttrToParquetColNum[i] == 1)
349+
{
350+
ParquetColumnReader_readValue(nextReader, &values[i], &nulls[i], hawqTypeID);
351+
}
352+
else
353+
{
354+
/*
355+
* Because there are some memory reused inside the whole column reader, so need
356+
* to switch the context from PerTupleContext to rowgroup->context
357+
*/
358+
MemoryContext oldContext = MemoryContextSwitchTo(
359+
rowGroupReader->memoryContext);
360+
361+
switch (hawqTypeID) {
362+
case HAWQ_TYPE_POINT:
363+
ParquetColumnReader_readPoint(nextReader, &values[i], &nulls[i]);
364+
break;
365+
case HAWQ_TYPE_PATH:
366+
ParquetColumnReader_readPATH(nextReader, &values[i], &nulls[i]);
367+
break;
368+
case HAWQ_TYPE_LSEG:
369+
ParquetColumnReader_readLSEG(nextReader, &values[i], &nulls[i]);
370+
break;
371+
case HAWQ_TYPE_BOX:
372+
ParquetColumnReader_readBOX(nextReader, &values[i], &nulls[i]);
373+
break;
374+
case HAWQ_TYPE_CIRCLE:
375+
ParquetColumnReader_readCIRCLE(nextReader, &values[i], &nulls[i]);
376+
break;
377+
case HAWQ_TYPE_POLYGON:
378+
ParquetColumnReader_readPOLYGON(nextReader, &values[i], &nulls[i]);
379+
break;
380+
default:
381+
/* TODO array type */
382+
/* TODO UDT */
383+
Insist(false);
384+
break;
385+
}
386+
387+
MemoryContextSwitchTo(oldContext);
409388
}
410389

411-
MemoryContextSwitchTo(oldContext);
390+
colReaderIndex += hawqAttrToParquetColNum[i];
412391
}
413392
}
414393

src/include/cdb/cdbparquetrowgroup.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,15 @@ ParquetRowGroupReader_ScanNextTuple(
7575
RuntimeFilterState *rfState,
7676
TupleTableSlot *slot);
7777

78-
/* Get one attribute of a tuple from current row group*/
78+
/* Get specified attributes of a tuple into slot*/
7979
void
80-
ParquetRowGroupReader_ScanOneAttribute(
81-
ParquetRowGroupReader *rowGroupReader,
82-
int colChildNum,
83-
ParquetColumnReader *columnReader,
84-
Datum *value,
85-
bool *null,
86-
int hawqTypeID);
80+
ParquetRowGroupReader_ScanNextTupleColumns(
81+
TupleDesc pqs_tupDesc,
82+
ParquetRowGroupReader *rowGroupReader,
83+
int *hawqAttrToParquetColNum,
84+
bool *projs,
85+
TupleTableSlot *slot,
86+
List *attsList);
8787

8888
/* Finish scanning current row group*/
8989
void

0 commit comments

Comments
 (0)