Skip to content

Commit d2754b4

Browse files
committed
Enable timeouts for client operations
1 parent 2c25278 commit d2754b4

3 files changed

Lines changed: 88 additions & 9 deletions

File tree

spawnclient/client.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func (sc *SpawnClient) Tail(ctx context.Context, svcName string, uri string) (<-
159159
errChan := make(chan error, 1)
160160
logChan := make(chan service.LogMessage, 20)
161161

162-
if err := iFaceClient.SubscribeSignal("log", func(msg *bw2.SimpleMessage) {
162+
handle, err := iFaceClient.SubscribeSignalH("log", func(msg *bw2.SimpleMessage) {
163163
if len(msg.POs) > 0 {
164164
messagePo, ok := msg.POs[0].(bw2.MsgPackPayloadObject)
165165
if !ok {
@@ -171,20 +171,21 @@ func (sc *SpawnClient) Tail(ctx context.Context, svcName string, uri string) (<-
171171
}
172172
logChan <- logMessage
173173
}
174-
}); err != nil {
174+
})
175+
if err != nil {
175176
close(logChan)
176177
errChan <- errors.Wrap(err, "Failed to subscribe to service log")
177178
return logChan, errChan
178179
}
179180

180-
tick := time.Tick(30 * time.Second)
181181
// Publish keep-alive log messages
182182
go func() {
183183
if err := iFaceClient.PublishSlot("keepLogAlive"); err != nil {
184184
close(logChan)
185185
errChan <- errors.Wrap(err, "Failed to publish log keep-alive message")
186186
return
187187
}
188+
tick := time.Tick(30 * time.Second)
188189
for {
189190
select {
190191
case <-tick:
@@ -193,7 +194,12 @@ func (sc *SpawnClient) Tail(ctx context.Context, svcName string, uri string) (<-
193194
errChan <- errors.Wrap(err, "Failed to publish log keep-alive message")
194195
return
195196
}
197+
196198
case <-ctx.Done():
199+
if err := sc.bwClient.Unsubscribe(handle); err != nil {
200+
errChan <- errors.Wrap(err, "Failed to unsubscribe from log channel")
201+
}
202+
close(logChan)
197203
return
198204
}
199205
}

spawnctl/main.go

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ func main() {
5959
Usage: "Name of the service",
6060
Value: "",
6161
},
62+
cli.StringFlag{
63+
Name: "timeout, t",
64+
Usage: "Timeout duration (optional)",
65+
Value: "",
66+
},
6267
},
6368
},
6469
{
@@ -77,6 +82,11 @@ func main() {
7782
Usage: "Name of the service",
7883
Value: "",
7984
},
85+
cli.StringFlag{
86+
Name: "timeout, t",
87+
Usage: "Timeout duration (optional)",
88+
Value: "",
89+
},
8090
},
8191
},
8292
{
@@ -95,6 +105,11 @@ func main() {
95105
Usage: "Name of the service",
96106
Value: "",
97107
},
108+
cli.StringFlag{
109+
Name: "timeout, t",
110+
Usage: "Timeout duration (optional)",
111+
Value: "",
112+
},
98113
},
99114
},
100115
{
@@ -113,6 +128,11 @@ func main() {
113128
Usage: "Name of the service",
114129
Value: "",
115130
},
131+
cli.StringFlag{
132+
Name: "timeout, t",
133+
Usage: "Timeout duration (optional)",
134+
Value: "",
135+
},
116136
},
117137
},
118138
{
@@ -165,12 +185,34 @@ func actionDeploy(c *cli.Context) error {
165185
}
166186
}
167187

188+
var timeout time.Duration
189+
timeoutStr := c.String("timeout")
190+
if len(timeoutStr) > 0 {
191+
timeout, err = time.ParseDuration(timeoutStr)
192+
if err != nil {
193+
fmt.Println("Illegal timeout parameter")
194+
os.Exit(1)
195+
} else if timeout < 0 {
196+
fmt.Println("Timeout duration must be positive")
197+
os.Exit(1)
198+
}
199+
}
200+
168201
spawnClient, err := spawnclient.New(c.GlobalString("router"), entity)
169202
if err != nil {
170203
fmt.Printf("Could not create spawnpoint client: %s\n", err)
171204
}
172205

