Skip to content

Commit 2f112d3

Browse files
committed
Test new enhancements
1 parent bfb8d9c commit 2f112d3

8 files changed

Lines changed: 396 additions & 28 deletions

File tree

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package RestServer;
2+
3+
import java.util.ArrayList;
4+
import java.util.HashMap;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.TimeUnit;
11+
12+
import org.apache.log4j.LogManager;
13+
import org.apache.log4j.Logger;
14+
15+
import org.springframework.http.HttpStatus;
16+
import org.springframework.http.ResponseEntity;
17+
18+
/**
19+
* CollectionLoadBatcher implements Java-side batching for massive collection loads.
20+
* When Python calls doc_load() sequentially for many collections, this batches them
21+
* to prevent worker starvation and queue overhead.
22+
*/
23+
public class CollectionLoadBatcher {
24+
static Logger logger = LogManager.getLogger(CollectionLoadBatcher.class);
25+
26+
private static final int BATCH_SIZE = 50; // Process 50 collections concurrently
27+
private static ExecutorService batchExecutor;
28+
private static Map<String, BatchState> batchStates = new ConcurrentHashMap<>();
29+
private static Object batchLock = new Object();
30+
31+
static {
32+
batchExecutor = Executors.newFixedThreadPool(5); // 5 concurrent batch processors
33+
logger.info("CollectionLoadBatcher initialized with batch size: " + BATCH_SIZE);
34+
}
35+
36+
public static class BatchState {
37+
String batchId;
38+
List<String> tasknames = new ArrayList<>();
39+
int totalCollections;
40+
int completedCollections;
41+
long startTime;
42+
43+
public BatchState(String batchId, int totalCollections) {
44+
this.batchId = batchId;
45+
this.totalCollections = totalCollections;
46+
this.completedCollections = 0;
47+
this.startTime = System.currentTimeMillis();
48+
}
49+
50+
public synchronized void addTask(String taskname) {
51+
tasknames.add(taskname);
52+
completedCollections++;
53+
}
54+
55+
public synchronized boolean isComplete() {
56+
return completedCollections >= totalCollections;
57+
}
58+
59+
public synchronized double getProgress() {
60+
return (double)completedCollections / totalCollections;
61+
}
62+
}
63+
64+
/**
65+
* Submit a collection load request to the batch processor
66+
*/
67+
public static ResponseEntity<Map<String, Object>> submitToBatch(Map<String, Object> requestBody) {
68+
try {
69+
TaskRequest taskRequest = TaskRequest.fromJson(requestBody.toString());
70+
71+
// Get current batch or create new one
72+
String batchId = getCurrentBatchId();
73+
BatchState batchState = batchStates.computeIfAbsent(batchId, k ->
74+
new BatchState(batchId, BATCH_SIZE));
75+
76+
// Process the doc_load normally
77+
ResponseEntity<Map<String, Object>> result = taskRequest.doc_load();
78+
79+
// Add to batch
80+
batchState.addTask(result.getBody().get("tasks").toString());
81+
82+
// Check if batch is complete and start next batch
83+
if (batchState.isComplete()) {
84+
logger.info("Batch " + batchId + " complete (" + batchState.totalCollections + " collections)");
85+
batchStates.remove(batchId);
86+
87+
// Start processing next batch if there are pending loads
88+
startNextBatch();
89+
}
90+
91+
return result;
92+
93+
} catch (Exception e) {
94+
Map<String, Object> body = new HashMap<>();
95+
body.put("error", "Batch processing failed: " + e.getMessage());
96+
body.put("status", false);
97+
return new ResponseEntity<>(body, HttpStatus.INTERNAL_SERVER_ERROR);
98+
}
99+
}
100+
101+
private static synchronized String getCurrentBatchId() {
102+
// Find current batch with capacity
103+
for (Map.Entry<String, BatchState> entry : batchStates.entrySet()) {
104+
if (!entry.getValue().isComplete()) {
105+
return entry.getKey();
106+
}
107+
}
108+
109+
// Create new batch ID
110+
return "batch_" + System.currentTimeMillis();
111+
}
112+
113+
private static void startNextBatch() {
114+
// Could implement proactive batch starting if needed
115+
logger.debug("Ready for next batch of collection loads");
116+
}
117+
118+
public static void shutdown() {
119+
if (batchExecutor != null) {
120+
batchExecutor.shutdownNow();
121+
logger.info("CollectionLoadBatcher shutdown complete");
122+
}
123+
}
124+
125+
public static Map<String, Object> getStats() {
126+
Map<String, Object> stats = new HashMap<>();
127+
stats.put("active_batches", batchStates.size());
128+
stats.put("total_capacity", BATCH_SIZE);
129+
130+
List<String> batchProgress = new ArrayList<>();
131+
for (Map.Entry<String, BatchState> entry : batchStates.entrySet()) {
132+
BatchState state = entry.getValue();
133+
batchProgress.add(String.format("%s: %.1f%% (%d/%d)",
134+
entry.getKey(),
135+
state.getProgress() * 100,
136+
state.completedCollections,
137+
state.totalCollections));
138+
}
139+
stats.put("batch_progress", batchProgress);
140+
141+
return stats;
142+
}
143+
}

