Skip to content

Commit ead68b6

Browse files
authored
more mutex changes (#169)
* more mutex changes * more * more * more * more * more * more * more * more * more * more
1 parent b2ecd63 commit ead68b6

4 files changed

Lines changed: 139 additions & 127 deletions

File tree

internal/gameServer/server.go

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,10 @@ type Registration struct {
2626

2727
type GameServer struct {
2828
StartTime time.Time
29-
Players map[string]Client
30-
PlayersMutex sync.Mutex
29+
Players sync.Map
3130
tcpListener *net.TCPListener
3231
udpListener *net.UDPConn
33-
registrations map[byte]*Registration
34-
registrationsMutex sync.Mutex
32+
registrations sync.Map
3533
tcpMutex sync.RWMutex
3634
tcpFiles map[string][]byte
3735
customData map[byte][]byte
@@ -132,7 +130,7 @@ func (g *GameServer) ManageBuffer() {
132130
var bufferHealth float32 = -1.0
133131
var countLag float32 = 255
134132
var leadPlayer int
135-
g.gameDataMutex.Lock() // BufferHealth can be modified by processUDP in a different thread
133+
g.gameDataMutex.Lock()
136134
for i := range 4 {
137135
var errBufferHeatlh error
138136
var errCountLag error
@@ -170,36 +168,32 @@ func (g *GameServer) ManagePlayers() {
170168
playersActive := false // used to check if anyone is still around
171169
var i byte
172170

173-
// Lock registrations before gameData so TCP paths that take registrationsMutex then gameDataMutex cannot deadlock.
174-
g.registrationsMutex.Lock()
175171
g.gameDataMutex.Lock()
176172
for i = range 4 {
177-
_, ok := g.registrations[i]
173+
v, ok := g.registrations.Load(i)
178174
if ok {
179175
if g.gameData.playerAlive[i] {
180-
g.Logger.Info("player status", "player", i, "regID", g.registrations[i].regID, "bufferHealth", g.gameData.averageBufferHealth[i], "bufferSize", g.gameData.bufferSize, "countLag", g.gameData.averageCountLag[i], "address", g.gameData.playerAddresses[i])
176+
g.Logger.Info("player status", "player", i, "regID", v.(*Registration).regID, "bufferHealth", g.gameData.averageBufferHealth[i], "bufferSize", g.gameData.bufferSize, "countLag", g.gameData.averageCountLag[i], "address", g.gameData.playerAddresses[i])
181177
playersActive = true
182178
} else {
183-
g.Logger.Info("player disconnected UDP", "player", i, "regID", g.registrations[i].regID, "address", g.gameData.playerAddresses[i])
179+
g.Logger.Info("player disconnected UDP", "player", i, "regID", v.(*Registration).regID, "address", g.gameData.playerAddresses[i])
184180
g.gameData.status |= (0x1 << (i + 1))
185181

186-
delete(g.registrations, i)
182+
g.registrations.Delete(i)
187183

188-
for k, v := range g.Players {
189-
if v.Number == int(i) {
190-
g.PlayersMutex.Lock()
191-
delete(g.Players, k)
184+
g.Players.Range(func(k, v any) bool {
185+
if v.(Client).Number == int(i) {
186+
g.Players.Delete(k)
192187
g.NeedsUpdatePlayers = true
193-
g.PlayersMutex.Unlock()
194188
}
195-
}
189+
return true
190+
})
196191
g.gameData.bufferHealth[i] = g.gameData.bufferHealth[i][:0]
197192
}
198193
}
199194
g.gameData.playerAlive[i] = false
200195
}
201196
g.gameDataMutex.Unlock()
202-
g.registrationsMutex.Unlock()
203197

204198
if !playersActive {
205199
g.Logger.Info("no more players, closing room", "numPlayers", g.NumberOfPlayers, "clientSHA", g.ClientSha, "playTime", time.Since(g.StartTime).String())
@@ -210,3 +204,21 @@ func (g *GameServer) ManagePlayers() {
210204
time.Sleep(time.Second * DisconnectTimeoutS)
211205
}
212206
}
207+
208+
func (g *GameServer) GetPlayersLength() int {
209+
length := 0
210+
g.Players.Range(func(_, _ any) bool {
211+
length++
212+
return true
213+
})
214+
return length
215+
}
216+
217+
func (g *GameServer) GetRegistrationsLength() int {
218+
length := 0
219+
g.registrations.Range(func(_, _ any) bool {
220+
length++
221+
return true
222+
})
223+
return length
224+
}

internal/gameServer/tcp.go

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func (g *GameServer) tcpSendCustom(conn *net.TCPConn, customID byte) {
117117

118118
func (g *GameServer) tcpSendReg(conn *net.TCPConn) {
119119
startTime := time.Now()
120-
for len(g.Players) != len(g.registrations) {
120+
for g.GetPlayersLength() != g.GetRegistrationsLength() {
121121
time.Sleep(time.Millisecond)
122122
if time.Since(startTime) > TCPTimeout {
123123
g.Logger.Info("TCP connection timed out in tcpSendReg")
@@ -128,13 +128,13 @@ func (g *GameServer) tcpSendReg(conn *net.TCPConn) {
128128
registrations := make([]byte, 24)
129129
current := 0
130130
for i = range 4 {
131-
_, ok := g.registrations[i]
131+
v, ok := g.registrations.Load(i)
132132
if ok {
133-
binary.BigEndian.PutUint32(registrations[current:], g.registrations[i].regID)
133+
binary.BigEndian.PutUint32(registrations[current:], v.(*Registration).regID)
134134
current += 4
135-
registrations[current] = g.registrations[i].plugin
135+
registrations[current] = v.(*Registration).plugin
136136
current++
137-
registrations[current] = g.registrations[i].raw
137+
registrations[current] = v.(*Registration).raw
138138
current++
139139
} else {
140140
current += 6
@@ -282,36 +282,35 @@ func (g *GameServer) processTCP(conn *net.TCPConn) {
282282
regID := binary.BigEndian.Uint32(regIDBytes)
283283

284284
response := make([]byte, 2)
285-
_, ok := g.registrations[playerNumber]
285+
v, ok := g.registrations.Load(playerNumber)
286286
if !ok {
287287
if playerNumber > 0 && plugin == 2 { // Only P1 can use mempak
288288
plugin = 1
289289
}
290290

291-
g.registrationsMutex.Lock() // any player can modify this, which would be in a different thread
292-
g.registrations[playerNumber] = &Registration{
291+
g.registrations.Store(playerNumber, &Registration{
293292
regID: regID,
294293
plugin: plugin,
295294
raw: raw,
296-
}
297-
g.registrationsMutex.Unlock()
295+
})
298296

299297
response[0] = 1
300-
g.Logger.Info("registered player", "registration", g.registrations[playerNumber], "number", playerNumber, "bufferLeft", tcpData.buffer.Len(), "address", conn.RemoteAddr().String())
298+
g.Logger.Info("registered player", "regID", regID, "number", playerNumber, "bufferLeft", tcpData.buffer.Len(), "address", conn.RemoteAddr().String())
301299

302-
g.gameDataMutex.Lock() // any player can modify this, which would be in a different thread
300+
g.gameDataMutex.Lock()
303301
g.gameData.pendingInput[playerNumber] = InputData{0, plugin}
304302
g.gameData.playerAlive[playerNumber] = true
305303
g.gameDataMutex.Unlock()
306304
} else {
307-
if g.registrations[playerNumber].regID == regID {
308-
g.Logger.Error(fmt.Errorf("re-registration"), "player already registered", "registration", g.registrations[playerNumber], "number", playerNumber, "bufferLeft", tcpData.buffer.Len(), "address", conn.RemoteAddr().String())
305+
if v.(*Registration).regID == regID {
306+
g.Logger.Error(fmt.Errorf("re-registration"), "player already registered", "registration", v.(*Registration), "number", playerNumber, "bufferLeft", tcpData.buffer.Len(), "address", conn.RemoteAddr().String())
309307
response[0] = 1
310308
} else {
311-
g.Logger.Error(fmt.Errorf("registration failure"), "could not register player", "registration", g.registrations[playerNumber], "number", playerNumber, "bufferLeft", tcpData.buffer.Len(), "address", conn.RemoteAddr().String())
309+
g.Logger.Error(fmt.Errorf("registration failure"), "could not register player", "registration", v.(*Registration), "number", playerNumber, "bufferLeft", tcpData.buffer.Len(), "address", conn.RemoteAddr().String())
312310
response[0] = 0
313311
}
314312
}
313+
315314
response[1] = uint8(g.BufferTarget)
316315
_, err = conn.Write(response)
317316
if err != nil {
@@ -334,30 +333,27 @@ func (g *GameServer) processTCP(conn *net.TCPConn) {
334333
regID := binary.BigEndian.Uint32(regIDBytes)
335334
var i byte
336335
for i = range 4 {
337-
v, ok := g.registrations[i]
338-
if ok {
339-
if v.regID == regID {
340-
g.Logger.Info("player disconnected TCP", "regID", regID, "player", i, "address", conn.RemoteAddr().String())
341-
342-
// Same lock order as ManagePlayers / RegisterPlayer: registrationsMutex, then gameDataMutex.
343-
g.registrationsMutex.Lock()
344-
g.gameDataMutex.Lock()
345-
g.gameData.playerAlive[i] = false
346-
g.gameData.status |= (0x1 << (i + 1))
347-
delete(g.registrations, i)
348-
349-
for k, v := range g.Players {
350-
if v.Number == int(i) {
351-
g.PlayersMutex.Lock()
352-
delete(g.Players, k)
353-
g.NeedsUpdatePlayers = true
354-
g.PlayersMutex.Unlock()
355-
}
336+
v, ok := g.registrations.Load(i)
337+
if ok && v.(*Registration).regID == regID {
338+
g.Logger.Info("player disconnected TCP", "regID", regID, "player", i, "address", conn.RemoteAddr().String())
339+
340+
g.gameDataMutex.Lock()
341+
g.gameData.playerAlive[i] = false
342+
g.gameData.status |= (0x1 << (i + 1))
343+
g.gameDataMutex.Unlock()
344+
g.registrations.Delete(i)
345+
346+
g.Players.Range(func(k, v any) bool {
347+
if v.(Client).Number == int(i) {
348+
g.Players.Delete(k)
349+
g.NeedsUpdatePlayers = true
356350
}
357-
g.gameData.bufferHealth[i] = g.gameData.bufferHealth[i][:0]
358-
g.gameDataMutex.Unlock()
359-
g.registrationsMutex.Unlock()
360-
}
351+
return true
352+
})
353+
354+
g.gameDataMutex.Lock()
355+
g.gameData.bufferHealth[i] = g.gameData.bufferHealth[i][:0]
356+
g.gameDataMutex.Unlock()
361357
}
362358
}
363359
tcpData.request = RequestNone
@@ -413,11 +409,12 @@ func (g *GameServer) watchTCP() {
413409
conn.Close() //nolint:errcheck
414410
continue
415411
}
416-
for _, v := range g.Players {
417-
if remoteAddr.IP.Equal(v.IP) {
412+
g.Players.Range(func(k, v any) bool {
413+
if remoteAddr.IP.Equal(v.(Client).IP) {
418414
validated = true
419415
}
420-
}
416+
return true
417+
})
421418
if !validated {
422419
g.Logger.Error(fmt.Errorf("invalid tcp connection"), "bad IP", "IP", conn.RemoteAddr().String())
423420
conn.Close() //nolint:errcheck
@@ -440,7 +437,6 @@ func (g *GameServer) createTCPServer(basePort int, maxGames int) int {
440437
g.tcpFiles = make(map[string][]byte)
441438
g.customData = make(map[byte][]byte)
442439
g.tcpSettings = make([]byte, SettingsSize)
443-
g.registrations = map[byte]*Registration{}
444440
go g.watchTCP()
445441
return g.Port
446442
}

internal/gameServer/udp.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ func uintLarger(v uint32, w uint32) bool {
5959
func (g *GameServer) getPlayerNumberByID(regID uint32) (byte, error) {
6060
var i byte
6161
for i = range 4 {
62-
v, ok := g.registrations[i]
62+
v, ok := g.registrations.Load(i)
6363
if ok {
64-
if v.regID == regID {
64+
if v.(*Registration).regID == regID {
6565
return i, nil
6666
}
6767
}
@@ -122,10 +122,11 @@ func (g *GameServer) sendUDPInput(count uint32, addr *net.UDPAddr, playerNumber
122122
}
123123

124124
func (g *GameServer) processUDP(addr *net.UDPAddr) {
125+
g.gameDataMutex.Lock()
126+
defer g.gameDataMutex.Unlock()
125127
playerNumber := g.gameData.recvBuffer[1]
126128
switch g.gameData.recvBuffer[0] {
127129
case KeyInfoClient:
128-
g.gameDataMutex.Lock()
129130
g.gameData.playerAddresses[playerNumber] = addr
130131
count := binary.BigEndian.Uint32(g.gameData.recvBuffer[2:])
131132

@@ -139,7 +140,6 @@ func (g *GameServer) processUDP(addr *net.UDPAddr) {
139140
g.sendUDPInput(count, g.gameData.playerAddresses[i], playerNumber, true, NoRegID)
140141
}
141142
}
142-
g.gameDataMutex.Unlock()
143143
case PlayerInputRequest:
144144
regID := binary.BigEndian.Uint32(g.gameData.recvBuffer[2:])
145145
count := binary.BigEndian.Uint32(g.gameData.recvBuffer[6:])
@@ -149,24 +149,20 @@ func (g *GameServer) processUDP(addr *net.UDPAddr) {
149149
g.Logger.Error(err, "could not process request", "regID", regID)
150150
return
151151
}
152-
g.gameDataMutex.Lock() // playerAlive, countLag, bufferHealth, leadCount; other threads touch these
153152
if uintLarger(count, g.gameData.leadCount) && spectator == 0 {
154153
g.gameData.leadCount = count
155154
}
156155
g.gameData.countLag[sendingPlayerNumber] = append(g.gameData.countLag[sendingPlayerNumber], g.sendUDPInput(count, addr, playerNumber, spectator != 0, sendingPlayerNumber))
157156
g.gameData.bufferHealth[sendingPlayerNumber] = append(g.gameData.bufferHealth[sendingPlayerNumber], g.gameData.recvBuffer[11])
158157
g.gameData.playerAlive[sendingPlayerNumber] = true
159-
g.gameDataMutex.Unlock()
160158
case CP0Info:
161159
if g.gameData.status&StatusDesync == 0 {
162160
viCount := binary.BigEndian.Uint32(g.gameData.recvBuffer[1:])
163161
syncValue, ok := g.gameData.syncValues.Get(viCount)
164162
if !ok {
165163
g.gameData.syncValues.Add(viCount, bytes.Clone(g.gameData.recvBuffer[5:133]))
166164
} else if !bytes.Equal(syncValue, g.gameData.recvBuffer[5:133]) {
167-
g.gameDataMutex.Lock() // Status can be modified by ManagePlayers in a different thread
168165
g.gameData.status |= StatusDesync
169-
g.gameDataMutex.Unlock()
170166

171167
g.Logger.Error(fmt.Errorf("desync"), "game has desynced", "numPlayers", g.NumberOfPlayers, "clientSHA", g.ClientSha, "playTime", time.Since(g.StartTime).String(), "features", g.Features)
172168
}
@@ -186,11 +182,12 @@ func (g *GameServer) watchUDP() {
186182

187183
if g.VerifyIP {
188184
validated := false
189-
for _, v := range g.Players {
190-
if addr.IP.Equal(v.IP) {
185+
g.Players.Range(func(k, v any) bool {
186+
if addr.IP.Equal(v.(Client).IP) {
191187
validated = true
192188
}
193-
}
189+
return true
190+
})
194191
if !validated {
195192
g.Logger.Error(fmt.Errorf("invalid udp connection"), "bad IP", "IP", addr.IP)
196193
continue

0 commit comments

Comments
 (0)