diff --git a/.github/workflows/deploy_gae.yml b/.github/workflows/deploy_gae.yml new file mode 100644 index 0000000000..a88b5403b9 --- /dev/null +++ b/.github/workflows/deploy_gae.yml @@ -0,0 +1,79 @@ +name: Build + +on: [push, pull_request] + +jobs: + build-and-deploy: + permissions: + contents: "read" + id-token: "write" + deployments: "write" + strategy: + matrix: + os: + - "ubuntu-latest" + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v4 + with: + # Need full history to determine version number. + fetch-depth: 0 + - uses: actions/setup-node@v4 + with: + node-version: 22.x + cache: "npm" + cache-dependency-path: | + package-lock.json + examples/**/package-lock.json + - run: npm install + - name: Typecheck with TypeScript + run: npm run typecheck + - name: Get branch name (merge) + if: github.event_name != 'pull_request' + shell: bash + run: echo "BRANCH_NAME=$(echo ${GITHUB_REF#refs/heads/} | tr / -)" >> $GITHUB_ENV + - name: Get branch name (pull request) + if: github.event_name == 'pull_request' + shell: bash + run: echo "BRANCH_NAME=$(echo ${GITHUB_HEAD_REF} | tr / -)" >> $GITHUB_ENV + - run: echo "BRANCH_NAME_URL=$(echo ${{ env.BRANCH_NAME }} | tr / - | tr _ -)" >> $GITHUB_ENV + - name: Get build info + run: echo "BUILD_INFO={\"tag\":\"$(git describe --always --tags)\", \"url\":\"https://github.com/${{github.repository}}/commit/$(git rev-parse HEAD)\", \"timestamp\":\"$(date)\", \"branch\":\"${{github.repository}}/${{env.BRANCH_NAME}}\"}" >> $GITHUB_ENV + shell: bash + # - name: Check for dirty working directory + # run: git diff --exit-code + - name: Build client bundles + run: npm run build -- --no-typecheck --no-lint --define STATE_SERVERS=$(cat config/state_servers.json | tr -d " \t\n\r") --define NEUROGLANCER_BUILD_INFO='${{ env.BUILD_INFO }}' --define NEUROGLANCER_CUSTOM_INPUT_BINDINGS=$(cat config/custom-keybinds.json | tr -d " \t\n\r") --define NEUROGLANCER_SEGMENT_LIST_COLOR_WIDGET=true + - name: Write build info + run: echo $BUILD_INFO > ./dist/client/version.json + shell: bash + - run: cp -r ./dist/client appengine/frontend/static/ + - name: start deployment + uses: bobheadxi/deployments@v1 + id: deployment + with: + step: start + token: ${{ secrets.GITHUB_TOKEN }} + env: ${{ env.BRANCH_NAME }} + desc: Setting up staging deployment for ${{ env.BRANCH_NAME }} + - id: "auth" + uses: "google-github-actions/auth@v1" + with: + workload_identity_provider: "projects/483670036293/locations/global/workloadIdentityPools/neuroglancer-github/providers/github" + service_account: "chris-apps-deploy@seung-lab.iam.gserviceaccount.com" + - id: deploy + uses: google-github-actions/deploy-appengine@main + with: + version: ${{ env.BRANCH_NAME_URL }} + deliverables: appengine/frontend/app.yaml + promote: false + - name: update deployment status + uses: bobheadxi/deployments@v1 + if: always() + with: + step: finish + token: ${{ secrets.GITHUB_TOKEN }} + env: ${{ steps.deployment.outputs.env }} + env_url: ${{ steps.deploy.outputs.url }} + status: ${{ job.status }} + deployment_id: ${{ steps.deployment.outputs.deployment_id }} diff --git a/appengine/frontend/app.yaml b/appengine/frontend/app.yaml new file mode 100644 index 0000000000..7d375d9d1b --- /dev/null +++ b/appengine/frontend/app.yaml @@ -0,0 +1,19 @@ +runtime: python312 + +service: neuroglancer + +handlers: + # Handle the main page by serving the index page. + # Note the $ to specify the end of the path, since app.yaml does prefix matching. + - url: /$ + static_files: static/index.html + upload: static/index.html + login: optional + secure: always + redirect_http_response_code: 301 + + - url: / + static_dir: static + login: optional + secure: always + redirect_http_response_code: 301 diff --git a/config/custom-keybinds.json b/config/custom-keybinds.json new file mode 100644 index 0000000000..acf8855508 --- /dev/null +++ b/config/custom-keybinds.json @@ -0,0 +1,39 @@ +{ + "keym": { + "action": { + "layerType": "segmentation", + "tool": "grapheneMergeSegments", + "provider": "graphene" + } + }, + "keyc": { + "action": { + "layerType": "segmentation", + "tool": "grapheneMulticutSegments", + "provider": "graphene" + } + }, + "keyf": { + "action": { + "layerType": "segmentation", + "tool": "grapheneFindPath", + "provider": "graphene" + } + }, + "keyx": { + "action": false + }, + "control+shift+keyx": { + "action": "clear-segments" + }, + "bracketleft": [ + { "action": false, "context": "sliceView" }, + { "action": false, "context": "perspectiveView" }, + { "action": "select-previous" } + ], + "bracketright": [ + { "action": false, "context": "sliceView" }, + { "action": false, "context": "perspectiveView" }, + { "action": "select-next" } + ] +} diff --git a/config/state_servers.json b/config/state_servers.json new file mode 100644 index 0000000000..279a63557d --- /dev/null +++ b/config/state_servers.json @@ -0,0 +1,6 @@ +{ + "cave": { + "url": "middleauth+https://global.daf-apis.com/nglstate/api/v1/post", + "default": true + } +} diff --git a/package.json b/package.json index 6be1444942..7f0b2d0225 100644 --- a/package.json +++ b/package.json @@ -428,6 +428,12 @@ "neuroglancer/kvstore/icechunk:disabled": "./src/util/false.ts", "default": "./src/kvstore/icechunk/register_backend.ts" }, + "#kvstore/kvstack/register": { + "neuroglancer/kvstore/kvstack:enabled": "./src/kvstore/kvstack/register.ts", + "neuroglancer/kvstore:none_by_default": "./src/util/false.ts", + "neuroglancer/kvstore/kvstack:disabled": "./src/util/false.ts", + "default": "./src/kvstore/kvstack/register.ts" + }, "#kvstore/middleauth/register_frontend": { "neuroglancer/kvstore/middleauth:enabled": "./src/kvstore/middleauth/register_frontend.ts", "neuroglancer/kvstore:none_by_default": "./src/util/false.ts", diff --git a/src/chunk_manager/generic_file_source.ts b/src/chunk_manager/generic_file_source.ts index 45937f4a27..e25a443a8e 100644 --- a/src/chunk_manager/generic_file_source.ts +++ b/src/chunk_manager/generic_file_source.ts @@ -69,6 +69,22 @@ export class SimpleAsyncCache extends ChunkSourceBase { progressOptions: ProgressOptions, ) => Promise<{ size: number; data: Value }>; + invalidate(key: Key) { + const encodedKey = this.encodeKeyFunction(key); + const chunk = this.chunks.get(encodedKey); + if (chunk !== undefined) { + chunk.freeSystemMemory(); + this.chunkManager.queueManager.updateChunkState(chunk, ChunkState.QUEUED); + } + } + + invalidateAll() { + for (const chunk of this.chunks.values()) { + chunk.freeSystemMemory(); + this.chunkManager.queueManager.updateChunkState(chunk, ChunkState.QUEUED); + } + } + get(key: Key, options: Partial): Promise { const encodedKey = this.encodeKeyFunction(key); let chunk = this.chunks.get(encodedKey); diff --git a/src/datasource/graphene/backend.ts b/src/datasource/graphene/backend.ts index 57985b1090..7727f4ebb2 100644 --- a/src/datasource/graphene/backend.ts +++ b/src/datasource/graphene/backend.ts @@ -28,6 +28,7 @@ import type { } from "#src/datasource/graphene/base.js"; import { getGrapheneFragmentKey, + GRAPHENE_INVALIDATE_OCDBT_RPC_ID, GRAPHENE_MESH_NEW_SEGMENT_RPC_ID, ChunkedGraphSourceParameters, MeshSourceParameters, @@ -42,6 +43,7 @@ import { decodeManifestChunk } from "#src/datasource/precomputed/backend.js"; import { WithSharedKvStoreContextCounterpart } from "#src/kvstore/backend.js"; import type { KvStoreWithPath, ReadResponse } from "#src/kvstore/index.js"; import { readKvStore } from "#src/kvstore/index.js"; +import { invalidateOcdbtCaches } from "#src/kvstore/ocdbt/metadata_cache.js"; import type { FragmentChunk, ManifestChunk } from "#src/mesh/backend.js"; import { assignMeshFragmentData, MeshSource } from "#src/mesh/backend.js"; import { decodeDraco } from "#src/mesh/draco/index.js"; @@ -642,3 +644,8 @@ registerRPC(GRAPHENE_MESH_NEW_SEGMENT_RPC_ID, function (x) { const obj = this.get(x.rpcId); obj.addNewSegment(x.segment); }); + +registerRPC(GRAPHENE_INVALIDATE_OCDBT_RPC_ID, function (x) { + const source = this.get(x.layerId) as GrapheneChunkedGraphChunkSource; + invalidateOcdbtCaches(source.sharedKvStoreContext); +}); diff --git a/src/datasource/graphene/base.ts b/src/datasource/graphene/base.ts index 844cddf352..7759660c29 100644 --- a/src/datasource/graphene/base.ts +++ b/src/datasource/graphene/base.ts @@ -32,6 +32,7 @@ import type { FetchOk, HttpError } from "#src/util/http_request.js"; export const PYCG_APP_VERSION = 1; export const GRAPHENE_MESH_NEW_SEGMENT_RPC_ID = "GrapheneMeshSource:NewSegment"; +export const GRAPHENE_INVALIDATE_OCDBT_RPC_ID = "Graphene:InvalidateOcdbt"; export enum VolumeChunkEncoding { RAW = 0, diff --git a/src/datasource/graphene/frontend.ts b/src/datasource/graphene/frontend.ts index bd3b583e8a..aac851ed31 100644 --- a/src/datasource/graphene/frontend.ts +++ b/src/datasource/graphene/frontend.ts @@ -50,6 +50,7 @@ import { CHUNKED_GRAPH_RENDER_LAYER_UPDATE_SOURCES_RPC_ID, ChunkedGraphSourceParameters, getGrapheneFragmentKey, + GRAPHENE_INVALIDATE_OCDBT_RPC_ID, GRAPHENE_MESH_NEW_SEGMENT_RPC_ID, isBaseSegmentId, makeChunkedGraphChunkSpecification, @@ -74,6 +75,7 @@ import { getSegmentPropertyMap, parseMultiscaleVolumeInfo, PrecomputedMultiscaleVolumeChunkSource, + PrecomputedVolumeChunkSource, } from "#src/datasource/precomputed/frontend.js"; import { WithSharedKvStoreContext } from "#src/kvstore/chunk_source_frontend.js"; import type { SharedKvStoreContext } from "#src/kvstore/frontend.js"; @@ -122,6 +124,7 @@ import { SegmentationGraphSourceConnection, } from "#src/segmentation_graph/source.js"; import { SharedWatchableValue } from "#src/shared_watchable_value.js"; +import { readSingleChannelValueUint64 } from "#src/sliceview/compressed_segmentation/decode_common.js"; import type { FrontendTransformedSource, SliceViewSingleResolutionSource, @@ -133,6 +136,9 @@ import { } from "#src/sliceview/frontend.js"; import type { SliceViewRenderLayer } from "#src/sliceview/renderlayer.js"; import { SliceViewPanelRenderLayer } from "#src/sliceview/renderlayer.js"; +import type { VolumeSourceOptions } from "#src/sliceview/volume/base.js"; +import { makeDefaultVolumeChunkSpecifications } from "#src/sliceview/volume/base.js"; +import type { VolumeChunkSource } from "#src/sliceview/volume/frontend.js"; import { StatusMessage } from "#src/status.js"; import { TrackableBoolean, @@ -165,6 +171,7 @@ import { registerTool, } from "#src/ui/tool.js"; import { Uint64Set } from "#src/uint64_set.js"; +import { transposeNestedArrays } from "#src/util/array.js"; import { packColor } from "#src/util/color.js"; import type { Owned } from "#src/util/disposable.js"; import { RefCounted } from "#src/util/disposable.js"; @@ -276,6 +283,7 @@ const N_BITS_FOR_LAYER_ID_DEFAULT = 8; class GraphInfo { chunkSize: vec3; nBitsForLayerId: number; + ocdbtKvstoreSpec: { driver?: string; base?: unknown } | undefined; constructor(obj: any) { verifyObject(obj); this.chunkSize = verifyObjectProperty(obj, "chunk_size", (x) => @@ -287,11 +295,22 @@ class GraphInfo { verifyPositiveInt, N_BITS_FOR_LAYER_ID_DEFAULT, ); + this.ocdbtKvstoreSpec = verifyOptionalObjectProperty( + obj, + "ocdbt_kvstore_spec", + (x) => { + verifyObject(x); + return x as { driver?: string; base?: unknown }; + }, + undefined, + ); } } interface GrapheneMultiscaleVolumeInfo extends MultiscaleVolumeInfo { dataUrl: string; + ocdbtDataUrl: string | undefined; + ocdbtScales: Set; app: AppInfo; graph: GraphInfo; } @@ -304,15 +323,31 @@ function parseGrapheneMultiscaleVolumeInfo( const dataUrl = verifyObjectProperty(obj, "data_dir", verifyString); const app = verifyObjectProperty(obj, "app", (x) => new AppInfo(url, x)); const graph = verifyObjectProperty(obj, "graph", (x) => new GraphInfo(x)); + let ocdbtDataUrl: string | undefined; + if (graph.ocdbtKvstoreSpec) { + const spec = graph.ocdbtKvstoreSpec; + if (spec.driver !== "ocdbt" || !spec.base) { + throw new Error( + "graph.ocdbt_kvstore_spec must have driver=ocdbt and a base", + ); + } + const kvstackUrl = `kvstack:${encodeURIComponent(JSON.stringify(spec.base))}`; + ocdbtDataUrl = `${kvstackUrl}|ocdbt:`; + } return { ...volumeInfo, app, graph, dataUrl, + ocdbtDataUrl, + ocdbtScales: new Set(), }; } class GrapheneMultiscaleVolumeChunkSource extends PrecomputedMultiscaleVolumeChunkSource { + private volumeChunkSources: VolumeChunkSource[] = []; + private chunkedGraphChunkSource: GrapheneChunkedGraphChunkSource | undefined; + constructor( sharedKvStoreContext: SharedKvStoreContext, public info: GrapheneMultiscaleVolumeInfo, @@ -320,6 +355,189 @@ class GrapheneMultiscaleVolumeChunkSource extends PrecomputedMultiscaleVolumeChu super(sharedKvStoreContext, info.dataUrl, info); } + resolveScaleUrl(scaleKey: string): string { + const { ocdbtDataUrl, ocdbtScales } = this.info; + const baseUrl = + ocdbtDataUrl && ocdbtScales.has(scaleKey) ? ocdbtDataUrl : this.url; + return kvstoreEnsureDirectoryPipelineUrl( + this.sharedKvStoreContext.kvStoreContext.resolveRelativePath( + baseUrl, + scaleKey, + ), + ); + } + + getSources(volumeSourceOptions: VolumeSourceOptions) { + const modelResolution = this.info.scales[0].resolution; + const { rank } = this; + const sources = transposeNestedArrays( + this.info.scales + .filter((x) => !x.hidden) + .filter((x) => x.key !== "placeholder") + .filter( + (x) => + this.info.graph.ocdbtKvstoreSpec === undefined || + this.info.ocdbtScales.has(x.key), + ) + .map((scaleInfo) => { + const { resolution } = scaleInfo; + const stride = rank + 1; + const chunkToMultiscaleTransform = new Float32Array(stride * stride); + chunkToMultiscaleTransform[chunkToMultiscaleTransform.length - 1] = 1; + const { lowerBounds: baseLowerBound, upperBounds: baseUpperBound } = + this.info.modelSpace.boundingBoxes[0].box; + const lowerClipBound = new Float32Array(rank); + const upperClipBound = new Float32Array(rank); + for (let i = 0; i < 3; ++i) { + const relativeScale = resolution[i] / modelResolution[i]; + chunkToMultiscaleTransform[stride * i + i] = relativeScale; + const voxelOffsetValue = scaleInfo.voxelOffset[i]; + chunkToMultiscaleTransform[stride * rank + i] = + voxelOffsetValue * relativeScale; + lowerClipBound[i] = + baseLowerBound[i] / relativeScale - voxelOffsetValue; + upperClipBound[i] = + baseUpperBound[i] / relativeScale - voxelOffsetValue; + } + if (rank === 4) { + chunkToMultiscaleTransform[stride * 3 + 3] = 1; + lowerClipBound[3] = baseLowerBound[3]; + upperClipBound[3] = baseUpperBound[3]; + } + return makeDefaultVolumeChunkSpecifications({ + rank, + dataType: this.dataType, + chunkToMultiscaleTransform, + upperVoxelBound: scaleInfo.size, + volumeType: this.volumeType, + chunkDataSizes: scaleInfo.chunkSizes, + baseVoxelOffset: scaleInfo.voxelOffset, + compressedSegmentationBlockSize: + scaleInfo.compressedSegmentationBlockSize, + volumeSourceOptions, + }).map( + (spec): SliceViewSingleResolutionSource => ({ + chunkSource: this.chunkManager.getChunkSource( + PrecomputedVolumeChunkSource, + { + sharedKvStoreContext: this.sharedKvStoreContext, + spec, + parameters: { + url: this.resolveScaleUrl(scaleInfo.key), + encoding: scaleInfo.encoding, + sharding: scaleInfo.sharding, + }, + }, + ), + chunkToMultiscaleTransform, + lowerClipBound, + upperClipBound, + }), + ); + }), + ); + this.volumeChunkSources = sources.flat().map((s) => s.chunkSource); + return sources; + } + + // Invalidate just the OCDBT metadata caches (manifest / btree / version) + // on the backend without re-queueing volume chunks. Used before a + // targeted base-chunk read to ensure that read goes to the network + // instead of trusting potentially-stale cached metadata. + invalidateOcdbtMetadata() { + if (this.info.graph.ocdbtKvstoreSpec && this.chunkedGraphChunkSource?.rpc) { + this.chunkedGraphChunkSource.rpc.invoke( + GRAPHENE_INVALIDATE_OCDBT_RPC_ID, + { layerId: this.chunkedGraphChunkSource.rpcId }, + ); + } + } + + invalidateVolumeSources() { + // Invalidate OCDBT metadata caches first so that when volume chunks + // are re-queued and start downloading, they read fresh metadata. + this.invalidateOcdbtMetadata(); + for (const source of this.volumeChunkSources) { + source.invalidateCache(); + } + } + + // Read uint64 supervoxel IDs at the given layer-voxel positions directly + // from the base-scale OCDBT data. Positions sharing a base chunk are + // fetched in one request. Out-of-bounds positions yield `undefined`. + // Caller must invalidate OCDBT metadata first if it needs guaranteed- + // fresh reads (see `invalidateOcdbtMetadata`). + async readBaseSupervoxelsAt( + positions: Float32Array[], + signal?: AbortSignal, + ): Promise<(bigint | undefined)[]> { + const baseScale = this.info.scales[0]; + const blockSize = baseScale.compressedSegmentationBlockSize; + if (blockSize === undefined) { + throw new Error( + "readBaseSupervoxelsAt: base scale is not compressed_segmentation", + ); + } + const chunkDataSize = baseScale.chunkSizes[0]; + const { voxelOffset, size: scaleSize } = baseScale; + const scaleUrl = this.resolveScaleUrl(baseScale.key); + const kvStoreContext = this.sharedKvStoreContext.kvStoreContext; + + type Member = { posIdx: number; within: [number, number, number] }; + const groups = new Map(); + const result = new Array(positions.length).fill( + undefined, + ); + for (let i = 0; i < positions.length; ++i) { + const pos = positions[i]; + const within: [number, number, number] = [0, 0, 0]; + const origin: [number, number, number] = [0, 0, 0]; + let oob = false; + for (let d = 0; d < 3; ++d) { + const v = Math.floor(pos[d] - voxelOffset[d]); + if (v < 0 || v >= scaleSize[d]) { + oob = true; + break; + } + const cIdx = Math.floor(v / chunkDataSize[d]); + origin[d] = voxelOffset[d] + cIdx * chunkDataSize[d]; + within[d] = v - cIdx * chunkDataSize[d]; + } + if (oob) continue; + const url = + `${scaleUrl}${origin[0]}-${origin[0] + chunkDataSize[0]}_` + + `${origin[1]}-${origin[1] + chunkDataSize[1]}_` + + `${origin[2]}-${origin[2] + chunkDataSize[2]}`; + let members = groups.get(url); + if (members === undefined) { + members = []; + groups.set(url, members); + } + members.push({ posIdx: i, within }); + } + + await Promise.all( + Array.from(groups.entries()).map(async ([url, members]) => { + const response = await kvStoreContext.read(url, { + throwIfMissing: true, + signal, + }); + if (response === undefined) return; + const data = new Uint32Array(await response.response.arrayBuffer()); + for (const { posIdx, within } of members) { + result[posIdx] = readSingleChannelValueUint64( + data, + 0, + chunkDataSize, + blockSize, + within, + ); + } + }), + ); + return result; + } + getChunkedGraphSource() { const { rank } = this; const scaleInfo = this.info.scales[0]; @@ -347,15 +565,17 @@ class GrapheneMultiscaleVolumeChunkSource extends PrecomputedMultiscaleVolumeChu lowerClipBound[i] = baseLowerBound[i]; upperClipBound[i] = baseUpperBound[i]; } + const chunkSource = this.chunkManager.getChunkSource( + GrapheneChunkedGraphChunkSource, + { + spec, + sharedKvStoreContext: this.sharedKvStoreContext, + parameters: { url: `${this.info.app!.segmentationUrl}/node` }, + }, + ); + this.chunkedGraphChunkSource = chunkSource; return { - chunkSource: this.chunkManager.getChunkSource( - GrapheneChunkedGraphChunkSource, - { - spec, - sharedKvStoreContext: this.sharedKvStoreContext, - parameters: { url: `${this.info.app!.segmentationUrl}/node` }, - }, - ), + chunkSource, chunkToMultiscaleTransform, lowerClipBound, upperClipBound, @@ -608,6 +828,23 @@ async function getVolumeDataSource( stateJson: any, ): Promise { const info = parseGrapheneMultiscaleVolumeInfo(metadata, url); + if (info.ocdbtDataUrl) { + try { + const listResult = await sharedKvStoreContext.kvStoreContext.list( + info.ocdbtDataUrl, + { responseKeys: "suffix", ...options }, + ); + const knownScaleKeys = new Set(info.scales.map((s) => s.key)); + for (const dir of listResult.directories) { + if (knownScaleKeys.has(dir)) { + info.ocdbtScales.add(dir); + } + } + } catch (e) { + console.error(`Failed to list OCDBT scales at ${info.ocdbtDataUrl}`, e); + throw e; + } + } const volume = new GrapheneMultiscaleVolumeChunkSource( sharedKvStoreContext, info, @@ -1640,6 +1877,40 @@ class GraphConnection extends SegmentationGraphSourceConnection { ); return false; } else { + // For OCDBT-backed layers, refresh 2D-picked supervoxel IDs from the + // base scale just before submitting. The user may have picked from a + // coarser stale rendering (eventually-consistent downsamples); the + // server cares about the supervoxel at the position now. 3D-picked + // selections (rooted IDs, isBaseSegmentId === false) are resolved + // server-side from position and pass through unchanged. + if (this.graph.info.graph.ocdbtKvstoreSpec) { + const { nBitsForLayerId } = this.graph.info.graph; + const all = [...sinks, ...sources]; + const indices: number[] = []; + for (let i = 0; i < all.length; ++i) { + if (isBaseSegmentId(all[i].segmentId, nBitsForLayerId)) { + indices.push(i); + } + } + if (indices.length > 0) { + this.chunkSource.invalidateOcdbtMetadata(); + try { + const fresh = await this.chunkSource.readBaseSupervoxelsAt( + indices.map((i) => all[i].position), + ); + for (let j = 0; j < indices.length; ++j) { + const v = fresh[j]; + if (v !== undefined) all[indices[j]].segmentId = v; + } + } catch (e) { + console.error( + "Failed to refresh base supervoxel IDs for multicut; " + + "submitting with originally-picked IDs", + e, + ); + } + } + } const splitRoots = await this.graph.graphServer.splitSegments( [...sinks].map((x) => selectionInNanometers(x, annotationToNanometers)), [...sources].map((x) => @@ -1665,6 +1936,9 @@ class GraphConnection extends SegmentationGraphSourceConnection { const newValues = new Uint64Set(); newValues.add(splitRoots); this.state.replaceSegments(oldValues, newValues); + if (this.graph.info.graph.ocdbtKvstoreSpec) { + this.chunkSource.invalidateVolumeSources(); + } return true; } } @@ -2664,7 +2938,13 @@ class MulticutSegmentsTool extends LayerTool { return; } const isRoot = rootId === segmentId; - if (!isRoot) { + // Supervoxel splits require selecting the same supervoxel on both + // sides of the cut (the split happens within one supervoxel), so + // skip the duplicate-selection guard when an OCDBT kvstore is active. + if ( + !isRoot && + graphConnection.graph.info.graph.ocdbtKvstoreSpec === undefined + ) { for (const segment of segments) { if (segment === segmentId) { StatusMessage.showTemporaryMessage( diff --git a/src/kvstore/enabled_backend_modules.ts b/src/kvstore/enabled_backend_modules.ts index 335031dc7d..48dc14ec8e 100644 --- a/src/kvstore/enabled_backend_modules.ts +++ b/src/kvstore/enabled_backend_modules.ts @@ -4,6 +4,7 @@ import "#kvstore/gcs/register"; import "#kvstore/gzip/register"; import "#kvstore/http/register_backend"; import "#kvstore/icechunk/register_backend"; +import "#kvstore/kvstack/register"; import "#kvstore/middleauth/register_backend"; import "#kvstore/ngauth/register"; import "#kvstore/ocdbt/register_backend"; diff --git a/src/kvstore/enabled_frontend_modules.ts b/src/kvstore/enabled_frontend_modules.ts index 476e2f6d1d..9611bb294e 100644 --- a/src/kvstore/enabled_frontend_modules.ts +++ b/src/kvstore/enabled_frontend_modules.ts @@ -4,6 +4,7 @@ import "#kvstore/gcs/register"; import "#kvstore/gzip/register"; import "#kvstore/http/register_frontend"; import "#kvstore/icechunk/register_frontend"; +import "#kvstore/kvstack/register"; import "#kvstore/middleauth/register_frontend"; import "#kvstore/middleauth/register_credentials_provider"; import "#kvstore/ngauth/register"; diff --git a/src/kvstore/kvstack/common.ts b/src/kvstore/kvstack/common.ts new file mode 100644 index 0000000000..003e0f1b64 --- /dev/null +++ b/src/kvstore/kvstack/common.ts @@ -0,0 +1,219 @@ +/** + * @license + * Copyright 2026 Google Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type { + BaseKvStoreProvider, + KvStoreContext, +} from "#src/kvstore/context.js"; +import type { + DriverReadOptions, + KvStore, + KvStoreWithPath, + ReadResponse, + StatOptions, + StatResponse, +} from "#src/kvstore/index.js"; +import type { KvStackLayer, KvStackSpec } from "#src/kvstore/kvstack/url.js"; +import { formatKvStackUrl, parseKvStackUrl } from "#src/kvstore/kvstack/url.js"; +import type { + KvStoreProviderRegistry, + SharedKvStoreContextBase, +} from "#src/kvstore/register.js"; +import { HttpError, pickDelay } from "#src/util/http_request.js"; + +interface ResolvedLayer { + matcher: KvStackLayer; + resolved: KvStoreWithPath; +} + +// fetchOk already retries 429/503/504; only retry the transient error +// classes it surfaces unwrapped (network errors → status 0, plus 502). +const RETRY_STATUSES = new Set([0, 502]); +const RETRY_MAX_ATTEMPTS = 4; + +function isRetryable(e: unknown): boolean { + return e instanceof HttpError && RETRY_STATUSES.has(e.status); +} + +function describeMatcher(matcher: KvStackLayer): string { + if (matcher.exact !== undefined) + return `exact ${JSON.stringify(matcher.exact)}`; + if (matcher.prefix !== undefined) + return `prefix ${JSON.stringify(matcher.prefix)}`; + return "base"; +} + +async function delayWithAbort(ms: number, signal: AbortSignal | undefined) { + return new Promise((resolve, reject) => { + const timer = setTimeout(resolve, ms); + signal?.addEventListener( + "abort", + () => { + clearTimeout(timer); + reject(signal.reason); + }, + { once: true }, + ); + }); +} + +// Key range-routed kvstore stack. Composes multiple backing kvstores into one +// logical store, matching the semantics of tensorstore's kvstack driver. +// +// Each layer in the spec has a matcher and a backing kvstore URL: +// * `{base: URL}` - catch-all; matches any key +// * `{exact: KEY, base: URL}` - matches only when the input key == KEY +// * `{prefix: KEY, base: URL}` - matches when the input key starts with KEY +// +// Resolution: +// 1. Each layer's backing URL is resolved lazily (on first read) via +// `kvStoreContext.getKvStore(...)`. Layers may nest any registered +// driver (http/gcs/s3/ocdbt/...); resolution is a plain recursive call +// into the same context that dispatched to kvstack. +// 2. For a given input key, layers are scanned in REVERSE order so later +// entries override earlier ones (last-match-wins, per tensorstore). +// 3. When a layer matches, the matched portion of the key is stripped +// before delegating to the layer's backing store: +// - `base`: delegate read(inputKey) - pass key through +// - `exact`: delegate read("") - base URL is the target +// - `prefix`: delegate read(inputKey[plen:]) - strip the prefix +// This makes the layer's backing URL concatenate naturally with the +// remainder to yield the correct full URL. +// 4. No fallthrough: if no layer matches, `undefined` is returned (same as +// any kvstore returning "not found" for an unknown key). +// +// The driver is registered on the isomorphic registry; the same code runs on +// frontend and backend since kvstack only composes other kvstores and does no +// I/O itself. +export class KvStackKvStore implements KvStore { + private resolvedLayers: ResolvedLayer[] | undefined; + + constructor( + public kvStoreContext: KvStoreContext, + public spec: KvStackSpec, + ) {} + + private layers(): ResolvedLayer[] { + if (this.resolvedLayers === undefined) { + this.resolvedLayers = this.spec.layers.map((matcher) => ({ + matcher, + resolved: this.kvStoreContext.getKvStore(matcher.base), + })); + } + return this.resolvedLayers; + } + + private findLayer( + key: string, + ): { layer: ResolvedLayer; subKey: string } | undefined { + const layers = this.layers(); + for (let i = layers.length - 1; i >= 0; --i) { + const layer = layers[i]; + const { matcher } = layer; + if (matcher.exact !== undefined) { + if (key === matcher.exact) return { layer, subKey: "" }; + } else if (matcher.prefix !== undefined) { + if (key.startsWith(matcher.prefix)) { + return { layer, subKey: key.substring(matcher.prefix.length) }; + } + } else { + return { layer, subKey: key }; + } + } + return undefined; + } + + stat(key: string, options: StatOptions): Promise { + const match = this.findLayer(key); + if (match === undefined) return Promise.resolve(undefined); + const { layer, subKey } = match; + const fullPath = layer.resolved.path + subKey; + return this.runWithRetry(layer, key, options.signal, () => + layer.resolved.store.stat(fullPath, options), + ); + } + + read( + key: string, + options: DriverReadOptions, + ): Promise { + const match = this.findLayer(key); + if (match === undefined) return Promise.resolve(undefined); + const { layer, subKey } = match; + const fullPath = layer.resolved.path + subKey; + return this.runWithRetry(layer, key, options.signal, () => + layer.resolved.store.read(fullPath, options), + ); + } + + private async runWithRetry( + layer: ResolvedLayer, + key: string, + signal: AbortSignal | undefined, + op: () => Promise, + ): Promise { + let lastError: unknown; + for (let attempt = 0; attempt < RETRY_MAX_ATTEMPTS; ++attempt) { + signal?.throwIfAborted(); + try { + return await op(); + } catch (e) { + lastError = e; + if (!isRetryable(e) || attempt + 1 === RETRY_MAX_ATTEMPTS) break; + await delayWithAbort(pickDelay(attempt), signal); + } + } + console.error( + `kvstack read failed for key ${JSON.stringify(key)} ` + + `(layer ${describeMatcher(layer.matcher)}, backing ${layer.matcher.base})`, + lastError, + ); + throw lastError; + } + + getUrl(key: string): string { + return formatKvStackUrl(this.spec, key); + } + + get supportsOffsetReads(): boolean { + return true; + } + get supportsSuffixReads(): boolean { + return true; + } +} + +function kvstackProvider( + sharedKvStoreContext: SharedKvStoreContextBase, +): BaseKvStoreProvider { + return { + scheme: "kvstack", + description: "Key range-routed kvstore stack", + getKvStore(parsedUrl) { + const { spec, path } = parseKvStackUrl(parsedUrl); + return { + store: new KvStackKvStore(sharedKvStoreContext.kvStoreContext, spec), + path, + }; + }, + }; +} + +export function registerProviders< + SharedKvStoreContext extends SharedKvStoreContextBase, +>(registry: KvStoreProviderRegistry) { + registry.registerBaseKvStoreProvider((context) => kvstackProvider(context)); +} diff --git a/src/kvstore/kvstack/register.ts b/src/kvstore/kvstack/register.ts new file mode 100644 index 0000000000..505557e907 --- /dev/null +++ b/src/kvstore/kvstack/register.ts @@ -0,0 +1,20 @@ +/** + * @license + * Copyright 2026 Google Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { registerProviders } from "#src/kvstore/kvstack/common.js"; +import { frontendBackendIsomorphicKvStoreProviderRegistry } from "#src/kvstore/register.js"; + +registerProviders(frontendBackendIsomorphicKvStoreProviderRegistry); diff --git a/src/kvstore/kvstack/url.ts b/src/kvstore/kvstack/url.ts new file mode 100644 index 0000000000..080ea8c5fb --- /dev/null +++ b/src/kvstore/kvstack/url.ts @@ -0,0 +1,91 @@ +/** + * @license + * Copyright 2026 Google Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type { UrlWithParsedScheme } from "#src/kvstore/url.js"; +import { ensureNoQueryOrFragmentParameters } from "#src/kvstore/url.js"; + +export interface KvStackLayer { + base: string; + exact?: string; + prefix?: string; +} + +export interface KvStackSpec { + layers: KvStackLayer[]; +} + +// URL form: `kvstack:[/]`. +// +// The JSON is percent-encoded (encodeURIComponent) so it never contains a bare +// `/`; the first `/` in the suffix therefore always delimits the optional +// within-kvstack path. +export function parseKvStackUrl(parsedUrl: UrlWithParsedScheme): { + spec: KvStackSpec; + path: string; +} { + ensureNoQueryOrFragmentParameters(parsedUrl); + const suffix = parsedUrl.suffix ?? ""; + const slashIdx = suffix.indexOf("/"); + const jsonPart = slashIdx === -1 ? suffix : suffix.substring(0, slashIdx); + const pathPart = slashIdx === -1 ? "" : suffix.substring(slashIdx + 1); + let spec: unknown; + try { + spec = JSON.parse(decodeURIComponent(jsonPart)); + } catch (e) { + throw new Error(`Invalid kvstack URL: ${parsedUrl.url}`, { cause: e }); + } + validateKvStackSpec(spec); + return { spec, path: decodeURIComponent(pathPart) }; +} + +export function formatKvStackUrl(spec: KvStackSpec, key: string = ""): string { + const json = encodeURIComponent(JSON.stringify(spec)); + return key === "" + ? `kvstack:${json}` + : `kvstack:${json}/${encodeURIComponent(key)}`; +} + +function validateKvStackSpec(spec: unknown): asserts spec is KvStackSpec { + if ( + typeof spec !== "object" || + spec === null || + !Array.isArray((spec as { layers?: unknown }).layers) + ) { + throw new Error("kvstack spec must have a 'layers' array"); + } + const { layers } = spec as KvStackSpec; + if (layers.length === 0) { + throw new Error("kvstack spec must have at least one layer"); + } + for (const layer of layers) { + if (typeof layer !== "object" || layer === null) { + throw new Error("kvstack layer must be an object"); + } + if (typeof layer.base !== "string" || layer.base === "") { + throw new Error("kvstack layer must have a non-empty 'base' string"); + } + const hasExact = typeof layer.exact === "string"; + const hasPrefix = typeof layer.prefix === "string"; + if (hasExact && hasPrefix) { + throw new Error("kvstack layer cannot have both 'exact' and 'prefix'"); + } + if (hasPrefix && layer.prefix === "") { + throw new Error( + "kvstack layer 'prefix' must be non-empty; use a 'base' layer for catch-all routing", + ); + } + } +} diff --git a/src/kvstore/ocdbt/backend.ts b/src/kvstore/ocdbt/backend.ts index 24486f57f8..371a244707 100644 --- a/src/kvstore/ocdbt/backend.ts +++ b/src/kvstore/ocdbt/backend.ts @@ -34,7 +34,6 @@ import { import { getRoot } from "#src/kvstore/ocdbt/read_version.js"; import { getOcdbtUrl } from "#src/kvstore/ocdbt/url.js"; import { type VersionSpecifier } from "#src/kvstore/ocdbt/version_specifier.js"; -import type { BtreeGenerationReference } from "#src/kvstore/ocdbt/version_tree.js"; import type { ProgressOptions } from "#src/util/progress_listener.js"; export class OcdbtKvStore implements KvStore { @@ -44,19 +43,13 @@ export class OcdbtKvStore implements KvStore { public version: VersionSpecifier | undefined, ) {} - private root: BtreeGenerationReference | undefined; - - private async getRoot(options: Partial) { - let { root } = this; - if (root === undefined) { - root = this.root = await getRoot( - this.sharedKvStoreContext, - this.baseUrl, - this.version, - options, - ); - } - return root; + private resolveRoot(options: Partial) { + return getRoot( + this.sharedKvStoreContext, + this.baseUrl, + this.version, + options, + ); } getUrl(key: string) { @@ -67,7 +60,7 @@ export class OcdbtKvStore implements KvStore { key: string, options: StatOptions, ): Promise { - const root = await this.getRoot(options); + const root = await this.resolveRoot(options); const encodedKey = new TextEncoder().encode(key) as Key; const entry = await findEntryInRoot( this.sharedKvStoreContext, @@ -85,7 +78,7 @@ export class OcdbtKvStore implements KvStore { key: string, options: DriverReadOptions, ): Promise { - const root = await this.getRoot(options); + const root = await this.resolveRoot(options); const encodedKey = new TextEncoder().encode(key) as Key; const entry = await findEntryInRoot( this.sharedKvStoreContext, @@ -105,7 +98,7 @@ export class OcdbtKvStore implements KvStore { prefix: string, options: DriverListOptions, ): Promise { - const root = await this.getRoot(options); + const root = await this.resolveRoot(options); const encodedPrefix = new TextEncoder().encode(prefix) as Key; return await listRoot( this.sharedKvStoreContext, diff --git a/src/kvstore/ocdbt/metadata_cache.ts b/src/kvstore/ocdbt/metadata_cache.ts index 361c048630..277635648f 100644 --- a/src/kvstore/ocdbt/metadata_cache.ts +++ b/src/kvstore/ocdbt/metadata_cache.ts @@ -27,7 +27,12 @@ import type { ManifestWithVersionTree, } from "#src/kvstore/ocdbt/manifest.js"; import { decodeManifest } from "#src/kvstore/ocdbt/manifest.js"; -import type { VersionTreeNode } from "#src/kvstore/ocdbt/version_tree.js"; +import type { VersionSpecifier } from "#src/kvstore/ocdbt/version_specifier.js"; +import { formatVersion } from "#src/kvstore/ocdbt/version_specifier.js"; +import type { + BtreeGenerationReference, + VersionTreeNode, +} from "#src/kvstore/ocdbt/version_tree.js"; import { decodeVersionTreeNode } from "#src/kvstore/ocdbt/version_tree.js"; import { pipelineUrlJoin } from "#src/kvstore/url.js"; import type { ProgressOptions } from "#src/util/progress_listener.js"; @@ -84,6 +89,67 @@ export function getManifest( return cache.get(dataFile, options); } +// Clears every OCDBT metadata cache so the next read resolves a fresh root +// from the updated manifest. Stub factories below intentionally throw: the +// real factories (in `getManifest` / `getBtreeNode` / `getRoot`) are +// already registered by the time invalidation runs, so `memoize.get` returns +// the existing cache instance without ever calling these stubs. +// +// Scope is the whole shared context: if multiple OCDBT databases are open +// they are all flushed. Metadata is small and fast to re-fetch so this is +// acceptable. +export function invalidateOcdbtCaches( + sharedKvStoreContext: SharedKvStoreContextCounterpart, +) { + const manifestCache = sharedKvStoreContext.chunkManager.memoize.get( + "ocdbt:manifest", + () => { + const cache = new SimpleAsyncCache( + sharedKvStoreContext.chunkManager.addRef(), + { + get: async () => { + throw new Error("unreachable"); + }, + }, + ); + cache.registerDisposer(sharedKvStoreContext.addRef()); + return cache; + }, + ); + manifestCache.invalidateAll(); + const btreeCache = sharedKvStoreContext.chunkManager.memoize.get( + "ocdbt:btree", + () => + makeIndirectDataReferenceCache( + sharedKvStoreContext, + "b+tree node", + decodeBtreeNode, + ), + ); + btreeCache.invalidateAll(); + const versionCache = sharedKvStoreContext.chunkManager.memoize.get( + "ocdbt:version", + () => { + const cache = new SimpleAsyncCache< + { url: string; version: VersionSpecifier | undefined }, + BtreeGenerationReference + >(sharedKvStoreContext.chunkManager.addRef(), { + get: async () => { + throw new Error("unreachable"); + }, + encodeKey: ({ url, version }) => + JSON.stringify([ + url, + version !== undefined ? formatVersion(version) : undefined, + ]), + }); + cache.registerDisposer(sharedKvStoreContext.addRef()); + return cache; + }, + ); + versionCache.invalidateAll(); +} + export async function getResolvedManifest( sharedKvStoreContext: SharedKvStoreContextCounterpart, url: string,