-
Notifications
You must be signed in to change notification settings - Fork 162
Expand file tree
/
Copy pathnode.go
More file actions
449 lines (397 loc) · 14.2 KB
/
node.go
File metadata and controls
449 lines (397 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
package node
import (
"context"
"fmt"
"log/slog"
"math/big"
"math/rand"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"golang.org/x/sync/errgroup"
"github.com/rocket-pool/node-manager-core/beacon"
"github.com/rocket-pool/node-manager-core/eth"
"github.com/rocket-pool/node-manager-core/log"
"github.com/rocket-pool/node-manager-core/utils"
"github.com/rocket-pool/rocketpool-go/v2/dao/oracle"
"github.com/rocket-pool/rocketpool-go/v2/rocketpool"
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/common/alerting"
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/common/services"
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/common/state"
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/node/collectors"
"github.com/rocket-pool/smartnode/v2/rocketpool-daemon/watchtower"
wc "github.com/rocket-pool/smartnode/v2/rocketpool-daemon/watchtower/collectors"
"github.com/rocket-pool/smartnode/v2/shared/config"
)
// Config
const (
minTasksInterval time.Duration = time.Minute * 4
maxTasksInterval time.Duration = time.Minute * 6
taskCooldown time.Duration = time.Second * 5
totalEffectiveStakeCooldown time.Duration = time.Hour * 1
metricsShutdownTimeout time.Duration = time.Second * 5
)
type waitUntilReadyResult int
const (
waitUntilReadyExit waitUntilReadyResult = iota
waitUntilReadyContinue
waitUntilReadySuccess
)
type TaskLoop struct {
// Services
logger *log.Logger
ctx context.Context
sp *services.ServiceProvider
cfg *config.SmartNodeConfig
rp *rocketpool.RocketPool
ec eth.IExecutionClient
bc beacon.IBeaconClient
metricsServer *http.Server
stateLocker *collectors.StateLocker
stateMgr *state.NetworkStateManager
watchtowerTaskMgr *watchtower.TaskManager
// Tasks
manageFeeRecipient *ManageFeeRecipient
distributeMinipools *DistributeMinipools
stakePrelaunchMinipools *StakePrelaunchMinipools
promoteMinipools *PromoteMinipools
downloadRewardsTrees *DownloadRewardsTrees
reduceBonds *ReduceBonds
defendPdaoProps *DefendPdaoProps
verifyPdaoProps *VerifyPdaoProps
// Watchtower metrics
scrubCollector *wc.ScrubCollector
bondReductionCollector *wc.BondReductionCollector
soloMigrationCollector *wc.SoloMigrationCollector
// Internal
wasExecutionClientSynced bool
wasBeaconClientSynced bool
lastTotalEffectiveStakeTime time.Time
secondsDelta float64
}
func NewTaskLoop(sp *services.ServiceProvider, wg *sync.WaitGroup) *TaskLoop {
logger := sp.GetTasksLogger()
ctx := logger.CreateContextWithLogger(sp.GetBaseContext())
t := &TaskLoop{
sp: sp,
logger: logger,
ctx: ctx,
cfg: sp.GetConfig(),
rp: sp.GetRocketPool(),
ec: sp.GetEthClient(),
bc: sp.GetBeaconClient(),
stateLocker: collectors.NewStateLocker(),
lastTotalEffectiveStakeTime: time.Unix(0, 0),
manageFeeRecipient: NewManageFeeRecipient(ctx, sp, logger),
distributeMinipools: NewDistributeMinipools(sp, logger),
stakePrelaunchMinipools: NewStakePrelaunchMinipools(sp, logger),
promoteMinipools: NewPromoteMinipools(sp, logger),
downloadRewardsTrees: NewDownloadRewardsTrees(sp, logger),
reduceBonds: NewReduceBonds(sp, logger),
defendPdaoProps: NewDefendPdaoProps(ctx, sp, logger),
scrubCollector: wc.NewScrubCollector(),
bondReductionCollector: wc.NewBondReductionCollector(),
soloMigrationCollector: wc.NewSoloMigrationCollector(),
// We assume clients are synced on startup so that we don't send unnecessary alerts
wasExecutionClientSynced: true,
wasBeaconClientSynced: true,
// Delta between min and max to wait between loops
secondsDelta: (maxTasksInterval - minTasksInterval).Seconds(),
}
// Create the prop verifier if the user enabled it
if t.cfg.VerifyProposals.Value {
t.verifyPdaoProps = NewVerifyPdaoProps(t.ctx, t.sp, t.logger)
}
return t
}
// Run the daemon task loop
func (t *TaskLoop) Run() error {
// Print the current mode
if t.cfg.IsNativeMode {
fmt.Println("Starting node daemon in Native Mode.")
} else {
fmt.Println("Starting node daemon in Docker Mode.")
}
// Handle the initial fee recipient file deployment
err := deployDefaultFeeRecipientFile(t.cfg)
if err != nil {
return err
}
// Run task loop
go func() {
for {
// Make sure all of the resources are ready for task processing
readyResult := t.waitUntilReady()
switch readyResult {
case waitUntilReadyExit:
return
case waitUntilReadyContinue:
continue
}
// === Task execution ===
if t.runTasks() {
return
}
}
}()
// Run metrics loop
t.metricsServer = runMetricsServer(t.ctx, t.sp, t.logger, t.stateLocker, t.scrubCollector, t.bondReductionCollector, t.soloMigrationCollector)
return nil
}
// Stop the daemon
func (t *TaskLoop) Stop() {
if t.metricsServer != nil {
// Shut down the metrics server
ctx, cancel := context.WithTimeout(context.Background(), metricsShutdownTimeout)
defer cancel()
t.metricsServer.Shutdown(ctx)
}
}
// Wait until the chains and other resources are ready to be queried
// Returns true if the owning loop needs to exit, false if it can continue
func (t *TaskLoop) waitUntilReady() waitUntilReadyResult {
// Check the EC status
err := t.sp.WaitEthClientSynced(t.ctx, false) // Force refresh the primary / fallback EC status
if err != nil {
errMsg := err.Error()
if strings.Contains(errMsg, "context canceled") {
return waitUntilReadyExit
}
t.wasExecutionClientSynced = false
t.logger.Error("Execution Client not synced. Waiting for sync...", slog.String(log.ErrorKey, errMsg))
return t.sleepAndReturnReadyResult()
}
if !t.wasExecutionClientSynced {
t.logger.Info("Execution Client is now synced.")
t.wasExecutionClientSynced = true
alerting.AlertExecutionClientSyncComplete(t.cfg)
}
// Check the BC status
err = t.sp.WaitBeaconClientSynced(t.ctx, false) // Force refresh the primary / fallback BC status
if err != nil {
errMsg := err.Error()
if strings.Contains(errMsg, "context canceled") {
return waitUntilReadyExit
}
// NOTE: if not synced, it returns an error - so there isn't necessarily an underlying issue
t.wasBeaconClientSynced = false
t.logger.Error("Beacon Node not synced. Waiting for sync...", slog.String(log.ErrorKey, errMsg))
return t.sleepAndReturnReadyResult()
}
if !t.wasBeaconClientSynced {
t.logger.Info("Beacon Node is now synced.")
t.wasBeaconClientSynced = true
alerting.AlertBeaconClientSyncComplete(t.cfg)
}
// Load contracts
err = t.sp.RefreshRocketPoolContracts()
if err != nil {
t.logger.Error("Error loading contract bindings", log.Err(err))
return t.sleepAndReturnReadyResult()
}
// Wait until node is registered
err = t.sp.WaitNodeRegistered(t.ctx, true)
if err != nil {
errMsg := err.Error()
if strings.Contains(errMsg, "context canceled") {
return waitUntilReadyExit
}
t.logger.Error("Error waiting for node registration", slog.String(log.ErrorKey, errMsg))
return t.sleepAndReturnReadyResult()
}
// Create the network state manager
if t.stateMgr == nil {
t.stateMgr, err = state.NewNetworkStateManager(t.ctx, t.rp, t.cfg, t.ec, t.bc, t.logger.Logger)
if err != nil {
t.logger.Error("Error creating network state manager", log.Err(err))
return t.sleepAndReturnReadyResult()
}
}
// Create the watchtower task manager
if t.watchtowerTaskMgr == nil {
t.watchtowerTaskMgr = watchtower.NewTaskManager(t.sp, t.stateMgr, t.scrubCollector, t.bondReductionCollector, t.soloMigrationCollector)
err = t.watchtowerTaskMgr.Initialize(t.stateMgr)
if err != nil {
t.logger.Error("Error creating watchtower task manager", log.Err(err))
return t.sleepAndReturnReadyResult()
}
}
return waitUntilReadySuccess
}
// Sleep on the context for the task cooldown time, and return either exit or continue
// based on whether the context was cancelled.
func (t *TaskLoop) sleepAndReturnReadyResult() waitUntilReadyResult {
if utils.SleepWithCancel(t.ctx, taskCooldown) {
return waitUntilReadyExit
} else {
return waitUntilReadyContinue
}
}
// Runs an iteration of the node tasks.
// Returns true if the task loop should exit, false if it should continue.
func (t *TaskLoop) runTasks() bool {
nodeAddress, hasAddress := t.sp.GetWallet().GetAddress()
if !hasAddress {
t.logger.Error("Node address not set")
return utils.SleepWithCancel(t.ctx, taskCooldown)
}
// Get the Beacon block
latestBlock, err := t.stateMgr.GetLatestBeaconBlock(t.ctx)
if err != nil {
t.logger.Error("error getting latest Beacon block", log.Err(err))
return utils.SleepWithCancel(t.ctx, taskCooldown)
}
// Check if on the Oracle DAO
isOnOdao, err := isOnOracleDAO(t.rp, nodeAddress, latestBlock)
if err != nil {
t.logger.Error(err.Error())
return utils.SleepWithCancel(t.ctx, taskCooldown)
}
// Get the latest appropriate state
var state *state.NetworkState
var totalEffectiveStake *big.Int
if isOnOdao {
// Get the state of the entire network
state, err = t.stateMgr.GetStateForSlot(t.ctx, latestBlock.Header.Slot)
totalEffectiveStake = calculateTotalEffectiveStakeForNetwork(state)
t.lastTotalEffectiveStakeTime = time.Now()
} else {
updateTotalEffectiveStake := false
if time.Since(t.lastTotalEffectiveStakeTime) > totalEffectiveStakeCooldown {
updateTotalEffectiveStake = true
t.lastTotalEffectiveStakeTime = time.Now()
}
state, totalEffectiveStake, err = t.stateMgr.GetNodeStateForSlot(t.ctx, nodeAddress, latestBlock.Header.Slot, updateTotalEffectiveStake)
}
if err != nil {
t.logger.Error(err.Error())
return utils.SleepWithCancel(t.ctx, taskCooldown)
}
t.stateLocker.UpdateState(state, totalEffectiveStake)
// Run watchtower duties in parallel
var watchtowerWg errgroup.Group
defer watchtowerWg.Wait()
watchtowerWg.Go(func() error {
return t.watchtowerTaskMgr.Run(isOnOdao, state)
})
// Manage the fee recipient for the node
if err := t.manageFeeRecipient.Run(state); err != nil {
t.logger.Error(err.Error())
}
if utils.SleepWithCancel(t.ctx, taskCooldown) {
return true
}
// Run the rewards download check
if err := t.downloadRewardsTrees.Run(state); err != nil {
t.logger.Error(err.Error())
}
if utils.SleepWithCancel(t.ctx, taskCooldown) {
return true
}
// Run the pDAO proposal defender
if err := t.defendPdaoProps.Run(state); err != nil {
t.logger.Error(err.Error())
}
if utils.SleepWithCancel(t.ctx, taskCooldown) {
return true
}
// Run the pDAO proposal verifier
if t.verifyPdaoProps != nil {
if err := t.verifyPdaoProps.Run(state); err != nil {
t.logger.Error(err.Error())
}
if utils.SleepWithCancel(t.ctx, taskCooldown) {
return true
}
}
// Run the minipool stake check
if err := t.stakePrelaunchMinipools.Run(state); err != nil {
t.logger.Error(err.Error())
}
if utils.SleepWithCancel(t.ctx, taskCooldown) {
return true
}
// Run the balance distribution check
if err := t.distributeMinipools.Run(state); err != nil {
t.logger.Error(err.Error())
}
if utils.SleepWithCancel(t.ctx, taskCooldown) {
return true
}
// Run the reduce bond check
if err := t.reduceBonds.Run(state); err != nil {
t.logger.Error(err.Error())
}
if utils.SleepWithCancel(t.ctx, taskCooldown) {
return true
}
// Run the minipool promotion check
if err := t.promoteMinipools.Run(state); err != nil {
t.logger.Error(err.Error())
}
// Wait for a random amount of time between the min and max durations
randomSeconds := rand.Intn(int(t.secondsDelta))
interval := minTasksInterval + time.Duration(randomSeconds)*time.Second
return utils.SleepWithCancel(t.ctx, interval)
}
// Copy the default fee recipient file into the proper location
func deployDefaultFeeRecipientFile(cfg *config.SmartNodeConfig) error {
feeRecipientPath := cfg.GetFeeRecipientFilePath()
_, err := os.Stat(feeRecipientPath)
if os.IsNotExist(err) {
// Make sure the validators dir is created
validatorsFolder := filepath.Dir(feeRecipientPath)
err = os.MkdirAll(validatorsFolder, 0755)
if err != nil {
return fmt.Errorf("could not create validators directory: %w", err)
}
// Create the file
rs := cfg.GetRocketPoolResources()
var defaultFeeRecipientFileContents string
if cfg.IsNativeMode {
// Native mode needs an environment variable definition
defaultFeeRecipientFileContents = fmt.Sprintf("FEE_RECIPIENT=%s", rs.RethAddress.Hex())
} else {
// Docker and Hybrid just need the address itself
defaultFeeRecipientFileContents = rs.RethAddress.Hex()
}
err := os.WriteFile(feeRecipientPath, []byte(defaultFeeRecipientFileContents), 0664)
if err != nil {
return fmt.Errorf("could not write default fee recipient file to %s: %w", feeRecipientPath, err)
}
} else if err != nil {
return fmt.Errorf("error checking fee recipient file status: %w", err)
}
return nil
}
// Check if this node is on the Oracle DAO
func isOnOracleDAO(rp *rocketpool.RocketPool, nodeAddress common.Address, block beacon.BeaconBlock) (bool, error) {
opts := &bind.CallOpts{
BlockNumber: big.NewInt(0).SetUint64(block.ExecutionBlockNumber),
}
member, err := oracle.NewOracleDaoMember(rp, nodeAddress)
if err != nil {
return false, fmt.Errorf("error creating oDAO member binding: %w", err)
}
err = rp.Query(nil, opts, member.Exists)
if err != nil {
return false, fmt.Errorf("error checking if node is in the Oracle DAO for Beacon block %d, EL block %d: %w", block.Header.Slot, block.ExecutionBlockNumber, err)
}
return member.Exists.Get(), nil
}
// Calculates the total effective stake for the network from a state that captured the whole network,
// since the information normally required by GetTotalEffectiveRplStake() is already in the state.
func calculateTotalEffectiveStakeForNetwork(state *state.NetworkState) *big.Int {
total := big.NewInt(0)
for _, node := range state.NodeDetails {
if node.EffectiveRPLStake.Cmp(node.MinimumRPLStake) > 0 {
total.Add(total, node.EffectiveRPLStake)
}
}
return total
}