Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 3a4e749

Browse files
Enrico Olivelligaoran10
authored andcommitted
[transactions] Implement KIP-664 listTransactions (#76)
(cherry picked from commit 5ef4a85)
1 parent 930e502 commit 3a4e749

9 files changed

Lines changed: 279 additions & 30 deletions

File tree

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
314314
case LIST_GROUPS:
315315
handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture);
316316
break;
317+
case LIST_TRANSACTIONS:
318+
handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
319+
break;
317320
case DELETE_GROUPS:
318321
handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture);
319322
break;
@@ -572,6 +575,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
572575
protected abstract void
573576
handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);
574577

578+
protected abstract void
579+
handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);
580+
575581
protected abstract void
576582
handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture<AbstractResponse> response);
577583

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
3232
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
3333
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
34+
import io.streamnative.pulsar.handlers.kop.scala.Either;
3435
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
3536
import io.streamnative.pulsar.handlers.kop.security.Session;
3637
import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer;
@@ -125,6 +126,7 @@
125126
import org.apache.kafka.common.message.LeaveGroupRequestData;
126127
import org.apache.kafka.common.message.ListOffsetsRequestData;
127128
import org.apache.kafka.common.message.ListOffsetsResponseData;
129+
import org.apache.kafka.common.message.ListTransactionsResponseData;
128130
import org.apache.kafka.common.message.OffsetCommitRequestData;
129131
import org.apache.kafka.common.message.ProduceRequestData;
130132
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
@@ -173,6 +175,8 @@
173175
import org.apache.kafka.common.requests.ListOffsetRequestV0;
174176
import org.apache.kafka.common.requests.ListOffsetsRequest;
175177
import org.apache.kafka.common.requests.ListOffsetsResponse;
178+
import org.apache.kafka.common.requests.ListTransactionsRequest;
179+
import org.apache.kafka.common.requests.ListTransactionsResponse;
176180
import org.apache.kafka.common.requests.MetadataRequest;
177181
import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
178182
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
@@ -2038,8 +2042,26 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup,
20382042
protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups,
20392043
CompletableFuture<AbstractResponse> resultFuture) {
20402044
checkArgument(listGroups.getRequest() instanceof ListGroupsRequest);
2041-
KeyValue<Errors, List<GroupOverview>> listResult = getGroupCoordinator().handleListGroups();
2042-
resultFuture.complete(KafkaResponseUtils.newListGroups(listResult.getKey(), listResult.getValue()));
2045+
Either<Errors, List<GroupOverview>> listResult = getGroupCoordinator().handleListGroups();
2046+
resultFuture.complete(KafkaResponseUtils.newListGroups(listResult));
2047+
}
2048+
2049+
@Override
2050+
protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransactions,
2051+
CompletableFuture<AbstractResponse> resultFuture) {
2052+
checkArgument(listTransactions.getRequest() instanceof ListTransactionsRequest);
2053+
ListTransactionsRequest request = (ListTransactionsRequest) listTransactions.getRequest();
2054+
List<String> stateFilters = request.data().stateFilters();
2055+
if (stateFilters == null) {
2056+
stateFilters = Collections.emptyList();
2057+
}
2058+
List<Long> producerIdFilters = request.data().producerIdFilters();
2059+
if (producerIdFilters == null) {
2060+
producerIdFilters = Collections.emptyList();
2061+
}
2062+
ListTransactionsResponseData listResult = getTransactionCoordinator()
2063+
.handleListTransactions(stateFilters, producerIdFilters);
2064+
resultFuture.complete(new ListTransactionsResponse(listResult));
20432065
}
20442066

20452067
@Override

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview;
3030
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary;
3131
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
32+
import io.streamnative.pulsar.handlers.kop.scala.Either;
3233
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
3334
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.GroupKey;
3435
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.MemberKey;
@@ -832,22 +833,16 @@ public KeyValue<Errors, Map<TopicPartition, PartitionData>> handleFetchOffsets(
832833
);
833834
}
834835

