Skip to content

Commit 43799ba

Browse files
fix: telegram auth session recycle
1 parent ff5ed62 commit 43799ba

1 file changed

Lines changed: 182 additions & 31 deletions

File tree

api/app/clients/tools/structured/Telegram.js

Lines changed: 182 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ class TelegramSessionPool {
1212
static currentIndex = 0;
1313
static sessionKeys = [];
1414
static lastReuseLog = 0; // Track when we last logged reuse to reduce noise
15+
static idleTimeout = 5 * 60 * 1000; // 5 minutes idle timeout
16+
static cleanupInterval = null;
17+
static invalidatedSessions = new Set(); // Permanently invalid sessions (AUTH_KEY_DUPLICATED)
1518

1619
static initialize() {
1720
// Discover available session strings
@@ -43,24 +46,44 @@ class TelegramSessionPool {
4346

4447
// PHASE 1: Try to reuse any existing connected client first (most efficient)
4548
for (const [sessionKey, clientInfo] of this.clients.entries()) {
46-
if (clientInfo && clientInfo.client && clientInfo.client.connected) {
47-
clientInfo.lastUsed = Date.now();
48-
// Only log reuse occasionally to reduce noise
49-
const now = Date.now();
50-
if (!this.lastReuseLog || now - this.lastReuseLog > 30000) {
51-
console.log(`♻️ Reusing session: ${sessionKey}`);
52-
this.lastReuseLog = now;
49+
// Skip invalidated sessions
50+
if (this.invalidatedSessions.has(sessionKey)) {
51+
continue;
52+
}
53+
54+
if (clientInfo && clientInfo.client) {
55+
// Check if client is actually connected and healthy
56+
try {
57+
if (clientInfo.client.connected) {
58+
clientInfo.lastUsed = Date.now();
59+
// Only log reuse occasionally to reduce noise
60+
const now = Date.now();
61+
if (!this.lastReuseLog || now - this.lastReuseLog > 30000) {
62+
console.log(`♻️ Reusing healthy session: ${sessionKey}`);
63+
this.lastReuseLog = now;
64+
}
65+
return clientInfo.client;
66+
} else {
67+
// Client exists but not connected, clean it up
68+
console.log(`🔧 Cleaning up disconnected client: ${sessionKey}`);
69+
this.clients.delete(sessionKey);
70+
}
71+
} catch (error) {
72+
// Client is in bad state, clean it up
73+
console.log(`🔧 Cleaning up unhealthy client: ${sessionKey} - ${error.message}`);
74+
this.clients.delete(sessionKey);
5375
}
54-
return clientInfo.client;
5576
}
5677
}
5778

5879
// PHASE 2: Wait for any connecting sessions to complete (avoid duplicate connections)
5980
const connectingSessions = Array.from(this.clients.entries()).filter(
60-
([_, clientInfo]) => clientInfo && clientInfo.isConnecting,
81+
([sessionKey, clientInfo]) => clientInfo && clientInfo.isConnecting && !this.invalidatedSessions.has(sessionKey),
6182
);
6283

6384
if (connectingSessions.length > 0) {
85+
console.log(`⏳ Waiting for ${connectingSessions.length} session(s) to finish connecting...`);
86+
6487
// Wait up to 10 seconds for any connection to complete
6588
const maxWaitTime = 10000;
6689
const checkInterval = 100;
@@ -70,42 +93,96 @@ class TelegramSessionPool {
7093
await new Promise((resolve) => setTimeout(resolve, checkInterval));
7194
waitTime += checkInterval;
7295

73-
// Check if ANY session has connected successfully
96+
// Check if ANY valid session has connected successfully
7497
for (const [sessionKey, clientInfo] of this.clients.entries()) {
75-
if (clientInfo && clientInfo.client && clientInfo.client.connected) {
98+
if (!this.invalidatedSessions.has(sessionKey) && clientInfo && clientInfo.client && clientInfo.client.connected) {
7699
clientInfo.lastUsed = Date.now();
100+
console.log(`✅ Connected session became available: ${sessionKey}`);
77101
return clientInfo.client;
78102
}
79103
}
80104

81105
// Check if all connecting sessions have finished (success or failure)
82106
const stillConnecting = Array.from(this.clients.entries()).some(
83-
([_, clientInfo]) => clientInfo && clientInfo.isConnecting,
107+
([sessionKey, clientInfo]) => clientInfo && clientInfo.isConnecting && !this.invalidatedSessions.has(sessionKey),
84108
);
85109

86110
if (!stillConnecting) {
87111
break;
88112
}
89113
}
114+
115+
console.log(`⏱️ Finished waiting after ${waitTime}ms`);
90116
}
91117

92-
// PHASE 3: Create new connection with proper locking to prevent AUTH_KEY_DUPLICATED
118+
// PHASE 3: Create new connection with smart session selection
93119
const maxAttempts = this.sessionKeys.length * 2;
94120

95-
for (let attempt = 0; attempt < maxAttempts; attempt++) {
96-
const sessionKey = this.sessionKeys[this.currentIndex];
97-
this.currentIndex = (this.currentIndex + 1) % this.sessionKeys.length;
121+
// Check if we have any permanently invalidated sessions
122+
if (this.invalidatedSessions.size > 0) {
123+
console.log(`⚠️ WARNING: ${this.invalidatedSessions.size} session(s) permanently invalidated: ${Array.from(this.invalidatedSessions).join(', ')}`);
124+
console.log(`⚠️ These sessions need to be regenerated. Please update your .env file with new session strings.`);
125+
}
126+
127+
// Filter out permanently invalidated sessions
128+
const validSessionKeys = this.sessionKeys.filter(sessionKey => !this.invalidatedSessions.has(sessionKey));
129+
130+
if (validSessionKeys.length === 0) {
131+
throw new Error('All Telegram sessions have been invalidated (AUTH_KEY_DUPLICATED). Please regenerate your session strings.');
132+
}
133+
134+
// First, try to find unused sessions (prioritize fresh sessions)
135+
const unusedSessions = validSessionKeys.filter(sessionKey => !this.clients.has(sessionKey));
136+
const usedSessions = validSessionKeys.filter(sessionKey => this.clients.has(sessionKey));
137+
138+
// Create session priority list: unused first, then used (oldest failures first)
139+
const sessionPriorityList = [
140+
...unusedSessions,
141+
...usedSessions.sort((a, b) => {
142+
const clientA = this.clients.get(a);
143+
const clientB = this.clients.get(b);
144+
const failTimeA = clientA?.failedAt || 0;
145+
const failTimeB = clientB?.failedAt || 0;
146+
return failTimeA - failTimeB; // Older failures first
147+
})
148+
];
149+
150+
console.log(`🎯 Session priority order: [${sessionPriorityList.join(', ')}] (${unusedSessions.length} unused, ${usedSessions.length} used, ${this.invalidatedSessions.size} invalid)`);
151+
152+
for (let attempt = 0; attempt < maxAttempts && attempt < sessionPriorityList.length; attempt++) {
153+
const sessionKey = sessionPriorityList[attempt];
98154

99155
try {
100156
const clientInfo = this.clients.get(sessionKey);
101157

102-
// Skip if this session is already connecting or recently failed
103-
if (clientInfo && (clientInfo.isConnecting || clientInfo.failedAt)) {
104-
// Clear old failures after 30 seconds
105-
if (clientInfo.failedAt && Date.now() - clientInfo.failedAt > 30000) {
106-
this.clients.delete(sessionKey);
107-
} else {
158+
// Skip if this session is currently connecting
159+
if (clientInfo && clientInfo.isConnecting) {
160+
console.log(`⏭️ Skipping ${sessionKey}: currently connecting`);
161+
continue;
162+
}
163+
164+
// For AUTH_KEY_DUPLICATED errors, only skip if failed very recently (5 seconds)
165+
// For connection thread errors, retry immediately (likely a race condition)
166+
// For other errors, use the longer 30-second timeout
167+
const isAuthKeyDuplicated = clientInfo?.lastError?.includes('AUTH_KEY_DUPLICATED');
168+
const isThreadError = clientInfo?.lastError?.includes('already being connected');
169+
let failureTimeout = 30000; // Default timeout
170+
171+
if (isAuthKeyDuplicated) {
172+
failureTimeout = 5000; // Short timeout for auth key errors
173+
} else if (isThreadError) {
174+
failureTimeout = 500; // Very short timeout for thread conflicts
175+
}
176+
177+
if (clientInfo && clientInfo.failedAt) {
178+
if (Date.now() - clientInfo.failedAt < failureTimeout) {
179+
const timeSinceFail = Math.ceil((Date.now() - clientInfo.failedAt) / 1000);
180+
console.log(`⏭️ Skipping ${sessionKey}: failed ${timeSinceFail}s ago (${isAuthKeyDuplicated ? 'AUTH_KEY_DUPLICATED' : isThreadError ? 'thread conflict' : 'other error'})`);
108181
continue;
182+
} else {
183+
// Clear old failure
184+
console.log(`🔄 Clearing old failure for ${sessionKey}`);
185+
this.clients.delete(sessionKey);
109186
}
110187
}
111188

@@ -115,10 +192,11 @@ class TelegramSessionPool {
115192
isConnecting: true,
116193
lastUsed: Date.now(),
117194
failedAt: null,
195+
lastError: null,
118196
});
119197

120198
// Create new client
121-
console.log(`🔌 Connecting session: ${sessionKey}`);
199+
console.log(`🔌 Connecting session: ${sessionKey} (attempt ${attempt + 1}/${maxAttempts})`);
122200
const client = await this._createClient(sessionKey);
123201

124202
// Update with successful connection
@@ -127,6 +205,7 @@ class TelegramSessionPool {
127205
isConnecting: false,
128206
lastUsed: Date.now(),
129207
failedAt: null,
208+
lastError: null,
130209
});
131210

132211
// Small delay to ensure connection is fully established for reuse
@@ -135,27 +214,47 @@ class TelegramSessionPool {
135214
} catch (error) {
136215
console.log(`❌ Failed to create client for ${sessionKey}: ${error.message}`);
137216

138-
// Mark as failed to prevent immediate retry
217+
// Mark as failed with error details
139218
this.clients.set(sessionKey, {
140219
client: null,
141220
isConnecting: false,
142221
lastUsed: Date.now(),
143222
failedAt: Date.now(),
223+
lastError: error.message,
144224
});
145225

146-
// For AUTH_KEY_DUPLICATED errors, add extra delay before trying next session
226+
// For AUTH_KEY_DUPLICATED errors, mark session as permanently invalid
147227
if (error.message.includes('AUTH_KEY_DUPLICATED')) {
148-
console.log(`🔄 AUTH_KEY_DUPLICATED detected, waiting 2s before next attempt...`);
149-
await new Promise((resolve) => setTimeout(resolve, 2000));
150-
151-
// After delay, check if another request has already succeeded
228+
console.log(`💀 AUTH_KEY_DUPLICATED detected for ${sessionKey} - this session is now permanently invalid!`);
229+
230+
// Mark this session as permanently invalidated
231+
this.invalidatedSessions.add(sessionKey);
232+
233+
// Remove from clients map completely
234+
this.clients.delete(sessionKey);
235+
236+
console.log(`🚨 Session ${sessionKey} has been invalidated by Telegram and needs to be regenerated.`);
237+
console.log(`📊 Status: ${validSessionKeys.length - this.invalidatedSessions.size} valid sessions remaining.`);
238+
239+
// Check if another request has already succeeded while we were failing
152240
for (const [checkSessionKey, checkClientInfo] of this.clients.entries()) {
153-
if (checkClientInfo && checkClientInfo.client && checkClientInfo.client.connected) {
241+
if (checkClientInfo && checkClientInfo.client && checkClientInfo.client.connected && !this.invalidatedSessions.has(checkSessionKey)) {
154242
checkClientInfo.lastUsed = Date.now();
243+
console.log(`♻️ Found existing connected session: ${checkSessionKey}`);
155244
return checkClientInfo.client;
156245
}
157246
}
247+
// Continue immediately to next session
248+
continue;
158249
}
250+
251+
// For other errors, add a small delay before continuing
252+
await new Promise((resolve) => setTimeout(resolve, 500));
253+
}
254+
255+
// Add a small delay between attempts to prevent race conditions
256+
if (attempt < sessionPriorityList.length - 1) {
257+
await new Promise((resolve) => setTimeout(resolve, 100));
159258
}
160259
}
161260

@@ -164,6 +263,11 @@ class TelegramSessionPool {
164263

165264
static async _createClient(sessionKey) {
166265
try {
266+
// Double-check this session isn't invalidated
267+
if (this.invalidatedSessions.has(sessionKey)) {
268+
throw new Error(`Session ${sessionKey} is permanently invalidated (AUTH_KEY_DUPLICATED)`);
269+
}
270+
167271
const sessionString = getEnvironmentVariable(sessionKey);
168272
if (!sessionString) {
169273
throw new Error(`Session string not found: ${sessionKey}`);
@@ -182,7 +286,7 @@ class TelegramSessionPool {
182286
const client = new TelegramClient(session, parseInt(apiId), apiHash, {
183287
connectionRetries: 3,
184288
floodSleepThreshold: 60,
185-
autoReconnect: true,
289+
autoReconnect: false, // Disable auto-reconnect to prevent parallel connections
186290
maxConcurrentDownloads: 1,
187291
});
188292

@@ -209,6 +313,33 @@ class TelegramSessionPool {
209313
}
210314
}
211315

316+
static getSessionStatus() {
317+
const status = {
318+
total: this.sessionKeys.length,
319+
valid: this.sessionKeys.filter(key => !this.invalidatedSessions.has(key)).length,
320+
invalidated: Array.from(this.invalidatedSessions),
321+
connected: 0,
322+
connecting: 0,
323+
failed: 0,
324+
};
325+
326+
for (const [sessionKey, clientInfo] of this.clients.entries()) {
327+
if (this.invalidatedSessions.has(sessionKey)) continue;
328+
329+
if (clientInfo) {
330+
if (clientInfo.client && clientInfo.client.connected) {
331+
status.connected++;
332+
} else if (clientInfo.isConnecting) {
333+
status.connecting++;
334+
} else if (clientInfo.failedAt) {
335+
status.failed++;
336+
}
337+
}
338+
}
339+
340+
return status;
341+
}
342+
212343
static async cleanup() {
213344
console.log('🧹 Cleaning up Telegram session pool...');
214345
for (const [sessionKey, clientInfo] of this.clients.entries()) {
@@ -835,6 +966,24 @@ class TelegramChannelFetcher extends Tool {
835966
} catch (error) {
836967
console.error('❌ Telegram fetch error:', error.message);
837968

969+
// Check session pool status for AUTH_KEY_DUPLICATED issues
970+
const sessionStatus = TelegramSessionPool.getSessionStatus();
971+
if (sessionStatus.invalidated.length > 0) {
972+
console.error('🚨 Session pool status:', sessionStatus);
973+
974+
if (sessionStatus.valid === 0) {
975+
throw new Error(
976+
`All Telegram sessions have been invalidated. Please regenerate your session strings. ` +
977+
`Invalidated sessions: ${sessionStatus.invalidated.join(', ')}`
978+
);
979+
} else {
980+
console.error(
981+
`⚠️ ${sessionStatus.invalidated.length} session(s) invalidated: ${sessionStatus.invalidated.join(', ')}. ` +
982+
`${sessionStatus.valid} session(s) still valid.`
983+
);
984+
}
985+
}
986+
838987
// Don't expose internal errors to users, provide helpful guidance instead
839988
if (error.message.includes('timeout')) {
840989
throw new Error(
@@ -846,6 +995,8 @@ class TelegramChannelFetcher extends Tool {
846995
);
847996
} else if (error.message.includes('Rate limit')) {
848997
throw error; // Pass through rate limit errors as-is
998+
} else if (error.message.includes('All Telegram sessions')) {
999+
throw error; // Pass through session invalidation errors
8491000
} else {
8501001
throw new Error(
8511002
`Failed to fetch messages from ${channel_name}. Please try again or contact support if the issue persists.`,

0 commit comments

Comments
 (0)