src/main/java/RestServer/TaskRequest.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,40 @@ public ResponseEntity<Map<String, Object>> doc_load() {
678678
ArrayList<String> task_names = new ArrayList<String>();
679679
String task_name = "Task_" + TaskRequest.task_id.incrementAndGet();
680680
int retry = 0;
681-
for (int i = 0; i < ws.workers; i++) {
681+
682+
// Calculate total number of documents to process across all operation types
683+
long totalDocsToProcess = (this.createEndIndex - this.createStartIndex) +
684+
(this.updateEndIndex - this.updateStartIndex) +
685+
(this.readEndIndex - this.readStartIndex) +
686+
(this.deleteEndIndex - this.deleteStartIndex) +
687+
(this.expiryEndIndex - this.expiryStartIndex);
688+
689+
// Calculate how many documents each worker would process
690+
// Based on the batch calculation in WorkLoadGenerate: ops = batchSize * operation_percent/100
691+
int docsPerWorker = ws.batchSize * (
692+
(this.createPercent > 0 ? this.createPercent : 0) +
693+
(this.updatePercent > 0 ? this.updatePercent : 0) +
694+
(this.readPercent > 0 ? this.readPercent : 0) +
695+
(this.deletePercent > 0 ? this.deletePercent : 0) +
696+
(this.expiryPercent > 0 ? this.expiryPercent : 0)
697+
) / 100;
698+
699+
// Handle edge case where docsPerWorker might be 0
700+
if (docsPerWorker == 0) {
701+
docsPerWorker = 1; // Minimum of 1 doc per worker to avoid division by zero
702+
}
703+
704+
// Calculate effective number of workers needed
705+
int effectiveWorkers = Math.min(ws.workers,
706+
(int)((totalDocsToProcess + docsPerWorker - 1) / docsPerWorker)); // ceil division
707+
708+
System.out.println("Smart worker counting: Total docs=" + totalDocsToProcess +
709+
", Docs per worker=" + docsPerWorker +
710+
", Requested workers=" + ws.workers +
711+
", Effective workers=" + effectiveWorkers);
712+
713+
// Only spawn effective workers
714+
for (int i = 0; i < effectiveWorkers; i++) {
682715
String th_name = task_name + "_" + i;
683716
WorkLoadGenerate wlg = new WorkLoadGenerate(th_name, dg, TaskRequest.SDKClientPool, esClient,
684717
this.durabilityLevel,

src/main/java/couchbase/loadgen/WorkLoadGenerate.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -388,13 +388,15 @@ else if(ops < dg.ws.ops/dg.ws.workers && flag) {
388388
ops = 0;
389389
Instant end = Instant.now();
390390
timeElapsed = Duration.between(start, end);
391-
if(!this.dg.ws.gtm && timeElapsed.toMillis() < 1000)
392-
try {
393-
long i = (long) ((1000-timeElapsed.toMillis()));
394-
TimeUnit.MILLISECONDS.sleep(i);
395-
} catch (InterruptedException e) {
396-
e.printStackTrace();
397-
}
391+
// THROTTLE LOGIC REMOVED FOR TESTING PURPOSES
392+
// Python frameworks runs continuous ops, so no 1-second pacing needed
393+
// if(!this.dg.ws.gtm && timeElapsed.toMillis() < 1000)
394+
// try {
395+
// long i = (long) ((1000-timeElapsed.toMillis()));
396+
// TimeUnit.MILLISECONDS.sleep(i);
397+
// } catch (InterruptedException e) {
398+
// e.printStackTrace();
399+
// }
398400
}
399401
logger.info(this.taskName + " is completed!");
400402
if (retryTimes > 0 && failedMutations.size() > 0) {

src/main/java/couchbase/sdk/DocOps.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public List<Result> bulkInsert(Collection collection, List<Tuple2<String, Object
3737

3838
// Emit error Results as part of the stream and collect at the end
3939
// This is thread-safe and avoids synchronization overhead
40+
// Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
41+
int concurrency = Math.min(documents.size(), 20);
4042
return Flux.fromIterable(documents)
4143
.flatMap(documentToInsert -> {
4244
String k = documentToInsert.getT1();
@@ -45,7 +47,7 @@ public List<Result> bulkInsert(Collection collection, List<Tuple2<String, Object
4547
return reactiveCollection.insert(k, v, insertOptions)
4648
.then(Mono.<Result>empty())
4749
.onErrorResume(error -> Mono.just(new Result(k, v, error, false)));
48-
})
50+
}, concurrency)
4951
.collectList()
5052
.block();
5153
}
@@ -56,6 +58,8 @@ public List<Result> bulkUpsert(Collection collection, List<Tuple2<String, Object
5658

5759
// Emit error Results as part of the stream and collect at the end
5860
// This is thread-safe and avoids synchronization overhead
61+
// Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
62+
int concurrency = Math.min(documents.size(), 20);
5963
return Flux.fromIterable(documents)
6064
.flatMap(documentToInsert -> {
6165
String k = documentToInsert.getT1();
@@ -64,13 +68,15 @@ public List<Result> bulkUpsert(Collection collection, List<Tuple2<String, Object
6468
return reactiveCollection.upsert(k, v, upsertOptions)
6569
.then(Mono.<Result>empty())
6670
.onErrorResume(error -> Mono.just(new Result(k, v, error, false)));
67-
})
71+
}, concurrency)
6872
.collectList()
6973
.block();
7074
}
7175

7276
public List<Tuple2<String, Object>> bulkGets(Collection collection, List<Tuple2<String, Object>> documents, GetOptions getOptions) {
7377
final ReactiveCollection reactiveCollection = collection.reactive();
78+
// Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
79+
int concurrency = Math.min(documents.size(), 20);
7480
List<Tuple2<String, Object>> returnValue = Flux.fromIterable(documents)
7581
.flatMap(new Function<Tuple2<String, Object>, Publisher<Tuple2<String, Object>>>() {
7682
public Publisher<Tuple2<String, Object>> apply(Tuple2<String, Object> documentToInsert) {
@@ -86,7 +92,7 @@ public Mono<Tuple2<String, Object>> apply(Throwable error) {
8692
}
8793
});
8894
}
89-
}).collectList().block();
95+
}, concurrency).collectList().block();
9096
return returnValue;
9197
}
9298

