Skip to content

Commit 8ac28b6

Browse files
Use single db connection to listen for events (#17)
1 parent e7c6575 commit 8ac28b6

3 files changed

Lines changed: 213 additions & 65 deletions

File tree

RIN.InternalAPI/Program.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public static void Main(string[] args)
2727

2828
builder.Services.AddSingleton<DB>();
2929
builder.Services.AddSingleton<SDB>();
30+
builder.Services.AddSingleton<DbEventBus>();
31+
builder.Services.AddHostedService(provider => provider.GetRequiredService<DbEventBus>());
3032

3133
var app = builder.Build();
3234
app.UseSerilogRequestLogging();
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
using System.Collections.Concurrent;
2+
using System.Text.Json;
3+
using System.Threading.Channels;
4+
using Npgsql;
5+
using RIN.Core.DB;
6+
using RIN.InternalAPI.Models;
7+
8+
namespace RIN.InternalAPI.Services
9+
{
10+
public class DbEventBus : BackgroundService
11+
{
12+
private readonly string ConnStr;
13+
private readonly DB Db;
14+
private readonly ILogger<DbEventBus> Logger;
15+
private readonly ConcurrentDictionary<Guid, Channel<Event>> Subscribers = new();
16+
private readonly Channel<Event> InternalChannel = Channel.CreateUnbounded<Event>();
17+
private static readonly ConcurrentDictionary<string, Type?> EventTypeCache = new();
18+
19+
public DbEventBus(DB db, ILogger<DbEventBus> logger)
20+
{
21+
Db = db;
22+
ConnStr = db.ConnStr;
23+
Logger = logger;
24+
}
25+
26+
public Guid Subscribe(Channel<Event> channel)
27+
{
28+
var id = Guid.NewGuid();
29+
Subscribers.TryAdd(id, channel);
30+
return id;
31+
}
32+
33+
public void Unsubscribe(Guid id) => Subscribers.TryRemove(id, out _);
34+
35+
protected override async Task ExecuteAsync(CancellationToken ct)
36+
{
37+
try
38+
{
39+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
40+
var listenTask = ListenToPostgres(cts.Token);
41+
var processTask = ProcessEventsAsync(cts.Token);
42+
43+
await Task.WhenAny(listenTask, processTask);
44+
await cts.CancelAsync();
45+
await Task.WhenAll(listenTask, processTask);
46+
}
47+
catch (Exception ex) when (!ct.IsCancellationRequested)
48+
{
49+
Logger.LogError(ex, ex.Message);
50+
}
51+
}
52+
53+
private async Task ListenToPostgres(CancellationToken ct)
54+
{
55+
while (!ct.IsCancellationRequested)
56+
{
57+
try
58+
{
59+
await using var conn = new NpgsqlConnection(ConnStr);
60+
await conn.OpenAsync(ct);
61+
conn.Notification += (_, e) => ParseAndQueueEvent(e.Payload);
62+
63+
await using (var cmd = new NpgsqlCommand("LISTEN events", conn))
64+
{
65+
await cmd.ExecuteNonQueryAsync(ct);
66+
}
67+
68+
Logger.LogInformation("Listening for db events");
69+
70+
while (!ct.IsCancellationRequested)
71+
{
72+
await conn.WaitAsync(ct);
73+
}
74+
}
75+
catch (Exception ex) when (ex is not OperationCanceledException)
76+
{
77+
Logger.LogError(ex, "DbEventBus error ocurred, reconnecting in 10 seconds");
78+
await Task.Delay(10000, ct);
79+
}
80+
}
81+
}
82+
83+
private void ParseAndQueueEvent(string payloadJson)
84+
{
85+
try
86+
{
87+
var dbEvent = payloadJson.Split(["->"], StringSplitOptions.None);
88+
if (dbEvent.Length != 2) return;
89+
90+
var eventType = dbEvent[0];
91+
var payloadStr = dbEvent[1];
92+
93+
var type = EventTypeCache.GetOrAdd(eventType, type => Type.GetType($"RIN.InternalAPI.Models.{type}"));
94+
if (type == null)
95+
{
96+
Logger.LogWarning("Unknown event type: {eventType}", eventType);
97+
return;
98+
}
99+
100+
var payload = JsonSerializer.Deserialize(payloadStr, type);
101+
if (payload is Event evt)
102+
{
103+
InternalChannel.Writer.TryWrite(evt);
104+
}
105+
}
106+
catch (Exception ex)
107+
{
108+
Logger.LogError(ex, "Error parsing event payload: {payloadJson}", payloadJson);
109+
}
110+
}
111+
112+
private async Task ProcessEventsAsync(CancellationToken ct)
113+
{
114+
await foreach (var evt in InternalChannel.Reader.ReadAllAsync(ct))
115+
{
116+
try
117+
{
118+
if (evt is CharacterVisualsUpdated cvu)
119+
{
120+
_ = FetchCharacterVisuals(cvu);
121+
}
122+
else
123+
{
124+
DispatchToAll(evt);
125+
}
126+
}
127+
catch (Exception ex)
128+
{
129+
Logger.LogError(ex, "Error processing event");
130+
}
131+
}
132+
}
133+
134+
private async Task FetchCharacterVisuals(CharacterVisualsUpdated cvu)
135+
{
136+
try
137+
{
138+
var result = await Db.GetBasicCharacterAndVisualData((long)cvu.CharacterGuid);
139+
var bfVisuals = PlayerBattleframeVisuals.CreateDefault();
140+
141+
cvu.CharacterAndBattleframeVisuals = new CharacterAndBattleframeVisuals
142+
{
143+
CharacterInfo = result.info,
144+
CharacterVisuals = result.visuals,
145+
BattleframeVisuals = bfVisuals
146+
};
147+
148+
DispatchToAll(cvu);
149+
}
150+
catch (Exception ex)
151+
{
152+
Logger.LogError(ex, "Error while fetching character and battleframe visuals: {message}", ex.Message);
153+
}
154+
}
155+
156+
private void DispatchToAll(Event evt)
157+
{
158+
foreach (var sub in Subscribers)
159+
{
160+
sub.Value.Writer.TryWrite(evt);
161+
}
162+
}
163+
}
164+
}
Lines changed: 47 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
1-
using System.Text.Json;
1+
using System.Threading.Channels;
22
using Grpc.Core;
3-
using Npgsql;
43
using RIN.Core.DB;
54
using RIN.InternalAPI.Models;
65

76
namespace RIN.InternalAPI.Services
87
{
98
public class GameServerAPI : IGameServerAPI
109
{
11-
private readonly DB DB;
10+
private readonly DB Db;
1211
private readonly ILogger<GameServerAPI> Logger;
12+
private readonly DbEventBus EventBus;
13+
private readonly IHostApplicationLifetime Lifetime;
1314

14-
public GameServerAPI(DB db, ILogger<GameServerAPI> logger)
15+
public GameServerAPI(DB db, ILogger<GameServerAPI> logger, DbEventBus eventBus, IHostApplicationLifetime lifetime)
1516
{
16-
DB = db;
17+
Db = db;
1718
Logger = logger;
19+
EventBus = eventBus;
20+
Lifetime = lifetime;
1821
}
1922

2023
public async ValueTask<PingResp> Ping(PingReq req)
@@ -30,13 +33,13 @@ public async ValueTask<PingResp> Ping(PingReq req)
3033

3134
public async ValueTask<CharacterAndBattleframeVisuals> GetCharacterAndBattleframeVisuals(CharacterID req)
3235
{
33-
var result = await DB.GetBasicCharacterAndVisualData(req.ID);
36+
var result = await Db.GetBasicCharacterAndVisualData(req.ID);
3437
var bfVisuals = PlayerBattleframeVisuals.CreateDefault();
3538

3639
var resp = new CharacterAndBattleframeVisuals
3740
{
38-
CharacterInfo = result.Item1,
39-
CharacterVisuals = result.Item2,
41+
CharacterInfo = result.info,
42+
CharacterVisuals = result.visuals,
4043
BattleframeVisuals = bfVisuals
4144
};
4245

@@ -45,73 +48,52 @@ public async ValueTask<CharacterAndBattleframeVisuals> GetCharacterAndBattlefram
4548

4649
public async Task Stream(IAsyncStreamReader<Command> commands, IServerStreamWriter<Event> events, ServerCallContext context)
4750
{
48-
await using var connection = new NpgsqlConnection(DB.ConnStr);
49-
await connection.OpenAsync();
51+
var channel = Channel.CreateUnbounded<Event>();
52+
var subscriptionId = EventBus.Subscribe(channel);
5053

51-
await using var cmd = new NpgsqlCommand("LISTEN events", connection);
52-
await cmd.ExecuteNonQueryAsync();
53-
54-
connection.Notification += async (_, e) =>
54+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, Lifetime.ApplicationStopping);
55+
var token = cts.Token;
56+
try
5557
{
56-
var dbEvent = e.Payload.Split(["->"], StringSplitOptions.None);
57-
var eventType = dbEvent[0];
58-
var payloadJson = dbEvent[1];
59-
60-
var type = Type.GetType($"RIN.InternalAPI.Models.{eventType}");
61-
62-
if (type == null)
63-
{
64-
Logger.LogError("Unknown event type: {eventType}", eventType);
65-
return;
66-
}
67-
68-
var payload = JsonSerializer.Deserialize(payloadJson, type);
69-
70-
if (payload is not Event evt)
71-
{
72-
Logger.LogError("Failed to deserialize payload for event type: {eventType}", eventType);
73-
return;
74-
}
75-
76-
if (evt is CharacterVisualsUpdated cvu)
77-
{
78-
var updatedVisuals = await GetCharacterAndBattleframeVisuals(
79-
new CharacterID { ID = (long)cvu.CharacterGuid });
80-
81-
cvu.CharacterAndBattleframeVisuals = updatedVisuals;
82-
83-
await events.WriteAsync(cvu);
84-
}
85-
else
58+
var sendEventsTask = Task.Run(async () =>
8659
{
87-
await events.WriteAsync(evt);
88-
}
89-
};
60+
await foreach (var evt in channel.Reader.ReadAllAsync(token))
61+
{
62+
await events.WriteAsync(evt);
63+
}
64+
});
9065

91-
var commandsTask = Task.Run(async () =>
92-
{
93-
await foreach (var command in commands.ReadAllAsync())
66+
var commandsTask = Task.Run(async () =>
9467
{
95-
Logger.LogInformation("Received command: {command}", command);
96-
97-
switch (command)
68+
await foreach (var command in commands.ReadAllAsync(token))
9869
{
99-
case SaveGameSessionData data:
100-
await DB.UpdateCharacterAfterGameSession((long)data.CharacterId, (int)data.ZoneId, (int)data.OutpostId, (int)data.TimePlayed);
101-
break;
102-
case SaveLgvRaceFinish race:
103-
await DB.SaveLgvRaceFinish((long)race.CharacterGuid, (int)race.LeaderboardId, (long)race.TimeMs);
104-
break;
70+
Logger.LogInformation("Received command: {command}", command);
71+
72+
switch (command)
73+
{
74+
case SaveGameSessionData data:
75+
await Db.UpdateCharacterAfterGameSession((long)data.CharacterId, (int)data.ZoneId, (int)data.OutpostId, (int)data.TimePlayed);
76+
break;
77+
case SaveLgvRaceFinish race:
78+
await Db.SaveLgvRaceFinish((long)race.CharacterGuid, (int)race.LeaderboardId, (long)race.TimeMs);
79+
break;
80+
}
10581
}
106-
}
107-
});
82+
});
10883

109-
while (!context.CancellationToken.IsCancellationRequested)
84+
await Task.WhenAny(sendEventsTask, commandsTask);
85+
await cts.CancelAsync();
86+
await Task.WhenAll(sendEventsTask, commandsTask);
87+
}
88+
catch (Exception ex) when (ex is not OperationCanceledException)
11089
{
111-
await connection.WaitAsync(context.CancellationToken);
90+
Logger.LogError(ex, "GRPC Stream crashed");
91+
}
92+
finally
93+
{
94+
channel.Writer.TryComplete();
95+
EventBus.Unsubscribe(subscriptionId);
11296
}
113-
114-
await commandsTask;
11597
}
11698
}
11799
}

0 commit comments

Comments
 (0)