From 2ef86442aa94ad4fdd9080ce47f91ad8e950b55d Mon Sep 17 00:00:00 2001 From: Raphael Panic Date: Wed, 25 Mar 2026 18:26:52 +0100 Subject: [PATCH 1/2] Implemented atomicity separation --- .../persistance/src/PrismaRedisDatabase.ts | 18 +++++------------ .../services/prisma/PrismaLinkedLeafStore.ts | 8 ++------ packages/sequencer/src/index.ts | 1 + .../production/BatchProducerModule.ts | 7 ++++++- .../sequencing/BlockProducerModule.ts | 18 ++++++++++------- .../src/state/async/AsyncLinkedLeafStore.ts | 4 +--- .../src/state/lmt/CachedLinkedLeafStore.ts | 20 ++++++++++++------- .../inmemory/InMemoryAsyncLinkedLeafStore.ts | 6 +----- .../src/storage/inmemory/InMemoryDatabase.ts | 3 +++ .../StateTransitionTracingService.test.ts | 2 +- 10 files changed, 44 insertions(+), 43 deletions(-) diff --git a/packages/persistance/src/PrismaRedisDatabase.ts b/packages/persistance/src/PrismaRedisDatabase.ts index aab091ef2..fe39fa433 100644 --- a/packages/persistance/src/PrismaRedisDatabase.ts +++ b/packages/persistance/src/PrismaRedisDatabase.ts @@ -8,7 +8,6 @@ import { } from "@proto-kit/sequencer"; import { ChildContainerProvider, dependencyFactory } from "@proto-kit/common"; import { PrismaClient } from "@prisma/client"; -import { RedisClientType } from "redis"; import { inject } from "tsyringe"; import { @@ -17,10 +16,8 @@ import { PrismaDatabaseConnection, } from "./PrismaDatabaseConnection"; import { - RedisConnection, RedisConnectionConfig, RedisConnectionModule, - RedisTransaction, } from "./RedisConnection"; export interface PrismaRedisCombinedConfig { @@ -33,7 +30,7 @@ export interface PrismaRedisCombinedConfig { @dependencyFactory() export class PrismaRedisDatabase extends SequencerModule - implements PrismaConnection, RedisConnection, Database + implements PrismaConnection, Database { public prisma: PrismaDatabaseConnection; @@ -49,14 +46,6 @@ export class PrismaRedisDatabase return this.prisma.prismaClient; } - public get redisClient(): RedisClientType { - return this.redis.redisClient; - } - - public get currentMulti(): RedisTransaction { - return this.redis.currentMulti; - } - public create(childContainerProvider: ChildContainerProvider) { super.create(childContainerProvider); this.prisma.create(childContainerProvider); @@ -67,6 +56,9 @@ export class PrismaRedisDatabase return { ...PrismaDatabaseConnection.dependencies(), ...RedisConnectionModule.dependencies(), + TreeDatabase: { + useGenerated: (dbModule) => dbModule.redis, + }, }; } @@ -92,7 +84,7 @@ export class PrismaRedisDatabase // TODO Long-term we want to somehow make sure we can rollback one data source // if commiting the other one's transaction fails await this.prisma.executeInTransaction(async () => { - await this.redis.executeInTransaction(f); + await f(); }); } } diff --git a/packages/persistance/src/services/prisma/PrismaLinkedLeafStore.ts b/packages/persistance/src/services/prisma/PrismaLinkedLeafStore.ts index bd2c7cba3..a0e37af6a 100644 --- a/packages/persistance/src/services/prisma/PrismaLinkedLeafStore.ts +++ b/packages/persistance/src/services/prisma/PrismaLinkedLeafStore.ts @@ -1,4 +1,4 @@ -import { noop, StoredLeaf } from "@proto-kit/common"; +import { StoredLeaf } from "@proto-kit/common"; import { AsyncLinkedLeafStore, trace, Tracer } from "@proto-kit/sequencer"; import { injectable } from "tsyringe"; import { Prisma } from "@prisma/client"; @@ -23,12 +23,8 @@ export class PrismaLinkedLeafStore implements AsyncLinkedLeafStore { } } - public async openTransaction(): Promise { - noop(); - } - @trace("LinkedLeafStore.commit") - public async commit(): Promise { + public async flush(): Promise { if (this.cache.length > 0) { const data = this.cache.map((entry) => ({ path: entry.leaf.path.toString(), diff --git a/packages/sequencer/src/index.ts b/packages/sequencer/src/index.ts index 35ccd9f20..2b79ee088 100644 --- a/packages/sequencer/src/index.ts +++ b/packages/sequencer/src/index.ts @@ -85,6 +85,7 @@ export * from "./storage/repositories/MessageStorage"; export * from "./storage/repositories/TransactionStorage"; export * from "./storage/inmemory/InMemoryDatabase"; export * from "./storage/inmemory/InMemoryAsyncMerkleTreeStore"; +export * from "./storage/inmemory/InMemoryAsyncLinkedLeafStore"; export * from "./storage/inmemory/InMemoryBlockStorage"; export * from "./storage/inmemory/InMemoryBatchStorage"; export * from "./storage/inmemory/InMemorySettlementStorage"; diff --git a/packages/sequencer/src/protocol/production/BatchProducerModule.ts b/packages/sequencer/src/protocol/production/BatchProducerModule.ts index 1dad75eaa..a1e4eec98 100644 --- a/packages/sequencer/src/protocol/production/BatchProducerModule.ts +++ b/packages/sequencer/src/protocol/production/BatchProducerModule.ts @@ -54,6 +54,8 @@ export class BatchProducerModule extends SequencerModule { @inject("BatchStorage") private readonly batchStorage: BatchStorage, @inject("Database") private readonly database: Database, + @inject("TreeDatabase") + private readonly treeDatabase: Database, private readonly batchFlow: BatchFlow, private readonly blockProofSerializer: BlockProofSerializer, private readonly batchTraceService: BatchTracingService @@ -88,8 +90,11 @@ export class BatchProducerModule extends SequencerModule { // Apply state changes to current MerkleTreeStore await this.database.executeInTransaction(async () => { await this.batchStorage.pushBatch(batchWithStateDiff.batch); - await batchWithStateDiff.changes.mergeIntoParent(); }); + await batchWithStateDiff.changes.mergeIntoParent( + this.database, + this.treeDatabase + ); // TODO Add transition from unproven to proven state for stateservice // This needs proper DB-level masking diff --git a/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts b/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts index e6dd9f294..ed3922daf 100644 --- a/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts +++ b/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts @@ -58,6 +58,7 @@ export class BlockProducerModule extends SequencerModule { private readonly methodIdResolver: MethodIdResolver, @inject("Runtime") private readonly runtime: Runtime, @inject("Database") private readonly database: Database, + @inject("TreeDatabase") private readonly treeDatabase: Database, @inject("Tracer") public readonly tracer: Tracer ) { super(); @@ -127,14 +128,16 @@ export class BlockProducerModule extends SequencerModule { await this.tracer.trace( "block.result.commit", - async () => + async () => { await this.database.executeInTransaction(async () => { - await blockHashTreeStore.mergeIntoParent(); - await treeStore.mergeIntoParent(); - await stateService.mergeIntoParent(); - await this.blockQueue.pushResult(result); - }), + await stateService.mergeIntoParent(); + }); + await this.treeDatabase.executeInTransaction(async () => { + await blockHashTreeStore.mergeIntoParent(); + }); + await treeStore.mergeIntoParent(this.database, this.treeDatabase); + }, traceMetadata ); @@ -215,7 +218,6 @@ export class BlockProducerModule extends SequencerModule { async () => { // Push changes to the database atomically await this.database.executeInTransaction(async () => { - await stateChanges.mergeIntoParent(); await this.blockQueue.pushBlock(block); // Remove included or dropped txs, leave skipped ones alone @@ -234,6 +236,8 @@ export class BlockProducerModule extends SequencerModule { await this.transactionStorage.reportSkippedTransactions( orderingMetadata.skippedPaths ); + + await stateChanges.mergeIntoParent(); }); }, { diff --git a/packages/sequencer/src/state/async/AsyncLinkedLeafStore.ts b/packages/sequencer/src/state/async/AsyncLinkedLeafStore.ts index b3ac3c6f4..3725a9986 100644 --- a/packages/sequencer/src/state/async/AsyncLinkedLeafStore.ts +++ b/packages/sequencer/src/state/async/AsyncLinkedLeafStore.ts @@ -1,9 +1,7 @@ import { StoredLeaf } from "@proto-kit/common"; export interface AsyncLinkedLeafStore { - openTransaction: () => Promise; - - commit: () => Promise; + flush: () => Promise; writeLeaves: (leaves: StoredLeaf[]) => void; diff --git a/packages/sequencer/src/state/lmt/CachedLinkedLeafStore.ts b/packages/sequencer/src/state/lmt/CachedLinkedLeafStore.ts index 0ee781cc9..f330b98c1 100644 --- a/packages/sequencer/src/state/lmt/CachedLinkedLeafStore.ts +++ b/packages/sequencer/src/state/lmt/CachedLinkedLeafStore.ts @@ -14,6 +14,7 @@ import groupBy from "lodash/groupBy"; import { AsyncLinkedLeafStore } from "../async/AsyncLinkedLeafStore"; import { CachedMerkleTreeStore } from "../merkle/CachedMerkleTreeStore"; import { AsyncMerkleTreeStore } from "../async/AsyncMerkleTreeStore"; +import { Database } from "../../storage/Database"; export class CachedLinkedLeafStore implements LinkedLeafStore { private writeCache: { @@ -26,7 +27,7 @@ export class CachedLinkedLeafStore implements LinkedLeafStore { private constructor( private readonly parent: AsyncLinkedLeafStore, - private readonly parentTreeStore: AsyncMerkleTreeStore + parentTreeStore: AsyncMerkleTreeStore ) { this.treeCache = new CachedMerkleTreeStore(parentTreeStore); } @@ -217,20 +218,25 @@ export class CachedLinkedLeafStore implements LinkedLeafStore { // This merges the cache into the parent tree and resets the cache, but not the // in-memory merkle tree. - public async mergeIntoParent(): Promise { + public async mergeIntoParent( + stateDb: Database, + treeDb: Database + ): Promise { const leaves = this.getWrittenLeaves(); // In case no state got set we can skip this step if (leaves.length === 0) { return; } - await this.parent.openTransaction(); - - this.parent.writeLeaves(Object.values(leaves)); + await stateDb.executeInTransaction(async () => { + this.parent.writeLeaves(Object.values(leaves)); - await this.parent.commit(); + await this.parent.flush(); + }); - await this.treeCache.mergeIntoParent(); + await treeDb.executeInTransaction(async () => { + await this.treeCache.mergeIntoParent(); + }); this.resetWrittenLeaves(); } diff --git a/packages/sequencer/src/storage/inmemory/InMemoryAsyncLinkedLeafStore.ts b/packages/sequencer/src/storage/inmemory/InMemoryAsyncLinkedLeafStore.ts index c74f4ffdb..964d38a05 100644 --- a/packages/sequencer/src/storage/inmemory/InMemoryAsyncLinkedLeafStore.ts +++ b/packages/sequencer/src/storage/inmemory/InMemoryAsyncLinkedLeafStore.ts @@ -5,11 +5,7 @@ import { AsyncLinkedLeafStore } from "../../state/async/AsyncLinkedLeafStore"; export class InMemoryAsyncLinkedLeafStore implements AsyncLinkedLeafStore { private readonly leafStore = new InMemoryLinkedLeafStore(); - public async openTransaction(): Promise { - noop(); - } - - public async commit(): Promise { + public async flush(): Promise { noop(); } diff --git a/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts b/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts index c29f32f8b..e1082b955 100644 --- a/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts +++ b/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts @@ -62,6 +62,9 @@ export class InMemoryDatabase extends SequencerModule implements Database { asyncTreeStore: { useClass: InMemoryAsyncMerkleTreeStore, }, + treeDatabase: { + useToken: "Database", + }, }; } diff --git a/packages/sequencer/test/production/tracing/StateTransitionTracingService.test.ts b/packages/sequencer/test/production/tracing/StateTransitionTracingService.test.ts index f93224b74..d87974d51 100644 --- a/packages/sequencer/test/production/tracing/StateTransitionTracingService.test.ts +++ b/packages/sequencer/test/production/tracing/StateTransitionTracingService.test.ts @@ -20,8 +20,8 @@ import { ConsoleTracer, CachedLinkedLeafStore, InMemoryAsyncMerkleTreeStore, + InMemoryAsyncLinkedLeafStore, } from "../../../src"; -import { InMemoryAsyncLinkedLeafStore } from "../../../src/storage/inmemory/InMemoryAsyncLinkedLeafStore"; function createST(obj: { path: string; From 01ede5f1aac951f06a7f5fd82e4f94aed1c8a25b Mon Sep 17 00:00:00 2001 From: Raphael Panic Date: Thu, 26 Mar 2026 14:50:56 +0100 Subject: [PATCH 2/2] Changed interface of mergeIntoParent a bit --- .../sequencing/BlockProducerModule.ts | 3 +- .../src/state/lmt/CachedLinkedLeafStore.ts | 32 ++++++++++++------- .../merkle/CachedLinkedMerkleStore.test.ts | 17 ++++++---- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts b/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts index ed3922daf..9617048a2 100644 --- a/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts +++ b/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts @@ -132,11 +132,12 @@ export class BlockProducerModule extends SequencerModule { await this.database.executeInTransaction(async () => { await this.blockQueue.pushResult(result); await stateService.mergeIntoParent(); + await treeStore.mergeLeavesIntoParent(); }); await this.treeDatabase.executeInTransaction(async () => { await blockHashTreeStore.mergeIntoParent(); + await treeStore.mergeTreeIntoParent(); }); - await treeStore.mergeIntoParent(this.database, this.treeDatabase); }, traceMetadata ); diff --git a/packages/sequencer/src/state/lmt/CachedLinkedLeafStore.ts b/packages/sequencer/src/state/lmt/CachedLinkedLeafStore.ts index f330b98c1..53c83203e 100644 --- a/packages/sequencer/src/state/lmt/CachedLinkedLeafStore.ts +++ b/packages/sequencer/src/state/lmt/CachedLinkedLeafStore.ts @@ -216,29 +216,37 @@ export class CachedLinkedLeafStore implements LinkedLeafStore { await this.preloadKeysInternal(paths); } - // This merges the cache into the parent tree and resets the cache, but not the - // in-memory merkle tree. - public async mergeIntoParent( - stateDb: Database, - treeDb: Database - ): Promise { + public async mergeLeavesIntoParent() { const leaves = this.getWrittenLeaves(); // In case no state got set we can skip this step if (leaves.length === 0) { return; } - await stateDb.executeInTransaction(async () => { - this.parent.writeLeaves(Object.values(leaves)); + this.parent.writeLeaves(Object.values(leaves)); + + await this.parent.flush(); + + this.resetWrittenLeaves(); + } - await this.parent.flush(); + public async mergeTreeIntoParent() { + await this.treeCache.mergeIntoParent(); + } + + // This merges the cache into the parent tree and resets the cache, but not the + // in-memory merkle tree. + public async mergeIntoParent( + stateDb: Database, + treeDb: Database + ): Promise { + await stateDb.executeInTransaction(async () => { + await this.mergeLeavesIntoParent(); }); await treeDb.executeInTransaction(async () => { - await this.treeCache.mergeIntoParent(); + await this.mergeTreeIntoParent(); }); - - this.resetWrittenLeaves(); } public getPreviousLeaf(path: bigint) { diff --git a/packages/sequencer/test/merkle/CachedLinkedMerkleStore.test.ts b/packages/sequencer/test/merkle/CachedLinkedMerkleStore.test.ts index 0df9e4d38..c95eaeab3 100644 --- a/packages/sequencer/test/merkle/CachedLinkedMerkleStore.test.ts +++ b/packages/sequencer/test/merkle/CachedLinkedMerkleStore.test.ts @@ -30,7 +30,8 @@ describe("cached linked merkle store", () => { const tmpTree = new LinkedMerkleTree(cachedStore.treeStore, cachedStore); tmpTree.setLeaf(5n, 10n); - await cachedStore.mergeIntoParent(); + await cachedStore.mergeLeavesIntoParent(); + await cachedStore.mergeTreeIntoParent(); cache1 = await CachedLinkedLeafStore.new(mainStore, mainTreeStore); tree1 = new LinkedMerkleTree(cache1.treeStore, cache1); @@ -84,7 +85,8 @@ describe("cached linked merkle store", () => { expectDefined(cache1.treeStore.getNode(1n, 0)); tree1.setLeaf(10n, 10n); - await cache1.mergeIntoParent(); + await cache1.mergeLeavesIntoParent(); + await cache1.mergeTreeIntoParent(); const leaf5 = tree1.getLeaf(5n); const leaf10 = tree1.getLeaf(10n); @@ -119,7 +121,8 @@ describe("cached linked merkle store", () => { tree1.setLeaf(11n, 11n); tree1.setLeaf(12n, 12n); tree1.setLeaf(13n, 13n); - await cache1.mergeIntoParent(); + await cache1.mergeLeavesIntoParent(); + await cache1.mergeTreeIntoParent(); const cache2 = new SyncCachedLinkedLeafStore(cache1); await cache2.preloadKeys([14n]); @@ -218,7 +221,7 @@ describe("cached linked merkle store", () => { leaf2.hash().toBigInt() ); expect(tree1.getRoot()).not.toEqual(tree2.getRoot()); - await cache2.mergeIntoParent(); + cache2.mergeIntoParent(); expect(tree1.getRoot()).toEqual(tree2.getRoot()); }); @@ -229,7 +232,8 @@ describe("cached linked merkle store", () => { await cache1.preloadKeys([10n, 20n]); treeCache1.setLeaf(10n, 10n); treeCache1.setLeaf(20n, 20n); - await cache1.mergeIntoParent(); + await cache1.mergeLeavesIntoParent(); + await cache1.mergeTreeIntoParent(); const cache2 = new SyncCachedLinkedLeafStore(cache1); const treeCache2 = new LinkedMerkleTree(cache2.treeStore, cache2); @@ -385,7 +389,8 @@ describe("cached linked merkle store", () => { ); // Now the mainstore has the new 15n root. - await cache1.mergeIntoParent(); + await cache1.mergeLeavesIntoParent(); + await cache1.mergeTreeIntoParent(); const cachedStore = await CachedLinkedLeafStore.new( mainStore,