Skip to content

Commit 5ff3611

Browse files
authored
Merge pull request #38065 from apanich/patch-6
fix MetadataPropagationTest.java
2 parents b9b7a67 + d0ec8d1 commit 5ff3611

1 file changed

Lines changed: 36 additions & 39 deletions

File tree

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -29,56 +29,53 @@
2929
import org.junit.runner.RunWith;
3030
import org.junit.runners.JUnit4;
3131

32+
@RunWith(JUnit4.class)
3233
public class MetadataPropagationTest {
3334

34-
@RunWith(JUnit4.class)
35-
public static class MiscTest {
35+
/** Tests for metadata propagation. */
36+
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
3637

37-
/** Tests for metadata propagation. */
38-
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
39-
40-
static class CausedByDrainSettingDoFn extends DoFn<Integer, String> {
41-
@ProcessElement
42-
public void process(OutputReceiver<String> r) {
43-
r.builder("value").setCausedByDrain(CausedByDrain.CAUSED_BY_DRAIN).output();
44-
}
38+
static class CausedByDrainSettingDoFn extends DoFn<Integer, String> {
39+
@ProcessElement
40+
public void process(OutputReceiver<String> r) {
41+
r.builder("value").setCausedByDrain(CausedByDrain.CAUSED_BY_DRAIN).output();
4542
}
43+
}
4644

47-
static class CausedByDrainExtractingDoFn extends DoFn<String, String> {
48-
@ProcessElement
49-
public void process(ProcessContext pc, OutputReceiver<String> r) {
50-
r.output(pc.causedByDrain().toString());
51-
}
45+
static class CausedByDrainExtractingDoFn extends DoFn<String, String> {
46+
@ProcessElement
47+
public void process(ProcessContext pc, OutputReceiver<String> r) {
48+
r.output(pc.causedByDrain().toString());
5249
}
50+
}
5351

54-
@Test
55-
@Category(NeedsRunner.class)
56-
public void testMetadataPropagationAcrossShuffleParameter() {
57-
WindowedValues.WindowedValueCoder.setMetadataSupported();
58-
PCollection<String> results =
59-
pipeline
60-
.apply(Create.of(1))
61-
.apply(ParDo.of(new CausedByDrainSettingDoFn()))
62-
.apply(Redistribute.arbitrarily())
63-
.apply(ParDo.of(new CausedByDrainExtractingDoFn()));
52+
@Test
53+
@Category(NeedsRunner.class)
54+
public void testMetadataPropagationAcrossShuffleParameter() {
55+
WindowedValues.WindowedValueCoder.setMetadataSupported();
56+
PCollection<String> results =
57+
pipeline
58+
.apply(Create.of(1))
59+
.apply(ParDo.of(new CausedByDrainSettingDoFn()))
60+
.apply(Redistribute.arbitrarily())
61+
.apply(ParDo.of(new CausedByDrainExtractingDoFn()));
6462

65-
PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN");
63+
PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN");
6664

67-
pipeline.run();
68-
}
65+
pipeline.run();
66+
}
6967

70-
@Test
71-
@Category(NeedsRunner.class)
72-
public void testMetadataPropagationParameter() {
73-
PCollection<String> results =
74-
pipeline
75-
.apply(Create.of(1))
76-
.apply(ParDo.of(new CausedByDrainSettingDoFn()))
77-
.apply(ParDo.of(new CausedByDrainExtractingDoFn()));
68+
@Test
69+
@Category(NeedsRunner.class)
70+
public void testMetadataPropagationParameter() {
71+
PCollection<String> results =
72+
pipeline
73+
.apply(Create.of(1))
74+
.apply(ParDo.of(new CausedByDrainSettingDoFn()))
75+
.apply(ParDo.of(new CausedByDrainExtractingDoFn()));
7876

79-
PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN");
77+
PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN");
8078

81-
pipeline.run();
82-
}
79+
pipeline.run();
8380
}
8481
}

0 commit comments

Comments
 (0)