@@ -95,19 +101,23 @@ public List<Result> bulkDelete(Collection collection, List<String> keys, RemoveO
95101

96102
// Emit error Results as part of the stream and collect at the end
97103
// This is thread-safe and avoids synchronization overhead
104+
// Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
105+
int concurrency = Math.min(keys.size(), 20);
98106
return Flux.fromIterable(keys)
99107
.flatMap(key -> {
100108
return reactiveCollection.remove(key, removeOptions)
101109
.then(Mono.<Result>empty())
102110
.onErrorResume(error -> Mono.just(new Result(key, null, error, false)));
103-
})
111+
}, concurrency)
104112
.collectList()
105113
.block();
106114
}
107115

108116
public List<ConcurrentHashMap<String, Object>> bulkReplace(Collection collection, List<Tuple2<String, Object>> documents,
109117
ReplaceOptions replaceOptions) {
110118
final ReactiveCollection reactiveCollection = collection.reactive();
119+
// Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
120+
int concurrency = Math.min(documents.size(), 20);
111121
List<ConcurrentHashMap<String, Object>> returnValue = Flux.fromIterable(documents)
112122
.flatMap(new Function<Tuple2<String, Object>, Publisher<ConcurrentHashMap<String, Object>>>() {
113123
public Publisher<ConcurrentHashMap<String, Object>> apply(Tuple2<String, Object> documentToInsert) {
@@ -134,13 +144,15 @@ public Mono<ConcurrentHashMap<String, Object>> apply(Throwable error) {
134144
}
135145
});
136146
}
137-
}).collectList().block();
147+
}, concurrency).collectList().block();
138148
return returnValue;
139149
}
140150

