Skip to content

Commit b34668e

Browse files
feat: added Async Operations: Concurrent data fetching with semaphore control (#294)
1 parent 19b4ced commit b34668e

3 files changed

Lines changed: 427 additions & 0 deletions

File tree

async/Concurrent_Data_Fetching.ts

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/**
2+
* Professional Example: Concurrent Data Fetching in TypeScript
3+
*
4+
* This module demonstrates best practices for fetching multiple independent
5+
* resources concurrently using `Promise.all` and `Promise.allSettled`.
6+
* Concurrent fetching significantly improves performance compared to
7+
* sequential fetching when requests do not depend on each other.
8+
*/
9+
10+
import { Semaphore } from './Semaphore'
11+
12+
// --- Type Definitions ---
13+
14+
export interface User {
15+
id: number
16+
name: string
17+
email: string
18+
}
19+
20+
export interface Post {
21+
id: number
22+
userId: number
23+
title: string
24+
body: string
25+
}
26+
27+
export interface Comment {
28+
id: number
29+
postId: number
30+
name: string
31+
body: string
32+
}
33+
34+
// --- Mock API Service ---
35+
// In a real application, these would be actual fetch calls (e.g., using native fetch or axios).
36+
37+
const api = {
38+
async fetchUsers(): Promise<User[]> {
39+
// Simulating network latency
40+
await new Promise((resolve) => setTimeout(resolve, 500))
41+
return [
42+
{ id: 1, name: 'Alice Smith', email: 'alice@example.com' },
43+
{ id: 2, name: 'Bob Jones', email: 'bob@example.com' }
44+
]
45+
},
46+
47+
async fetchPosts(): Promise<Post[]> {
48+
await new Promise((resolve) => setTimeout(resolve, 800))
49+
return [
50+
{
51+
id: 101,
52+
userId: 1,
53+
title: 'TypeScript Tips',
54+
body: 'Use strict mode.'
55+
},
56+
{
57+
id: 102,
58+
userId: 2,
59+
title: 'Async/Await',
60+
body: 'Makes promises easier to read.'
61+
}
62+
]
63+
},
64+
65+
async fetchComments(): Promise<Comment[]> {
66+
await new Promise((resolve) => setTimeout(resolve, 300))
67+
// Simulating a potential network error for demonstration purposes
68+
if (Math.random() < 0.1) {
69+
throw new Error('Network Error')
70+
}
71+
return [{ id: 1001, postId: 101, name: 'Charlie', body: 'Great tips!' }]
72+
}
73+
}
74+
75+
// --- Concurrent Fetching Implementations ---
76+
77+
/**
78+
* Example 1: Using Promise.all
79+
*
80+
* Best when you need ALL requests to succeed. If any single promise rejects,
81+
* the entire Promise.all rejects immediately (fail-fast behavior).
82+
*/
83+
export async function fetchAllDataStrict(): Promise<{
84+
users: User[]
85+
posts: Post[]
86+
comments: Comment[]
87+
}> {
88+
try {
89+
console.log('Starting concurrent fetch (Strict)...')
90+
const startTime = Date.now()
91+
92+
// The requests are initiated concurrently.
93+
// We await the resolution of all promises together.
94+
const [users, posts, comments] = await Promise.all([
95+
api.fetchUsers(),
96+
api.fetchPosts(),
97+
api.fetchComments()
98+
])
99+
100+
const duration = Date.now() - startTime
101+
console.log(`Successfully fetched all data in ${duration}ms`)
102+
103+
return { users, posts, comments }
104+
} catch (error) {
105+
// If ANY of the fetches fail, we catch the error here.
106+
console.error('Critical failure during concurrent data fetch:', error)
107+
throw new Error('Failed to load application data. Please try again later.')
108+
}
109+
}
110+
111+
/**
112+
* Example 2: Using Promise.allSettled
113+
*
114+
* Best when requests are independent and you want to handle successes and
115+
* failures individually without failing the entire operation if one request fails.
116+
*/
117+
export async function fetchDataResiliently(): Promise<void> {
118+
console.log('\nStarting concurrent fetch (Resilient)...')
119+
const startTime = Date.now()
120+
121+
const results = await Promise.allSettled([
122+
api.fetchUsers(),
123+
api.fetchPosts(),
124+
api.fetchComments()
125+
])
126+
127+
const duration = Date.now() - startTime
128+
console.log(`Finished resilient fetch in ${duration}ms`)
129+
130+
// Process results safely using type narrowing
131+
const [usersResult, postsResult, commentsResult] = results
132+
133+
if (usersResult.status === 'fulfilled') {
134+
console.log(`✅ Loaded ${usersResult.value.length} users.`)
135+
} else {
136+
console.error(`❌ Failed to load users:`, usersResult.reason)
137+
}
138+
139+
if (postsResult.status === 'fulfilled') {
140+
console.log(`✅ Loaded ${postsResult.value.length} posts.`)
141+
} else {
142+
console.error(`❌ Failed to load posts:`, postsResult.reason)
143+
}
144+
145+
if (commentsResult.status === 'fulfilled') {
146+
console.log(`✅ Loaded ${commentsResult.value.length} comments.`)
147+
} else {
148+
console.warn(
149+
`⚠️ Comments could not be loaded, continuing without them:`,
150+
commentsResult.reason
151+
)
152+
}
153+
}
154+
155+
/**
156+
* Example 3: Rate-Limited Concurrent Fetching (Using Semaphore)
157+
*
158+
* Executes multiple async tasks with a limit on concurrency.
159+
* This is CRITICAL for bulk fetching to avoid rate-limiting, network congestion,
160+
* or overwhelming the server/database.
161+
*
162+
* @param tasks Array of functions that return Promises.
163+
* @param limit The maximum number of concurrent executions.
164+
*/
165+
export async function concurrentFetch<T>(
166+
tasks: (() => Promise<T>)[],
167+
limit: number
168+
): Promise<T[]> {
169+
const semaphore = new Semaphore(limit)
170+
return Promise.all(tasks.map((task) => semaphore.run(task)))
171+
}
172+
173+
/**
174+
* Demonstrates bulk fetching with concurrency limits.
175+
*/
176+
export async function fetchBulkDataWithLimits(): Promise<void> {
177+
console.log('\nStarting rate-limited bulk fetch (Max 3 concurrent)...')
178+
const startTime = Date.now()
179+
180+
// Create 10 dummy tasks that take 200ms each
181+
const tasks = Array.from({ length: 10 }, (_, i) => async () => {
182+
await new Promise((resolve) => setTimeout(resolve, 200))
183+
console.log(`Task ${i + 1} completed.`)
184+
return i + 1
185+
})
186+
187+
// Limit to 3 concurrent requests at any given time
188+
await concurrentFetch(tasks, 3)
189+
190+
const duration = Date.now() - startTime
191+
console.log(`Finished rate-limited fetch in ${duration}ms.`)
192+
}
193+
194+
// --- Execution ---
195+
// If you want to test this file, you can uncomment the lines below:
196+
// async function runExamples() {
197+
// try {
198+
// await fetchAllDataStrict();
199+
// await fetchDataResiliently();
200+
// await fetchBulkDataWithLimits();
201+
// } catch (err) {
202+
// console.error("Application encountered a top-level error.", err);
203+
// }
204+
// }
205+
// runExamples();

async/Semaphore.ts

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/**
2+
* @function Semaphore
3+
* @description A Semaphore is a synchronization primitive that limits the number
4+
* of concurrent asynchronous operations. It maintains a set of permits.
5+
* Each acquire() blocks if necessary until a permit is available, and then takes it.
6+
* Each release() adds a permit, potentially releasing a blocking acquirer.
7+
*
8+
* @see https://en.wikipedia.org/wiki/Semaphore_(programming)
9+
*/
10+
export class Semaphore {
11+
private queue: Array<{
12+
resolve: () => void
13+
reject: (reason?: any) => void
14+
timeoutId?: NodeJS.Timeout
15+
}> = []
16+
private activeCount: number = 0
17+
18+
/**
19+
* @param maxConcurrency The maximum number of concurrent operations allowed.
20+
*/
21+
constructor(private readonly maxConcurrency: number) {
22+
if (maxConcurrency <= 0) {
23+
throw new Error('Max concurrency must be at least 1.')
24+
}
25+
}
26+
27+
/**
28+
* Acquires a permit from the semaphore.
29+
* If no permits are available, it returns a promise that resolves
30+
* when a permit is released by another task.
31+
*
32+
* @param timeoutMs Optional. The maximum amount of time (in ms) to wait in the queue.
33+
* @returns {Promise<void>} A promise that resolves when a permit is acquired.
34+
*/
35+
public async acquire(timeoutMs?: number): Promise<void> {
36+
if (this.activeCount < this.maxConcurrency) {
37+
this.activeCount++
38+
return Promise.resolve()
39+
}
40+
41+
return new Promise<void>((resolve, reject) => {
42+
const queueItem: {
43+
resolve: () => void
44+
reject: (reason?: any) => void
45+
timeoutId?: NodeJS.Timeout
46+
} = { resolve, reject }
47+
48+
if (timeoutMs !== undefined) {
49+
queueItem.timeoutId = setTimeout(() => {
50+
// Remove from queue
51+
const index = this.queue.indexOf(queueItem)
52+
if (index !== -1) {
53+
this.queue.splice(index, 1)
54+
}
55+
reject(
56+
new Error(
57+
`Timeout of ${timeoutMs}ms exceeded while waiting for Semaphore permit.`
58+
)
59+
)
60+
}, timeoutMs)
61+
}
62+
63+
this.queue.push(queueItem)
64+
})
65+
}
66+
67+
/**
68+
* Releases a permit back to the semaphore.
69+
* If there are tasks waiting in the queue, the first one is notified
70+
* and allowed to proceed.
71+
*/
72+
public release(): void {
73+
const nextTask = this.queue.shift()
74+
if (nextTask) {
75+
// Clear the timeout if the task had one
76+
if (nextTask.timeoutId) {
77+
clearTimeout(nextTask.timeoutId)
78+
}
79+
// Pass the permit directly to the next waiting task
80+
nextTask.resolve()
81+
} else {
82+
// No one is waiting, so just decrement the active count
83+
this.activeCount--
84+
}
85+
}
86+
87+
/**
88+
* A helper method that wraps an asynchronous task.
89+
* It handles the acquisition and release of the permit automatically,
90+
* even if the task fails.
91+
*
92+
* @param task A function that returns a Promise.
93+
* @param queueTimeoutMs Optional. Throw an error if the task waits in the queue longer than this.
94+
* @returns {Promise<T>} The result of the task.
95+
*/
96+
public async run<T>(
97+
task: () => Promise<T>,
98+
queueTimeoutMs?: number
99+
): Promise<T> {
100+
await this.acquire(queueTimeoutMs)
101+
try {
102+
return await task()
103+
} finally {
104+
this.release()
105+
}
106+
}
107+
108+
/**
109+
* Returns the current number of active permits.
110+
*/
111+
public getActiveCount(): number {
112+
return this.activeCount
113+
}
114+
115+
/**
116+
* Returns the number of tasks currently waiting for a permit.
117+
*/
118+
public getQueueLength(): number {
119+
return this.queue.length
120+
}
121+
}

0 commit comments

Comments
 (0)