Skip to content

Commit c179bae

Browse files
jniocheclaude
andauthored
Add bolt-level timeout for fetcher threads (#1861)
* Add bolt-level timeout for fetcher threads to prevent stuck fetches Wraps protocol.getProtocolOutput() in a Future with a configurable hard timeout (fetcher.thread.timeout). When a fetch exceeds the timeout the future is cancelled, the URL is marked as FETCH_ERROR, and the thread moves on. Disabled by default (-1). Fixes #996 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Formatting fix again Signed-off-by: Julien Nioche <julien@digitalpebble.com> --------- Signed-off-by: Julien Nioche <julien@digitalpebble.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent df7dfe3 commit c179bae

4 files changed

Lines changed: 166 additions & 2 deletions

File tree

core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,14 @@
3434
import java.util.Map;
3535
import java.util.Map.Entry;
3636
import java.util.concurrent.BlockingDeque;
37+
import java.util.concurrent.CancellationException;
38+
import java.util.concurrent.ExecutionException;
39+
import java.util.concurrent.ExecutorService;
40+
import java.util.concurrent.Executors;
41+
import java.util.concurrent.Future;
3742
import java.util.concurrent.LinkedBlockingDeque;
43+
import java.util.concurrent.TimeUnit;
44+
import java.util.concurrent.TimeoutException;
3845
import java.util.concurrent.atomic.AtomicInteger;
3946
import java.util.concurrent.atomic.AtomicLong;
4047
import java.util.regex.Pattern;
@@ -80,6 +87,14 @@ public class FetcherBolt extends StatusEmitterBolt {
8087
*/
8188
public static final String QUEUED_TIMEOUT_PARAM_KEY = "fetcher.timeout.queue";
8289

90+
/**
91+
* Hard timeout in seconds for a single call to {@link Protocol#getProtocolOutput}. If a fetch
92+
* exceeds this duration the thread is interrupted, the URL is marked as FETCH_ERROR, and the
93+
* thread moves on to the next item. A value of {@code -1} (the default) disables the bolt-level
94+
* timeout, relying solely on the protocol-level socket timeouts.
95+
*/
96+
public static final String FETCH_TIMEOUT_PARAM_KEY = "fetcher.thread.timeout";
97+
8398
/** Key name of the custom crawl delay for a queue that may be present in the metadata. */
8499
private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay";
85100

@@ -463,6 +478,15 @@ private class FetcherThread extends Thread {
463478

464479
private long timeoutInQueues = -1;
465480

481+
/** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */
482+
private long fetchTimeout = -1;
483+
484+
/**
485+
* Single-thread executor used to run the protocol call so that it can be interrupted via
486+
* {@link Future#cancel(boolean)} when the bolt-level timeout fires.
487+
*/
488+
private final ExecutorService fetchExecutor;
489+
466490
// by default remains as is-pre 1.17
467491
private String protocolMetadataPrefix = "";
468492

@@ -476,11 +500,24 @@ public FetcherThread(Config conf, int num) {
476500
this.crawlDelayForce = ConfUtils.getBoolean(conf, "fetcher.server.delay.force", false);
477501
this.threadNum = num;
478502
timeoutInQueues = ConfUtils.getLong(conf, QUEUED_TIMEOUT_PARAM_KEY, timeoutInQueues);
503+
fetchTimeout = ConfUtils.getLong(conf, FETCH_TIMEOUT_PARAM_KEY, fetchTimeout);
479504
protocolMetadataPrefix =
480505
ConfUtils.getString(
481506
conf,
482507
ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM,
483508
protocolMetadataPrefix);
509+
510+
if (fetchTimeout > 0) {
511+
fetchExecutor =
512+
Executors.newSingleThreadExecutor(
513+
r -> {
514+
Thread t = new Thread(r, "FetcherTimeout #" + num);
515+
t.setDaemon(true);
516+
return t;
517+
});
518+
} else {
519+
fetchExecutor = null;
520+
}
484521
}
485522

486523
@Override
@@ -665,7 +702,35 @@ public void run() {
665702
continue;
666703
}
667704

668-
ProtocolResponse response = protocol.getProtocolOutput(fit.url, metadata);
705+
final Metadata fetchMetadata = metadata;
706+
ProtocolResponse response;
707+
if (fetchExecutor != null) {
708+
Future<ProtocolResponse> future =
709+
fetchExecutor.submit(
710+
() -> protocol.getProtocolOutput(fit.url, fetchMetadata));
711+
try {
712+
response = future.get(fetchTimeout, TimeUnit.SECONDS);
713+
} catch (TimeoutException e) {
714+
future.cancel(true);
715+
throw new Exception(
716+
"Fetch timed out after "
717+
+ fetchTimeout
718+
+ "s fetching "
719+
+ fit.url,
720+
e);
721+
} catch (CancellationException e) {
722+
throw new Exception("Fetch cancelled for " + fit.url);
723+
} catch (ExecutionException e) {
724+
// unwrap the real cause so existing catch logic handles it
725+
Throwable cause = e.getCause();
726+
if (cause instanceof Exception) {
727+
throw (Exception) cause;
728+
}
729+
throw new Exception(cause);
730+
}
731+
} else {
732+
response = protocol.getProtocolOutput(fit.url, metadata);
733+
}
669734

670735
long timeFetching = System.currentTimeMillis() - start;
671736

core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@
2828
import java.text.SimpleDateFormat;
2929
import java.util.Locale;
3030
import java.util.Map;
31+
import java.util.concurrent.CancellationException;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Executors;
35+
import java.util.concurrent.Future;
3136
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.TimeoutException;
3238
import java.util.concurrent.atomic.AtomicInteger;
3339
import org.apache.commons.lang3.StringUtils;
3440
import org.apache.http.HttpHeaders;
@@ -122,6 +128,11 @@ public class SimpleFetcherBolt extends StatusEmitterBolt {
122128
// by default remains as is-pre 1.17
123129
private String protocolMetadataPrefix = "";
124130

131+
/** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */
132+
private long fetchTimeout = -1;
133+
134+
private ExecutorService fetchExecutor;
135+
125136
private void checkConfiguration() {
126137

127138
// ensure that a value has been set for the agent name and that that
@@ -226,6 +237,18 @@ public Object getValueAndReset() {
226237
this.protocolMetadataPrefix =
227238
ConfUtils.getString(
228239
conf, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, protocolMetadataPrefix);
240+
241+
this.fetchTimeout =
242+
ConfUtils.getLong(conf, FetcherBolt.FETCH_TIMEOUT_PARAM_KEY, fetchTimeout);
243+
if (fetchTimeout > 0) {
244+
fetchExecutor =
245+
Executors.newSingleThreadExecutor(
246+
r -> {
247+
Thread t = new Thread(r, "SimpleFetcherTimeout #" + taskId);
248+
t.setDaemon(true);
249+
return t;
250+
});
251+
}
229252
}
230253

231254
@Override
@@ -238,6 +261,9 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
238261
@Override
239262
public void cleanup() {
240263
protocolFactory.cleanup();
264+
if (fetchExecutor != null) {
265+
fetchExecutor.shutdownNow();
266+
}
241267
}
242268

243269
@Override
@@ -425,7 +451,31 @@ public void execute(Tuple input) {
425451
activeThreads.incrementAndGet();
426452

427453
long start = System.currentTimeMillis();
428-
ProtocolResponse response = protocol.getProtocolOutput(urlString, metadata);
454+
final String fetchUrl = urlString;
455+
final Metadata fetchMetadata = metadata;
456+
ProtocolResponse response;
457+
if (fetchExecutor != null) {
458+
Future<ProtocolResponse> future =
459+
fetchExecutor.submit(
460+
() -> protocol.getProtocolOutput(fetchUrl, fetchMetadata));
461+
try {
462+
response = future.get(fetchTimeout, TimeUnit.SECONDS);
463+
} catch (TimeoutException e) {
464+
future.cancel(true);
465+
throw new Exception(
466+
"Fetch timed out after " + fetchTimeout + "s fetching " + urlString, e);
467+
} catch (CancellationException e) {
468+
throw new Exception("Fetch cancelled for " + urlString);
469+
} catch (ExecutionException e) {
470+
Throwable cause = e.getCause();
471+
if (cause instanceof Exception) {
472+
throw (Exception) cause;
473+
}
474+
throw new Exception(cause);
475+
}
476+
} else {
477+
response = protocol.getProtocolOutput(urlString, metadata);
478+
}
429479
long timeFetching = System.currentTimeMillis() - start;
430480

431481
final int byteLength = response.getContent().length;

core/src/main/resources/crawler-default.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ config:
2929
fetcher.max.urls.in.queues: -1
3030
fetcher.max.queue.size: -1
3131
fetcher.timeout.queue: -1
32+
# hard timeout in seconds for a single protocol fetch at the bolt level;
33+
# -1 disables (relies on protocol-level socket timeouts only)
34+
fetcher.thread.timeout: -1
3235
# max. crawl-delay accepted in robots.txt (in seconds)
3336
fetcher.max.crawl.delay: 30
3437
# behavior of fetcher when the crawl-delay in the robots.txt

core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@
3737
import org.apache.storm.tuple.Tuple;
3838
import org.apache.storm.utils.Utils;
3939
import org.apache.stormcrawler.Constants;
40+
import org.apache.stormcrawler.Metadata;
4041
import org.apache.stormcrawler.TestOutputCollector;
4142
import org.apache.stormcrawler.TestUtil;
43+
import org.apache.stormcrawler.persistence.Status;
4244
import org.junit.jupiter.api.AfterEach;
4345
import org.junit.jupiter.api.Assertions;
4446
import org.junit.jupiter.api.Test;
@@ -104,4 +106,48 @@ void test304(WireMockRuntimeInfo wmRuntimeInfo) {
104106
// index
105107
Assertions.assertEquals(0, output.getEmitted(Utils.DEFAULT_STREAM_ID).size());
106108
}
109+
110+
@Test
111+
void testThreadTimeout(WireMockRuntimeInfo wmRuntimeInfo) {
112+
// server delays response for 10 seconds — longer than the bolt timeout
113+
stubFor(
114+
get(urlMatching(".+"))
115+
.willReturn(aResponse().withStatus(200).withFixedDelay(10_000)));
116+
117+
TestOutputCollector output = new TestOutputCollector();
118+
Map<String, Object> config = new HashMap<>();
119+
config.put("http.agent.name", "this_is_only_a_test");
120+
// bolt-level timeout: 2 seconds
121+
config.put("fetcher.thread.timeout", 2L);
122+
// raise the socket timeout so the bolt timeout fires first
123+
config.put("http.timeout", 30_000);
124+
bolt.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(output));
125+
126+
Tuple tuple = mock(Tuple.class);
127+
when(tuple.getSourceComponent()).thenReturn("source");
128+
when(tuple.getStringByField("url"))
129+
.thenReturn("http://localhost:" + wmRuntimeInfo.getHttpPort() + "/slow");
130+
when(tuple.getValueByField("metadata")).thenReturn(null);
131+
bolt.execute(tuple);
132+
133+
// the bolt should ack within ~2s + margin, not wait the full 10s
134+
await().atMost(8, TimeUnit.SECONDS).until(() -> output.getAckedTuples().size() > 0);
135+
136+
Assertions.assertTrue(output.getAckedTuples().contains(tuple));
137+
138+
// should have emitted a FETCH_ERROR on the status stream
139+
List<List<Object>> statusTuples = output.getEmitted(Constants.StatusStreamName);
140+
Assertions.assertEquals(1, statusTuples.size());
141+
Status status = (Status) statusTuples.get(0).get(2);
142+
Assertions.assertEquals(Status.FETCH_ERROR, status);
143+
144+
// verify the metadata records the timeout reason
145+
Metadata metadata = (Metadata) statusTuples.get(0).get(1);
146+
String exception = metadata.getFirstValue("fetch.exception");
147+
Assertions.assertNotNull(exception);
148+
Assertions.assertEquals("Socket timeout fetching", exception);
149+
150+
// nothing on the default stream — no content was fetched
151+
Assertions.assertEquals(0, output.getEmitted(Utils.DEFAULT_STREAM_ID).size());
152+
}
107153
}

0 commit comments

Comments
 (0)