Skip to content

Commit 06dd9b0

Browse files
authored
Fix PulsarIO (#36141)
* Fix PulsarIO The main issue for current PulsarIO.read is * It is based on Pulsar Reader instead of PulsarConsumer, which then do not support acknowledgement * The while() block in reader DoFn would never return until topic termination, this basically means pipeline stuck * The restriction is on publishTime, and tryClaim assumes its ordering. This is not true. reader returning message is ordered on messageId. This is a wrong choice. Currently unresolved * PulsarMessage's coder implementation dropped message. This causes Data loss if the PulsarIO.read do not follow an immediate mapping * Tests are defunct and errors are suppressed, making them succeed spuriously Current PulsarIO.write is even more primitive. Pipeline expansion actually fails. It is not idempotent. Major fixes include * Allow Pulsar reader to have a timeout * Fix PulsarMessage and coder to include serializable fields from message * Fix mock client/reader and add a full read pipeline in test * Fix issues prevent PulsarIO.write from expanding. now it works minimally, that is publish every message received (at least once). * Working integration tests for read and write This has made PulsarIO.read minimally functionable. Although it won't split and can only run single thread. Going forward, we should re-implement reader DoFn based on Pulsar consumer. Thoughs rename the current DoFn to "NaiveReadFromPulsarDoFn" * Update CHANGES.md * Fix CHANGES.md lint support multi-line item
1 parent 8a5f57a commit 06dd9b0

18 files changed

Lines changed: 798 additions & 547 deletions

.github/workflows/beam_PreCommit_Java_Pulsar_IO_Direct.yml

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,13 @@ on:
2121
branches: ['master', 'release-*']
2222
paths:
2323
- "sdks/java/io/pulsar/**"
24-
- "sdks/java/io/common/**"
25-
- "sdks/java/core/src/main/**"
26-
- "build.gradle"
27-
- "buildSrc/**"
28-
- "gradle/**"
29-
- "gradle.properties"
30-
- "gradlew"
31-
- "gradle.bat"
32-
- "settings.gradle.kts"
3324
- ".github/workflows/beam_PreCommit_Java_Pulsar_IO_Direct.yml"
3425
pull_request_target:
3526
branches: ['master', 'release-*']
3627
paths:
3728
- "sdks/java/io/pulsar/**"
38-
- "sdks/java/io/common/**"
39-
- "sdks/java/core/src/main/**"
29+
- ".github/workflows/beam_PreCommit_Java_Pulsar_IO_Direct.yml"
4030
- 'release/trigger_all_tests.json'
41-
- '.github/trigger_files/beam_PreCommit_Java_Pulsar_IO_Direct.json'
42-
- "build.gradle"
43-
- "buildSrc/**"
44-
- "gradle/**"
45-
- "gradle.properties"
46-
- "gradlew"
47-
- "gradle.bat"
48-
- "settings.gradle.kts"
4931
issue_comment:
5032
types: [created]
5133
schedule:
@@ -110,6 +92,13 @@ jobs:
11092
arguments: |
11193
-PdisableSpotlessCheck=true \
11294
-PdisableCheckStyle=true \
95+
- name: run Pulsar IO IT script
96+
uses: ./.github/actions/gradle-command-self-hosted-action
97+
with:
98+
gradle-command: :sdks:java:io:pulsar:integrationTest
99+
arguments: |
100+
-PdisableSpotlessCheck=true \
101+
-PdisableCheckStyle=true \
113102
- name: Archive JUnit Test Results
114103
uses: actions/upload-artifact@v4
115104
if: ${{ !success() }}
@@ -135,4 +124,4 @@ jobs:
135124
if: always()
136125
with:
137126
name: Publish SpotBugs
138-
path: '**/build/reports/spotbugs/*.html'
127+
path: '**/build/reports/spotbugs/*.html'

CHANGES.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@
8888
## Bugfixes
8989

9090
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
91+
* PulsarIO has now changed support status from incomplete to experimental. Both read and writes should now minimally
92+
function (un-partitioned topics, without schema support, timestamp ordered messages for read) (Java)
93+
([#36141](https://github.com/apache/beam/issues/36141)).
9194

9295
## Known Issues
9396

@@ -133,7 +136,7 @@
133136
significant digits related to casting.
134137
* (Python) The deterministic fallback coder for complex types like NamedTuple, Enum, and dataclasses now uses cloudpickle instead of dill. If your pipeline is affected, you may see a warning like: "Using fallback deterministic coder for type X...". You can revert to the previous behavior by using the pipeline option `--update_compatibility_version=2.67.0` ([35725](https://github.com/apache/beam/pull/35725)). Report any pickling related issues to [#34903](https://github.com/apache/beam/issues/34903)
135138
* (Python) Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.
136-
* Dropped Java 8 support for [IO expansion-service](https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service). Cross-language pipelines using this expansion service will need a Java11+ runtime ([#35981](https://github.com/apache/beam/pull/35981).
139+
* Dropped Java 8 support for [IO expansion-service](https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service). Cross-language pipelines using this expansion service will need a Java11+ runtime ([#35981](https://github.com/apache/beam/pull/35981)).
137140

138141
## Deprecations
139142

build.gradle.kts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import java.util.TreeMap
2+
13
/*
24
* Licensed to the Apache Software Foundation (ASF) under one
35
* or more contributor license agreements. See the NOTICE file
@@ -355,6 +357,7 @@ tasks.register("javaioPreCommit") {
355357
dependsOn(":sdks:java:io:mqtt:build")
356358
dependsOn(":sdks:java:io:neo4j:build")
357359
dependsOn(":sdks:java:io:parquet:build")
360+
dependsOn(":sdks:java:io:pulsar:build")
358361
dependsOn(":sdks:java:io:rabbitmq:build")
359362
dependsOn(":sdks:java:io:redis:build")
360363
dependsOn(":sdks:java:io:rrio:build")
@@ -691,12 +694,31 @@ tasks.register("validateChanges") {
691694

692695
// Check entries in the unreleased section
693696
var i = unreleasedSectionStart + 1
694-
println("Starting validation from line ${i+1}")
695-
697+
val items = TreeMap<Int, String>()
698+
var lastline = 0
699+
var item = ""
696700
while (i < lines.size && !lines[i].startsWith("# [")) {
697701
val line = lines[i].trim()
702+
if (line.isEmpty()) {
703+
// skip
704+
} else if (line.startsWith("* ")) {
705+
items.put(lastline, item)
706+
lastline = i
707+
item = line
708+
} else if (line.startsWith("##")) {
709+
items.put(lastline, item)
710+
lastline = i
711+
item = ""
712+
} else {
713+
item += line
714+
}
715+
i++
716+
}
717+
items.put(lastline, item)
718+
println("Starting validation from line ${i+1}")
698719

699-
if (line.startsWith("* ") && line.isNotEmpty()) {
720+
items.forEach { (i, line) ->
721+
if (line.startsWith("* ")) {
700722
println("Checking line ${i+1}: $line")
701723

702724
// Skip comment lines
@@ -747,8 +769,6 @@ tasks.register("validateChanges") {
747769
}
748770
}
749771
}
750-
751-
i++
752772
}
753773

754774
println("Found ${errors.size} errors")

sdks/java/io/pulsar/build.gradle

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,31 +18,32 @@
1818

1919
plugins { id 'org.apache.beam.module' }
2020
applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.pulsar')
21+
enableJavaPerformanceTesting()
2122

2223
description = "Apache Beam :: SDKs :: Java :: IO :: Pulsar"
2324
ext.summary = "IO to read and write to Pulsar"
2425

25-
def pulsar_version = '2.8.2'
26+
def pulsar_version = '2.11.4'
2627

2728

2829
dependencies {
2930
implementation library.java.vendored_guava_32_1_2_jre
3031
implementation library.java.slf4j_api
3132
implementation library.java.joda_time
3233

33-
implementation "org.apache.pulsar:pulsar-client:$pulsar_version"
34-
implementation "org.apache.pulsar:pulsar-client-admin:$pulsar_version"
35-
permitUnusedDeclared "org.apache.pulsar:pulsar-client:$pulsar_version"
36-
permitUnusedDeclared "org.apache.pulsar:pulsar-client-admin:$pulsar_version"
37-
permitUsedUndeclared "org.apache.pulsar:pulsar-client-api:$pulsar_version"
38-
permitUsedUndeclared "org.apache.pulsar:pulsar-client-admin-api:$pulsar_version"
34+
implementation "org.apache.pulsar:pulsar-client-api:$pulsar_version"
35+
implementation "org.apache.pulsar:pulsar-client-admin-api:$pulsar_version"
36+
runtimeOnly "org.apache.pulsar:pulsar-client:$pulsar_version"
37+
runtimeOnly("org.apache.pulsar:pulsar-client-admin:$pulsar_version") {
38+
// To prevent a StackOverflow within Pulsar admin client because JUL -> SLF4J -> JUL
39+
exclude group: "org.slf4j", module: "jul-to-slf4j"
40+
}
3941

4042
implementation project(path: ":sdks:java:core", configuration: "shadow")
4143

42-
testImplementation library.java.jupiter_api
43-
testRuntimeOnly library.java.jupiter_engine
44+
testImplementation library.java.junit
45+
testRuntimeOnly library.java.slf4j_jdk14
4446
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
4547
testImplementation "org.testcontainers:pulsar:1.15.3"
4648
testImplementation "org.assertj:assertj-core:2.9.1"
47-
4849
}

0 commit comments

Comments
 (0)