@@ -215,26 +215,33 @@ ParquetRowGroupReader_ScanNextTuple(
215215 int natts = slot -> tts_tupleDescriptor -> natts ;
216216 Assert (natts <= tupDesc -> natts );
217217
218- Datum * values = slot_get_values (slot );
219- bool * nulls = slot_get_isnull (slot );
220-
221218 bool useBloomFilter = false;
222- int joinKeyCount = 0 ;
223- int * joinKeySet = NULL ;
219+ List * joinKeyAtts = NIL ;
220+ List * nonJoinKeyAtts = NIL ;
221+ List * allAtts = NIL ;
222+
223+ /* prepare data structure to sperate join keys from other attributes */
224224 if (rfState != NULL && rfState -> hasRuntimeFilter && !rfState -> stopRuntimeFilter )
225225 {
226226 useBloomFilter = true;
227227
228- joinKeyCount = list_length (rfState -> joinkeys );
229- Assert (joinKeyCount <= natts );
230- joinKeySet = palloc (sizeof (int ) * joinKeyCount );
231-
228+ /* find out attributes in hash join key */
232229 ListCell * hk ;
233- int i = 0 ;
234230 foreach (hk , rfState -> joinkeys )
235231 {
236232 AttrNumber attrno = (AttrNumber ) lfirst (hk );
237- joinKeySet [i ++ ] = attrno - 1 ;
233+ lappend_int (joinKeyAtts , attrno - 1 );
234+ }
235+ }
236+
237+ /* find out attributes not in hash join keys */
238+ for (int i = 0 ; i < natts ; i ++ )
239+ {
240+ lappend_int (allAtts , i );
241+
242+ if (joinKeyAtts != NIL && list_find_int (joinKeyAtts , i ) < 0 )
243+ {
244+ lappend_int (nonJoinKeyAtts , i );
238245 }
239246 }
240247
@@ -247,103 +254,47 @@ ParquetRowGroupReader_ScanNextTuple(
247254 rowGroupReader -> rowRead ++ ;
248255
249256 /*
250- * Step 1: fetch those columns as hash join keys
251- */
252- int colReaderIndex = 0 ;
253- for (int i = 0 ; i < natts ; i ++ )
254- {
255- if (projs [i ] == false)
256- {
257- nulls [i ] = true;
258- continue ;
259- }
260-
261- bool isJoinKeyColumn = false;
262- for (int j = 0 ; j < joinKeyCount ; j ++ )
263- {
264- if (joinKeySet [j ] == i )
265- {
266- isJoinKeyColumn = true;
267- break ;
268- }
269- }
270-
271- if (isJoinKeyColumn )
272- {
273- ParquetColumnReader * nextReader =
274- & rowGroupReader -> columnReaders [colReaderIndex ];
275- int hawqTypeID = tupDesc -> attrs [i ]-> atttypid ;
276-
277- ParquetRowGroupReader_ScanOneAttribute (
278- rowGroupReader , hawqAttrToParquetColNum [i ],
279- nextReader , & values [i ], & nulls [i ], hawqTypeID );
280- }
281-
282- colReaderIndex += hawqAttrToParquetColNum [i ];
283- }
284-
285- /*
286- * Step 2: skip following columns decoding if bloomfilter is mismatched
257+ * In case using BloomFilter, we first fetch those columns in hash join keys,
258+ * then check whether their hash values contained by bloomfilter. If negative,
259+ * we skip following columns reading and decoding to speed up.
287260 */
288261 if (useBloomFilter )
289262 {
263+ ParquetRowGroupReader_ScanNextTupleColumns (tupDesc , rowGroupReader ,
264+ hawqAttrToParquetColNum , projs , slot , joinKeyAtts );
265+
266+ Datum * values = slot_get_values (slot );
290267 uint32_t hashkey = 0 ;
291- for (int i = 0 ; i < joinKeyCount ; i ++ )
268+
269+ ListCell * hk ;
270+ int i = 0 ;
271+ foreach (hk , joinKeyAtts )
292272 {
293273 Datum keyval ;
294274 uint32 hkey ;
295275
296276 /* rotate hashkey left 1 bit at each step */
297277 hashkey = (hashkey << 1 ) | ((hashkey & 0x80000000 ) ? 1 : 0 );
298- keyval = values [joinKeySet [ i ] ];
278+ keyval = values [lfirst_int ( hk ) ];
299279
300280 /* Evaluate expression */
301281 hkey = DatumGetUInt32 (
302- FunctionCall1 (& rfState -> hashfunctions [i ], keyval ));
282+ FunctionCall1 (& rfState -> hashfunctions [i ++ ], keyval ));
303283 hashkey ^= hkey ;
304284 }
305285
306286 if (!FindBloomFilter (rfState -> bloomfilter , hashkey ))
307287 {
308288 continue ;
309289 }
310- }
311290
312- /*
313- * Step 3: fetch those columns not in hash join keys
314- */
315- colReaderIndex = 0 ;
316- for (int i = 0 ; i < natts ; i ++ )
291+ ParquetRowGroupReader_ScanNextTupleColumns (tupDesc , rowGroupReader ,
292+ hawqAttrToParquetColNum , projs , slot , nonJoinKeyAtts );
293+ }
294+ else
317295 {
318- // it is not expensive to do twice
319- if (projs [i ] == false)
320- {
321- nulls [i ] = true;
322- continue ;
323- }
324-
325- bool isJoinKeyColumn = false;
326- for (int j = 0 ; j < joinKeyCount ; j ++ )
327- {
328- if (joinKeySet [j ] == i )
329- {
330- isJoinKeyColumn = true;
331- break ;
332- }
333- }
334-
335- if (!isJoinKeyColumn )
336- {
337- ParquetColumnReader * nextReader =
338- & rowGroupReader -> columnReaders [colReaderIndex ];
339- int hawqTypeID = tupDesc -> attrs [i ]-> atttypid ;
340-
341- ParquetRowGroupReader_ScanOneAttribute (
342- rowGroupReader , hawqAttrToParquetColNum [i ],
343- nextReader , & values [i ], & nulls [i ], hawqTypeID );
344- }
345-
346- colReaderIndex += hawqAttrToParquetColNum [i ];
296+ ParquetRowGroupReader_ScanNextTupleColumns (tupDesc , rowGroupReader ,
297+ hawqAttrToParquetColNum , projs , slot , allAtts );
347298 }
348299
349300 /*construct tuple, and return back*/
@@ -356,59 +307,88 @@ ParquetRowGroupReader_ScanNextTuple(
356307}
357308
358309/*
359- * Get one attribute of a tuple from current row group into slot.
360- *
361- * Similar to ParquetColumnReader_readValue() but consider more hawq types.
310+ * Get specified attributes of a tuple from current row group into slot.
362311 */
363312void
364- ParquetRowGroupReader_ScanOneAttribute (
365- ParquetRowGroupReader * rowGroupReader ,
366- int colChildNum , // hawqAttrToParquetColNum
367- ParquetColumnReader * columnReader ,
368- Datum * value ,
369- bool * null ,
370- int hawqTypeID )
313+ ParquetRowGroupReader_ScanNextTupleColumns (
314+ TupleDesc tupDesc ,
315+ ParquetRowGroupReader * rowGroupReader ,
316+ int * hawqAttrToParquetColNum ,
317+ bool * projs ,
318+ TupleTableSlot * slot ,
319+ List * attsList )
371320{
372- if (colChildNum == 1 )
373- {
374- ParquetColumnReader_readValue (columnReader , value , null , hawqTypeID );
375- }
376- else
321+ int natts = slot -> tts_tupleDescriptor -> natts ;
322+ Assert (natts <= tupDesc -> natts );
323+
324+ Datum * values = slot_get_values (slot );
325+ bool * nulls = slot_get_isnull (slot );
326+
327+ int colReaderIndex = 0 ;
328+ for (int i = 0 ; i < natts ; i ++ )
377329 {
378- /*
379- * Because there are some memory reused inside the whole column reader, so need
380- * to switch the context from PerTupleContext to rowgroup->context
381- */
382- MemoryContext oldContext = MemoryContextSwitchTo (
383- rowGroupReader -> memoryContext );
330+ /* it is not expensive to do twice in case of bloomfilter */
331+ if ( projs [ i ] == false)
332+ {
333+ nulls [ i ] = true;
334+ continue ;
335+ }
384336
385- switch (hawqTypeID ) {
386- case HAWQ_TYPE_POINT :
387- ParquetColumnReader_readPoint (columnReader , value , null );
388- break ;
389- case HAWQ_TYPE_PATH :
390- ParquetColumnReader_readPATH (columnReader , value , null );
391- break ;
392- case HAWQ_TYPE_LSEG :
393- ParquetColumnReader_readLSEG (columnReader , value , null );
394- break ;
395- case HAWQ_TYPE_BOX :
396- ParquetColumnReader_readBOX (columnReader , value , null );
397- break ;
398- case HAWQ_TYPE_CIRCLE :
399- ParquetColumnReader_readCIRCLE (columnReader , value , null );
400- break ;
401- case HAWQ_TYPE_POLYGON :
402- ParquetColumnReader_readPOLYGON (columnReader , value , null );
403- break ;
404- default :
405- /* TODO array type */
406- /* TODO UDT */
407- Insist (false);
408- break ;
337+ /* skip those attributes not in given list */
338+ if (attsList != NIL && list_find_int (attsList , i ) >= 0 )
339+ {
340+ colReaderIndex += hawqAttrToParquetColNum [i ];
341+ continue ;
342+ }
343+
344+ ParquetColumnReader * nextReader =
345+ & rowGroupReader -> columnReaders [colReaderIndex ];
346+ int hawqTypeID = tupDesc -> attrs [i ]-> atttypid ;
347+
348+
349+ if (hawqAttrToParquetColNum [i ] == 1 )
350+ {
351+ ParquetColumnReader_readValue (nextReader , & values [i ], & nulls [i ], hawqTypeID );
352+ }
353+ else
354+ {
355+ /*
356+ * Because there are some memory reused inside the whole column reader, so need
357+ * to switch the context from PerTupleContext to rowgroup->context
358+ */
359+ MemoryContext oldContext = MemoryContextSwitchTo (
360+ rowGroupReader -> memoryContext );
361+
362+ switch (hawqTypeID ) {
363+ case HAWQ_TYPE_POINT :
364+ ParquetColumnReader_readPoint (nextReader , & values [i ], & nulls [i ]);
365+ break ;
366+ case HAWQ_TYPE_PATH :
367+ ParquetColumnReader_readPATH (nextReader , & values [i ], & nulls [i ]);
368+ break ;
369+ case HAWQ_TYPE_LSEG :
370+ ParquetColumnReader_readLSEG (nextReader , & values [i ], & nulls [i ]);
371+ break ;
372+ case HAWQ_TYPE_BOX :
373+ ParquetColumnReader_readBOX (nextReader , & values [i ], & nulls [i ]);
374+ break ;
375+ case HAWQ_TYPE_CIRCLE :
376+ ParquetColumnReader_readCIRCLE (nextReader , & values [i ], & nulls [i ]);
377+ break ;
378+ case HAWQ_TYPE_POLYGON :
379+ ParquetColumnReader_readPOLYGON (nextReader , & values [i ], & nulls [i ]);
380+ break ;
381+ default :
382+ /* TODO array type */
383+ /* TODO UDT */
384+ Insist (false);
385+ break ;
386+ }
387+
388+ MemoryContextSwitchTo (oldContext );
409389 }
410390
411- MemoryContextSwitchTo ( oldContext ) ;
391+ colReaderIndex += hawqAttrToParquetColNum [ i ] ;
412392 }
413393}
414394
0 commit comments