Skip to content

Commit 52e1408

Browse files
feat(agentflow): implement deduplication for API client (#6004)
* feat(agentflow): implement deduplication for API client - Updated bindApiClient to return a deduplicated client, enhancing request efficiency. - Introduced deduplicatedClient with caching and deduplication logic for GET and POST requests. - Added tests for deduplicatedClient to ensure correct behavior for various request scenarios. - Added request interceptor example usage in BasicExample to manage cookie handling based on token presence. * fix(agentflow): address PR review feedback for deduplicatedClient Remove inFlight.clear() from clearCache() to prevent race condition where concurrent in-flight requests could be duplicated. Simplify nullish check to idiomatic != null. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 07e7604 commit 52e1408

4 files changed

Lines changed: 380 additions & 3 deletions

File tree

packages/agentflow/examples/src/demos/BasicExample.tsx

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { useCallback, useRef, useState } from 'react'
99

1010
import type { AgentFlowInstance, FlowData, ValidationResult } from '@flowiseai/agentflow'
1111
import { Agentflow } from '@flowiseai/agentflow'
12+
import { InternalAxiosRequestConfig } from 'axios'
1213

1314
import { apiBaseUrl, token } from '../config'
1415
import { FlowStatePanel } from '../FlowStatePanel'
@@ -137,6 +138,13 @@ export function BasicExample() {
137138
onFlowChange={handleFlowChange}
138139
onSave={handleSave}
139140
showDefaultHeader={true}
141+
requestInterceptor={(config: InternalAxiosRequestConfig) => {
142+
// pass cookies if no token is provided
143+
if (!token) {
144+
config.withCredentials = true
145+
}
146+
return config
147+
}}
140148
/>
141149
</div>
142150
<FlowStatePanel currentFlow={currentFlow} savedFlow={savedFlow} changeCount={changeCount} />
@@ -151,5 +159,6 @@ export const BasicExampleProps = {
151159
initialFlow: 'FlowData',
152160
onFlowChange: '(flow: FlowData) => void',
153161
onSave: '(flow: FlowData) => void',
154-
showDefaultHeader: true
162+
showDefaultHeader: true,
163+
requestInterceptor: '(config: InternalAxiosRequestConfig) => InternalAxiosRequestConfig'
155164
}

packages/agentflow/src/infrastructure/api/client.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ import axios, { AxiosInstance } from 'axios'
22

33
import type { RequestInterceptor } from '@/core/types'
44

5+
import { type DeduplicatedClient, withDeduplication } from './deduplicatedClient'
6+
57
/**
68
* Creates a configured axios client for API calls
79
* @param apiBaseUrl - Base URL of the Flowise server
810
* @param token - Authentication token (optional)
911
* @param requestInterceptor - Optional callback to customize outgoing requests
1012
*/
11-
export function bindApiClient(apiBaseUrl: string, token?: string, requestInterceptor?: RequestInterceptor): AxiosInstance {
13+
export function bindApiClient(apiBaseUrl: string, token?: string, requestInterceptor?: RequestInterceptor): DeduplicatedClient {
1214
const headers: Record<string, string> = {
1315
'Content-Type': 'application/json'
1416
}
@@ -52,7 +54,7 @@ export function bindApiClient(apiBaseUrl: string, token?: string, requestInterce
5254
}
5355
)
5456

55-
return client
57+
return withDeduplication(client)
5658
}
5759

5860
export type { AxiosInstance }
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
import type { AxiosInstance, AxiosResponse } from 'axios'
2+
3+
import { type DeduplicatedClient, withDeduplication } from './deduplicatedClient'
4+
5+
/** Create a deferred promise that can be resolved/rejected externally */
6+
function deferred<T>() {
7+
let resolve!: (value: T) => void
8+
let reject!: (reason?: unknown) => void
9+
const promise = new Promise<T>((res, rej) => {
10+
resolve = res
11+
reject = rej
12+
})
13+
return { promise, resolve, reject }
14+
}
15+
16+
/** Build a minimal mock axios instance */
17+
function createMockClient(): AxiosInstance {
18+
return {
19+
get: jest.fn(),
20+
post: jest.fn(),
21+
put: jest.fn(),
22+
delete: jest.fn()
23+
} as unknown as AxiosInstance
24+
}
25+
26+
function fakeResponse(data: unknown): AxiosResponse {
27+
return { data, status: 200, statusText: 'OK', headers: {}, config: {} } as AxiosResponse
28+
}
29+
30+
describe('withDeduplication', () => {
31+
let mockClient: AxiosInstance
32+
let client: DeduplicatedClient
33+
34+
beforeEach(() => {
35+
jest.useFakeTimers()
36+
mockClient = createMockClient()
37+
client = withDeduplication(mockClient)
38+
})
39+
40+
afterEach(() => {
41+
jest.useRealTimers()
42+
})
43+
44+
describe('GET requests', () => {
45+
it('should deduplicate concurrent identical GET requests', async () => {
46+
const d = deferred<AxiosResponse>()
47+
;(mockClient.get as jest.Mock).mockReturnValue(d.promise)
48+
49+
const p1 = client.get('/nodes')
50+
const p2 = client.get('/nodes')
51+
52+
expect(mockClient.get).toHaveBeenCalledTimes(1)
53+
54+
d.resolve(fakeResponse([{ name: 'node1' }]))
55+
const [r1, r2] = await Promise.all([p1, p2])
56+
57+
expect(r1).toBe(r2)
58+
expect(r1.data).toEqual([{ name: 'node1' }])
59+
})
60+
61+
it('should return cached response for sequential GET requests within TTL', async () => {
62+
;(mockClient.get as jest.Mock).mockResolvedValue(fakeResponse('first'))
63+
64+
const r1 = await client.get('/nodes')
65+
66+
;(mockClient.get as jest.Mock).mockResolvedValue(fakeResponse('second'))
67+
const r2 = await client.get('/nodes')
68+
69+
expect(mockClient.get).toHaveBeenCalledTimes(1)
70+
expect(r1.data).toBe('first')
71+
expect(r2.data).toBe('first')
72+
})
73+
74+
it('should re-fetch after TTL expires', async () => {
75+
const ttl = 5000
76+
mockClient = createMockClient()
77+
client = withDeduplication(mockClient, ttl)
78+
;(mockClient.get as jest.Mock).mockResolvedValue(fakeResponse('first'))
79+
await client.get('/nodes')
80+
81+
// Advance past TTL
82+
jest.advanceTimersByTime(ttl + 1)
83+
;(mockClient.get as jest.Mock).mockResolvedValue(fakeResponse('second'))
84+
const r2 = await client.get('/nodes')
85+
86+
expect(mockClient.get).toHaveBeenCalledTimes(2)
87+
expect(r2.data).toBe('second')
88+
})
89+
90+
it('should not deduplicate GET requests with different URLs', async () => {
91+
const d1 = deferred<AxiosResponse>()
92+
const d2 = deferred<AxiosResponse>()
93+
;(mockClient.get as jest.Mock).mockReturnValueOnce(d1.promise).mockReturnValueOnce(d2.promise)
94+
95+
client.get('/nodes')
96+
client.get('/chatflows')
97+
98+
expect(mockClient.get).toHaveBeenCalledTimes(2)
99+
100+
d1.resolve(fakeResponse('nodes'))
101+
d2.resolve(fakeResponse('chatflows'))
102+
})
103+
104+
it('should not deduplicate GET requests with different params', async () => {
105+
const d1 = deferred<AxiosResponse>()
106+
const d2 = deferred<AxiosResponse>()
107+
;(mockClient.get as jest.Mock).mockReturnValueOnce(d1.promise).mockReturnValueOnce(d2.promise)
108+
109+
client.get('/credentials', { params: { credentialName: 'openAIApi' } })
110+
client.get('/credentials', { params: { credentialName: 'googleAI' } })
111+
112+
expect(mockClient.get).toHaveBeenCalledTimes(2)
113+
114+
d1.resolve(fakeResponse([]))
115+
d2.resolve(fakeResponse([]))
116+
})
117+
118+
it('should deduplicate GET requests with identical params', async () => {
119+
const d = deferred<AxiosResponse>()
120+
;(mockClient.get as jest.Mock).mockReturnValue(d.promise)
121+
122+
const p1 = client.get('/credentials', { params: { credentialName: 'openAIApi' } })
123+
const p2 = client.get('/credentials', { params: { credentialName: 'openAIApi' } })
124+
125+
expect(mockClient.get).toHaveBeenCalledTimes(1)
126+
127+
d.resolve(fakeResponse([{ id: '1' }]))
128+
const [r1, r2] = await Promise.all([p1, p2])
129+
expect(r1).toBe(r2)
130+
})
131+
})
132+
133+
describe('POST /node-load-method/* requests', () => {
134+
it('should deduplicate concurrent identical load-method POST requests', async () => {
135+
const d = deferred<AxiosResponse>()
136+
;(mockClient.post as jest.Mock).mockReturnValue(d.promise)
137+
138+
const body = { loadMethod: 'listModels' }
139+
const p1 = client.post('/node-load-method/agentAgentflow', body)
140+
const p2 = client.post('/node-load-method/agentAgentflow', body)
141+
142+
expect(mockClient.post).toHaveBeenCalledTimes(1)
143+
144+
d.resolve(fakeResponse([{ name: 'gpt-4' }]))
145+
const [r1, r2] = await Promise.all([p1, p2])
146+
expect(r1).toBe(r2)
147+
})
148+
149+
it('should return cached response for sequential load-method POST within TTL', async () => {
150+
;(mockClient.post as jest.Mock).mockResolvedValue(fakeResponse([{ name: 'gpt-4' }]))
151+
152+
const body = { loadMethod: 'listModels' }
153+
await client.post('/node-load-method/agentAgentflow', body)
154+
;(mockClient.post as jest.Mock).mockResolvedValue(fakeResponse([{ name: 'gpt-5' }]))
155+
const r2 = await client.post('/node-load-method/agentAgentflow', body)
156+
157+
expect(mockClient.post).toHaveBeenCalledTimes(1)
158+
expect(r2.data).toEqual([{ name: 'gpt-4' }])
159+
})
160+
161+
it('should not deduplicate load-method POST requests with different bodies', async () => {
162+
const d1 = deferred<AxiosResponse>()
163+
const d2 = deferred<AxiosResponse>()
164+
;(mockClient.post as jest.Mock).mockReturnValueOnce(d1.promise).mockReturnValueOnce(d2.promise)
165+
166+
client.post('/node-load-method/agentAgentflow', { loadMethod: 'listModels' })
167+
client.post('/node-load-method/agentAgentflow', { loadMethod: 'listTools' })
168+
169+
expect(mockClient.post).toHaveBeenCalledTimes(2)
170+
171+
d1.resolve(fakeResponse([]))
172+
d2.resolve(fakeResponse([]))
173+
})
174+
})
175+
176+
describe('non-cacheable requests', () => {
177+
it('should never deduplicate POST to non-load-method endpoints', async () => {
178+
const d1 = deferred<AxiosResponse>()
179+
const d2 = deferred<AxiosResponse>()
180+
;(mockClient.post as jest.Mock).mockReturnValueOnce(d1.promise).mockReturnValueOnce(d2.promise)
181+
182+
const body = { name: 'test', flowData: '{}' }
183+
client.post('/chatflows', body)
184+
client.post('/chatflows', body)
185+
186+
expect(mockClient.post).toHaveBeenCalledTimes(2)
187+
188+
d1.resolve(fakeResponse({ id: '1' }))
189+
d2.resolve(fakeResponse({ id: '2' }))
190+
})
191+
192+
it('should pass through PUT and DELETE to the original client', () => {
193+
;(mockClient.put as jest.Mock).mockResolvedValue(fakeResponse('updated'))
194+
;(mockClient.delete as jest.Mock).mockResolvedValue(fakeResponse('deleted'))
195+
196+
client.put('/chatflows/123', { name: 'updated' })
197+
client.delete('/chatflows/123')
198+
199+
expect(mockClient.put).toHaveBeenCalledTimes(1)
200+
expect(mockClient.delete).toHaveBeenCalledTimes(1)
201+
})
202+
})
203+
204+
describe('clearCache', () => {
205+
it('should force re-fetch after clearCache is called', async () => {
206+
;(mockClient.get as jest.Mock).mockResolvedValue(fakeResponse('first'))
207+
await client.get('/nodes')
208+
209+
client.clearCache()
210+
;(mockClient.get as jest.Mock).mockResolvedValue(fakeResponse('second'))
211+
const r2 = await client.get('/nodes')
212+
213+
expect(mockClient.get).toHaveBeenCalledTimes(2)
214+
expect(r2.data).toBe('second')
215+
})
216+
})
217+
218+
describe('error handling', () => {
219+
it('should not cache error responses', async () => {
220+
;(mockClient.get as jest.Mock).mockRejectedValueOnce(new Error('Network error'))
221+
222+
await expect(client.get('/nodes')).rejects.toThrow('Network error')
223+
;(mockClient.get as jest.Mock).mockResolvedValueOnce(fakeResponse('recovered'))
224+
const r2 = await client.get('/nodes')
225+
expect(r2.data).toBe('recovered')
226+
expect(mockClient.get).toHaveBeenCalledTimes(2)
227+
})
228+
229+
it('should propagate errors to all concurrent callers', async () => {
230+
const d = deferred<AxiosResponse>()
231+
;(mockClient.get as jest.Mock).mockReturnValue(d.promise)
232+
233+
const p1 = client.get('/nodes')
234+
const p2 = client.get('/nodes')
235+
236+
d.reject(new Error('Server error'))
237+
238+
await expect(p1).rejects.toThrow('Server error')
239+
await expect(p2).rejects.toThrow('Server error')
240+
})
241+
})
242+
})

0 commit comments

Comments
 (0)