-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathactivediscovery.go.bak
More file actions
145 lines (114 loc) · 4.42 KB
/
activediscovery.go.bak
File metadata and controls
145 lines (114 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package libp2p
import (
"context"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p-kad-dht/dual"
"go.uber.org/fx"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/helpers"
)
// GHY: 添加主动探索的间隔时间常量,优化网络节点发现频率
const activeDiscoveryInterval = time.Minute * 10
// GHY: 添加了SetupActiveDiscovery函数,实现主动从DHT中查找随机节点的功能
func SetupActiveDiscovery() interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, dht *dual.DHT, disc *discoveryHandler, cfg *config.Config) (bool, error) {
if dht == nil {
log.Info("DHT not available, skipping active peer discovery")
return false, nil
}
// GHY: 创建节点导出器实例,用于将发现的节点信息导出到数据库
exporter, err := NewNodeExporter()
if err != nil {
log.Errorf("Failed to create node exporter: %v", err)
return false, err
}
ctx := helpers.LifecycleCtx(mctx, lc)
// GHY: 启动后台协程,定期执行主动发现
go func() {
// 等待系统启动完成
time.Sleep(time.Minute)
ticker := time.NewTicker(activeDiscoveryInterval)
defer ticker.Stop()
defer exporter.Close()
log.Info("active peer discovery service started")
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
discoverPeers(ctx, host, disc, exporter)
}
}
}()
return true, nil
}
}
// GHY: 添加了discoverPeers函数,实现主动节点发现和信息导出
func discoverPeers(ctx context.Context, host host.Host, disc *discoveryHandler, exporter *NodeExporter) {
log.Info("starting active peer discovery")
// 获取当前连接的节点数量作为参考
currentPeers := len(host.Network().Peers())
log.Infof("currently connected to %d peers", currentPeers)
// GHY: 处理现有的节点,收集节点信息并导出
for _, p := range host.Network().Peers() {
// 获取节点信息
addrInfo := host.Peerstore().PeerInfo(p)
// GHY: 获取更多节点元数据,包括协议版本、代理版本和延迟
protocolVersion, _ := host.Peerstore().Get(p, "ProtocolVersion")
protoVer, _ := protocolVersion.(string)
agentVersion, _ := host.Peerstore().Get(p, "AgentVersion")
agentVer, _ := agentVersion.(string)
latency := host.Peerstore().LatencyEWMA(p)
// GHY: 导出节点信息到数据库,标记为主动发现
if err := exporter.ExportNode(ctx, addrInfo, "active", protoVer, agentVer, int64(latency)); err != nil {
log.Errorf("Failed to export node info: %v", err)
}
// 触发节点处理和导出
disc.HandlePeerFound(addrInfo)
// 只处理有限数量的节点
if len(addrInfo.Addrs) > 0 && currentPeers < 20 {
// 尝试连接到这个节点,可能会导致发现更多节点
discCtx, cancel := context.WithTimeout(ctx, time.Second*30)
if err := host.Connect(discCtx, addrInfo); err != nil {
log.Debugf("error reconnecting to peer %s: %s", p, err)
}
cancel()
}
}
// GHY: 从节点仓库获取所有已知节点,并尝试连接未连接的节点
allPeers := host.Peerstore().Peers()
log.Infof("peerstore contains %d peers", len(allPeers))
// 尝试连接到一些未连接的节点
connectedCount := 0
for _, pid := range allPeers {
if host.Network().Connectedness(pid) != 2 { // 2表示Connected
addrInfo := host.Peerstore().PeerInfo(pid)
if len(addrInfo.Addrs) > 0 {
discCtx, cancel := context.WithTimeout(ctx, time.Second*15)
if err := host.Connect(discCtx, addrInfo); err != nil {
log.Debugf("failed to connect to known peer %s: %s", pid, err)
} else {
connectedCount++
// GHY: 收集节点元数据并导出到数据库
protocolVersion, _ := host.Peerstore().Get(pid, "ProtocolVersion")
protoVer, _ := protocolVersion.(string)
agentVersion, _ := host.Peerstore().Get(pid, "AgentVersion")
agentVer, _ := agentVersion.(string)
latency := host.Peerstore().LatencyEWMA(pid)
// 导出节点信息
if err := exporter.ExportNode(ctx, addrInfo, "active", protoVer, agentVer, int64(latency)); err != nil {
log.Errorf("Failed to export node info: %v", err)
}
disc.HandlePeerFound(addrInfo)
}
cancel()
// 限制一次尝试连接的节点数量
if connectedCount >= 10 {
break
}
}
}
}
log.Infof("active peer discovery completed, now connected to %d peers", len(host.Network().Peers()))
}