141151
public List<ConcurrentHashMap<String, Object>> bulkTouch(Collection collection, List<String> keys, final int exp,
142152
TouchOptions touchOptions, Duration exp_duration) {
143153
final ReactiveCollection reactiveCollection = collection.reactive();
154+
// Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
155+
int concurrency = Math.min(keys.size(), 20);
144156
List<ConcurrentHashMap<String, Object>> returnValue = Flux.fromIterable(keys)
145157
.flatMap(new Function<String, Publisher<ConcurrentHashMap<String, Object>>>() {
146158
public Publisher<ConcurrentHashMap<String, Object>> apply(String key){
@@ -163,7 +175,7 @@ public Mono<ConcurrentHashMap<String, Object>> apply(Throwable error) {
163175
}
164176
}).defaultIfEmpty(returnValue);
165177
}
166-
}).collectList().block();
178+
}, concurrency).collectList().block();
167179
return returnValue;
168180
}
169181

src/main/java/couchbase/sdk/SDKClient.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,27 +72,24 @@ public void initialiseSDK() throws Exception {
7272

7373
public void connectCluster(){
7474
try{
75-
ClusterOptions cluster_options;
76-
if(this.master.memcached_port.equals("11207"))
77-
cluster_options = ClusterOptions.clusterOptions(master.rest_username, master.rest_password).environment(env1);
78-
else
79-
cluster_options = ClusterOptions.clusterOptions(master.rest_username, master.rest_password).environment(env2);
80-
this.cluster = Cluster.connect(master.ip, cluster_options);
81-
logger.info("Cluster connection is successful");
75+
// Use shared Cluster instance instead of creating new one
76+
this.cluster = SharedClusterManager.getCluster(this.master);
77+
logger.info("Cluster connection is successful (using shared instance)");
8278
}
8379
catch (AuthenticationFailureException e) {
8480
logger.info(String.format("cannot login from user: %s/%s",master.rest_username, master.rest_password));
8581
}
8682
}
8783

8884
public void disconnectCluster(){
89-
// Disconnect and close all buckets
90-
this.cluster.disconnect();
85+
// Release reference to shared Cluster instead of disconnecting
86+
SharedClusterManager.releaseCluster(this.master);
87+
logger.info("Released shared Cluster instance reference");
9188
}
9289

9390
public void shutdownEnv() {
94-
// Just close an environment
95-
this.cluster.environment().shutdown();
91+
// No-op - Shared Cluster environment is managed by SharedClusterManager
92+
logger.debug("shutdownEnv called on shared Cluster - no-op");
9693
}
9794

9895
private void connectBucket(String bucket){

src/main/java/couchbase/sdk/SDKClientPool.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public SDKClientPool() {
1818
}
1919

2020
public void shutdown() {
21-
logger.debug("Closing clients from SDKClientPool");
21+
logger.debug("Closing clients from SDKClientPool and shutting down shared Cluster instances");
2222
ArrayList<SDKClient> sdk_clients;
2323
for(Map.Entry<String, HashMap> m: this.clients.entrySet()){
2424
sdk_clients = (ArrayList)(m.getValue()).get("idle_clients");
@@ -28,6 +28,9 @@ public void shutdown() {
2828
}
2929
// Reset the clients HM
3030
this.clients = new HashMap<String, HashMap>();
31+
32+
// Shutdown shared Cluster manager
33+
SharedClusterManager.shutdownAll();
3134
}
3235

3336
public void force_close_clients_for_bucket(String bucket_name) {

0 commit comments

Comments
 (0)