diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 0ac000359d..4ee3b82617 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -51,11 +51,15 @@ import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest; +import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; import org.apache.fluss.rpc.messages.ControlledShutdownRequest; import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest; import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.messages.InitWriterResponse; import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest; +import org.apache.fluss.rpc.messages.NotifyLakeTableOffsetRequest; import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest; import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; import org.apache.fluss.security.acl.AccessControlEntry; @@ -1370,6 +1374,226 @@ void testTableExistsAuthorization() throws Exception { rootAdmin.dropTable(testTablePath, true).get(); } + @Test + void testSnapshotManagementAuthorization() throws Exception { + // These RPCs are internal-only, so we test via direct gateway access + try (RpcClient rpcClient = + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { + + TabletServerGateway guestTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0), + rpcClient, + TabletServerGateway.class); + + CoordinatorGateway guestCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), + rpcClient, + CoordinatorGateway.class); + + // Test 1: notifyKvSnapshotOffset without WRITE permission + NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest(); + notifyKvRequest.setTableId(1L); + notifyKvRequest.setBucketId(0); + notifyKvRequest.setCoordinatorEpoch(1); + notifyKvRequest.setMinRetainOffset(0L); + assertThatThrownBy( + () -> guestTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 2: notifyLakeTableOffset without WRITE permission + NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest(); + notifyLakeRequest.setCoordinatorEpoch(1); + assertThatThrownBy( + () -> guestTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 3: commitKvSnapshot without WRITE permission + CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest(); + commitKvRequest.setCompletedSnapshot(new byte[0]); + commitKvRequest.setCoordinatorEpoch(1); + commitKvRequest.setBucketLeaderEpoch(1); + assertThatThrownBy( + () -> guestCoordinatorGateway.commitKvSnapshot(commitKvRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 4: commitLakeTableSnapshot without WRITE permission + CommitLakeTableSnapshotRequest commitLakeRequest = new CommitLakeTableSnapshotRequest(); + assertThatThrownBy( + () -> + guestCoordinatorGateway + .commitLakeTableSnapshot(commitLakeRequest) + .get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + } + + // Test 5: Grant CLUSTER/WRITE permission and verify operations succeed + List aclBindings = + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.WRITE, + PermissionType.ALLOW))); + rootAdmin.createAcls(aclBindings).all().get(); + FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); + + try (RpcClient authorizedRpcClient = + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { + + TabletServerGateway authorizedTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0), + authorizedRpcClient, + TabletServerGateway.class); + + CoordinatorGateway authorizedCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), + authorizedRpcClient, + CoordinatorGateway.class); + + // Test notifyKvSnapshotOffset with permission + NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest(); + notifyKvRequest.setTableId(1L); + notifyKvRequest.setBucketId(0); + notifyKvRequest.setCoordinatorEpoch(1); + notifyKvRequest.setMinRetainOffset(0L); + Throwable thrown1 = + catchThrowable( + () -> + authorizedTabletGateway + .notifyKvSnapshotOffset(notifyKvRequest) + .get()); + if (thrown1 != null) { + assertThat(thrown1).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Test notifyLakeTableOffset with permission + NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest(); + notifyLakeRequest.setCoordinatorEpoch(1); + Throwable thrown2 = + catchThrowable( + () -> + authorizedTabletGateway + .notifyLakeTableOffset(notifyLakeRequest) + .get()); + if (thrown2 != null) { + assertThat(thrown2).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Test commitKvSnapshot with permission + CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest(); + commitKvRequest.setCompletedSnapshot(new byte[0]); + commitKvRequest.setCoordinatorEpoch(1); + commitKvRequest.setBucketLeaderEpoch(1); + Throwable thrown3 = + catchThrowable( + () -> + authorizedCoordinatorGateway + .commitKvSnapshot(commitKvRequest) + .get()); + if (thrown3 != null) { + assertThat(thrown3).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Test commitLakeTableSnapshot with permission + CommitLakeTableSnapshotRequest commitLakeRequest = new CommitLakeTableSnapshotRequest(); + Throwable thrown4 = + catchThrowable( + () -> + authorizedCoordinatorGateway + .commitLakeTableSnapshot(commitLakeRequest) + .get()); + if (thrown4 != null) { + assertThat(thrown4).rootCause().isNotInstanceOf(AuthorizationException.class); + } + } + + // Test 6: Verify internal sessions bypass authorization + TabletServerGateway internalTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("FLUSS").get(0), + FLUSS_CLUSTER_EXTENSION.getRpcClient(), + TabletServerGateway.class); + + CoordinatorGateway internalCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("FLUSS"), + FLUSS_CLUSTER_EXTENSION.getRpcClient(), + CoordinatorGateway.class); + + // Internal connections should NOT throw AuthorizationException + NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest(); + notifyKvRequest.setTableId(1L); + notifyKvRequest.setBucketId(0); + notifyKvRequest.setCoordinatorEpoch(1); + notifyKvRequest.setMinRetainOffset(0L); + Throwable thrown5 = + catchThrowable( + () -> internalTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get()); + if (thrown5 != null) { + assertThat(thrown5).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest(); + notifyLakeRequest.setCoordinatorEpoch(1); + Throwable thrown6 = + catchThrowable( + () -> internalTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get()); + if (thrown6 != null) { + assertThat(thrown6).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest(); + commitKvRequest.setCompletedSnapshot(new byte[0]); + commitKvRequest.setCoordinatorEpoch(1); + commitKvRequest.setBucketLeaderEpoch(1); + Throwable thrown7 = + catchThrowable( + () -> internalCoordinatorGateway.commitKvSnapshot(commitKvRequest).get()); + if (thrown7 != null) { + assertThat(thrown7).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + CommitLakeTableSnapshotRequest commitLakeRequest = new CommitLakeTableSnapshotRequest(); + Throwable thrown8 = + catchThrowable( + () -> + internalCoordinatorGateway + .commitLakeTableSnapshot(commitLakeRequest) + .get()); + if (thrown8 != null) { + assertThat(thrown8).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Cleanup + rootAdmin.dropAcls(Collections.singletonList(AclBindingFilter.ANY)).all().get(); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index bdc97434ec..4a1dbb1ca2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -198,6 +198,7 @@ import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings; +import static org.apache.fluss.security.acl.OperationType.WRITE; import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.getGoalByType; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.addTableOffsetsToResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath; @@ -770,6 +771,9 @@ public CompletableFuture adjustIsr(AdjustIsrRequest request) @Override public CompletableFuture commitKvSnapshot( CommitKvSnapshotRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); // parse completed snapshot from request byte[] completedSnapshotBytes = request.getCompletedSnapshot(); @@ -870,6 +874,9 @@ public CompletableFuture prepareLakeTableSnaps @Override public CompletableFuture commitLakeTableSnapshot( CommitLakeTableSnapshotRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier .get() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 88731daaba..dc83172491 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -432,6 +432,9 @@ public CompletableFuture notifyRemoteLogOffsets( @Override public CompletableFuture notifyKvSnapshotOffset( NotifyKvSnapshotOffsetRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); replicaManager.notifyKvSnapshotOffset( getNotifySnapshotOffsetData(request), response::complete); @@ -441,6 +444,9 @@ public CompletableFuture notifyKvSnapshotOffset( @Override public CompletableFuture notifyLakeTableOffset( NotifyLakeTableOffsetRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); replicaManager.notifyLakeTableOffset(getNotifyLakeTableOffset(request), response::complete); return response;