Skip to content

Commit 961c160

Browse files
committed
feat: add generic metric, extractor, weigher/filter
1 parent 99dc2f8 commit 961c160

13 files changed

Lines changed: 673 additions & 6 deletions

File tree

internal/knowledge/datasources/plugins/prometheus/supported_syncers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@ var supportedMetricSyncers = map[string]func(
2626
"netapp_node_metric": newTypedSyncer[NetAppNodeMetric],
2727
"netapp_volume_aggregate_labels_metric": newTypedSyncer[NetAppVolumeAggrLabelsMetric],
2828
"kvm_libvirt_domain_metric": newTypedSyncer[KVMDomainMetric],
29+
"generic": newTypedSyncer[GenericMetric],
2930
}

internal/knowledge/datasources/plugins/prometheus/types.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,37 @@ func (m NetAppVolumeAggrLabelsMetric) With(n string, t time.Time, v float64) Pro
292292
m.Value = v
293293
return m
294294
}
295+
296+
type GenericMetric struct {
297+
Name string `json:"name" db:"name"`
298+
Host string `json:"host" db:"host"`
299+
Value float64 `json:"value" db:"value"`
300+
Timestamp time.Time `json:"timestamp" db:"timestamp"`
301+
}
302+
303+
func (m GenericMetric) GetName() string {
304+
return m.Name
305+
}
306+
307+
func (m GenericMetric) GetValue() float64 {
308+
return m.Value
309+
}
310+
311+
func (m GenericMetric) GetTimestamp() time.Time {
312+
return m.Timestamp
313+
}
314+
315+
func (m GenericMetric) TableName() string {
316+
return "generic"
317+
}
318+
319+
func (m GenericMetric) Indexes() map[string][]string {
320+
return nil
321+
}
322+
323+
func (m GenericMetric) With(alias string, t time.Time, v float64) PrometheusMetric {
324+
m.Name = alias
325+
m.Timestamp = t
326+
m.Value = v
327+
return m
328+
}

internal/knowledge/extractor/plugins/base.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ func (e *BaseExtractor[Opts, Feature]) Init(
4242
}
4343

