Skip to content

Commit 5143034

Browse files
committed
Added a pipeline class structure
1 parent 4f98c17 commit 5143034

8 files changed

Lines changed: 235 additions & 7 deletions

File tree

src/main/java/nl/rdb/java_examples/batch/job/Job.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public class Job<T> {
1919
private final Reader<T> reader;
2020
private final int partitionSize;
2121
private final boolean parallel;
22+
private final JobContext jobContext;
2223

2324
public Job(Reader<T> reader, List<Step<T>> steps, int partitionSize) {
2425
this(reader, steps, partitionSize, false);
@@ -29,6 +30,7 @@ public Job(Reader<T> reader, List<Step<T>> steps, int partitionSize, boolean par
2930
this.reader = reader;
3031
this.partitionSize = partitionSize;
3132
this.parallel = parallel;
33+
this.jobContext = new JobContext();
3234
}
3335

3436
public void run() {

src/main/java/nl/rdb/java_examples/batch/job/JobBuilder.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,16 @@
88
public class JobBuilder<T> {
99

1010
private final LinkedList<Step<T>> steps = new LinkedList<>();
11-
private final int partitionSize;
11+
private final int chunkSize;
1212
private Reader<T> reader;
1313
private Boolean parallel;
1414

15-
public JobBuilder(int partitionSize) {
16-
this.partitionSize = partitionSize;
15+
public JobBuilder(int chunkSize) {
16+
this.chunkSize = chunkSize;
1717
}
1818

19-
public JobBuilder(int partitionSize, boolean parallel) {
20-
this.partitionSize = partitionSize;
19+
public JobBuilder(int chunkSize, boolean parallel) {
20+
this.chunkSize = chunkSize;
2121
this.parallel = parallel;
2222
}
2323

@@ -33,9 +33,9 @@ public JobBuilder<T> addStep(Step<T> step) {
3333

3434
public Job<T> build() {
3535
if (parallel == null) {
36-
return new Job<>(reader, steps, partitionSize);
36+
return new Job<>(reader, steps, chunkSize);
3737
} else {
38-
return new Job<>(reader, steps, partitionSize, parallel);
38+
return new Job<>(reader, steps, chunkSize, parallel);
3939
}
4040
}
4141
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package nl.rdb.java_examples.batch.job;
2+
3+
public class JobContext {
4+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package nl.rdb.java_examples.pipeline;
2+
3+
import java.util.LinkedList;
4+
5+
import lombok.Getter;
6+
import nl.rdb.java_examples.pipeline.job.Job;
7+
8+
@Getter
9+
public class Pipeline {
10+
11+
private LinkedList<Job> jobs = new LinkedList<>();
12+
13+
public static PipelineBuilder builder() {
14+
return new PipelineBuilder();
15+
}
16+
17+
public void run() {
18+
jobs.forEach(Job::execute);
19+
}
20+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package nl.rdb.java_examples.pipeline;
2+
3+
import nl.rdb.java_examples.pipeline.job.Job;
4+
5+
public class PipelineBuilder {
6+
7+
private final Pipeline pipeline = new Pipeline();
8+
9+
public PipelineBuilder addJob(Job job) {
10+
pipeline.getJobs().add(job);
11+
return this;
12+
}
13+
14+
public Pipeline build() {
15+
return this.pipeline;
16+
}
17+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package nl.rdb.java_examples.pipeline;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import nl.rdb.java_examples.pipeline.job.Job;
5+
import nl.rdb.java_examples.pipeline.step.Step;
6+
import nl.rdb.java_examples.scanner.Example;
7+
8+
@Slf4j
9+
public class PipelineExample {
10+
11+
@Example
12+
void pipeline() {
13+
Pipeline pipeline = Pipeline.builder()
14+
.addJob(new Job("test")
15+
.addStep(new Steps.OwaspStep())
16+
.addStep(new Steps.UnitTestStep()))
17+
.addJob(new Job("report")
18+
.asyncExecution()
19+
.addStep(new Steps.CoverageStep())
20+
.addStep(new Steps.SonarStep()))
21+
.addJob(new Job("build")
22+
.addStep(new Steps.CreateJarStep()))
23+
.build();
24+
25+
pipeline.run();
26+
}
27+
}
28+
29+
class Steps {
30+
31+
private Steps() {
32+
throw new IllegalStateException("Utility");
33+
}
34+
35+
@Slf4j
36+
public static class OwaspStep implements Step {
37+
38+
@Override
39+
public String getName() {
40+
return "owasp";
41+
}
42+
43+
@Override
44+
public void execute() {
45+
try {
46+
Thread.sleep(2000);
47+
log.info("{}", getName());
48+
} catch (InterruptedException ex) {
49+
Thread.currentThread().interrupt();
50+
log.error("Error {}", getName());
51+
}
52+
}
53+
}
54+
55+
@Slf4j
56+
public static class UnitTestStep implements Step {
57+
58+
@Override
59+
public String getName() {
60+
return "unit-test";
61+
}
62+
63+
@Override
64+
public void execute() {
65+
try {
66+
Thread.sleep(2000);
67+
log.info("{}", getName());
68+
} catch (InterruptedException ex) {
69+
Thread.currentThread().interrupt();
70+
log.error("Error {}", getName());
71+
}
72+
}
73+
}
74+
75+
@Slf4j
76+
public static class CoverageStep implements Step {
77+
78+
@Override
79+
public String getName() {
80+
return "visualize-coverage";
81+
}
82+
83+
@Override
84+
public void execute() {
85+
try {
86+
Thread.sleep(2000);
87+
log.info("{}", getName());
88+
} catch (InterruptedException ex) {
89+
Thread.currentThread().interrupt();
90+
log.error("Error {}", getName());
91+
}
92+
}
93+
}
94+
95+
@Slf4j
96+
public static class SonarStep implements Step {
97+
98+
@Override
99+
public String getName() {
100+
return "sonar";
101+
}
102+
103+
@Override
104+
public void execute() {
105+
try {
106+
Thread.sleep(2000);
107+
log.info("{}", getName());
108+
} catch (InterruptedException ex) {
109+
Thread.currentThread().interrupt();
110+
log.error("Error {}", getName());
111+
}
112+
}
113+
}
114+
115+
@Slf4j
116+
public static class CreateJarStep implements Step {
117+
118+
@Override
119+
public String getName() {
120+
return "create-jar";
121+
}
122+
123+
@Override
124+
public void execute() {
125+
try {
126+
Thread.sleep(2000);
127+
log.info("{}", getName());
128+
} catch (InterruptedException ex) {
129+
Thread.currentThread().interrupt();
130+
log.error("Error {}", getName());
131+
}
132+
}
133+
}
134+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package nl.rdb.java_examples.pipeline.job;
2+
3+
import java.util.LinkedList;
4+
import java.util.stream.Stream;
5+
6+
import lombok.extern.slf4j.Slf4j;
7+
import nl.rdb.java_examples.pipeline.step.Step;
8+
9+
@Slf4j
10+
public class Job {
11+
12+
private final LinkedList<Step> steps = new LinkedList<>();
13+
private final String name;
14+
15+
private boolean async = false;
16+
17+
public Job(String name) {
18+
this.name = name;
19+
}
20+
21+
public Job addStep(Step step) {
22+
this.steps.add(step);
23+
return this;
24+
}
25+
26+
public Job asyncExecution() {
27+
this.async = true;
28+
return this;
29+
}
30+
31+
public void execute() {
32+
log.info("Running: {}", this.name);
33+
34+
Stream<Step> stream = async ? steps.parallelStream() : steps.stream();
35+
stream.forEach(step -> {
36+
try {
37+
step.execute();
38+
} catch (Exception ex) {
39+
log.info("Step '{}' error: {}", this.name, ex.getMessage(), ex);
40+
}
41+
});
42+
}
43+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package nl.rdb.java_examples.pipeline.step;
2+
3+
public interface Step {
4+
5+
String getName();
6+
7+
void execute();
8+
}

0 commit comments

Comments
 (0)