835-
public KeyValue<Errors, List<GroupOverview>> handleListGroups() {
836+
public Either<Errors, List<GroupOverview>> handleListGroups() {
836837
if (!isActive.get()) {
837-
return new KeyValue<>(Errors.COORDINATOR_NOT_AVAILABLE, new ArrayList<>());
838+
return Either.left(Errors.COORDINATOR_NOT_AVAILABLE);
838839
} else {
839-
Errors errors;
840840
if (groupManager.isLoading()) {
841-
errors = Errors.COORDINATOR_LOAD_IN_PROGRESS;
842-
} else {
843-
errors = Errors.NONE;
841+
return Either.left(Errors.COORDINATOR_LOAD_IN_PROGRESS);
844842
}
845843
List<GroupOverview> overviews = new ArrayList<>();
846844
groupManager.currentGroups().forEach(group -> overviews.add(group.overview()));
847-
return new KeyValue<>(
848-
errors,
849-
overviews
850-
);
845+
return Either.right(overviews);
851846
}
852847
}
853848

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.streamnative.pulsar.handlers.kop.storage.PulsarTopicProducerStateManagerSnapshotBuffer;
3333
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
3434
import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch;
35+
import java.util.List;
3536
import java.util.Optional;
3637
import java.util.Set;
3738
import java.util.concurrent.CompletableFuture;
@@ -51,6 +52,7 @@
5152
import org.apache.commons.lang3.StringUtils;
5253
import org.apache.kafka.common.TopicPartition;
5354
import org.apache.kafka.common.internals.Topic;
55+
import org.apache.kafka.common.message.ListTransactionsResponseData;
5456
import org.apache.kafka.common.protocol.Errors;
5557
import org.apache.kafka.common.record.RecordBatch;
5658
import org.apache.kafka.common.requests.TransactionResult;
@@ -79,6 +81,8 @@ public class TransactionCoordinator {
7981

8082
private final Time time;
8183

84+
private final AtomicBoolean isActive = new AtomicBoolean(false);
85+
8286
private static final BiConsumer<TransactionStateManager.TransactionalIdAndProducerIdEpoch, Errors>
8387
onEndTransactionComplete =
8488
(txnIdAndPidEpoch, errors) -> {
@@ -215,6 +219,17 @@ public static String getTopicPartitionName(String topicPartitionName, int partit
215219
return topicPartitionName + PARTITIONED_TOPIC_SUFFIX + partitionId;
216220
}
217221

222+
public ListTransactionsResponseData handleListTransactions(List<String> filteredStates,
223+
List<Long> filteredProducerIds) {
224+
// https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/
225+
// src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L259
226+
if (!isActive.get()) {
227+
log.warn("The transaction coordinator is not active, so it will reject list transaction request");
228+
return new ListTransactionsResponseData().setErrorCode(Errors.NOT_COORDINATOR.code());
229+
}
230+
return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates);
231+
}
232+
218233
@Data
219234
@EqualsAndHashCode
220235
@AllArgsConstructor
@@ -925,7 +940,8 @@ public CompletableFuture<Void> startup(boolean enableTransactionalIdExpiration)
925940
txnManager.startup(enableTransactionalIdExpiration);
926941

927942
return this.producerIdManager.initialize().thenCompose(ignored -> {
928-
log.info("Startup transaction coordinator complete.");
943+
log.info("{} Startup transaction coordinator complete.", namespacePrefixForMetadata);
944+
isActive.set(true);
929945
return CompletableFuture.completedFuture(null);
930946
});
931947
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,26 @@ public boolean isExpirationAllowed() {
117117
return false;
118118
}
119119
}
120+
121+
public org.apache.kafka.clients.admin.TransactionState toAdminState() {
122+
switch (this) {
123+
case EMPTY:
124+
return org.apache.kafka.clients.admin.TransactionState.EMPTY;
125+
case ONGOING:
126+
return org.apache.kafka.clients.admin.TransactionState.ONGOING;
127+
case PREPARE_COMMIT:
128+
return org.apache.kafka.clients.admin.TransactionState.PREPARE_COMMIT;
129+
case PREPARE_ABORT:
130+
return org.apache.kafka.clients.admin.TransactionState.PREPARE_ABORT;
131+
case COMPLETE_COMMIT:
132+
return org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT;
133+
case COMPLETE_ABORT:
134+
return org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT;
135+
case PREPARE_EPOCH_FENCE:
136+
return org.apache.kafka.clients.admin.TransactionState.PREPARE_EPOCH_FENCE;
137+
case DEAD:
138+
default:
139+
return org.apache.kafka.clients.admin.TransactionState.UNKNOWN;
140+
}
141+
}
120142
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.nio.ByteBuffer;
2626
import java.util.ArrayList;
2727
import java.util.HashMap;
28+
import java.util.HashSet;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Optional;
@@ -40,6 +41,7 @@
4041
import lombok.extern.slf4j.Slf4j;
4142
import org.apache.bookkeeper.common.concurrent.FutureUtils;
4243
import org.apache.kafka.common.TopicPartition;
44+
import org.apache.kafka.common.message.ListTransactionsResponseData;
4345
import org.apache.kafka.common.protocol.Errors;
4446
import org.apache.kafka.common.protocol.types.SchemaException;
4547
import org.apache.kafka.common.requests.ProduceResponse;
@@ -244,6 +246,71 @@ private boolean shouldExpire(TransactionMetadata txnMetadata, Long currentTimeMs
244246
<= (currentTimeMs - transactionConfig.getTransactionalIdExpirationMs());
245247
}
246248

249+
private static boolean shouldInclude(TransactionMetadata txnMetadata,
250+
List<Long> filterProducerIds, Set<String> filterStateNames) {
251+
if (txnMetadata.getState() == TransactionState.DEAD) {
252+
// We filter the `Dead` state since it is a transient state which
253+
// indicates that the transactionalId and its metadata are in the
254+
// process of expiration and removal.
255+
return false;
256+
} else if (!filterProducerIds.isEmpty() && !filterProducerIds.contains(txnMetadata.getProducerId())) {
257+
return false;
258+
} else if (!filterStateNames.isEmpty() && !filterStateNames.contains(
259+
txnMetadata.getState().toAdminState().toString())) {
260+
return false;
261+
} else {
262+
return true;
263+
}
264+
}
265+
266+
public ListTransactionsResponseData listTransactionStates(List<Long> filteredProducerIds,
267+
List<String> filteredStates) {
268+
return CoreUtils.inReadLock(stateLock, () -> {
269+
ListTransactionsResponseData response = new ListTransactionsResponseData();
270+
if (!loadingPartitions.isEmpty()) {
271+
response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
272+
} else {
273+
Set<String> filterStates = new HashSet<>();
274+
for (TransactionState stateName : TransactionState.values()) {
275+
String nameForTheClient = stateName.toAdminState().toString();
276+
if (filteredStates.contains(nameForTheClient)) {
277+
filterStates.add(nameForTheClient);
278+
} else {
279+
response.unknownStateFilters().add(nameForTheClient);
280+
}
281+
}
282+
List<ListTransactionsResponseData.TransactionState> states = new ArrayList<>();
283+
transactionMetadataCache.forEach((__, cache) -> {
284+
cache.values().forEach(txnMetadata -> {
285+
txnMetadata.inLock(() -> {
286+
// use toString() to get the name of the state according to the protocol
287+
ListTransactionsResponseData.TransactionState transactionState =
288+
new ListTransactionsResponseData.TransactionState()
289+
.setTransactionalId(txnMetadata.getTransactionalId())
290+
.setProducerId(txnMetadata.getProducerId())
291+
.setTransactionState(txnMetadata.getState().toAdminState().toString());
292+
293+
if (shouldInclude(txnMetadata, filteredProducerIds, filterStates)) {
294+
if (log.isDebugEnabled()) {
295+
log.debug("add transaction state: {}", transactionState);
296+
}
297+
states.add(transactionState);
298+
} else {
299+
if (log.isDebugEnabled()) {
300+
log.debug("Skip transaction state: {}", transactionState);
301+
}
302+
}
303+
return null;
304+
});
305+
});
306+
});
307+
response.setErrorCode(Errors.NONE.code())
308+
.setTransactionStates(states);
309+
}
310+
return response;
311+
});
312+
}
313+
247314
@Data
248315
@AllArgsConstructor
249316
private static class TransactionalIdCoordinatorEpochAndMetadata {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import io.streamnative.pulsar.handlers.kop.ApiVersion;
1717
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
18+
import io.streamnative.pulsar.handlers.kop.scala.Either;
1819
import java.util.ArrayList;
1920
import java.util.Collections;
2021
import java.util.List;
@@ -278,14 +279,18 @@ public static LeaveGroupResponse newLeaveGroup(Errors errors) {
278279
return new LeaveGroupResponse(data);
279280
}
280281

281-
public static ListGroupsResponse newListGroups(Errors errors,
282-
List<GroupMetadata.GroupOverview> groups) {
282+
public static ListGroupsResponse newListGroups(Either<Errors, List<GroupMetadata.GroupOverview>> results) {
283283
ListGroupsResponseData data = new ListGroupsResponseData();
284-
data.setErrorCode(errors.code());
285-
data.setGroups(groups.stream().map(overView -> new ListGroupsResponseData.ListedGroup()
286-
.setGroupId(overView.groupId())
287-
.setProtocolType(overView.protocolType()))
288-
.collect(Collectors.toList()));
284+
data.setErrorCode(results.isLeft() ? results.getLeft().code() : Errors.NONE.code());
285+
if (!results.isLeft()) {
286+
data.setGroups(results.getRight().stream().map(overView -> new ListGroupsResponseData.ListedGroup()
287+
.setGroupId(overView.groupId())
288+
.setProtocolType(overView.protocolType()))
289+
.collect(Collectors.toList()));
290+
291+
} else {
292+
data.setGroups(Collections.emptyList());
293+
}
289294
return new ListGroupsResponse(data);
290295
}
291296

tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static org.mockito.Mockito.spy;
1717
import static org.testng.Assert.assertEquals;
18+
import static org.testng.Assert.assertFalse;
1819
import static org.testng.Assert.assertNotEquals;
1920
import static org.testng.Assert.assertTrue;
2021
import static org.testng.Assert.fail;
@@ -28,6 +29,7 @@
2829
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary;
2930
import io.streamnative.pulsar.handlers.kop.coordinator.group.MemberMetadata.MemberSummary;
3031
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
32+
import io.streamnative.pulsar.handlers.kop.scala.Either;
3133
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
3234
import io.streamnative.pulsar.handlers.kop.utils.timer.MockTimer;
3335
import java.util.ArrayList;
@@ -218,8 +220,8 @@ public void testRequestHandlingWhileLoadingInProgress() throws Exception {
218220
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupResult.getKey());
219221

220222
// ListGroups
221-
KeyValue<Errors, List<GroupOverview>> listGroupsResult = groupCoordinator.handleListGroups();
222-
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getKey());
223+
Either<Errors, List<GroupOverview>> listGroupsResult = groupCoordinator.handleListGroups();
224+
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getLeft());
223225

224226
// DeleteGroups
225227
Map<String, Errors> deleteGroupsErrors = groupCoordinator.handleDeleteGroups(
@@ -1695,12 +1697,12 @@ groupId, memberId, protocolType, newProtocols()
16951697
).get();
16961698
assertEquals(Errors.NONE, syncGroupResult.getKey());
16971699

1698-
KeyValue<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
1699-
assertEquals(Errors.NONE, groups.getKey());
1700-
assertEquals(1, groups.getValue().size());
1700+
Either<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
1701+
assertFalse(groups.isLeft());
1702+
assertEquals(1, groups.getRight().size());
17011703
assertEquals(
17021704
new GroupOverview("groupId", "consumer"),
1703-
groups.getValue().get(0)
1705+
groups.getRight().get(0)
17041706
);
17051707
}
17061708

@@ -1712,12 +1714,12 @@ groupId, memberId, protocolType, newProtocols()
17121714
);
17131715
assertEquals(Errors.NONE, joinGroupResult.getError());
17141716

1715-
KeyValue<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
1716-
assertEquals(Errors.NONE, groups.getKey());
1717-
assertEquals(1, groups.getValue().size());
1717+
Either<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
1718+
assertFalse(groups.isLeft());
1719+
assertEquals(1, groups.getRight().size());
17181720
assertEquals(
17191721
new GroupOverview("groupId", "consumer"),
1720-
groups.getValue().get(0)
1722+
groups.getRight().get(0)
17211723
);
17221724
}
17231725

0 commit comments

Comments
 (0)