Skip to content

Commit c5d4db0

Browse files
committed
More improvments
- Decr. sleep to 5ms within submit_task() calls - Removed lock base SDKClient booking from pool
1 parent 2f112d3 commit c5d4db0

4 files changed

Lines changed: 157 additions & 107 deletions

File tree

src/main/java/RestServer/TaskRequest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ private void reset_sdk_client_pool() {
373373
TaskRequest.SDKClientPool.shutdown();
374374
TaskRequest.SDKClientPool = new SDKClientPool();
375375
}
376-
376+
377377
private void reset_mongo_sdk_client_pool() {
378378
if (this.mongoClients != null) {
379379
for (MongoSDKClient client : this.mongoClients) {
@@ -502,7 +502,7 @@ public ResponseEntity<Map<String, Object>> submit_task() {
502502
Map<String, Object> body = new HashMap<>();
503503
try {
504504
TaskRequest.taskManager.submit(TaskRequest.loader_tasks.get(this.taskName));
505-
TimeUnit.MILLISECONDS.sleep(200);
505+
TimeUnit.MILLISECONDS.sleep(5);
506506
body.put("status", true);
507507
} catch (Exception e) {
508508
body.put("status", false);
@@ -515,7 +515,7 @@ public ResponseEntity<Map<String, Object>> submit_task_mongo() {
515515
Map<String, Object> body = new HashMap<>();
516516
try {
517517
TaskRequest.taskManager.submit(TaskRequest.mongo_loader_tasks.get(this.taskName));
518-
TimeUnit.MILLISECONDS.sleep(200);
518+
TimeUnit.MILLISECONDS.sleep(5);
519519
body.put("status", true);
520520
} catch (Exception e) {
521521
body.put("status", false);
@@ -702,7 +702,7 @@ public ResponseEntity<Map<String, Object>> doc_load() {
702702
}
703703

704704
// Calculate effective number of workers needed
705-
int effectiveWorkers = Math.min(ws.workers,
705+
int effectiveWorkers = Math.min(ws.workers,
706706
(int)((totalDocsToProcess + docsPerWorker - 1) / docsPerWorker)); // ceil division
707707

708708
System.out.println("Smart worker counting: Total docs=" + totalDocsToProcess +

src/main/java/couchbase/loadgen/WorkLoadGenerate.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -388,15 +388,15 @@ else if(ops < dg.ws.ops/dg.ws.workers && flag) {
388388
ops = 0;
389389
Instant end = Instant.now();
390390
timeElapsed = Duration.between(start, end);
391-
// THROTTLE LOGIC REMOVED FOR TESTING PURPOSES
392-
// Python frameworks runs continuous ops, so no 1-second pacing needed
393-
// if(!this.dg.ws.gtm && timeElapsed.toMillis() < 1000)
394-
// try {
395-
// long i = (long) ((1000-timeElapsed.toMillis()));
396-
// TimeUnit.MILLISECONDS.sleep(i);
397-
// } catch (InterruptedException e) {
398-
// e.printStackTrace();
399-
// }
391+
// Throttle to maintain 1-second pacing
392+
if(!this.dg.ws.gtm && timeElapsed.toMillis() < 1000) {
393+
try {
394+
long i = (long) ((1000-timeElapsed.toMillis()));
395+
TimeUnit.MILLISECONDS.sleep(i);
396+
} catch (InterruptedException e) {
397+
e.printStackTrace();
398+
}
399+
}
400400
}
401401
logger.info(this.taskName + " is completed!");
402402
if (retryTimes > 0 && failedMutations.size() > 0) {
Lines changed: 123 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,117 +1,167 @@
11
package couchbase.sdk;
22

33
import java.util.ArrayList;
4-
import java.util.HashMap;
5-
import java.util.Map;
4+
import java.util.ConcurrentModificationException;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ConcurrentLinkedQueue;
7+
import java.util.concurrent.atomic.AtomicInteger;
68

79
import org.apache.log4j.LogManager;
810
import org.apache.log4j.Logger;
911

1012

1113
public class SDKClientPool {
1214
static Logger logger = LogManager.getLogger(SDKClientPool.class);
13-
public HashMap<String, HashMap> clients;
15+
16+
// Thread-safe client collection cache
17+
private ConcurrentHashMap<String, ClientInfo> clientCache = new ConcurrentHashMap<>();
18+
19+
// Thread-safe client pools by bucket
20+
private ConcurrentHashMap<String, ConcurrentLinkedQueue<SDKClient>> idleClients = new ConcurrentHashMap<>();
21+
private ConcurrentHashMap<String, ConcurrentLinkedQueue<SDKClient>> busyClients = new ConcurrentHashMap<>();
1422

1523
public SDKClientPool() {
1624
super();
17-
this.clients = new HashMap<String, HashMap>();
1825
}
1926

2027
public void shutdown() {
2128
logger.debug("Closing clients from SDKClientPool and shutting down shared Cluster instances");
22-
ArrayList<SDKClient> sdk_clients;
23-
for(Map.Entry<String, HashMap> m: this.clients.entrySet()){
24-
sdk_clients = (ArrayList)(m.getValue()).get("idle_clients");
25-
sdk_clients.addAll((ArrayList)m.getValue().get("busy_clients"));
26-
for(SDKClient sdk_client: sdk_clients)
27-
sdk_client.disconnectCluster();
29+
30+
// Process all buckets
31+
for (String bucketName : idleClients.keySet()) {
32+
ConcurrentLinkedQueue<SDKClient> idle = idleClients.get(bucketName);
33+
ConcurrentLinkedQueue<SDKClient> busy = busyClients.get(bucketName);
34+
35+
if (idle != null) {
36+
for (SDKClient client : idle) {
37+
client.disconnectCluster();
38+
}
39+
}
40+
if (busy != null) {
41+
for (SDKClient client : busy) {
42+
client.disconnectCluster();
43+
}
44+
}
2845
}
29-
// Reset the clients HM
30-
this.clients = new HashMap<String, HashMap>();
46+
47+
// Clear all data structures
48+
clientCache.clear();
49+
idleClients.clear();
50+
busyClients.clear();
3151

3252
// Shutdown shared Cluster manager
3353
SharedClusterManager.shutdownAll();
3454
}
3555

3656
public void force_close_clients_for_bucket(String bucket_name) {
37-
if (! this.clients.containsKey(bucket_name))
38-
return;
39-
40-
HashMap<String, Object> hm = this.clients.get(bucket_name);
41-
ArrayList<SDKClient> sdk_clients;
42-
sdk_clients = (ArrayList)(hm.get("idle_clients"));
43-
sdk_clients.addAll((ArrayList)hm.get("busy_clients"));
44-
for(SDKClient sdk_client: sdk_clients) {
45-
sdk_client.disconnectCluster();
57+
ConcurrentLinkedQueue<SDKClient> idle = idleClients.get(bucket_name);
58+
ConcurrentLinkedQueue<SDKClient> busy = busyClients.get(bucket_name);
59+
60+
if (idle != null) {
61+
for (SDKClient client : idle) {
62+
client.disconnectCluster();
63+
}
64+
idleClients.remove(bucket_name);
65+
}
66+
67+
if (busy != null) {
68+
for (SDKClient client : busy) {
69+
client.disconnectCluster();
70+
}
71+
busyClients.remove(bucket_name);
4672
}
47-
this.clients.remove(bucket_name);
4873
}
4974

5075
public void create_clients(String bucket_name, Server server, int req_clients) throws Exception {
51-
HashMap<String, Object> bucket_hm;
52-
if (this.clients.containsKey(bucket_name))
53-
bucket_hm = this.clients.get(bucket_name);
54-
else {
55-
bucket_hm = new HashMap<String, Object>();
56-
bucket_hm.put("lock", new Object());
57-
bucket_hm.put("idle_clients", new ArrayList<SDKClient>());
58-
bucket_hm.put("busy_clients", new ArrayList<SDKClient>());
59-
this.clients.put(bucket_name, bucket_hm);
60-
}
61-
62-
for(int i=0; i<req_clients; i++) {
63-
SDKClient tem_client = new SDKClient(server, bucket_name);
64-
tem_client.initialiseSDK();
65-
((ArrayList)bucket_hm.get("idle_clients")).add(tem_client);
76+
// Initialize thread-safe client pools for this bucket if not already present
77+
idleClients.computeIfAbsent(bucket_name, k -> new ConcurrentLinkedQueue<>());
78+
busyClients.computeIfAbsent(bucket_name, k -> new ConcurrentLinkedQueue<>());
79+
80+
ConcurrentLinkedQueue<SDKClient> idlePool = idleClients.get(bucket_name);
81+
82+
for (int i = 0; i < req_clients; i++) {
83+
SDKClient client = new SDKClient(server, bucket_name);
84+
client.initialiseSDK();
85+
idlePool.add(client);
6686
}
6787
}
6888

6989
public SDKClient get_client_for_bucket(String bucket_name, String scope, String collection) {
70-
if (! this.clients.containsKey(bucket_name))
71-
return null;
72-
73-
SDKClient client = null;
7490
String col_name = scope + collection;
75-
HashMap<String, Object> col_hm;
76-
HashMap<String, Object> hm = this.clients.get(bucket_name);
77-
while (client == null) {
78-
synchronized(hm.get("lock")) {
79-
if (hm.containsKey(col_name)) {
80-
col_hm = (HashMap)hm.get(col_name);
81-
// Increment tasks' reference counter using this client object
82-
client = (SDKClient)col_hm.get("client");
83-
col_hm.replace("counter", (int)col_hm.get("counter")+1);
84-
}
85-
else if (! ((ArrayList)hm.get("idle_clients")).isEmpty()) {
86-
ArrayList idle_clients = (ArrayList)hm.get("idle_clients");
87-
client = (SDKClient)idle_clients.remove(idle_clients.size()-1);
88-
client.selectCollection(scope, collection);
89-
((ArrayList)hm.get("busy_clients")).add(client);
90-
// Create scope/collection reference using the client object
91-
col_hm = new HashMap<String, Object>();
92-
hm.put(col_name, col_hm);
93-
col_hm.put("client", client);
94-
col_hm.put("counter", 1);
95-
}
96-
}
91+
92+
// Check if client is already cached for this collection
93+
ClientInfo existing = clientCache.get(col_name);
94+
if (existing != null) {
95+
existing.counter.incrementAndGet();
96+
return existing.client;
9797
}
98+
99+
// Get idle client pool for this bucket
100+
ConcurrentLinkedQueue<SDKClient> idlePool = idleClients.get(bucket_name);
101+
if (idlePool == null || idlePool.isEmpty()) {
102+
return null;
103+
}
104+
105+
// Get client from idle pool atomically
106+
SDKClient client = idlePool.poll();
107+
if (client == null) {
108+
return null;
109+
}
110+
111+
// Configure client for this collection
112+
client.selectCollection(scope, collection);
113+
114+
// Add to busy pool atomically
115+
busyClients.computeIfAbsent(bucket_name, k -> new ConcurrentLinkedQueue<>()).add(client);
116+
117+
// Cache client reference with thread-safe counter
118+
clientCache.put(col_name, new ClientInfo(client, new AtomicInteger(1)));
119+
98120
return client;
99121
}
100122

101123
public void release_client(SDKClient client) {
102-
if (! this.clients.containsKey(client.bucket))
124+
if (client == null || client.bucket == null) {
103125
return;
104-
105-
HashMap<String, Object> hm = this.clients.get(client.bucket);
126+
}
127+
128+
String bucket_key = client.bucket;
106129
String col_name = client.scope + client.collection;
107-
synchronized(hm.get("lock")) {
108-
if ((int)((HashMap)hm.get(col_name)).get("counter") == 1) {
109-
hm.remove(col_name);
110-
((ArrayList)hm.get("busy_clients")).remove(client);
111-
((ArrayList)hm.get("idle_clients")).add(client);
130+
131+
// Get cached client info
132+
ClientInfo info = clientCache.get(col_name);
133+
if (info == null) {
134+
return;
135+
}
136+
137+
// Decrement counter atomically
138+
int newCount = info.counter.decrementAndGet();
139+
140+
if (newCount == 0) {
141+
// Remove from cache atomically
142+
clientCache.remove(col_name);
143+
144+
// Remove from busy pool and add to idle pool atomically
145+
ConcurrentLinkedQueue<SDKClient> busyPool = busyClients.get(bucket_key);
146+
ConcurrentLinkedQueue<SDKClient> idlePool = idleClients.get(bucket_key);
147+
148+
if (busyPool != null) {
149+
busyPool.remove(client);
150+
}
151+
if (idlePool != null) {
152+
idlePool.add(client);
112153
}
113-
else
114-
((HashMap)hm.get(col_name)).replace("counter", (int)((HashMap)hm.get(col_name)).get("counter") - 1);
154+
}
155+
}
156+
157+
// Helper class for cached client info with thread-safe counter
158+
private static class ClientInfo {
159+
SDKClient client;
160+
AtomicInteger counter;
161+
162+
ClientInfo(SDKClient client, AtomicInteger counter) {
163+
this.client = client;
164+
this.counter = counter;
115165
}
116166
}
117167
}

0 commit comments

Comments
 (0)