Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions packages/persistance/src/PrismaRedisDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
log,
} from "@proto-kit/common";
import { PrismaClient } from "@prisma/client";
import { RedisClientType } from "redis";
import { inject } from "tsyringe";

import {
Expand All @@ -22,10 +21,8 @@ import {
PrismaDatabaseConnection,
} from "./PrismaDatabaseConnection";
import {
RedisConnection,
RedisConnectionConfig,
RedisConnectionModule,
RedisTransaction,
} from "./RedisConnection";

export interface PrismaRedisCombinedConfig {
Expand All @@ -39,7 +36,7 @@ export interface PrismaRedisCombinedConfig {
@dependencyFactory()
export class PrismaRedisDatabase
extends SequencerModule<PrismaRedisCombinedConfig>
implements PrismaConnection, RedisConnection, Database, Prunable
implements PrismaConnection, Database, Prunable
{
public prisma: PrismaDatabaseConnection;

Expand All @@ -55,14 +52,6 @@ export class PrismaRedisDatabase
return this.prisma.prismaClient;
}

public get redisClient(): RedisClientType {
return this.redis.redisClient;
}

public get currentMulti(): RedisTransaction {
return this.redis.currentMulti;
}

public create(childContainerProvider: ChildContainerProvider) {
super.create(childContainerProvider);
this.prisma.create(childContainerProvider);
Expand All @@ -73,6 +62,9 @@ export class PrismaRedisDatabase
return {
...PrismaDatabaseConnection.dependencies(),
...RedisConnectionModule.dependencies(),
TreeDatabase: {
useGenerated: (dbModule) => dbModule.redis,
},
};
}

Expand Down Expand Up @@ -103,7 +95,7 @@ export class PrismaRedisDatabase
// TODO Long-term we want to somehow make sure we can rollback one data source
// if commiting the other one's transaction fails
await this.prisma.executeInTransaction(async () => {
await this.redis.executeInTransaction(f);
await f();
});
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { noop, StoredLeaf } from "@proto-kit/common";
import { StoredLeaf } from "@proto-kit/common";
import { AsyncLinkedLeafStore, trace, Tracer } from "@proto-kit/sequencer";
import { injectable } from "tsyringe";
import { Prisma } from "@prisma/client";
Expand All @@ -23,12 +23,8 @@ export class PrismaLinkedLeafStore implements AsyncLinkedLeafStore {
}
}

public async openTransaction(): Promise<void> {
noop();
}

@trace("LinkedLeafStore.commit")
public async commit(): Promise<void> {
public async flush(): Promise<void> {
if (this.cache.length > 0) {
const data = this.cache.map((entry) => ({
path: entry.leaf.path.toString(),
Expand Down
1 change: 1 addition & 0 deletions packages/sequencer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export * from "./storage/repositories/MessageStorage";
export * from "./storage/repositories/TransactionStorage";
export * from "./storage/inmemory/InMemoryDatabase";
export * from "./storage/inmemory/InMemoryAsyncMerkleTreeStore";
export * from "./storage/inmemory/InMemoryAsyncLinkedLeafStore";
export * from "./storage/inmemory/InMemoryBlockStorage";
export * from "./storage/inmemory/InMemoryBatchStorage";
export * from "./storage/inmemory/InMemorySettlementStorage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ export class BatchProducerModule extends SequencerModule {
@inject("BatchStorage") private readonly batchStorage: BatchStorage,
@inject("Database")
private readonly database: Database,
@inject("TreeDatabase")
private readonly treeDatabase: Database,
private readonly batchFlow: BatchFlow,
private readonly blockProofSerializer: BlockProofSerializer,
private readonly batchTraceService: BatchTracingService
Expand Down Expand Up @@ -88,8 +90,11 @@ export class BatchProducerModule extends SequencerModule {
// Apply state changes to current MerkleTreeStore
await this.database.executeInTransaction(async () => {
await this.batchStorage.pushBatch(batchWithStateDiff.batch);
await batchWithStateDiff.changes.mergeIntoParent();
});
await batchWithStateDiff.changes.mergeIntoParent(
this.database,
this.treeDatabase
);

// TODO Add transition from unproven to proven state for stateservice
// This needs proper DB-level masking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {
private readonly methodIdResolver: MethodIdResolver,
@inject("Runtime") private readonly runtime: Runtime<RuntimeModulesRecord>,
@inject("Database") private readonly database: Database,
@inject("TreeDatabase") private readonly treeDatabase: Database,
@inject("Tracer") public readonly tracer: Tracer
) {
super();
Expand Down Expand Up @@ -122,14 +123,17 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {

await this.tracer.trace(
"block.result.commit",
async () =>
async () => {
await this.database.executeInTransaction(async () => {
await blockHashTreeStore.mergeIntoParent();
await treeStore.mergeIntoParent();
await stateService.mergeIntoParent();

await this.blockQueue.pushResult(result);
}),
await stateService.mergeIntoParent();
await treeStore.mergeLeavesIntoParent();
});
await this.treeDatabase.executeInTransaction(async () => {
await blockHashTreeStore.mergeIntoParent();
await treeStore.mergeTreeIntoParent();
});
},
traceMetadata
);

Expand Down Expand Up @@ -209,7 +213,6 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {
async () => {
// Push changes to the database atomically
await this.database.executeInTransaction(async () => {
await stateChanges.mergeIntoParent();
await this.blockQueue.pushBlock(block);

// Remove included or dropped txs, leave skipped ones alone
Expand All @@ -228,6 +231,8 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {
await this.transactionStorage.reportSkippedTransactions(
orderingMetadata.skippedPaths
);

await stateChanges.mergeIntoParent();
});
},
{
Expand Down
4 changes: 1 addition & 3 deletions packages/sequencer/src/state/async/AsyncLinkedLeafStore.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { StoredLeaf } from "@proto-kit/common";

export interface AsyncLinkedLeafStore {
openTransaction: () => Promise<void>;

commit: () => Promise<void>;
flush: () => Promise<void>;

writeLeaves: (leaves: StoredLeaf[]) => void;

Expand Down
30 changes: 22 additions & 8 deletions packages/sequencer/src/state/lmt/CachedLinkedLeafStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import groupBy from "lodash/groupBy";
import { AsyncLinkedLeafStore } from "../async/AsyncLinkedLeafStore";
import { CachedMerkleTreeStore } from "../merkle/CachedMerkleTreeStore";
import { AsyncMerkleTreeStore } from "../async/AsyncMerkleTreeStore";
import { Database } from "../../storage/Database";

export class CachedLinkedLeafStore implements LinkedLeafStore {
private writeCache: {
Expand All @@ -26,7 +27,7 @@ export class CachedLinkedLeafStore implements LinkedLeafStore {

private constructor(
private readonly parent: AsyncLinkedLeafStore,
private readonly parentTreeStore: AsyncMerkleTreeStore
parentTreeStore: AsyncMerkleTreeStore
) {
this.treeCache = new CachedMerkleTreeStore(parentTreeStore);
}
Expand Down Expand Up @@ -215,24 +216,37 @@ export class CachedLinkedLeafStore implements LinkedLeafStore {
await this.preloadKeysInternal(paths);
}

// This merges the cache into the parent tree and resets the cache, but not the
// in-memory merkle tree.
public async mergeIntoParent(): Promise<void> {
public async mergeLeavesIntoParent() {
const leaves = this.getWrittenLeaves();
// In case no state got set we can skip this step
if (leaves.length === 0) {
return;
}

await this.parent.openTransaction();

this.parent.writeLeaves(Object.values(leaves));

await this.parent.commit();
await this.parent.flush();

this.resetWrittenLeaves();
}

public async mergeTreeIntoParent() {
await this.treeCache.mergeIntoParent();
}

this.resetWrittenLeaves();
// This merges the cache into the parent tree and resets the cache, but not the
// in-memory merkle tree.
public async mergeIntoParent(
stateDb: Database,
treeDb: Database
): Promise<void> {
await stateDb.executeInTransaction(async () => {
await this.mergeLeavesIntoParent();
});

await treeDb.executeInTransaction(async () => {
await this.mergeTreeIntoParent();
});
}

public getPreviousLeaf(path: bigint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ import { AsyncLinkedLeafStore } from "../../state/async/AsyncLinkedLeafStore";
export class InMemoryAsyncLinkedLeafStore implements AsyncLinkedLeafStore {
private readonly leafStore = new InMemoryLinkedLeafStore();

public async openTransaction(): Promise<void> {
noop();
}

public async commit(): Promise<void> {
public async flush(): Promise<void> {
noop();
}

Expand Down
3 changes: 3 additions & 0 deletions packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ export class InMemoryDatabase extends SequencerModule implements Database {
asyncTreeStore: {
useClass: InMemoryAsyncMerkleTreeStore,
},
treeDatabase: {
useToken: "Database",
},
};
}

Expand Down
17 changes: 11 additions & 6 deletions packages/sequencer/test/merkle/CachedLinkedMerkleStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ describe("cached linked merkle store", () => {
const tmpTree = new LinkedMerkleTree(cachedStore.treeStore, cachedStore);
tmpTree.setLeaf(5n, 10n);

await cachedStore.mergeIntoParent();
await cachedStore.mergeLeavesIntoParent();
await cachedStore.mergeTreeIntoParent();

cache1 = await CachedLinkedLeafStore.new(mainStore, mainTreeStore);
tree1 = new LinkedMerkleTree(cache1.treeStore, cache1);
Expand Down Expand Up @@ -84,7 +85,8 @@ describe("cached linked merkle store", () => {
expectDefined(cache1.treeStore.getNode(1n, 0));

tree1.setLeaf(10n, 10n);
await cache1.mergeIntoParent();
await cache1.mergeLeavesIntoParent();
await cache1.mergeTreeIntoParent();

const leaf5 = tree1.getLeaf(5n);
const leaf10 = tree1.getLeaf(10n);
Expand Down Expand Up @@ -119,7 +121,8 @@ describe("cached linked merkle store", () => {
tree1.setLeaf(11n, 11n);
tree1.setLeaf(12n, 12n);
tree1.setLeaf(13n, 13n);
await cache1.mergeIntoParent();
await cache1.mergeLeavesIntoParent();
await cache1.mergeTreeIntoParent();

const cache2 = new SyncCachedLinkedLeafStore(cache1);
await cache2.preloadKeys([14n]);
Expand Down Expand Up @@ -218,7 +221,7 @@ describe("cached linked merkle store", () => {
leaf2.hash().toBigInt()
);
expect(tree1.getRoot()).not.toEqual(tree2.getRoot());
await cache2.mergeIntoParent();
cache2.mergeIntoParent();
expect(tree1.getRoot()).toEqual(tree2.getRoot());
});

Expand All @@ -229,7 +232,8 @@ describe("cached linked merkle store", () => {
await cache1.preloadKeys([10n, 20n]);
treeCache1.setLeaf(10n, 10n);
treeCache1.setLeaf(20n, 20n);
await cache1.mergeIntoParent();
await cache1.mergeLeavesIntoParent();
await cache1.mergeTreeIntoParent();

const cache2 = new SyncCachedLinkedLeafStore(cache1);
const treeCache2 = new LinkedMerkleTree(cache2.treeStore, cache2);
Expand Down Expand Up @@ -385,7 +389,8 @@ describe("cached linked merkle store", () => {
);

// Now the mainstore has the new 15n root.
await cache1.mergeIntoParent();
await cache1.mergeLeavesIntoParent();
await cache1.mergeTreeIntoParent();

const cachedStore = await CachedLinkedLeafStore.new(
mainStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import {
ConsoleTracer,
CachedLinkedLeafStore,
InMemoryAsyncMerkleTreeStore,
InMemoryAsyncLinkedLeafStore,
} from "../../../src";
import { InMemoryAsyncLinkedLeafStore } from "../../../src/storage/inmemory/InMemoryAsyncLinkedLeafStore";

function createST(obj: {
path: string;
Expand Down
Loading