Skip to content

Commit 00be168

Browse files
committed
pause support
1 parent bcb3494 commit 00be168

3 files changed

Lines changed: 168 additions & 5 deletions

File tree

src/main/java/com/github/dexecutor/core/DefaultDexecutor.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void addAsDependencyToAllInitialNodes(final T nodeValue) {
106106
}
107107

108108
@Override
109-
public void recoverExecution(final ExecutionConfig config) {
109+
public ExecutionResults<T, R> recoverExecution(final ExecutionConfig config) {
110110
if (Phase.TERMINATED.equals(this.state.getCurrentPhase())) {
111111
throw new IllegalStateException("Can't recover terminated dexecutor");
112112
} else {
@@ -118,6 +118,7 @@ public void recoverExecution(final ExecutionConfig config) {
118118
this.state.onTerminate();
119119
logger.debug("Processed Nodes Ordering {}", this.state.getProcessedNodes());
120120
}
121+
return this.state.getExecutionResults();
121122
}
122123

123124
public ExecutionResults<T, R> execute(final ExecutionConfig config) {
@@ -333,10 +334,11 @@ private void updateNode(final ExecutionResult<T, R> executionResult, final Node<
333334

334335
private void forceStopIfRequired() {
335336
if (!shouldContinueProcessingNodes()) {
337+
logger.debug("Force Stopping dexecutor");
336338
this.state.forcedStop();
337-
this.immediatelyRetryExecutor.shutdownNow();
338-
this.scheduledRetryExecutor.shutdownNow();
339-
this.timeoutExecutor.shutdownNow();
339+
//this.immediatelyRetryExecutor.shutdownNow();
340+
//this.scheduledRetryExecutor.shutdownNow();
341+
//this.timeoutExecutor.shutdownNow();
340342
throw new IllegalStateException("Forced to Stop the instance of Dexecutor!");
341343
}
342344
}

src/main/java/com/github/dexecutor/core/Dexecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public interface Dexecutor<T, R> extends DependencyAware<T> {
4646
* After a dexecutor crash, create a new instance of dexecutor and call this method for recovery
4747
* @param config based on which execution would recover
4848
*/
49-
void recoverExecution(final ExecutionConfig config);
49+
ExecutionResults<T, R> recoverExecution(final ExecutionConfig config);
5050
/**
5151
* Prints the graph into the writer, using the traversar
5252
*
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.github.dexecutor.core;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.List;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.TimeUnit;
28+
29+
import org.junit.Test;
30+
31+
import com.github.dexecutor.core.graph.Node;
32+
import com.github.dexecutor.core.support.TestUtil;
33+
import com.github.dexecutor.core.support.ThreadPoolUtil;
34+
import com.github.dexecutor.core.task.ExecutionResults;
35+
import com.github.dexecutor.core.task.Task;
36+
import com.github.dexecutor.core.task.TaskProvider;
37+
38+
/**
39+
*
40+
* @author Nadeem Mohammad
41+
*
42+
*/
43+
public class DexecutorPauseAndRecover {
44+
45+
@Test
46+
public void testDependentTaskExecution() {
47+
ExecutorService executorService = newExecutor();
48+
49+
try {
50+
DexecutorConfig<Integer, Integer> config = new DexecutorConfig<>(executorService, new SleepyTaskProvider());
51+
52+
DefaultDexecutor<Integer, Integer> executor = new DefaultDexecutor<Integer, Integer>(config) {
53+
Integer counter = 0;
54+
55+
@Override
56+
protected boolean shouldContinueProcessingNodes() {
57+
counter++;
58+
return counter != 2;
59+
}
60+
};
61+
62+
executor.addDependency(1, 2);
63+
executor.addDependency(1, 2);
64+
executor.addDependency(1, 3);
65+
executor.addDependency(3, 4);
66+
executor.addDependency(3, 5);
67+
executor.addDependency(3, 6);
68+
executor.addDependency(2, 7);
69+
executor.addDependency(2, 9);
70+
executor.addDependency(2, 8);
71+
executor.addDependency(9, 10);
72+
executor.addDependency(12, 13);
73+
executor.addDependency(13, 4);
74+
executor.addDependency(13, 14);
75+
executor.addIndependent(11);
76+
77+
Thread thread = startRecoveryThread(executor);
78+
79+
executeDexecutor(executor);
80+
81+
waitForRecoveryThreadToFinish(thread);
82+
83+
84+
85+
Collection<Node<Integer, Integer>> processedNodesOrder = TestUtil.processedNodesOrder(executor);
86+
assertThat(processedNodesOrder).containsAll(executionOrderExpectedResult());
87+
assertThat(processedNodesOrder).size().isGreaterThanOrEqualTo(4);
88+
89+
} finally {
90+
try {
91+
executorService.shutdownNow();
92+
executorService.awaitTermination(1, TimeUnit.SECONDS);
93+
} catch (InterruptedException e) {
94+
95+
}
96+
}
97+
}
98+
99+
private ExecutionResults<Integer, Integer> executeDexecutor(DefaultDexecutor<Integer, Integer> executor) {
100+
ExecutionResults<Integer, Integer> result = null;
101+
try {
102+
result = executor.execute(ExecutionConfig.TERMINATING);
103+
} catch (IllegalStateException e) {
104+
e.printStackTrace();
105+
}
106+
return result;
107+
}
108+
109+
private void waitForRecoveryThreadToFinish(Thread thread) {
110+
try {
111+
thread.join();
112+
} catch (InterruptedException e) {
113+
e.printStackTrace();
114+
}
115+
}
116+
117+
private Thread startRecoveryThread(DefaultDexecutor<Integer, Integer> executor) {
118+
Runnable task = () -> {
119+
120+
try {
121+
TimeUnit.SECONDS.sleep(1);
122+
ExecutionResults<Integer, Integer> results = executor.recoverExecution(ExecutionConfig.TERMINATING);
123+
System.out.println(results);
124+
} catch (InterruptedException e) {
125+
e.printStackTrace();
126+
}
127+
128+
};
129+
130+
Thread thread = new Thread(task);
131+
thread.start();
132+
return thread;
133+
}
134+
135+
private Collection<Node<Integer, Integer>> executionOrderExpectedResult() {
136+
List<Node<Integer, Integer>> result = new ArrayList<Node<Integer, Integer>>();
137+
result.add(new Node<Integer, Integer>(1));
138+
result.add(new Node<Integer, Integer>(2));
139+
result.add(new Node<Integer, Integer>(11));
140+
result.add(new Node<Integer, Integer>(12));
141+
return result;
142+
}
143+
private ExecutorService newExecutor() {
144+
return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
145+
}
146+
147+
private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {
148+
149+
public Task<Integer, Integer> provideTask(final Integer id) {
150+
151+
return new Task<Integer, Integer>() {
152+
153+
private static final long serialVersionUID = 1L;
154+
155+
public Integer execute() {
156+
return id;
157+
}
158+
};
159+
}
160+
}
161+
}

0 commit comments

Comments
 (0)