Skip to content

Commit bcb3494

Browse files
committed
fixed issue#30
1 parent 2f5ea68 commit bcb3494

2 files changed

Lines changed: 133 additions & 2 deletions

File tree

src/main/java/com/github/dexecutor/core/DefaultExecutionEngine.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,15 @@ public ExecutionResult<T, R> call() throws Exception {
132132
r = task.execute();
133133
result = ExecutionResult.success(task.getId(), r);
134134
state.removeErrored(result);
135+
task.markEnd();
135136
executionListener.onSuccess(task);
136137
} catch (Exception e) {
137138
result = ExecutionResult.errored(task.getId(), r, e.getMessage());
138139
state.addErrored(result);
140+
task.markEnd();
139141
executionListener.onError(task, e);
140142
logger.error("Error Execution Task # {}", task.getId(), e);
141-
} finally {
142-
task.markEnd();
143+
} finally {
143144
result.setTimes(task.getStartTime(), task.getEndTime());
144145
}
145146
return result;
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.github.dexecutor.core;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.List;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.TimeUnit;
28+
29+
import org.junit.Test;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import com.github.dexecutor.core.graph.Node;
34+
import com.github.dexecutor.core.support.TestUtil;
35+
import com.github.dexecutor.core.support.ThreadPoolUtil;
36+
import com.github.dexecutor.core.task.ExecutionResults;
37+
import com.github.dexecutor.core.task.Task;
38+
import com.github.dexecutor.core.task.TaskProvider;
39+
40+
/**
41+
*
42+
* @author Nadeem Mohammad
43+
*
44+
*/
45+
public class DexecutorListenerTest {
46+
47+
private static final Logger logger = LoggerFactory.getLogger(DexecutorListenerTest.class);
48+
49+
@Test
50+
public void testDependentTaskExecution() {
51+
ExecutorService executorService = newExecutor();
52+
53+
try {
54+
DexecutorConfig<Integer, Integer> config = new DexecutorConfig<>(executorService, new SleepyTaskProvider());
55+
config.setExecutionListener(new ExecutionListener<Integer, Integer>() {
56+
57+
@Override
58+
public void onSuccess(Task<Integer, Integer> task) {
59+
logger.info("onSuccess : {}", task);
60+
}
61+
62+
@Override
63+
public void onError(Task<Integer, Integer> task, Exception exception) {
64+
logger.info("onError : {}", task);
65+
}
66+
});
67+
68+
DefaultDexecutor<Integer, Integer> executor = new DefaultDexecutor<Integer, Integer>(config);
69+
executor.addDependency(1, 2);
70+
executor.addDependency(1, 2);
71+
executor.addDependency(1, 3);
72+
executor.addDependency(3, 4);
73+
executor.addDependency(3, 5);
74+
executor.addDependency(3, 6);
75+
executor.addDependency(2, 7);
76+
executor.addDependency(2, 9);
77+
executor.addDependency(2, 8);
78+
executor.addDependency(9, 10);
79+
executor.addDependency(12, 13);
80+
executor.addDependency(13, 4);
81+
executor.addDependency(13, 14);
82+
executor.addIndependent(11);
83+
84+
ExecutionResults<Integer, Integer> result = executor.execute(ExecutionConfig.TERMINATING);
85+
System.out.println(result);
86+
87+
Collection<Node<Integer, Integer>> processedNodesOrder = TestUtil.processedNodesOrder(executor);
88+
assertThat(processedNodesOrder).containsAll(executionOrderExpectedResult());
89+
assertThat(processedNodesOrder).size().isGreaterThanOrEqualTo(4);
90+
91+
} finally {
92+
try {
93+
executorService.shutdownNow();
94+
executorService.awaitTermination(1, TimeUnit.SECONDS);
95+
} catch (InterruptedException e) {
96+
97+
}
98+
}
99+
}
100+
101+
private Collection<Node<Integer, Integer>> executionOrderExpectedResult() {
102+
List<Node<Integer, Integer>> result = new ArrayList<Node<Integer, Integer>>();
103+
result.add(new Node<Integer, Integer>(1));
104+
//result.add(new Node<Integer, Integer>(2));
105+
result.add(new Node<Integer, Integer>(11));
106+
result.add(new Node<Integer, Integer>(12));
107+
return result;
108+
}
109+
private ExecutorService newExecutor() {
110+
return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
111+
}
112+
113+
private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {
114+
115+
public Task<Integer, Integer> provideTask(final Integer id) {
116+
117+
return new Task<Integer, Integer>() {
118+
119+
private static final long serialVersionUID = 1L;
120+
121+
public Integer execute() {
122+
if (id == 2) {
123+
throw new IllegalArgumentException("Invalid task");
124+
}
125+
return id;
126+
}
127+
};
128+
}
129+
}
130+
}

0 commit comments

Comments
 (0)