4444
// Extract the features directly from an sql query.
45-
func (e *BaseExtractor[Opts, F]) ExtractSQL(query string) ([]Feature, error) {
45+
func (e *BaseExtractor[Opts, F]) ExtractSQL(query string, args ...interface{}) ([]Feature, error) {
4646
// This can happen when no datasource is provided that connects to a database.
4747
if e.DB == nil {
4848
return nil, errors.New("database connection is not initialized")
4949
}
5050
var features []F
51-
if _, err := e.DB.Select(&features, query); err != nil {
51+
if _, err := e.DB.Select(&features, query, args...); err != nil {
5252
return nil, err
5353
}
5454
return e.Extracted(features)

internal/knowledge/extractor/plugins/compute/flavor_groups.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"errors"
1010
"sort"
1111

12+
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
1213
"github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins"
1314
ctrl "sigs.k8s.io/controller-runtime"
1415
)
@@ -76,7 +77,7 @@ var flavorGroupsQuery string
7677
var flavorGroupLog = ctrl.Log.WithName("flavor_group_extractor")
7778

7879
// Extract flavor groups from the database.
79-
func (e *FlavorGroupExtractor) Extract() ([]plugins.Feature, error) {
80+
func (e *FlavorGroupExtractor) Extract(_ []*v1alpha1.Datasource, _ []*v1alpha1.Knowledge) ([]plugins.Feature, error) {
8081
if e.DB == nil {
8182
return nil, errors.New("database connection is not initialized")
8283
}

internal/knowledge/extractor/plugins/compute/flavor_groups_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func TestFlavorGroupExtractor_Extract(t *testing.T) {
125125
t.Fatal(err)
126126
}
127127

128-
features, err := extractor.Extract()
128+
features, err := extractor.Extract([]*v1alpha1.Datasource{}, []*v1alpha1.Knowledge{})
129129
if err != nil {
130130
t.Fatal(err)
131131
}
@@ -360,7 +360,7 @@ func TestFlavorGroupExtractor_RamCoreRatio_FixedRatio(t *testing.T) {
360360
t.Fatal(err)
361361
}
362362

363-
features, err := extractor.Extract()
363+
features, err := extractor.Extract([]*v1alpha1.Datasource{}, []*v1alpha1.Knowledge{})
364364
if err != nil {
365365
t.Fatal(err)
366366
}

internal/knowledge/extractor/plugins/compute/vrops_hostsystem_contention_short_term.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type VROpsHostsystemContentionShortTerm struct {
2626
type VROpsHostsystemContentionShortTermExtractor struct {
2727
// Common base for all extractors that provides standard functionality.
2828
plugins.BaseExtractor[
29-
struct{}, // No options passed through yaml config
29+
struct{}, // No options passed through yaml config
3030
VROpsHostsystemContentionShortTerm, // Feature model
3131
]
3232
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright SAP SE
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package plugins
5+
6+
import (
7+
_ "embed"
8+
"errors"
9+
10+
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
11+
)
12+
13+
type Generic struct {
14+
Host string `db:"host"`
15+
Value float64 `db:"value"`
16+
}
17+
18+
type GenericExtractor struct {
19+
BaseExtractor[
20+
struct{},
21+
Generic,
22+
]
23+
}
24+
25+
func (e *GenericExtractor) Extract(d []*v1alpha1.Datasource, _ []*v1alpha1.Knowledge) ([]Feature, error) {
26+
if len(d) != 1 {
27+
return nil, errors.New("generic Knowledge requires exactly one datasource")
28+
}
29+
dsSpec := &d[0].Spec
30+
if dsSpec.Type != v1alpha1.DatasourceTypePrometheus {
31+
return nil, errors.New("unsupported datasource type: expected prometheus")
32+
}
33+
name := dsSpec.Prometheus.Alias
34+
if name == "" {
35+
return nil, errors.New("prometheus datasource alias cannot be empty")
36+
}
37+
query := "SELECT host, value FROM generic WHERE name = ?"
38+
return e.ExtractSQL(query, name)
39+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright SAP SE
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package plugins
5+
6+
import (
7+
"slices"
8+
"testing"
9+
10+
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
11+
"github.com/cobaltcore-dev/cortex/internal/knowledge/datasources/plugins/prometheus"
12+
"github.com/cobaltcore-dev/cortex/internal/knowledge/db"
13+
testlibDB "github.com/cobaltcore-dev/cortex/internal/knowledge/db/testing"
14+
)
15+
16+
func TestGenericExtractor_Extract(t *testing.T) {
17+
dbEnv := testlibDB.SetupDBEnv(t)
18+
testDB := db.DB{DbMap: dbEnv.DbMap}
19+
defer dbEnv.Close()
20+
// Create dependency table
21+
if err := testDB.CreateTable(
22+
testDB.AddTable(prometheus.GenericMetric{}),
23+
); err != nil {
24+
t.Fatalf("expected no error, got %v", err)
25+
}
26+
27+
metrics := []any{
28+
&prometheus.GenericMetric{Name: "node_cpu_seconds_total", Host: "node-01", Value: 0.81},
29+
&prometheus.GenericMetric{Name: "node_cpu_seconds_total", Host: "node-02", Value: 0.37},
30+
}
31+
if err := testDB.Insert(metrics...); err != nil {
32+
t.Fatalf("failed to insert manila storage pools: %v", err)
33+
}
34+
35+
extractor := &GenericExtractor{}
36+
config := v1alpha1.KnowledgeSpec{}
37+
if err := extractor.Init(&testDB, nil, config); err != nil {
38+
t.Fatalf("expected no error, got %v", err)
39+
}
40+
datasources := []*v1alpha1.Datasource{
41+
{
42+
Spec: v1alpha1.DatasourceSpec{
43+
Type: v1alpha1.DatasourceTypePrometheus,
44+
Prometheus: v1alpha1.PrometheusDatasource{
45+
Alias: "node_cpu_seconds_total",
46+
},
47+
},
48+
},
49+
}
50+
51+
features, err := extractor.Extract(datasources, []*v1alpha1.Knowledge{})
52+
if err != nil {
53+
t.Fatalf("expected no error, got %v", err)
54+
}
55+
56+
var actual []Generic
57+
for _, f := range features {
58+
actual = append(actual, f.(Generic))
59+
}
60+
61+
expected := []Generic{
62+
{Host: "node-01", Value: 0.81},
63+
{Host: "node-02", Value: 0.37},
64+
}
65+
66+
if len(actual) != len(expected) {
67+
t.Errorf("expected %d rows, got %d", len(expected), len(actual))
68+
}
69+
70+
for _, exp := range expected {
71+
if !slices.ContainsFunc(actual, func(m Generic) bool {
72+
return m.Host == exp.Host && m.Value == exp.Value
73+
}) {
74+
t.Errorf("expected to find %+v in actual results %+v", exp, actual)
75+
}
76+
}
77+
}

internal/knowledge/extractor/supported_extractors.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ var supportedExtractors = map[string]plugins.FeatureExtractor{
2626
"flavor_groups": &compute.FlavorGroupExtractor{},
2727

2828
"netapp_storage_pool_cpu_usage_extractor": &storage.StoragePoolCPUUsageExtractor{},
29+
30+
"generic": &plugins.GenericExtractor{},
2931
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright SAP SE
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package filters
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"log/slog"
10+
11+
"github.com/cobaltcore-dev/cortex/api/external/pods"
12+
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
13+
"github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins"
14+
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
15+
"k8s.io/apimachinery/pkg/types"
16+
"sigs.k8s.io/controller-runtime/pkg/client"
17+
)
18+
19+
type GenericFilterStepOpts struct {
20+
Knowledge string `json:"knowledge"`
21+
}
22+
23+
func (o GenericFilterStepOpts) Validate() error {
24+
if o.Knowledge == "" {
25+
return fmt.Errorf("knowledge name must be provided")
26+
}
27+
return nil
28+
}
29+
30+
type GenericFilterStep struct {
31+
lib.BaseFilter[pods.PodPipelineRequest, GenericFilterStepOpts]
32+
knowledgeRef types.NamespacedName
33+
}
34+
35+
func (f *GenericFilterStep) Init(ctx context.Context, client client.Client, spec v1alpha1.FilterSpec) error {
36+
if err := f.BaseFilter.Init(ctx, client, spec); err != nil {
37+
return err
38+
}
39+
knowledgeRef := types.NamespacedName{Name: f.Options.Knowledge}
40+
if err := f.CheckKnowledges(ctx, knowledgeRef); err != nil {
41+
return err
42+
}
43+
f.knowledgeRef = knowledgeRef
44+
45+
return nil
46+
}
47+
48+
func (f *GenericFilterStep) Run(ctx context.Context, _ *slog.Logger, req pods.PodPipelineRequest) (*lib.FilterWeigherPipelineStepResult, error) {
49+
knowledge := v1alpha1.Knowledge{}
50+
if err := f.Client.Get(ctx, f.knowledgeRef, &knowledge); err != nil {
51+
return nil, err
52+
}
53+
54+
nodeFeatures, err := v1alpha1.
55+
UnboxFeatureList[plugins.Generic](knowledge.Status.Raw)
56+
if err != nil {
57+
return nil, err
58+
}
59+
60+
result := f.IncludeAllHostsFromRequest(req)
61+
for _, nodeFeature := range nodeFeatures {
62+
host := nodeFeature.Host
63+
if _, exists := result.Activations[host]; !exists {
64+
continue
65+
}
66+
if nodeFeature.Value == 1.0 {
67+
delete(result.Activations, host)
68+
}
69+
}
70+
71+
return result, nil
72+
}
73+
74+
func init() {
75+
Index["generic"] = func() PodFilter { return &GenericFilterStep{} }
76+
}

0 commit comments

Comments
 (0)