Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
import org.apache.fluss.rpc.messages.InitWriterRequest;
import org.apache.fluss.rpc.messages.InitWriterResponse;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrRequest;
import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest;
import org.apache.fluss.rpc.messages.StopReplicaRequest;
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
import org.apache.fluss.security.acl.AccessControlEntry;
import org.apache.fluss.security.acl.AccessControlEntryFilter;
Expand Down Expand Up @@ -1394,6 +1398,115 @@ private static Configuration initConfig() {
return conf;
}

@Test
void testInternalReplicationControlAuthorization() throws Exception {
// These RPCs are internal-only and should reject all external sessions,
// regardless of permissions
try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) {

TabletServerGateway guestTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0),
rpcClient,
TabletServerGateway.class);

CoordinatorGateway guestCoordinatorGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
rpcClient,
CoordinatorGateway.class);

// Test 1: notifyLeaderAndIsr should reject external sessions
NotifyLeaderAndIsrRequest notifyRequest = new NotifyLeaderAndIsrRequest();
notifyRequest.setCoordinatorEpoch(1);
assertThatThrownBy(() -> guestTabletGateway.notifyLeaderAndIsr(notifyRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
"NotifyLeaderAndIsr is an internal RPC and cannot be called by external clients");

// Test 2: updateMetadata should reject external sessions
UpdateMetadataRequest updateRequest = new UpdateMetadataRequest();
updateRequest.setCoordinatorEpoch(1);
assertThatThrownBy(() -> guestTabletGateway.updateMetadata(updateRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
"UpdateMetadata is an internal RPC and cannot be called by external clients");

// Test 3: stopReplica should reject external sessions
StopReplicaRequest stopRequest = new StopReplicaRequest();
stopRequest.setCoordinatorEpoch(1);
assertThatThrownBy(() -> guestTabletGateway.stopReplica(stopRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
"StopReplica is an internal RPC and cannot be called by external clients");

// Test 4: adjustIsr should reject external sessions
AdjustIsrRequest adjustRequest = new AdjustIsrRequest();
adjustRequest.setServerId(0);
assertThatThrownBy(() -> guestCoordinatorGateway.adjustIsr(adjustRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
"AdjustIsr is an internal RPC and cannot be called by external clients");
}

// Test 5: Even root user (super user) cannot call these RPCs from external sessions
Configuration rootClientConf =
new Configuration(FLUSS_CLUSTER_EXTENSION.getClientConfig("CLIENT"));
rootClientConf.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "sasl");
rootClientConf.set(ConfigOptions.CLIENT_SASL_MECHANISM, "plain");
rootClientConf.setString("client.security.sasl.username", "root");
rootClientConf.setString("client.security.sasl.password", "password");

try (RpcClient rootRpcClient =
RpcClient.create(rootClientConf, TestingClientMetricGroup.newInstance())) {

TabletServerGateway rootTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0),
rootRpcClient,
TabletServerGateway.class);

NotifyLeaderAndIsrRequest notifyRequest = new NotifyLeaderAndIsrRequest();
notifyRequest.setCoordinatorEpoch(1);
assertThatThrownBy(() -> rootTabletGateway.notifyLeaderAndIsr(notifyRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
"NotifyLeaderAndIsr is an internal RPC and cannot be called by external clients");
}

// Test 6: Verify internal sessions can call these RPCs
TabletServerGateway internalTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("FLUSS").get(0),
FLUSS_CLUSTER_EXTENSION.getRpcClient(),
TabletServerGateway.class);

// Internal connection should NOT throw "internal RPC" AuthorizationException
// (may fail for other reasons like invalid data, but not because it's external)
NotifyLeaderAndIsrRequest internalNotifyRequest = new NotifyLeaderAndIsrRequest();
internalNotifyRequest.setCoordinatorEpoch(1);

Throwable thrown =
catchThrowable(
() ->
internalTabletGateway
.notifyLeaderAndIsr(internalNotifyRequest)
.get());
if (thrown != null) {
// Should not be the "internal RPC" error message
assertThat(thrown)
.rootCause()
.message()
.doesNotContain("internal RPC and cannot be called by external clients");
}
}

