Skip to content

Commit 2c25278

Browse files
committed
Add spawnpoint inspect capability
1 parent 240df7d commit 2c25278

4 files changed

Lines changed: 86 additions & 7 deletions

File tree

spawnclient/client.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/base64"
77
"io/ioutil"
88
"path/filepath"
9+
"strings"
910
"time"
1011

1112
"github.com/SoftwareDefinedBuildings/spawnpoint/service"
@@ -60,6 +61,48 @@ func (sc *SpawnClient) Scan(baseURI string) (map[string]daemon.Heartbeat, error)
6061
return spawnpoints, nil
6162
}
6263

64+
func (sc *SpawnClient) Inspect(uri string) (*daemon.Heartbeat, map[string]daemon.ServiceHeartbeat, error) {
65+
daemonHbs, err := sc.Scan(uri)
66+
if err != nil {
67+
return nil, nil, errors.Wrap(err, "Initial spawnpoint scan failed")
68+
} else if len(daemonHbs) == 0 {
69+
return nil, nil, errors.New("No spawnpoints found at URI")
70+
} else if len(daemonHbs) > 1 {
71+
return nil, nil, errors.New("Multiple spawnpoints found at URI")
72+
}
73+
// This loop is guaranteed to iterate just once
74+
var daemonHb daemon.Heartbeat
75+
for _, hb := range daemonHbs {
76+
daemonHb = hb
77+
}
78+
79+
svcClient := sc.bwClient.RegisterService(uri, "s.spawnpoint")
80+
iFaceClient := svcClient.RegisterInterface("+", "i.spawnable")
81+
svcHeartbeatMsgs, err := sc.bwClient.Query(&bw2.QueryParams{
82+
URI: iFaceClient.SignalURI("heartbeat"),
83+
})
84+
if err != nil {
85+
return nil, nil, errors.Wrap(err, "Bosswave query failed")
86+
}
87+
svcHeartbeats := make(map[string]daemon.ServiceHeartbeat)
88+
for svcHbMsg := range svcHeartbeatMsgs {
89+
for _, po := range svcHbMsg.POs {
90+
if po.IsTypeDF(bw2.PODFSpawnpointSvcHb) {
91+
var svcHb daemon.ServiceHeartbeat
92+
if err := po.(bw2.MsgPackPayloadObject).ValueInto(&svcHb); err != nil {
93+
// Ignore this query result
94+
continue
95+
}
96+
tokens := strings.Split(svcHbMsg.URI, "/")
97+
svcName := tokens[len(tokens)-4]
98+
svcHeartbeats[svcName] = svcHb
99+
}
100+
}
101+
}
102+
103+
return &daemonHb, svcHeartbeats, nil
104+
}
105+
63106
func (sc *SpawnClient) Deploy(config *service.Configuration, uri string) error {
64107
if err := validateConfig(config); err != nil {
65108
return errors.Wrap(err, "Invalid service configuration")

spawnctl/main.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,22 @@ func actionScan(c *cli.Context) error {
298298
fmt.Printf("Scan failed: %s\n", err)
299299
os.Exit(1)
300300
}
301-
fmt.Printf("Discovered %v Spawnpoint(s)\n", len(heartbeats))
302-
for uri, hb := range heartbeats {
303-
printSpawnpointStatus(uri, &hb)
301+
302+
if len(heartbeats) == 1 {
303+
// Guaranteed to iterate once
304+
for uri, _ := range heartbeats {
305+
daemonHb, svcHbs, err := spawnClient.Inspect(uri)
306+
if err != nil {
307+
fmt.Printf("Inspect failed: %s\n", err)
308+
os.Exit(1)
309+
}
310+
printSpawnpointDetails(uri, daemonHb, svcHbs)
311+
}
312+
} else {
313+
fmt.Printf("Discovered %v Spawnpoint(s)\n", len(heartbeats))
314+
for uri, hb := range heartbeats {
315+
printSpawnpointStatus(uri, &hb)
316+
}
304317
}
305318

306319
return nil
@@ -339,10 +352,12 @@ func parseSvcConfig(configFile string) (*service.Configuration, error) {
339352
}
340353

341354
func printSpawnpointStatus(uri string, hb *daemon.Heartbeat) {
355+
tokens := strings.Split(uri, "/")
356+
alias := tokens[len(tokens)-1]
342357
lastSeen := time.Unix(0, hb.Time)
343358
duration := time.Now().Sub(lastSeen) / (10 * time.Millisecond) * (10 * time.Millisecond)
344359

345-
fmt.Printf("[%s] seen %s (%s) ago at %s\n", hb.Alias, lastSeen.Format(time.RFC822), duration.String(), uri)
360+
fmt.Printf("[%s] seen %s (%s) ago at %s\n", alias, lastSeen.Format(time.RFC822), duration.String(), uri)
346361
fmt.Printf("Available CPU Shares: %v/%v\n", hb.AvailableCPU, hb.TotalCPU)
347362
fmt.Printf("Available Memory: %v/%v\n", hb.AvailableMemory, hb.TotalMemory)
348363

@@ -351,3 +366,23 @@ func printSpawnpointStatus(uri string, hb *daemon.Heartbeat) {
351366
fmt.Printf(" • %s\n", service)
352367
}
353368
}
369+
370+
func printSpawnpointDetails(uri string, daemonHb *daemon.Heartbeat, svcHbs map[string]daemon.ServiceHeartbeat) {
371+
tokens := strings.Split(uri, "/")
372+
alias := tokens[len(tokens)-1]
373+
lastSeen := time.Unix(0, daemonHb.Time)
374+
duration := time.Now().Sub(lastSeen) / (10 * time.Millisecond) * (10 * time.Millisecond)
375+
376+
fmt.Printf("[%s] seen %s (%s) ago at %s\n", alias, lastSeen.Format(time.RFC822), duration.String(), uri)
377+
fmt.Printf("Available CPU Shares: %v/%v\n", daemonHb.AvailableCPU, daemonHb.TotalCPU)
378+
fmt.Printf("Available Memory: %v/%v\n", daemonHb.AvailableMemory, daemonHb.TotalMemory)
379+
380+
fmt.Printf("%v Running Service(s)\n", len(daemonHb.Services))
381+
for name, svcHb := range svcHbs {
382+
lastSeen := time.Unix(0, svcHb.Time)
383+
duration := time.Now().Sub(lastSeen) / (10 * time.Millisecond) * (10 * time.Millisecond)
384+
fmt.Printf("• [%s] seen %s (%s) ago.\n", name, lastSeen.Format(time.RFC822), duration.String())
385+
fmt.Printf(" CPU: ~%.2f/%d Shares. Memory: %.2f/%d MiB\n", svcHb.UsedCPUShares, svcHb.CPUShares,
386+
svcHb.UsedMemory, svcHb.Memory)
387+
}
388+
}

spawnd/daemon/heartbeat.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
)
1111

1212
type Heartbeat struct {
13-
Alias string
1413
Version string
1514
Time int64
1615
TotalMemory uint64
@@ -56,7 +55,6 @@ func (daemon *SpawnpointDaemon) publishHearbeats(ctx context.Context, delay time
5655
daemon.registryLock.RUnlock()
5756

5857
hb := Heartbeat{
59-
Alias: daemon.alias,
6058
Version: util.VersionNum,
6159
Time: time.Now().UnixNano(),
6260
TotalCPU: daemon.totalCPUShares,
@@ -111,7 +109,7 @@ func (daemon *SpawnpointDaemon) publishServiceHeartbeats(ctx context.Context, sv
111109

112110
func (daemon *SpawnpointDaemon) Decommission() error {
113111
bw2Iface := daemon.bw2Service.RegisterInterface("daemon", "i.spawnpoint")
114-
daemon.logger.Debugf("Decomissioning spawnpoint %s", daemon.alias)
112+
daemon.logger.Debugf("Decomissioning spawnpoint %s", daemon.path)
115113
// A message without any POs is effectively a metadata de-persist
116114
if err := bw2Iface.PublishSignal("heartbeat"); err != nil {
117115
daemon.logger.Errorf("Failed to publish de-persist message: %s", err)

spawnd/daemon/management.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ func (daemon *SpawnpointDaemon) manageService(svc *serviceManifest, done chan<-
4444
}
4545

4646
daemon.logger.Debugf("(%s) Attempting to start new service", svc.Name)
47+
if err := daemon.publishLogMessage(svc.Name, "[INFO] Launching service..."); err != nil {
48+
daemon.logger.Errorf("(%s) Failed to publish log message: %s", svc.Name, err)
49+
}
4750
svcID, err := daemon.backend.StartService(ctx, svc.Configuration)
4851
if err != nil {
4952
daemon.logger.Errorf("(%s) Failed to start service: %s", svc.Name, err)

0 commit comments

Comments
 (0)