Skip to content

Commit 2fa446c

Browse files
authored
Handle errors in LMDB and Pino (#527)
1 parent c21726f commit 2fa446c

3 files changed

Lines changed: 49 additions & 24 deletions

File tree

src/datastore/LMDB.ts

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ export class LMDBStoreFactory implements DataStoreFactory {
8080
noSubdir: config.noSubdir,
8181
overlappingSync: config.overlappingSync,
8282
},
83-
`Initialized LMDB ${Version} with stores: ${toString(storeNames)} and ${formatNumber(this.totalBytes() / (1024 * 1024), 4)} MB`,
83+
`Initialized LMDB ${Version} with stores: ${toString(storeNames)} and ${formatNumber(stats(this.env).totalSize / (1024 * 1024), 4)} MB`,
8484
);
8585
}
8686

@@ -106,31 +106,48 @@ export class LMDBStoreFactory implements DataStoreFactory {
106106
if (this.closed) return;
107107
const msg = extractErrorMessage(error);
108108

109-
if (msg.includes('MDB_BAD_RSLOT') || msg.includes("doesn't match env pid")) {
110-
this.recoverFromFork();
111-
} else {
112-
this.recoverFromError();
109+
try {
110+
if (msg.includes('MDB_BAD_RSLOT') || msg.includes("doesn't match env pid")) {
111+
this.recoverFromFork();
112+
} else {
113+
this.recoverFromError();
114+
}
115+
} catch (recoveryError) {
116+
this.log.error(recoveryError, 'LMDB recovery failed, disabling database');
117+
this.telemetry.count('recovery.failed', 1);
113118
}
114119
}
115120

116121
private ensureValidEnv(): void {
117122
if (process.pid !== this.openPid) {
118123
this.telemetry.count('process.forked', 1);
119124
this.log.warn({ oldPid: this.openPid, newPid: process.pid }, 'Process fork detected, reopening LMDB');
120-
this.reopenEnv();
121125

122-
// Update all stores with new handles
123-
for (const store of this.storeNames) {
124-
this.stores.get(store)?.updateStore(createDB(this.env, store));
126+
try {
127+
this.reopenEnv();
128+
129+
// Update all stores with new handles
130+
for (const store of this.storeNames) {
131+
this.stores.get(store)?.updateStore(createDB(this.env, store));
132+
}
133+
} catch (e) {
134+
this.log.error(e, 'Failed to reopen LMDB after fork');
135+
this.deleteAndRecreate();
125136
}
126137
}
127138
}
128139

129140
private recoverFromFork(): void {
130141
this.telemetry.count('forked.recover', 1);
131142
this.log.warn({ oldPid: this.openPid, newPid: process.pid }, 'Process fork detected, reopening LMDB');
132-
this.reopenEnv();
133-
this.recreateStores();
143+
144+
try {
145+
this.reopenEnv();
146+
this.recreateStores();
147+
} catch {
148+
this.log.warn('Fork recovery failed, deleting and recreating');
149+
this.deleteAndRecreate();
150+
}
134151
}
135152

136153
private recoverFromError(): void {
@@ -143,9 +160,18 @@ export class LMDBStoreFactory implements DataStoreFactory {
143160
this.log.info('Successfully recovered by reopening LMDB');
144161
} catch {
145162
this.log.warn('Reopen failed, deleting database');
163+
this.deleteAndRecreate();
164+
}
165+
}
166+
167+
private deleteAndRecreate(): void {
168+
try {
146169
this.deleteVersionDir();
147170
this.reopenEnv();
148171
this.recreateStores();
172+
} catch (e) {
173+
this.log.error(e, 'Failed to recreate LMDB after deletion');
174+
this.telemetry.count('recovery.failed', 1);
149175
}
150176
}
151177

@@ -209,8 +235,6 @@ export class LMDBStoreFactory implements DataStoreFactory {
209235
if (this.closed) return;
210236

211237
try {
212-
const totalBytes = this.totalBytes();
213-
214238
const envStat = stats(this.env);
215239
this.telemetry.histogram('version', VersionNumber);
216240
this.telemetry.histogram('env.size.bytes', envStat.totalSize, { unit: 'By' });
@@ -219,10 +243,12 @@ export class LMDBStoreFactory implements DataStoreFactory {
219243
});
220244
this.telemetry.histogram('env.entries', envStat.entries);
221245

246+
let totalBytes = envStat.totalSize;
222247
for (const [name, store] of this.stores.entries()) {
223248
const stat = store.stats();
224249
this.telemetry.histogram(`store.${name}.size.bytes`, stat.totalSize, { unit: 'By' });
225250
this.telemetry.histogram(`store.${name}.entries`, stat.entries);
251+
totalBytes += stat.totalSize;
226252
}
227253

228254
this.telemetry.histogram('total.usage', 100 * (totalBytes / TotalMaxDbSize), { unit: '%' });
@@ -231,17 +257,6 @@ export class LMDBStoreFactory implements DataStoreFactory {
231257
this.handleError(e);
232258
}
233259
}
234-
235-
private totalBytes() {
236-
let totalBytes = 0;
237-
totalBytes += stats(this.env).totalSize;
238-
239-
for (const store of this.stores.values()) {
240-
totalBytes += store.stats().totalSize;
241-
}
242-
243-
return totalBytes;
244-
}
245260
}
246261

247262
const VersionNumber = 5;

src/telemetry/LoggerFactory.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,14 @@ export class LoggerFactory implements Closeable {
155155
clearInterval(this.interval);
156156
}
157157

158+
/**
159+
* Returns true if the error is a known pino stream race condition that
160+
* occurs when the ThreadStream worker exits while async log calls are in flight.
161+
*/
162+
static isPinoStreamError(error: unknown): boolean {
163+
return error instanceof TypeError && error.message.includes('pino.msgPrefix');
164+
}
165+
158166
static getLogger(clazz: string | Function): Logger {
159167
if (LoggerFactory._instance === undefined) {
160168
throw new Error('Logger not configured');

src/telemetry/TelemetryService.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,13 @@ export class TelemetryService implements Closeable {
152152

153153
private registerErrorHandlers(telemetry: ScopedTelemetry): void {
154154
process.on('unhandledRejection', (reason, _promise) => {
155+
if (LoggerFactory.isPinoStreamError(reason)) return;
155156
telemetry.error('process.promise.unhandled', reason, undefined, { captureErrorAttributes: true });
156157
void this.metricsReader?.forceFlush();
157158
});
158159

159160
process.on('uncaughtException', (error, origin) => {
161+
if (LoggerFactory.isPinoStreamError(error)) return;
160162
telemetry.error('process.exception.uncaught', error, origin, { captureErrorAttributes: true });
161163
void this.metricsReader?.forceFlush();
162164
});

0 commit comments

Comments
 (0)