Skip to content

Commit 240df7d

Browse files
committed
Add service-specific resource monitoring and heartbeats
1 parent 71103b1 commit 240df7d

7 files changed

Lines changed: 144 additions & 13 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ spawnd/spawnd
44
spawnctl/spawnctl
55
.bw2bind.log
66
containers/spawnd/spawnd
7+
*.yml
8+
spawnd/.manifests

service/types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ type Configuration struct {
55
BaseImage string `yaml:"image"`
66
Source string `yaml:"source"`
77
BW2Entity string `yaml:"bw2Entity"`
8-
CPUShares uint32 `yaml:"cpuShares"`
9-
Memory uint32 `yaml:"memory"`
8+
CPUShares uint64 `yaml:"cpuShares"`
9+
Memory uint64 `yaml:"memory"`
1010
Build []string `yaml:"build,omitempty"`
1111
Run []string `yaml:"run"`
1212
IncludedFiles []string `yaml:"includedFiles,omitempty"`

spawnd/backend/docker.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import (
55
"bytes"
66
"context"
77
"encoding/base64"
8+
"encoding/json"
89
"fmt"
910
"io"
1011
"io/ioutil"
1112
"path/filepath"
1213
"strings"
14+
"time"
1315

1416
"github.com/SoftwareDefinedBuildings/spawnpoint/service"
1517
"github.com/docker/docker/api/types"
@@ -23,13 +25,35 @@ import (
2325

2426
const defaultSpawnpointImage = "jhkolb/spawnable:amd64"
2527
const logMaxSize = "50m"
28+
const cpuSharesPerCore = 1024
2629

2730
type Docker struct {
2831
Alias string
2932
bw2Router string
3033
client *docker.Client
3134
}
3235

36+
type dockerStatsResponse struct {
37+
Read time.Time `json:"read"`
38+
PreRead time.Time `json:"preread"`
39+
CPUStats struct {
40+
CPUUsage struct {
41+
TotalUsage uint64 `json:"total_usage"`
42+
} `json:"cpu_usage"`
43+
SystemCPUUsage uint64 `json:"system_cpu_usage"`
44+
OnlineCPUs uint64 `json:"online_cpus"`
45+
} `json:"cpu_stats"`
46+
PreCPUStats struct {
47+
CPUUsage struct {
48+
TotalUsage uint64 `json:"total_usage"`
49+
} `json:"cpu_usage"`
50+
SystemCPUUsage uint64 `json:"system_cpu_usage"`
51+
} `json:"precpu_stats"`
52+
MemoryStats struct {
53+
Usage uint64 `json:"usage"`
54+
} `json:"memory_stats"`
55+
}
56+
3357
func NewDocker(alias, bw2Router string) (*Docker, error) {
3458
client, err := docker.NewEnvClient()
3559
if err != nil {
@@ -208,6 +232,60 @@ func (dkr *Docker) ListServices(ctx context.Context) ([]string, error) {
208232
return IDs, nil
209233
}
210234

235+
func (dkr *Docker) ProfileService(ctx context.Context, id string, period time.Duration) (<-chan Stats, <-chan error) {
236+
statChan := make(chan Stats, 10)
237+
errChan := make(chan error, 1)
238+
response, err := dkr.client.ContainerStats(ctx, id, true)
239+
if err != nil {
240+
close(statChan)
241+
errChan <- errors.Wrap(err, "Failed to initialize container stat collection")
242+
return statChan, errChan
243+
}
244+
245+
go func() {
246+
defer response.Body.Close()
247+
decoder := json.NewDecoder(response.Body)
248+
lastEmitted := time.Now()
249+
lastCPUCores := 0.0
250+
for {
251+
select {
252+
case <-ctx.Done():
253+
close(statChan)
254+
return
255+
default:
256+
var statEntry dockerStatsResponse
257+
if err := decoder.Decode(&statEntry); err != nil {
258+
close(statChan)
259+
if err != io.EOF {
260+
errChan <- errors.Wrap(err, "Failed to read and decode container stats entry")
261+
}
262+
return
263+
}
264+
if statEntry.Read.Sub(lastEmitted) > period {
265+
lastEmitted = time.Now()
266+
267+
// Logic is based on Docker's `calculateCPUPercent` function
268+
containerCPUDelta := float64(statEntry.CPUStats.CPUUsage.TotalUsage -
269+
statEntry.PreCPUStats.CPUUsage.TotalUsage)
270+
systemCPUDelta := float64(statEntry.CPUStats.SystemCPUUsage -
271+
statEntry.PreCPUStats.SystemCPUUsage)
272+
if systemCPUDelta > 0.0 {
273+
numCores := float64(statEntry.CPUStats.OnlineCPUs)
274+
lastCPUCores = (containerCPUDelta / systemCPUDelta) * numCores
275+
}
276+
277+
statChan <- Stats{
278+
Memory: float64(statEntry.MemoryStats.Usage) / (1024.0 * 1024.0),
279+
CPUShares: lastCPUCores * cpuSharesPerCore,
280+
}
281+
}
282+
}
283+
}
284+
}()
285+
286+
return statChan, errChan
287+
}
288+
211289
func (dkr *Docker) buildImage(ctx context.Context, svcConfig *service.Configuration) (string, error) {
212290
buildCtxt, err := generateBuildContext(svcConfig)
213291
if err != nil {

spawnd/backend/types.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package backend
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/SoftwareDefinedBuildings/spawnpoint/service"
78
)
@@ -14,10 +15,16 @@ type ServiceBackend interface {
1415
ListServices(ctx context.Context) ([]string, error)
1516
TailService(ctx context.Context, id string, log bool) (<-chan string, <-chan error)
1617
MonitorService(ctx context.Context, id string) (<-chan Event, <-chan error)
18+
ProfileService(ctx context.Context, id string, period time.Duration) (<-chan Stats, <-chan error)
1719
}
1820

1921
type Event int
2022

2123
const (
2224
Die = iota
2325
)
26+
27+
type Stats struct {
28+
Memory float64
29+
CPUShares float64
30+
}

spawnd/daemon/core.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ type Config struct {
2222
BW2Entity string `yaml:"bw2Entity"`
2323
BW2Agent string `yaml:"bw2Agent"`
2424
Path string `yaml:"path"`
25-
CPUShares uint32 `yaml:"cpuShares"`
26-
Memory uint32 `yaml:"memory"`
25+
CPUShares uint64 `yaml:"cpuShares"`
26+
Memory uint64 `yaml:"memory"`
2727
Backend string `yaml:"backend"`
2828
}
2929

@@ -34,10 +34,10 @@ type SpawnpointDaemon struct {
3434
logger *logging.Logger
3535
path string
3636
alias string
37-
totalCPUShares uint32
38-
totalMemory uint32
39-
availableCPUShares uint32
40-
availableMemory uint32
37+
totalCPUShares uint64
38+
totalMemory uint64
39+
availableCPUShares uint64
40+
availableMemory uint64
4141
resourceLock sync.RWMutex
4242
serviceRegistry map[string]*serviceManifest
4343
registryLock sync.RWMutex

spawnd/daemon/heartbeat.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,21 @@ type Heartbeat struct {
1313
Alias string
1414
Version string
1515
Time int64
16-
TotalMemory uint32
17-
TotalCPU uint32
18-
AvailableMemory uint32
19-
AvailableCPU uint32
16+
TotalMemory uint64
17+
TotalCPU uint64
18+
AvailableMemory uint64
19+
AvailableCPU uint64
2020
Services []string
2121
}
2222

23+
type ServiceHeartbeat struct {
24+
Time int64
25+
Memory uint64
26+
CPUShares uint64
27+
UsedMemory float64
28+
UsedCPUShares float64
29+
}
30+
2331
func (daemon *SpawnpointDaemon) publishHearbeats(ctx context.Context, delay time.Duration) {
2432
bw2Iface := daemon.bw2Service.RegisterInterface("daemon", "i.spawnpoint")
2533
tick := time.Tick(delay)
@@ -67,6 +75,40 @@ func (daemon *SpawnpointDaemon) publishHearbeats(ctx context.Context, delay time
6775
}
6876
}
6977

78+
func (daemon *SpawnpointDaemon) publishServiceHeartbeats(ctx context.Context, svc *serviceManifest, period time.Duration) {
79+
statChan, errChan := daemon.backend.ProfileService(ctx, svc.ID, period)
80+
bw2Iface := daemon.bw2Service.RegisterInterface(svc.Name, "i.spawnable")
81+
for stats := range statChan {
82+
daemon.logger.Debugf("(%s) Publishing service heartbeat", svc.Name)
83+
daemon.logger.Debugf("(%s) CPU Shares: ~%.2f/%d, Memory: %.2f/%d", svc.Name,
84+
stats.CPUShares, svc.CPUShares, stats.Memory, svc.Memory)
85+
svcHb := ServiceHeartbeat{
86+
Time: time.Now().UnixNano(),
87+
Memory: svc.Memory,
88+
CPUShares: svc.CPUShares,
89+
UsedMemory: stats.Memory,
90+
UsedCPUShares: stats.CPUShares,
91+
}
92+
93+
po, err := bw2.CreateMsgPackPayloadObject(bw2.PONumSpawnpointSvcHb, svcHb)
94+
if err != nil {
95+
daemon.logger.Errorf("(%s) Failed to marshal service heartbeat: %s", svc.Name, err)
96+
continue
97+
}
98+
99+
if err := bw2Iface.PublishSignal("heartbeat", po); err != nil {
100+
daemon.logger.Errorf("(%s) Failed to publish service heartbeat: %s", svc.Name, err)
101+
}
102+
}
103+
daemon.logger.Debugf("(%s) Service heartbeat publication terminated", svc.Name)
104+
105+
select {
106+
case err := <-errChan:
107+
daemon.logger.Errorf("(%s) Error while profiling service: %s", svc.Name, err)
108+
default:
109+
}
110+
}
111+
70112
func (daemon *SpawnpointDaemon) Decommission() error {
71113
bw2Iface := daemon.bw2Service.RegisterInterface("daemon", "i.spawnpoint")
72114
daemon.logger.Debugf("Decomissioning spawnpoint %s", daemon.alias)

spawnd/daemon/management.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func (daemon *SpawnpointDaemon) manageService(svc *serviceManifest, done chan<-
6969

7070
go daemon.tailLogs(ctx, svc, true)
7171
go daemon.monitorEvents(ctx, svc)
72+
go daemon.publishServiceHeartbeats(ctx, svc, heartbeatInterval)
7273

7374
case service.Adopt:
7475
daemon.logger.Debugf("(%s) State machine received service adopt event", svc.Name)
@@ -96,6 +97,7 @@ func (daemon *SpawnpointDaemon) manageService(svc *serviceManifest, done chan<-
9697

9798
go daemon.tailLogs(ctx, svc, false)
9899
go daemon.monitorEvents(ctx, svc)
100+
go daemon.publishServiceHeartbeats(ctx, svc, heartbeatInterval)
99101

100102
case service.Restart:
101103
daemon.logger.Debugf("(%s) State machine received service restart event", svc.Name)
@@ -187,7 +189,7 @@ func (daemon *SpawnpointDaemon) monitorEvents(ctx context.Context, svc *serviceM
187189
}
188190
select {
189191
case err := <-errChan:
190-
fmt.Printf("(%s) Error while monitoring docker events: %s", svc.Name, err)
192+
daemon.logger.Errorf("(%s) Error while monitoring docker events: %s", svc.Name, err)
191193
default:
192194
}
193195
}

0 commit comments

Comments
 (0)