private void assertNoTableDescribeAuth(ThrowableAssert.ThrowingCallable callable) {
assertThatThrownBy(callable)
.cause()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.AlterConfigOpType;
import org.apache.fluss.exception.ApiException;
import org.apache.fluss.exception.AuthorizationException;
import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidCoordinatorException;
Expand Down Expand Up @@ -760,6 +761,14 @@ public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {
}

public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest request) {
// This is an internal-only RPC, reject all external sessions
if (!currentSession().isInternal()) {
CompletableFuture<AdjustIsrResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(
new AuthorizationException(
"AdjustIsr is an internal RPC and cannot be called by external clients"));
return failedFuture;
}
CompletableFuture<AdjustIsrResponse> response = new CompletableFuture<>();
eventManagerSupplier
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,14 @@ public CompletableFuture<GetTableStatsResponse> getTableStats(GetTableStatsReque
@Override
public CompletableFuture<NotifyLeaderAndIsrResponse> notifyLeaderAndIsr(
NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest) {
// This is an internal-only RPC, reject all external sessions
if (!currentSession().isInternal()) {
CompletableFuture<NotifyLeaderAndIsrResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(
new AuthorizationException(
"NotifyLeaderAndIsr is an internal RPC and cannot be called by external clients"));
return failedFuture;
}
CompletableFuture<NotifyLeaderAndIsrResponse> response = new CompletableFuture<>();
List<NotifyLeaderAndIsrData> notifyLeaderAndIsrRequestData =
getNotifyLeaderAndIsrRequestData(notifyLeaderAndIsrRequest);
Expand All @@ -373,6 +381,14 @@ public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {

@Override
public CompletableFuture<UpdateMetadataResponse> updateMetadata(UpdateMetadataRequest request) {
// This is an internal-only RPC, reject all external sessions
if (!currentSession().isInternal()) {
CompletableFuture<UpdateMetadataResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(
new AuthorizationException(
"UpdateMetadata is an internal RPC and cannot be called by external clients"));
return failedFuture;
}
int coordinatorEpoch =
request.hasCoordinatorEpoch()
? request.getCoordinatorEpoch()
Expand All @@ -385,6 +401,14 @@ public CompletableFuture<UpdateMetadataResponse> updateMetadata(UpdateMetadataRe
@Override
public CompletableFuture<StopReplicaResponse> stopReplica(
StopReplicaRequest stopReplicaRequest) {
// This is an internal-only RPC, reject all external sessions
if (!currentSession().isInternal()) {
CompletableFuture<StopReplicaResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(
new AuthorizationException(
"StopReplica is an internal RPC and cannot be called by external clients"));
return failedFuture;
}
CompletableFuture<StopReplicaResponse> response = new CompletableFuture<>();
replicaManager.stopReplicas(
stopReplicaRequest.getCoordinatorEpoch(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.AuthorizationException;
import org.apache.fluss.exception.NotCoordinatorLeaderException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.rpc.GatewayClientProxy;
Expand Down Expand Up @@ -292,7 +292,8 @@ void testTabletServerRejectsStaleCoordinatorEpochAfterLeaderSwitch() throws Exce

TabletServerGateway tsGateway = createGatewayForTabletServer(tabletServer);

// Send request with old coordinator epoch — tablet server should reject it
// UpdateMetadata is an internal-only RPC - external clients should be rejected
// regardless of coordinator epoch
assertThatThrownBy(
() ->
tsGateway
Expand All @@ -303,7 +304,8 @@ void testTabletServerRejectsStaleCoordinatorEpochAfterLeaderSwitch() throws Exce
.satisfies(
t ->
assertThat(getRootCause(t))
.isInstanceOf(InvalidCoordinatorException.class));
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining("UpdateMetadata is an internal RPC"));
}

@Test
Expand Down
Loading