Skip to content

Commit 41fe8f7

Browse files
authored
Merge pull request #2457 from threefoldtech/main_multipletimeservers
support multiple timeservers for the ntpcheck
2 parents f99d64a + 1fca8f5 commit 41fe8f7

11 files changed

Lines changed: 169 additions & 27 deletions

File tree

cmds/modules/noded/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,11 @@ func action(cli *cli.Context) error {
167167
if err != nil {
168168
return errors.Wrap(err, "failed to create a new perfMon")
169169
}
170-
170+
zcl, err := zbus.NewRedisClient(msgBrokerCon)
171+
if err != nil {
172+
return errors.Wrap(err, "failed to create a zbus client to the msgBroker")
173+
}
174+
ctx = perf.WithZbusClient(ctx, zcl)
171175
healthcheck.RunNTPCheck(ctx)
172176
perfMon.AddTask(iperf.NewTask())
173177
perfMon.AddTask(cpubench.NewTask())

pkg/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ all: getdeps test
77

88
getdeps:
99
@echo "Installing golangci-lint" && go get github.com/golangci/golangci-lint/cmd/golangci-lint && go install github.com/golangci/golangci-lint/cmd/golangci-lint
10+
@echo "Installing zbusc" && go install github.com/threefoldtech/zbus/zbusc
1011
go mod tidy
1112

1213
lint:

pkg/api_gateway.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package pkg
22

33
import (
4+
"time"
5+
46
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
57
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
68
)
@@ -27,6 +29,7 @@ type SubstrateGateway interface {
2729
SetNodePowerState(up bool) (hash types.Hash, err error)
2830
UpdateNode(node substrate.Node) (uint32, error)
2931
UpdateNodeUptimeV2(uptime uint64, timestampHint uint64) (hash types.Hash, err error)
32+
GetTime() (time.Time, error)
3033
}
3134

3235
type SubstrateError struct {

pkg/perf/cpubench/cpubench_task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (c *CPUBenchmarkTask) Run(ctx context.Context) (interface{}, error) {
5959
if err != nil {
6060
return nil, fmt.Errorf("failed to parse cpubench output: %w", err)
6161
}
62-
client := perf.GetZbusClient(ctx)
62+
client := perf.MustGetZbusClient(ctx)
6363
statistics := stubs.NewStatisticsStub(client)
6464

6565
workloads, err := statistics.Workloads(ctx)

pkg/perf/healthcheck/healthcheck.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (h *healthcheckTask) Run(ctx context.Context) (interface{}, error) {
6363
log.Debug().Msg("starting health check task")
6464
errs := make(map[string][]string)
6565

66-
cl := perf.GetZbusClient(ctx)
66+
cl := perf.MustGetZbusClient(ctx)
6767
zui := stubs.NewZUIStub(cl)
6868

6969
var wg sync.WaitGroup

pkg/perf/healthcheck/ntp.go

Lines changed: 101 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,34 @@ package healthcheck
33
import (
44
"context"
55
"encoding/json"
6+
"fmt"
67
"math"
78
"net/http"
89
"time"
910

1011
"github.com/cenkalti/backoff/v3"
1112
"github.com/pkg/errors"
1213
"github.com/rs/zerolog/log"
14+
"github.com/threefoldtech/zbus"
15+
"github.com/threefoldtech/zos/pkg/perf"
16+
"github.com/threefoldtech/zos/pkg/stubs"
1317
"github.com/threefoldtech/zos/pkg/zinit"
1418
)
1519

1620
const acceptableSkew = 10 * time.Minute
1721

1822
func RunNTPCheck(ctx context.Context) {
23+
operation := func() error {
24+
return ntpCheck(ctx)
25+
}
1926
go func() {
2027
for {
2128
exp := backoff.NewExponentialBackOff()
2229
retryNotify := func(err error, d time.Duration) {
2330
log.Error().Err(err).Msg("failed to run ntp check")
2431
}
2532

26-
if err := backoff.RetryNotify(ntpCheck, backoff.WithContext(exp, ctx), retryNotify); err != nil {
33+
if err := backoff.RetryNotify(operation, backoff.WithContext(exp, ctx), retryNotify); err != nil {
2734
log.Error().Err(err).Send()
2835
continue
2936
}
@@ -40,10 +47,13 @@ func RunNTPCheck(ctx context.Context) {
4047
}()
4148
}
4249

43-
func ntpCheck() error {
50+
func ntpCheck(ctx context.Context) error {
4451
z := zinit.Default()
45-
46-
utcTime, err := getCurrentUTCTime()
52+
zcl, err := perf.TryGetZbusClient(ctx)
53+
if err != nil {
54+
return fmt.Errorf("ntpCheck expects zbus client in the context and found none %w", err)
55+
}
56+
utcTime, err := getCurrentUTCTime(zcl)
4757
if err != nil {
4858
return err
4959
}
@@ -57,20 +67,101 @@ func ntpCheck() error {
5767
return nil
5868
}
5969

60-
func getCurrentUTCTime() (time.Time, error) {
70+
func getCurrentUTCTime(zcl zbus.Client) (time.Time, error) {
71+
72+
// TimeServer represents a time server with its name and fetching function
73+
type TimeServer struct {
74+
Name string
75+
Func func() (time.Time, error)
76+
}
77+
78+
// List of time servers, and here not in the global vars, so we can inject zcl to pass to getTimeChainWithZCL
79+
var timeServers = []TimeServer{
80+
{
81+
Name: "tfchain",
82+
Func: func() (time.Time, error) {
83+
return getTimeChainWithZCL(zcl)
84+
},
85+
},
86+
{
87+
Name: "worldtimeapi",
88+
Func: getWorldTimeAPI,
89+
},
90+
{
91+
Name: "worldclockapi",
92+
Func: getWorldClockAPI,
93+
},
94+
{
95+
Name: "timeapi.io",
96+
Func: getTimeAPI,
97+
},
98+
}
99+
for _, server := range timeServers {
100+
log.Info().Msg(fmt.Sprint("running NTP check against ", server.Name))
101+
utcTime, err := server.Func()
102+
if err == nil {
103+
log.Info().Msg(fmt.Sprint("utc time from ", server.Name, ": ", utcTime))
104+
return utcTime, nil
105+
}
106+
log.Error().Err(err).Str("server", server.Name).Msg("failed to get time from server")
107+
}
108+
return time.Time{}, errors.New("failed to get time from all servers")
109+
}
110+
111+
func getWorldTimeAPI() (time.Time, error) {
61112
timeRes, err := http.Get("https://worldtimeapi.org/api/timezone/UTC")
62113
if err != nil {
63-
return time.Time{}, errors.Wrapf(err, "failed to get date")
114+
return time.Time{}, errors.Wrapf(err, "failed to get date from worldtimeapi")
64115
}
116+
defer timeRes.Body.Close()
65117

66118
var utcTime struct {
67119
DateTime time.Time `json:"datetime"`
68120
}
69-
err = json.NewDecoder(timeRes.Body).Decode(&utcTime)
70-
timeRes.Body.Close()
71-
if err != nil {
72-
return time.Time{}, errors.Wrapf(err, "failed to decode date response")
121+
if err := json.NewDecoder(timeRes.Body).Decode(&utcTime); err != nil {
122+
return time.Time{}, errors.Wrapf(err, "failed to decode date response from worldtimeapi")
73123
}
74124

75125
return utcTime.DateTime, nil
76126
}
127+
128+
func getWorldClockAPI() (time.Time, error) {
129+
timeRes, err := http.Get("http://worldclockapi.com/api/json/utc/now")
130+
if err != nil {
131+
return time.Time{}, errors.Wrapf(err, "failed to get date from worldclockapi")
132+
}
133+
defer timeRes.Body.Close()
134+
135+
var utcTime struct {
136+
CurrentDateTime string `json:"currentDateTime"` // Changed to string, needs manual parsing
137+
}
138+
if err := json.NewDecoder(timeRes.Body).Decode(&utcTime); err != nil {
139+
return time.Time{}, errors.Wrapf(err, "failed to decode date response from worldclockapi")
140+
}
141+
142+
// Parse the time manually, handling the "Z"
143+
return time.Parse("2006-01-02T15:04Z", utcTime.CurrentDateTime)
144+
}
145+
146+
func getTimeAPI() (time.Time, error) {
147+
timeRes, err := http.Get("https://timeapi.io/api/Time/current/zone?timeZone=UTC")
148+
if err != nil {
149+
return time.Time{}, errors.Wrapf(err, "failed to get date from timeapi.io")
150+
}
151+
defer timeRes.Body.Close()
152+
153+
var utcTime struct {
154+
DateTime string `json:"dateTime"` // Changed to string, needs manual parsing
155+
}
156+
if err := json.NewDecoder(timeRes.Body).Decode(&utcTime); err != nil {
157+
return time.Time{}, errors.Wrapf(err, "failed to decode date response from timeapi.io")
158+
}
159+
160+
// Parse the time manually, handling the fractional seconds
161+
return time.Parse("2006-01-02T15:04:05.999999", utcTime.DateTime)
162+
}
163+
164+
func getTimeChainWithZCL(zcl zbus.Client) (time.Time, error) {
165+
gw := stubs.NewSubstrateGatewayStub(zcl)
166+
return gw.GetTime(context.Background())
167+
}

pkg/perf/monitor.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (pm *PerformanceMonitor) runTask(ctx context.Context, task Task) error {
7878

7979
// Run adds the tasks to the cron queue and start the scheduler
8080
func (pm *PerformanceMonitor) Run(ctx context.Context) error {
81-
ctx = withZbusClient(ctx, pm.zbusClient)
81+
ctx = WithZbusClient(ctx, pm.zbusClient)
8282
for _, task := range pm.tasks {
8383
task := task
8484
if _, err := pm.scheduler.CronWithSeconds(task.Cron()).Do(func() error {
@@ -105,14 +105,3 @@ func (pm *PerformanceMonitor) Run(ctx context.Context) error {
105105
pm.scheduler.StartAsync()
106106
return nil
107107
}
108-
109-
type zbusClient struct{}
110-
111-
func withZbusClient(ctx context.Context, client zbus.Client) context.Context {
112-
return context.WithValue(ctx, zbusClient{}, client)
113-
}
114-
115-
// GetZbusClient gets zbus client from the given context
116-
func GetZbusClient(ctx context.Context) zbus.Client {
117-
return ctx.Value(zbusClient{}).(zbus.Client)
118-
}

pkg/perf/publicip/publicip_task.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (p *publicIPValidationTask) Run(ctx context.Context) (interface{}, error) {
7878
if err != nil {
7979
return nil, fmt.Errorf("failed to get namespace %s: %w", testNamespace, err)
8080
}
81-
cl := perf.GetZbusClient(ctx)
81+
cl := perf.MustGetZbusClient(ctx)
8282
substrateGateway := stubs.NewSubstrateGatewayStub(cl)
8383
farmID := environment.MustGet().FarmID
8484

@@ -191,7 +191,7 @@ func isLeastValidNode(ctx context.Context, farmID uint32, substrateGateway *stub
191191
if err != nil {
192192
return false, fmt.Errorf("failed to get farm %d nodes: %w", farmID, err)
193193
}
194-
cl := perf.GetZbusClient(ctx)
194+
cl := perf.MustGetZbusClient(ctx)
195195
registrar := stubs.NewRegistrarStub(cl)
196196
var nodeID uint32
197197
err = backoff.Retry(func() error {

pkg/perf/zbusctx.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package perf
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/threefoldtech/zbus"
8+
)
9+
10+
// WithZbusClient adds a zbus.Client to the provided context, returning a new context.
11+
// This allows for the retrieval of the client from the context at a later time.
12+
type zbusClientKey struct{}
13+
14+
func WithZbusClient(ctx context.Context, client zbus.Client) context.Context {
15+
return context.WithValue(ctx, zbusClientKey{}, client)
16+
}
17+
18+
// MustGetZbusClient gets zbus client from the given context
19+
func MustGetZbusClient(ctx context.Context) zbus.Client {
20+
return ctx.Value(zbusClientKey{}).(zbus.Client)
21+
}
22+
23+
// TryGetZbusClient tries to get zbus client from the given context
24+
func TryGetZbusClient(ctx context.Context) (zbus.Client, error) {
25+
zcl, ok := ctx.Value(zbusClientKey{}).(zbus.Client)
26+
if !ok {
27+
return zcl, fmt.Errorf("context does not have zbus client")
28+
}
29+
return zcl, nil
30+
}

pkg/stubs/api_gateway_stub.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
tfchainclientgo "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
1111
zbus "github.com/threefoldtech/zbus"
1212
pkg "github.com/threefoldtech/zos/pkg"
13+
"time"
1314
)
1415

1516
type SubstrateGatewayStub struct {
@@ -233,6 +234,23 @@ func (s *SubstrateGatewayStub) GetPowerTarget(ctx context.Context, arg0 uint32)
233234
return
234235
}
235236

237+
func (s *SubstrateGatewayStub) GetTime(ctx context.Context) (ret0 time.Time, ret1 error) {
238+
args := []interface{}{}
239+
result, err := s.client.RequestContext(ctx, s.module, s.object, "GetTime", args...)
240+
if err != nil {
241+
panic(err)
242+
}
243+
result.PanicOnError()
244+
ret1 = result.CallError()
245+
loader := zbus.Loader{
246+
&ret0,
247+
}
248+
if err := result.Unmarshal(&loader); err != nil {
249+
panic(err)
250+
}
251+
return
252+
}
253+
236254
func (s *SubstrateGatewayStub) GetTwin(ctx context.Context, arg0 uint32) (ret0 tfchainclientgo.Twin, ret1 error) {
237255
args := []interface{}{arg0}
238256
result, err := s.client.RequestContext(ctx, s.module, s.object, "GetTwin", args...)

0 commit comments

Comments
 (0)