4444import org .apache .inlong .manager .service .stream .InlongStreamService ;
4545
4646import com .google .common .base .Objects ;
47+ import com .google .common .util .concurrent .ThreadFactoryBuilder ;
4748import lombok .extern .slf4j .Slf4j ;
4849import org .apache .commons .collections .CollectionUtils ;
4950import org .apache .commons .lang3 .StringUtils ;
5051import org .springframework .beans .factory .annotation .Autowired ;
52+ import org .springframework .beans .factory .annotation .Value ;
5153import org .springframework .stereotype .Service ;
5254
55+ import javax .annotation .PostConstruct ;
56+ import javax .annotation .PreDestroy ;
57+
5358import java .util .ArrayList ;
5459import java .util .Collections ;
5560import java .util .List ;
56- import java .util .concurrent .Executors ;
57- import java .util .concurrent .ScheduledExecutorService ;
61+ import java .util .concurrent .ConcurrentLinkedQueue ;
62+ import java .util .concurrent .ExecutorService ;
63+ import java .util .concurrent .Future ;
64+ import java .util .concurrent .LinkedBlockingQueue ;
65+ import java .util .concurrent .RejectedExecutionException ;
66+ import java .util .concurrent .ThreadPoolExecutor ;
5867import java .util .concurrent .TimeUnit ;
5968
6069/**
@@ -80,7 +89,59 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
8089 @ Autowired
8190 private PulsarOperator pulsarOperator ;
8291
83- private ScheduledExecutorService executor = Executors .newScheduledThreadPool (10 );
92+ @ Value ("${pulsar.query.poolSize:10}" )
93+ private int poolSize ;
94+
95+ @ Value ("${pulsar.query.keepAliveSeconds:60}" )
96+ private long keepAliveSeconds ;
97+
98+ @ Value ("${pulsar.query.queueCapacity:100}" )
99+ private int queueCapacity ;
100+
101+ @ Value ("${pulsar.query.queryTimeoutSeconds:10}" )
102+ private int queryTimeoutSeconds ;
103+
104+ /**
105+ * Thread pool for querying messages from multiple Pulsar clusters concurrently.
106+ * Configuration is loaded from application properties with prefix 'pulsar.query'.
107+ */
108+ private ExecutorService messageQueryExecutor ;
109+
110+ /**
111+ * Initialize the executor service after bean creation.
112+ */
113+ @ PostConstruct
114+ public void init () {
115+ // Initialize the executor service with same core pool size and max core pool size
116+ this .messageQueryExecutor = new ThreadPoolExecutor (
117+ poolSize ,
118+ poolSize ,
119+ keepAliveSeconds ,
120+ TimeUnit .SECONDS ,
121+ new LinkedBlockingQueue <>(queueCapacity ),
122+ new ThreadFactoryBuilder ().setNameFormat ("pulsar-message-query-%d" ).build (),
123+ // Use AbortPolicy to throw exception when the queue is full
124+ new ThreadPoolExecutor .AbortPolicy ());
125+ log .info ("Init message query executor, poolSize={}, keepAliveSeconds={}, queueCapacity={}" ,
126+ poolSize , keepAliveSeconds , queueCapacity );
127+ }
128+
129+ /**
130+ * Shutdown the executor service when the bean is destroyed.
131+ */
132+ @ PreDestroy
133+ public void shutdown () {
134+ log .info ("Shutting down pulsar message query executor" );
135+ messageQueryExecutor .shutdown ();
136+ try {
137+ if (!messageQueryExecutor .awaitTermination (5 , TimeUnit .SECONDS )) {
138+ messageQueryExecutor .shutdownNow ();
139+ }
140+ } catch (InterruptedException e ) {
141+ messageQueryExecutor .shutdownNow ();
142+ Thread .currentThread ().interrupt ();
143+ }
144+ }
84145
85146 @ Override
86147 public boolean accept (String mqType ) {
@@ -307,34 +368,117 @@ private void deletePulsarTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pu
307368 * Query latest message from pulsar
308369 */
309370 public List <BriefMQMessage > queryLatestMessages (InlongGroupInfo groupInfo , InlongStreamInfo streamInfo ,
310- QueryMessageRequest request ) throws Exception {
311- List <ClusterInfo > pulsarClusterList = clusterService .listByTagAndType (groupInfo .getInlongClusterTag (),
312- ClusterType .PULSAR );
313- List <BriefMQMessage > briefMQMessages = Collections .synchronizedList (new ArrayList <>());
314- QueryCountDownLatch queryLatch = new QueryCountDownLatch (request .getMessageCount (), pulsarClusterList .size ());
371+ QueryMessageRequest request ) {
372+ String groupId = streamInfo .getInlongGroupId ();
373+ String clusterTag = groupInfo .getInlongClusterTag ();
374+ List <ClusterInfo > clusterInfos = clusterService .listByTagAndType (clusterTag , ClusterType .PULSAR );
375+ if (CollectionUtils .isEmpty (clusterInfos )) {
376+ log .warn ("No pulsar cluster found for clusterTag={} for groupId={}" , clusterTag , groupId );
377+ return Collections .emptyList ();
378+ }
379+
380+ // Select clusters and calculate per-cluster query count
381+ Integer requestCount = request .getMessageCount ();
382+ int clusterSize = clusterInfos .size ();
383+ QueryCountDownLatch queryLatch = new QueryCountDownLatch (requestCount , clusterSize );
384+ log .debug ("Query pulsar message in {} clusters, each cluster query {} messages" , clusterSize , requestCount );
385+
386+ // Extract common parameters
315387 InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo ) groupInfo );
316- for (ClusterInfo clusterInfo : pulsarClusterList ) {
317- QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable (inlongPulsarInfo , streamInfo ,
318- (PulsarClusterInfo ) clusterInfo , pulsarOperator , request , briefMQMessages , queryLatch );
319- this .executor .execute (task );
388+ String tenant = inlongPulsarInfo .getPulsarTenant ();
389+ String namespace = inlongPulsarInfo .getMqResource ();
390+ String topicName = streamInfo .getMqResource ();
391+ boolean serialQueue = InlongConstants .PULSAR_QUEUE_TYPE_SERIAL .equals (inlongPulsarInfo .getQueueModule ());
392+
393+ // Submit query tasks to thread pool, each task queries from one cluster
394+ // Use submit() instead of execute() to get Future for cancellation support
395+ List <Future <?>> submittedTasks = new ArrayList <>();
396+ // Use ConcurrentLinkedQueue for thread-safe message collection,
397+ // its performance is better than Collections.synchronizedList
398+ ConcurrentLinkedQueue <BriefMQMessage > messageResultQueue = new ConcurrentLinkedQueue <>();
399+ for (ClusterInfo clusterInfo : clusterInfos ) {
400+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo ) clusterInfo ;
401+ if (StringUtils .isBlank (tenant )) {
402+ tenant = pulsarCluster .getPulsarTenant ();
403+ }
404+ String fullTopicName = tenant + "/" + namespace + "/" + topicName ;
405+ // Create a copy of request with adjusted message count for this cluster
406+ QueryMessageRequest currentRequest = buildRequestForSingleCluster (request , requestCount );
407+ QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable (pulsarOperator , streamInfo ,
408+ pulsarCluster , serialQueue , fullTopicName , currentRequest , messageResultQueue , queryLatch );
409+ try {
410+ Future <?> future = this .messageQueryExecutor .submit (task );
411+ submittedTasks .add (future );
412+ } catch (RejectedExecutionException e ) {
413+ // Cancel all previously submitted tasks before throwing exception
414+ log .error ("Failed to submit query task for groupId={}, cancelling {} submitted tasks" ,
415+ groupId , submittedTasks .size (), e );
416+ cancelSubmittedTasks (submittedTasks );
417+ throw new BusinessException ("Query messages task rejected: too many concurrent requests" );
418+ }
419+ }
420+
421+ // Wait for tasks to complete with a configurable timeout
422+ String streamId = streamInfo .getInlongStreamId ();
423+ try {
424+ boolean completed = queryLatch .await (queryTimeoutSeconds , TimeUnit .SECONDS );
425+ if (!completed ) {
426+ log .warn ("Query messages timeout for groupId={}, streamId={}, collected {} messages" ,
427+ groupId , streamId , messageResultQueue .size ());
428+ }
429+ } catch (InterruptedException e ) {
430+ throw new BusinessException (String .format ("Query messages task interrupted for groupId=%s, streamId=%s" ,
431+ groupId , streamId ));
432+ }
433+
434+ log .info ("Success query pulsar message for groupId={}, streamId={}" , groupId , streamId );
435+ List <BriefMQMessage > messageResultList = new ArrayList <>(messageResultQueue );
436+
437+ // if query result size is less than request count, return all, otherwise truncate to request count
438+ if (messageResultList .isEmpty () || messageResultList .size () <= requestCount ) {
439+ return messageResultList ;
320440 }
321- queryLatch .await (30 , TimeUnit .SECONDS );
322- log .info ("success query pulsar message for groupId={}, streamId={}" , streamInfo .getInlongGroupId (),
323- streamInfo .getInlongStreamId ());
324-
325- int finalMsgCount = Math .min (request .getMessageCount (), briefMQMessages .size ());
326- if (finalMsgCount > 0 ) {
327- return new ArrayList <>(briefMQMessages .subList (0 , finalMsgCount ));
328- } else {
329- return new ArrayList <>();
441+
442+ return new ArrayList <>(messageResultList .subList (0 , requestCount ));
443+ }
444+
445+ /**
446+ * Build a new QueryMessageRequest with adjusted message count for a specific cluster.
447+ */
448+ private QueryMessageRequest buildRequestForSingleCluster (QueryMessageRequest original , int messageCount ) {
449+ return QueryMessageRequest .builder ()
450+ .groupId (original .getGroupId ())
451+ .streamId (original .getStreamId ())
452+ .messageCount (messageCount )
453+ .fieldName (original .getFieldName ())
454+ .operationType (original .getOperationType ())
455+ .targetValue (original .getTargetValue ())
456+ .build ();
457+ }
458+
459+ /**
460+ * Cancel all submitted tasks when an error occurs.
461+ * This method attempts to cancel tasks with interrupt flag set to true,
462+ * allowing running tasks to be interrupted if they check for interruption.
463+ *
464+ * @param submittedTasks list of Future objects representing submitted tasks
465+ */
466+ private void cancelSubmittedTasks (List <java .util .concurrent .Future <?>> submittedTasks ) {
467+ int cancelledCount = 0 ;
468+ for (java .util .concurrent .Future <?> future : submittedTasks ) {
469+ // mayInterruptIfRunning=true allows interrupting running tasks
470+ if (future .cancel (true )) {
471+ cancelledCount ++;
472+ }
330473 }
474+ log .info ("Cancelled {}/{} submitted tasks" , cancelledCount , submittedTasks .size ());
331475 }
332476
333477 /**
334478 * Reset cursor for consumer group
335479 */
336480 public void resetCursor (InlongGroupInfo groupInfo , InlongStreamEntity streamEntity , StreamSinkEntity sinkEntity ,
337- Long resetTime ) throws Exception {
481+ Long resetTime ) {
338482 log .info ("begin to reset cursor for sinkId={}" , sinkEntity .getId ());
339483 InlongPulsarInfo pulsarInfo = (InlongPulsarInfo ) groupInfo ;
340484 List <ClusterInfo > clusterInfos = clusterService .listByTagAndType (pulsarInfo .getInlongClusterTag (),
0 commit comments