Skip to content

Commit e2a2cf3

Browse files
feat: Add support for control-client stall support (#776)
* feat(control-client-stall): adding stall feature for control client * adding more test and removing redundant content * adding GetStorageLayout API support * fixing lint issue * fix in setup.py * fix code formatting * add Prince review comment * Changed control-client unit tests to table-based as per review comment given by Prince. Also added a new unit test to test a bit more complex scenario of first stalling 2 times, and then trying out without stall to see that it works. * split stall and no-stall table-driven test cases * removing unrelated changes for cleanup * fix validations and handling of hns buckets in grpc server * add support for control-client stall duration in milliseconds * address review comment --------- Co-authored-by: raj-prince <princer@google.com>
1 parent 6f9cf7a commit e2a2cf3

8 files changed

Lines changed: 2217 additions & 3 deletions

File tree

README.md

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ is expected to be used by Storage library maintainers.
3535
- [Delete a Retry Test resource](#delete-a-retry-test-resource)
3636
- [Causing a failure using x-retry-test-id header](#causing-a-failure-using-x-retry-test-id-header)
3737
- [Forced Failures Supported](#forced-failures-supported)
38+
- [Storage Control API Stall Support](#storage-control-api-stall-support)
3839
- [Developing for the testbench](#developing-for-the-testbench)
3940
- [Writing and running tests](#writing-and-running-tests)
4041
- [Releasing the testbench](#releasing-the-testbench)
@@ -278,11 +279,42 @@ curl -H "x-retry-test-id: 1d05c20627844214a9ff7cbcf696317d" "http://localhost:91
278279
| redirect-send-handle-and-token-T | [HTTP] Unsupported [GRPC] Testbench will fail the RPC with `ABORTED` and include appropriate redirection error details.
279280
| return-X-if-dp-enforced | [HTTP] Unsupported [GRPC] Testbench will fail with the equivalent gRPC error to the HTTP code provided for X, but only if DirectPath is enforced.
280281

282+
## Storage Control API Stall Support
283+
284+
The testbench supports stall functionality for the Storage Control API (gRPC only) to test client retry behavior. All folder operations and storage layout operations can be delayed using the `x-goog-emulator-instructions` metadata header.
285+
286+
**Supported operations:**
287+
- **Folder operations:** `CreateFolder`, `DeleteFolder`, `GetFolder`, `ListFolders`, `RenameFolder`
288+
- **Storage layout operations:** `GetStorageLayout`
289+
290+
> **Note:** The Storage Control API uses the **same gRPC server** as the Storage API. Both services are available on the same port (e.g., port 8888 if started with `curl "http://localhost:9000/start_grpc?port=8888"`).
291+
292+
**Supported stall instruction:**
293+
- `stall-for-Ns`: Stalls for N seconds (e.g., `stall-for-3s` stalls for 3 seconds)
294+
295+
**Example usage in Python:**
296+
```python
297+
import grpc
298+
from google.storage.control.v2 import storage_control_pb2, storage_control_pb2_grpc
299+
300+
# Connect to the same gRPC server port (8888) started earlier
301+
channel = grpc.insecure_channel('localhost:8888')
302+
stub = storage_control_pb2_grpc.StorageControlStub(channel)
303+
304+
# Create folder with 2-second stall
305+
metadata = [('x-goog-emulator-instructions', 'stall-for-2s')]
306+
request = storage_control_pb2.CreateFolderRequest(
307+
parent="projects/_/buckets/test-bucket",
308+
folder_id="test-folder"
309+
)
310+
response = stub.CreateFolder(request, metadata=metadata)
311+
```
312+
281313
## Developing for the testbench
282314

283315
### Writing and running tests
284316

285-
Tests are located in the `tests/` directory. To run the tests locally, use
317+
Tests are located in the `tests/` directory. To run the tests locally, use
286318

287319
```bash
288320
python -m unittest [test_module.py] # runs all the tests in test_module.py
@@ -300,4 +332,4 @@ Steps:
300332
1. Title "v0.x.x"
301333
1. Click Generate release notes
302334
1. Make sure "Set as the latest release" is checked
303-
1. Click "Publish Release" to release
335+
1. Click "Publish Release" to release

google/storage/control/v2/storage_control_pb2.py

Lines changed: 418 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

google/storage/control/v2/storage_control_pb2_grpc.py

Lines changed: 1275 additions & 0 deletions
Large diffs are not rendered by default.

setup.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
"Operating System :: OS Independent",
3636
],
3737
packages=[
38+
"google",
39+
"google/storage",
40+
"google/storage/control",
41+
"google/storage/control/v2",
3842
"google/storage/v2",
3943
"google/iam/v1",
4044
"testbench",

testbench/database.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def __init__(
3939
retry_tests,
4040
supported_methods,
4141
soft_deleted_objects,
42+
folders=None,
4243
):
4344
self._resources_lock = threading.RLock()
4445
self._buckets = buckets
@@ -59,9 +60,12 @@ def __init__(
5960
self._projects_lock = threading.RLock()
6061
self._projects = {}
6162

63+
self._folders_lock = threading.RLock()
64+
self._folders = folders if folders is not None else {}
65+
6266
@classmethod
6367
def init(cls):
64-
return cls({}, {}, {}, {}, {}, {}, [], {})
68+
return cls({}, {}, {}, {}, {}, {}, [], {}, {})
6569

6670
def clear(self):
6771
"""Clear all data except for the supported method list."""
@@ -76,6 +80,8 @@ def clear(self):
7680
self._rewrites = {}
7781
with self._retry_tests_lock:
7882
self._retry_tests = {}
83+
with self._folders_lock:
84+
self._folders = {}
7985
# The list of supported methods for `retry_test` is defined via flask
8086
# decorators, it should remain unchanged after the test or application
8187
# is initialized. Arguably this means it should be in a global variable.
@@ -791,3 +797,58 @@ def delete_retry_test(self, retry_test_id):
791797
with self._retry_tests_lock:
792798
self.get_retry_test(retry_test_id)
793799
del self._retry_tests[retry_test_id]
800+
801+
# === FOLDER OPERATIONS === #
802+
803+
def insert_folder(self, folder_name, folder, context):
804+
"""Insert a folder into the database."""
805+
with self._folders_lock:
806+
if folder_name in self._folders:
807+
testbench.error.already_exists(
808+
"Folder %s already exists" % folder_name, context
809+
)
810+
self._folders[folder_name] = folder
811+
return folder
812+
813+
def get_folder(self, folder_name, context):
814+
"""Get a folder from the database."""
815+
with self._folders_lock:
816+
folder = self._folders.get(folder_name)
817+
if folder is None:
818+
testbench.error.notfound("Folder %s" % folder_name, context)
819+
return folder
820+
821+
def delete_folder(self, folder_name, context):
822+
"""Delete a folder from the database."""
823+
with self._folders_lock:
824+
if folder_name not in self._folders:
825+
testbench.error.notfound("Folder %s" % folder_name, context)
826+
del self._folders[folder_name]
827+
828+
def list_folders(self, bucket_name, prefix, context):
829+
"""List folders in a bucket with optional prefix filter."""
830+
with self._folders_lock:
831+
folders = []
832+
for folder_name, folder in self._folders.items():
833+
# Filter by bucket
834+
if not folder_name.startswith(bucket_name):
835+
continue
836+
# Filter by prefix if provided
837+
if prefix and not folder_name.startswith(f"{bucket_name}/{prefix}"):
838+
continue
839+
folders.append(folder)
840+
return folders
841+
842+
def rename_folder(self, src_folder_name, dst_folder_name, context):
843+
"""Rename a folder."""
844+
with self._folders_lock:
845+
if src_folder_name not in self._folders:
846+
testbench.error.notfound("Source folder %s" % src_folder_name, context)
847+
if dst_folder_name in self._folders:
848+
testbench.error.already_exists(
849+
"Destination folder %s already exists" % dst_folder_name, context
850+
)
851+
folder = self._folders[src_folder_name]
852+
del self._folders[src_folder_name]
853+
self._folders[dst_folder_name] = folder
854+
return folder

testbench/grpc_server.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import gcs
4040
import testbench
4141
from google.iam.v1 import iam_policy_pb2
42+
from google.storage.control.v2 import storage_control_pb2, storage_control_pb2_grpc
4243
from google.storage.v2 import storage_pb2, storage_pb2_grpc
4344

4445
_GRPC_SERVER_THREAD_COUNT = 2
@@ -1163,13 +1164,122 @@ def QueryWriteStatus(self, request, context):
11631164
return storage_pb2.QueryWriteStatusResponse(persisted_size=len(upload.media))
11641165

11651166

1167+
# === STORAGE CONTROL SERVICER === #
1168+
1169+
1170+
@decorate_all_rpc_methods
1171+
class StorageControlServicer(storage_control_pb2_grpc.StorageControlServicer):
1172+
"""Implements the google.storage.control.v2.StorageControl gRPC service."""
1173+
1174+
def __init__(self, db, echo_metadata=False):
1175+
self.db = db
1176+
self.db.insert_test_bucket()
1177+
self.echo_metadata = echo_metadata
1178+
1179+
def _apply_stall(self, context):
1180+
"""Check for stall instructions and apply delay if needed."""
1181+
import time
1182+
1183+
instruction = testbench.common.extract_instruction(None, context)
1184+
if instruction and "stall" in instruction:
1185+
# Parse stall instruction (e.g., "stall-for-1s" or "stall-for-500ms")
1186+
if instruction.startswith("stall-for-"):
1187+
# Check for milliseconds.
1188+
match_ms = re.match(r"stall-for-(\d+)ms", instruction)
1189+
if match_ms:
1190+
time.sleep(int(match_ms.group(1)) / 1000.0)
1191+
return
1192+
# Check for seconds.
1193+
match_s = re.match(r"stall-for-(\d+)s", instruction)
1194+
if match_s:
1195+
time.sleep(int(match_s.group(1)))
1196+
1197+
@retry_test(method="storage.folders.create")
1198+
def CreateFolder(self, request, context):
1199+
self._apply_stall(context)
1200+
# Create a simple folder metadata
1201+
folder = storage_control_pb2.Folder()
1202+
# The name should include the full path
1203+
folder.name = f"{request.parent}/folders/{request.folder_id}"
1204+
folder.metageneration = 1
1205+
folder.create_time.FromDatetime(datetime.datetime.now(datetime.timezone.utc))
1206+
folder.update_time.CopyFrom(folder.create_time)
1207+
1208+
# Store in database using full name as key
1209+
self.db.insert_folder(folder.name, folder, context)
1210+
return folder
1211+
1212+
@retry_test(method="storage.folders.delete")
1213+
def DeleteFolder(self, request, context):
1214+
self._apply_stall(context)
1215+
folder_key = request.name
1216+
self.db.delete_folder(folder_key, context)
1217+
return empty_pb2.Empty()
1218+
1219+
@retry_test(method="storage.folders.get")
1220+
def GetFolder(self, request, context):
1221+
self._apply_stall(context)
1222+
folder_key = request.name
1223+
return self.db.get_folder(folder_key, context)
1224+
1225+
@retry_test(method="storage.folders.list")
1226+
def ListFolders(self, request, context):
1227+
self._apply_stall(context)
1228+
# Extract bucket from parent (format: "projects/_/buckets/{bucket}")
1229+
bucket_name = request.parent
1230+
prefix = request.prefix if hasattr(request, "prefix") else ""
1231+
1232+
folders = self.db.list_folders(bucket_name, prefix, context)
1233+
return storage_control_pb2.ListFoldersResponse(folders=folders)
1234+
1235+
@retry_test(method="storage.folders.rename")
1236+
def RenameFolder(self, request, context):
1237+
self._apply_stall(context)
1238+
src_folder = request.name
1239+
dst_folder = request.destination_folder_id
1240+
1241+
folder = self.db.rename_folder(src_folder, dst_folder, context)
1242+
return folder
1243+
1244+
@retry_test(method="storage.storageLayout.get")
1245+
def GetStorageLayout(self, request, context):
1246+
self._apply_stall(context)
1247+
1248+
# Extract bucket path from request.name which is "projects/_/buckets/bucket_name/storageLayout"
1249+
bucket_path = request.name.replace("/storageLayout", "")
1250+
bucket = self.db.get_bucket(bucket_path, context)
1251+
1252+
if bucket is None:
1253+
return None
1254+
1255+
# Create a simple storage layout response
1256+
layout = storage_control_pb2.StorageLayout()
1257+
layout.name = request.name
1258+
1259+
# Set default location and location_type
1260+
layout.location = bucket.metadata.location if bucket.metadata.location else "US"
1261+
layout.location_type = "multi-region"
1262+
# Set hierarchical namespace enabled flag based on bucket metadata
1263+
if (
1264+
bucket.metadata.hierarchical_namespace
1265+
and bucket.metadata.hierarchical_namespace.enabled
1266+
):
1267+
layout.hierarchical_namespace.enabled = True
1268+
else:
1269+
layout.hierarchical_namespace.enabled = False
1270+
return layout
1271+
1272+
11661273
def run(port, database, echo_metadata=False):
11671274
server = grpc.server(
11681275
futures.ThreadPoolExecutor(max_workers=_GRPC_SERVER_THREAD_COUNT)
11691276
)
11701277
storage_pb2_grpc.add_StorageServicer_to_server(
11711278
StorageServicer(database, echo_metadata), server
11721279
)
1280+
storage_control_pb2_grpc.add_StorageControlServicer_to_server(
1281+
StorageControlServicer(database, echo_metadata), server
1282+
)
11731283
port = server.add_insecure_port("0.0.0.0:%d" % port)
11741284
server.start()
11751285
return port, server

0 commit comments

Comments
 (0)