Skip to content

Commit b1ba1c6

Browse files
committed
Merge remote-tracking branch 'origin/morpheus'
Change-Id: I6571f4a0c51b7121c0452c2e421d9c9824ac50ca
2 parents 2c178f1 + da3029c commit b1ba1c6

5 files changed

Lines changed: 110 additions & 18 deletions

File tree

algebra/agg_registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const (
4545
AGGREGATE_FROMFIRST
4646
AGGREGATE_FROMLAST
4747
AGGREGATE_REWRITE_INDEX_AGGS
48+
AGGREGATE_HAS_SUBQ
4849
)
4950

5051
/*

algebra/aggregate.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,13 @@ func (this *AggregateBase) SetAggregateModifiers(flags uint32, filter expression
182182
if !this.Distinct() && AggregateHasProperty(name, AGGREGATE_ALLOWS_INCREMENTAL) {
183183
this.AddFlags(AGGREGATE_INCREMENTAL)
184184
}
185+
186+
// check whether the aggregate has references to subqueries
187+
subqs, err := expression.ListSubqueries(this.Children(), false)
188+
// in case or error, assume it has subqueries to be on the safe side
189+
if err != nil || len(subqs) > 0 {
190+
this.AddFlags(AGGREGATE_HAS_SUBQ)
191+
}
185192
}
186193

187194
/*

execution/groupby_final.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import (
2121

2222
type FinalGroup struct {
2323
base
24-
plan *plan.FinalGroup
25-
groups map[string]value.AnnotatedValue
24+
plan *plan.FinalGroup
25+
aggNames []string
26+
groups map[string]value.AnnotatedValue
2627
}
2728

2829
func NewFinalGroup(plan *plan.FinalGroup, context *Context) *FinalGroup {
@@ -57,6 +58,13 @@ func (this *FinalGroup) RunOnce(context *Context, parent value.Value) {
5758
this.runConsumer(this, context, parent, this.Release)
5859
}
5960

61+
func (this *FinalGroup) beforeItems(context *Context, parent value.Value) bool {
62+
if len(this.plan.Aggregates()) > 0 {
63+
this.aggNames = make([]string, len(this.plan.Aggregates()))
64+
}
65+
return true
66+
}
67+
6068
func (this *FinalGroup) processItem(item value.AnnotatedValue, context *Context) bool {
6169
// Generate the group key
6270
var gk string
@@ -85,9 +93,18 @@ func (this *FinalGroup) processItem(item value.AnnotatedValue, context *Context)
8593
aggregates := gv.GetAttachment(value.ATT_AGGREGATES)
8694
switch aggregates := aggregates.(type) {
8795
case map[string]value.Value:
88-
for _, agg := range this.plan.Aggregates() {
96+
for i, agg := range this.plan.Aggregates() {
8997
// we can cache agg.String() and reuse it here as the aggregate expression isn't re-evaluated during this processing
90-
a := agg.String()
98+
var a string
99+
if i < len(this.aggNames) {
100+
a = this.aggNames[i]
101+
}
102+
if a == "" {
103+
a = agg.String()
104+
if i < len(this.aggNames) {
105+
this.aggNames[i] = a
106+
}
107+
}
91108
pv := aggregates[a]
92109
if pv == nil {
93110
// Log an error and explicitly panic
@@ -156,6 +173,9 @@ func (this *FinalGroup) reopen(context *Context) bool {
156173
rv := this.baseReopen(context)
157174
if rv {
158175
this.groups = make(map[string]value.AnnotatedValue)
176+
for i := range this.aggNames {
177+
this.aggNames[i] = ""
178+
}
159179
}
160180
return rv
161181
}
@@ -167,4 +187,10 @@ func (this *FinalGroup) Release() {
167187
}
168188
this.groups = nil
169189
}
190+
if this.aggNames != nil {
191+
for i := range this.aggNames {
192+
this.aggNames[i] = ""
193+
}
194+
this.aggNames = nil
195+
}
170196
}

execution/groupby_initial.go

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import (
2222
// Grouping of input data.
2323
type InitialGroup struct {
2424
base
25-
plan *plan.InitialGroup
26-
groups map[string]value.AnnotatedValue
25+
plan *plan.InitialGroup
26+
aggNames []string
27+
groups map[string]value.AnnotatedValue
2728
}
2829

2930
func NewInitialGroup(plan *plan.InitialGroup, context *Context) *InitialGroup {
@@ -58,6 +59,13 @@ func (this *InitialGroup) RunOnce(context *Context, parent value.Value) {
5859
this.runConsumer(this, context, parent, this.Release)
5960
}
6061

62+
func (this *InitialGroup) beforeItems(context *Context, parent value.Value) bool {
63+
if len(this.plan.Aggregates()) > 0 {
64+
this.aggNames = make([]string, len(this.plan.Aggregates()))
65+
}
66+
return true
67+
}
68+
6169
func (this *InitialGroup) processItem(item value.AnnotatedValue, context *Context) bool {
6270
// Generate the group key
6371
var gk string
@@ -99,9 +107,20 @@ func (this *InitialGroup) processItem(item value.AnnotatedValue, context *Contex
99107
return false
100108
}
101109

102-
for _, agg := range this.plan.Aggregates() {
103-
// WARNING: do not cache agg.String() - it may change during CumulateInitial
104-
a := agg.String()
110+
for i, agg := range this.plan.Aggregates() {
111+
var a string
112+
if i < len(this.aggNames) {
113+
a = this.aggNames[i]
114+
}
115+
if a == "" {
116+
a = agg.String()
117+
// do not cache aggregate string if aggregate has subquery references, since
118+
// the string may change (e.g. COVER transformation), and we don't know when
119+
// the subquery may be executed for the first time (e.g. under a CASE)
120+
if !agg.HasFlags(algebra.AGGREGATE_HAS_SUBQ) && i < len(this.aggNames) {
121+
this.aggNames[i] = a
122+
}
123+
}
105124
pv := aggregates[a]
106125
if pv == nil {
107126
// Log an error and explicitly panic
@@ -133,12 +152,16 @@ func (this *InitialGroup) processItem(item value.AnnotatedValue, context *Contex
133152
pv.Recycle()
134153
}
135154
}
136-
b := agg.String()
137-
aggregates[b] = v
138-
// delete the previous key if agg.String() has changed
139-
if a != b {
140-
delete(aggregates, a)
155+
b := a
156+
if agg.HasFlags(algebra.AGGREGATE_HAS_SUBQ) {
157+
// do not use cached agg string if agg has subquery references
158+
b = agg.String()
159+
// delete the previous key if agg.String() has changed
160+
if a != b {
161+
delete(aggregates, a)
162+
}
141163
}
164+
aggregates[b] = v
142165
}
143166

144167
// Update the Group Key's entry in the Map with the Group As field in the item
@@ -213,6 +236,9 @@ func (this *InitialGroup) reopen(context *Context) bool {
213236
rv := this.baseReopen(context)
214237
if rv {
215238
this.groups = make(map[string]value.AnnotatedValue)
239+
for i := range this.aggNames {
240+
this.aggNames[i] = ""
241+
}
216242
}
217243
return rv
218244
}
@@ -224,4 +250,10 @@ func (this *InitialGroup) Release() {
224250
}
225251
this.groups = nil
226252
}
253+
if this.aggNames != nil {
254+
for i := range this.aggNames {
255+
this.aggNames[i] = ""
256+
}
257+
this.aggNames = nil
258+
}
227259
}

execution/groupby_intermediate.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import (
2222
// Grouping of groups. Recursable.
2323
type IntermediateGroup struct {
2424
base
25-
plan *plan.IntermediateGroup
26-
groups map[string]value.AnnotatedValue
25+
plan *plan.IntermediateGroup
26+
aggNames []string
27+
groups map[string]value.AnnotatedValue
2728
}
2829

2930
func NewIntermediateGroup(plan *plan.IntermediateGroup, context *Context) *IntermediateGroup {
@@ -58,6 +59,13 @@ func (this *IntermediateGroup) RunOnce(context *Context, parent value.Value) {
5859
this.runConsumer(this, context, parent, this.Release)
5960
}
6061

62+
func (this *IntermediateGroup) beforeItems(context *Context, parent value.Value) bool {
63+
if len(this.plan.Aggregates()) > 0 {
64+
this.aggNames = make([]string, len(this.plan.Aggregates()))
65+
}
66+
return true
67+
}
68+
6169
func (this *IntermediateGroup) processItem(item value.AnnotatedValue, context *Context) bool {
6270
// Generate the group key
6371
var gk string
@@ -99,9 +107,18 @@ func (this *IntermediateGroup) processItem(item value.AnnotatedValue, context *C
99107
return false
100108
}
101109

102-
for _, agg := range this.plan.Aggregates() {
110+
for i, agg := range this.plan.Aggregates() {
103111
// we can cache agg.String() and reuse it here as the aggregate expression isn't re-evaluated during this processing
104-
a := agg.String()
112+
var a string
113+
if i < len(this.aggNames) {
114+
a = this.aggNames[i]
115+
}
116+
if a == "" {
117+
a = agg.String()
118+
if i < len(this.aggNames) {
119+
this.aggNames[i] = a
120+
}
121+
}
105122
pv := cumulative[a]
106123
if pv == nil {
107124
// Log an error and explicitly panic
@@ -164,6 +181,9 @@ func (this *IntermediateGroup) reopen(context *Context) bool {
164181
rv := this.baseReopen(context)
165182
if rv {
166183
this.groups = make(map[string]value.AnnotatedValue)
184+
for i := range this.aggNames {
185+
this.aggNames[i] = ""
186+
}
167187
}
168188
return rv
169189
}
@@ -175,4 +195,10 @@ func (this *IntermediateGroup) Release() {
175195
}
176196
this.groups = nil
177197
}
198+
if this.aggNames != nil {
199+
for i := range this.aggNames {
200+
this.aggNames[i] = ""
201+
}
202+
this.aggNames = nil
203+
}
178204
}

0 commit comments

Comments
 (0)