173-
logChan, errChan := spawnClient.Tail(context.Background(), svcName, spawnpointURI)
206+
var ctx context.Context
207+
var cancel context.CancelFunc
208+
if timeout == 0 {
209+
ctx = context.Background()
210+
} else {
211+
ctx, cancel = context.WithTimeout(context.Background(), timeout)
212+
defer cancel()
213+
}
214+
logChan, errChan := spawnClient.Tail(ctx, svcName, spawnpointURI)
215+
174216
// Check if an error has already occurred
175217
select {
176218
case err = <-errChan:
@@ -184,7 +226,11 @@ func actionDeploy(c *cli.Context) error {
184226
os.Exit(1)
185227
}
186228

187-
fmt.Println("Tailing service logs. Press CTRL-c to exit...")
229+
if timeout == 0 {
230+
fmt.Println("Tailing service logs. Press CTRL-c to exit...")
231+
} else {
232+
fmt.Printf("Tailing service logs for %s. Press CTRL-c to exit early...\n", timeout.String())
233+
}
188234
for msg := range logChan {
189235
fmt.Println(strings.TrimSpace(msg.Contents))
190236
}
@@ -228,12 +274,35 @@ func manipulateService(c *cli.Context, command string) error {
228274
os.Exit(1)
229275
}
230276

277+
var timeout time.Duration
278+
var err error
279+
timeoutStr := c.String("timeout")
280+
if len(timeoutStr) > 0 {
281+
timeout, err = time.ParseDuration(timeoutStr)
282+
if err != nil {
283+
fmt.Println("Illegal timeout parameter")
284+
os.Exit(1)
285+
} else if timeout < 0 {
286+
fmt.Println("Timeout duration must be positive")
287+
os.Exit(1)
288+
}
289+
}
290+
231291
spawnClient, err := spawnclient.New(c.GlobalString("router"), entity)
232292
if err != nil {
233293
fmt.Printf("Could not create spawnpoint client: %s\n", err)
234294
}
235295

236-
logChan, errChan := spawnClient.Tail(context.Background(), svcName, spawnpointURI)
296+
var ctx context.Context
297+
var cancel context.CancelFunc
298+
if timeout == 0 {
299+
ctx = context.Background()
300+
} else {
301+
ctx, cancel = context.WithTimeout(context.Background(), timeout)
302+
defer cancel()
303+
}
304+
logChan, errChan := spawnClient.Tail(ctx, svcName, spawnpointURI)
305+
237306
// Check if an error has already occurred
238307
select {
239308
case err = <-errChan:
@@ -260,7 +329,11 @@ func manipulateService(c *cli.Context, command string) error {
260329
os.Exit(1)
261330
}
262331

263-
fmt.Println("Tailing service logs. Press CTRL-c to exit...")
332+
if timeout == 0 {
333+
fmt.Println("Tailing service logs. Press CTRL-c to exit...")
334+
} else {
335+
fmt.Printf("Tailing service logs for %s. Press CTRL-c to exit early...\n", timeout.String())
336+
}
264337
for msg := range logChan {
265338
fmt.Println(strings.TrimSpace(msg.Contents))
266339
}
@@ -301,7 +374,7 @@ func actionScan(c *cli.Context) error {
301374

302375
if len(heartbeats) == 1 {
303376
// Guaranteed to iterate once
304-
for uri, _ := range heartbeats {
377+
for uri := range heartbeats {
305378
daemonHb, svcHbs, err := spawnClient.Inspect(uri)
306379
if err != nil {
307380
fmt.Printf("Inspect failed: %s\n", err)

spawnd/daemon/heartbeat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (daemon *SpawnpointDaemon) publishServiceHeartbeats(ctx context.Context, sv
7878
bw2Iface := daemon.bw2Service.RegisterInterface(svc.Name, "i.spawnable")
7979
for stats := range statChan {
8080
daemon.logger.Debugf("(%s) Publishing service heartbeat", svc.Name)
81-
daemon.logger.Debugf("(%s) CPU Shares: ~%.2f/%d, Memory: %.2f/%d", svc.Name,
81+
daemon.logger.Debugf("(%s) CPU Shares: ~%.2f/%d, Memory: %.2f/%d MiB", svc.Name,
8282
stats.CPUShares, svc.CPUShares, stats.Memory, svc.Memory)
8383
svcHb := ServiceHeartbeat{
8484
Time: time.Now().UnixNano(),

0 commit comments

Comments
 (0)