Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
416 changes: 413 additions & 3 deletions packages/node-core/src/indexer/dynamic-ds.service.spec.ts

Large diffs are not rendered by default.

100 changes: 95 additions & 5 deletions packages/node-core/src/indexer/dynamic-ds.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-3.0

import {Inject, Injectable} from '@nestjs/common';
import {BaseCustomDataSource, BaseDataSource, BaseTemplateDataSource} from '@subql/types-core';
import {BaseCustomDataSource, BaseDataSource, BaseTemplateDataSource, DynamicDatasourceInfo} from '@subql/types-core';
import {Transaction} from '@subql/x-sequelize';
import {cloneDeep} from 'lodash';
import {IBlockchainService} from '../blockchain.service';
Expand All @@ -19,12 +19,16 @@ export interface DatasourceParams {
templateName: string;
args?: Record<string, unknown>;
startBlock: number;
endBlock?: number;
}

export interface IDynamicDsService<DS> {
dynamicDatasources: DS[];
createDynamicDatasource(params: DatasourceParams): Promise<DS>;
destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index: number): Promise<void>;
getDynamicDatasources(forceReload?: boolean): Promise<DS[]>;
getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[];
getDatasourceParamByIndex(index: number): DatasourceParams | undefined;
}

@Injectable()
Expand Down Expand Up @@ -91,6 +95,92 @@ export class DynamicDsService<DS extends BaseDataSource = BaseDataSource, P exte
}
}

/**
* Get all active (non-destroyed) dynamic datasources for a specific template.
*
* @param templateName - The name of the template to filter by
* @returns Array of datasource info objects with global indices. The `index` field
* represents the global position in the internal datasource array and should
* be used when calling `destroyDynamicDatasource()`.
*/
getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[] {
if (!this._datasourceParams) {
throw new Error('DynamicDsService has not been initialized');
}

const matchingDatasources = this._datasourceParams
.map((params, globalIndex) => ({params, globalIndex}))
.filter(({params}) => params.templateName === templateName && params.endBlock === undefined);

return matchingDatasources.map(({globalIndex, params}) => ({
index: globalIndex,
templateName: params.templateName,
startBlock: params.startBlock,
endBlock: params.endBlock,
args: params.args,
}));
}

/**
* Get datasource parameters by global index.
*
* @param index - Global index in the internal datasource parameters array
* @returns DatasourceParams if found, undefined otherwise
*/
getDatasourceParamByIndex(index: number): DatasourceParams | undefined {
return this._datasourceParams?.[index];
}

async destroyDynamicDatasource(
templateName: string,
currentBlockHeight: number,
index: number,
tx?: Transaction
): Promise<void> {
if (!this._datasources || !this._datasourceParams) {
throw new Error('DynamicDsService has not been initialized');
}

// Get the datasource at the global index
const dsParam = this._datasourceParams[index];

// Validate datasource exists
if (!dsParam) {
throw new Error(
`Index ${index} is out of bounds. There are ${this._datasourceParams.length} datasource(s) in total`
);
}

// Validate it matches the template name and is not already destroyed
if (dsParam.templateName !== templateName) {
throw new Error(
`Datasource at index ${index} has template name "${dsParam.templateName}", not "${templateName}"`
);
}

if (dsParam.endBlock !== undefined) {
throw new Error(`Dynamic datasource at index ${index} is already destroyed`);
}

// Update the datasource params
const updatedParams = {...dsParam, endBlock: currentBlockHeight};
this._datasourceParams[index] = updatedParams;

// Update the datasource object if it exists
// Note: _datasources and _datasourceParams arrays should always be in sync.
// If the index is valid for params, it must also be valid for datasources.
const datasource = this._datasources[index];
if (!datasource) {
throw new Error(`Datasources array out of sync with params at index ${index}`);
}
Comment thread
stwiname marked this conversation as resolved.
// Set endBlock on the datasource object
datasource.endBlock = currentBlockHeight;

await this.metadata.set(METADATA_KEY, this._datasourceParams, tx);
Comment on lines +165 to +179
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

In-memory state updated before persistence could cause inconsistency.

If metadata.set fails, the in-memory state (_datasourceParams[index] and _datasources[index].endBlock) is already modified, leaving the service in an inconsistent state. Compare with createDynamicDatasource which persists to metadata before updating in-memory state.

Consider reordering to persist first:

-   // Update the datasource params
-   const updatedParams = {...dsParam, endBlock: currentBlockHeight};
-   this._datasourceParams[index] = updatedParams;
-
-   // Update the datasource object if it exists
-   const datasource = this._datasources[index];
-   if (!datasource) {
-     throw new Error(`Datasources array out of sync with params at index ${index}`);
-   }
-   // Set endBlock on the datasource object
-   datasource.endBlock = currentBlockHeight;
-
-   await this.metadata.set(METADATA_KEY, this._datasourceParams, tx);
+   // Prepare updated params
+   const updatedParams = {...dsParam, endBlock: currentBlockHeight};
+
+   // Create a copy for persistence
+   const paramsForPersistence = [...this._datasourceParams];
+   paramsForPersistence[index] = updatedParams;
+
+   // Persist first, then update in-memory on success
+   await this.metadata.set(METADATA_KEY, paramsForPersistence, tx);
+
+   // Update in-memory state only after successful persistence
+   this._datasourceParams[index] = updatedParams;
+
+   const datasource = this._datasources[index];
+   if (!datasource) {
+     throw new Error(`Datasources array out of sync with params at index ${index}`);
+   }
+   datasource.endBlock = currentBlockHeight;

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
packages/node-core/src/indexer/dynamic-ds.service.ts around lines 165 to 179:
the code mutates in-memory state (_datasourceParams[index] and
_datasources[index].endBlock) before calling metadata.set, which can leave the
service inconsistent if persistence fails; instead, build the updated params
object/array but do NOT assign it into _datasourceParams or modify
datasource.endBlock until metadata.set(METADATA_KEY, updatedParamsArray, tx)
completes successfully; after metadata.set resolves, then assign
this._datasourceParams[index] = updatedParams and set
this._datasources[index].endBlock = currentBlockHeight (and keep throwing the
same error if datasource is missing before doing any in-memory mutation).


logger.info(`Destroyed dynamic datasource "${templateName}" at block ${currentBlockHeight}`);
}

// Not force only seems to be used for project changes
async getDynamicDatasources(forceReload?: boolean): Promise<DS[]> {
// Workers should not cache this result in order to keep in sync
Expand All @@ -117,19 +207,19 @@ export class DynamicDsService<DS extends BaseDataSource = BaseDataSource, P exte
*
* This will throw if the template cannot be found by name.
*
* Inserts the startBlock into the template.
* Inserts the startBlock and optionally endBlock into the template.
* */
protected getTemplate(templateName: string, startBlock?: number): DS {
protected getTemplate(templateName: string, startBlock?: number, endBlock?: number): DS {
const t = (this.project.templates ?? []).find((t) => t.name === templateName);
if (!t) {
throw new Error(`Unable to find matching template in project for name: "${templateName}"`);
}
const {name, ...template} = cloneDeep(t);
return {...template, startBlock} as DS;
return {...template, startBlock, endBlock} as DS;
}

private async getDatasource(params: DatasourceParams): Promise<DS> {
const dsObj = this.getTemplate(params.templateName, params.startBlock);
const dsObj = this.getTemplate(params.templateName, params.startBlock, params.endBlock);

try {
await this.blockchainService.updateDynamicDs(params, dsObj);
Expand Down
195 changes: 195 additions & 0 deletions packages/node-core/src/indexer/indexer.manager.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {BaseCustomDataSource, BaseDataSource} from '@subql/types-core';
import {IApi} from '../api.service';
import {IBlockchainService} from '../blockchain.service';
import {NodeConfig} from '../configure';
import {ProcessBlockResponse} from './blockDispatcher';
import {DsProcessorService} from './ds-processor.service';
import {DatasourceParams, DynamicDsService} from './dynamic-ds.service';
import {BaseIndexerManager, FilterTypeMap, HandlerInputTypeMap, ProcessorTypeMap} from './indexer.manager';
import {IndexerSandbox} from './sandbox';
import {CacheMetadataModel} from './storeModelProvider';
import {IBlock, ISubqueryProject} from './types';
import {IUnfinalizedBlocksService} from './unfinalizedBlocks.service';

type FM = FilterTypeMap<BaseDataSource>;
type PM = ProcessorTypeMap<BaseDataSource, FM>;
type HIM = HandlerInputTypeMap<BaseDataSource, FM>;

// Minimal sandbox mock that lets us grab the frozen destroy callback
class MockSandbox {
// eslint-disable-next-line @typescript-eslint/ban-types
frozenFns: Record<string, Function> = {};

freeze(value: any, name: string): void {
if (typeof value === 'function') {
this.frozenFns[name] = value;
}
}
Comment on lines +22 to +30
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
fd -a 'package.json' | xargs -I{} sh -c 'ver=$(jq -r '\''.devDependencies["@typescript-eslint/eslint-plugin"] // .dependencies["@typescript-eslint/eslint-plugin"] // empty'\'' "{}"); if [ -n "$ver" ]; then printf "%s: %s\n" "{}" "$ver"; fi'
rg -n --hidden '@typescript-eslint/(ban-types|no-unsafe-function-type)' .

Repository: subquery/subql

Length of output: 327


🏁 Script executed:

cat -n packages/node-core/src/indexer/indexer.manager.spec.ts | head -50

Repository: subquery/subql

Length of output: 2205


🏁 Script executed:

# Check for any ESLint config in the workspace
fd -a 'eslint.*\.js|eslint.*\.json' . --maxdepth 3

Repository: subquery/subql

Length of output: 93


🏁 Script executed:

# Check the actual rules available in typescript-eslint v8.45.0 and test against current code
python3 << 'EOF'
import json
import subprocess

# List of rules to check
rules = ['@typescript-eslint/ban-types', '@typescript-eslint/no-unsafe-function-type']

# Try to find ESLint and run it on the spec file
result = subprocess.run(
    ['npx', 'eslint', '--print-config', 'packages/node-core/src/indexer/indexer.manager.spec.ts'],
    capture_output=True,
    text=True,
    cwd='/home/jailuser/git'
)

if result.returncode == 0:
    try:
        config = json.loads(result.stdout)
        print("ESLint config loaded:")
        rules_obj = config.get('rules', {})
        for rule in rules:
            if rule in rules_obj:
                print(f"  {rule}: {rules_obj[rule]}")
            else:
                print(f"  {rule}: NOT FOUND in config")
    except json.JSONDecodeError:
        print("Could not parse ESLint config as JSON")
        print(result.stdout[:500])
else:
    print("ESLint error:", result.stderr[:500])
EOF

Repository: subquery/subql

Length of output: 181


🏁 Script executed:

# Get more context on the test coverage to verify the self-destruction claim
sed -n '40,150p' packages/node-core/src/indexer/indexer.manager.spec.ts

Repository: subquery/subql

Length of output: 4069


🏁 Script executed:

# Verify that the no-unsafe-function-type rule would actually trigger on line 24
cat > /tmp/test-function-type.ts << 'EOF'
// This should trigger no-unsafe-function-type
const x: Record<string, Function> = {};
EOF

npx eslint /tmp/test-function-type.ts --parser-options=ecmaVersion:2020 --no-eslintrc --rule '@typescript-eslint/no-unsafe-function-type: error' 2>&1 || true

Repository: subquery/subql

Length of output: 276


🏁 Script executed:

# Also verify if typing it as suggested would pass
cat > /tmp/test-fixed-type.ts << 'EOF'
type FrozenFn = (...args: unknown[]) => unknown | Promise<unknown>;
const x: Record<string, FrozenFn> = {};
EOF

npx eslint /tmp/test-fixed-type.ts --parser-options=ecmaVersion:2020 --no-eslintrc --rule '@typescript-eslint/no-unsafe-function-type: error' 2>&1 || true

Repository: subquery/subql

Length of output: 276


🏁 Script executed:

# Check for test coverage of self-destruction scenario
sed -n '120,250p' packages/node-core/src/indexer/indexer.manager.spec.ts

Repository: subquery/subql

Length of output: 3074


🏁 Script executed:

# Verify the actual linting error by running eslint on the file
cd /home/jailuser/git && npx eslint packages/node-core/src/indexer/indexer.manager.spec.ts 2>&1 | head -40

Repository: subquery/subql

Length of output: 3359


Replace Function with a properly-typed callback alias and remove the invalid ESLint suppression.

Lines 23–24 fail linting in CI: the @typescript-eslint/ban-types rule is not defined in the workspace config, and Record<string, Function> triggers @typescript-eslint/no-unsafe-function-type. Replace both with a concrete callback type:

Proposed fix
+type FrozenFn = (...args: unknown[]) => unknown | Promise<unknown>;
+
 class MockSandbox {
-  // eslint-disable-next-line `@typescript-eslint/ban-types`
-  frozenFns: Record<string, Function> = {};
+  frozenFns: Record<string, FrozenFn> = {};
 
   freeze(value: any, name: string): void {
     if (typeof value === 'function') {
-      this.frozenFns[name] = value;
+      this.frozenFns[name] = value as FrozenFn;
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class MockSandbox {
// eslint-disable-next-line @typescript-eslint/ban-types
frozenFns: Record<string, Function> = {};
freeze(value: any, name: string): void {
if (typeof value === 'function') {
this.frozenFns[name] = value;
}
}
type FrozenFn = (...args: unknown[]) => unknown | Promise<unknown>;
class MockSandbox {
frozenFns: Record<string, FrozenFn> = {};
freeze(value: any, name: string): void {
if (typeof value === 'function') {
this.frozenFns[name] = value as FrozenFn;
}
}
🧰 Tools
🪛 ESLint

[error] 23-23: Definition for rule '@typescript-eslint/ban-types' was not found.

(@typescript-eslint/ban-types)


[error] 24-24: The Function type accepts any function-like value.
Prefer explicitly defining any function parameters and return type.

(@typescript-eslint/no-unsafe-function-type)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/node-core/src/indexer/indexer.manager.spec.ts` around lines 22 - 30,
The MockSandbox class uses Record<string, Function> and an ESLint suppression;
replace that with a concrete callback alias: add a local type alias like
Callback = (...args: unknown[]) => unknown, change frozenFns to Record<string,
Callback>, remove the // eslint-disable-next-line comment, and update
freeze(value: any, name: string) to accept value: unknown and cast the function
to Callback when assigning (e.g., this.frozenFns[name] = value as Callback)
while keeping the typeof value === 'function' guard; update any references to
frozenFns/freeze accordingly.


// eslint-disable-next-line @typescript-eslint/no-empty-function
async securedExec(): Promise<void> {}
}

class TestIndexerManager extends BaseIndexerManager<
any,
any,
any,
IApi,
BaseDataSource,
BaseDataSource & BaseCustomDataSource,
FM,
PM,
HIM
> {
processedStartBlocks: number[] = [];
destroyConfig?: {triggerStartBlock: number; targetTemplate: string; targetIndex: number};

async indexBlock(block: IBlock<any>, datasources: BaseDataSource[]): Promise<ProcessBlockResponse> {
return this.internalIndexBlock(block, datasources, () => Promise.resolve({} as any));
}

// Simulates what chain-specific indexBlockData does: iterate dataSources, call getVM, run handlers
protected async indexBlockData(
_block: any,
dataSources: BaseDataSource[],
getVM: (d: BaseDataSource) => Promise<IndexerSandbox>
): Promise<void> {
for (let i = 0; i < dataSources.length; i++) {
const ds = dataSources[i];
this.processedStartBlocks.push(ds.startBlock!);

const vm = (await getVM(ds)) as unknown as MockSandbox;

// Trigger destroy if this ds matches the config
if (this.destroyConfig && ds.startBlock === this.destroyConfig.triggerStartBlock) {
const destroyFn = vm.frozenFns.destroyDynamicDatasource;
if (destroyFn) {
await destroyFn(this.destroyConfig.targetTemplate, this.destroyConfig.targetIndex);
}
}
}
}

// eslint-disable-next-line @typescript-eslint/require-await
protected async prepareFilteredData<T>(_kind: any, data: T): Promise<T> {
return data;
}
}

class TestDynamicDsService extends DynamicDsService<BaseDataSource, ISubqueryProject> {
constructor(project: ISubqueryProject) {
super(project, {
updateDynamicDs: () => Promise.resolve(undefined),
} as unknown as IBlockchainService);
}
}

const mockMetadata = (initData: DatasourceParams[] = []) => {
let datasourceParams: DatasourceParams[] = initData;

return {
set: (_key: string, value: any) => {
datasourceParams = value;
},
find: (_key: string) => Promise.resolve([...datasourceParams]),
setNewDynamicDatasource: (params: DatasourceParams) => datasourceParams.push(params),
} as unknown as CacheMetadataModel;
};

const mockBlock = (height: number): IBlock<any> => ({
getHeader: () => ({blockHeight: height, blockHash: `hash-${height}`, parentHash: undefined, timestamp: new Date()}),
block: {},
});

describe('BaseIndexerManager', () => {
let dynamicDsService: TestDynamicDsService;
let manager: TestIndexerManager;

const project = {
templates: [{name: 'Test'}, {name: 'Other'}],
} as any as ISubqueryProject;

beforeEach(() => {
dynamicDsService = new TestDynamicDsService(project);

manager = new TestIndexerManager(
{unsafeApi: {}} as unknown as IApi,
{unfinalizedBlocks: false, profiler: false} as unknown as NodeConfig,
{getDsProcessor: () => new MockSandbox()} as any,
{} as DsProcessorService<BaseDataSource, BaseDataSource & BaseCustomDataSource>,
dynamicDsService as any,
{processUnfinalizedBlocks: () => Promise.resolve(undefined)} as unknown as IUnfinalizedBlocksService<any>,
{} as FM,
{} as PM,
{isRuntimeDs: () => true, isCustomDs: () => false} as unknown as IBlockchainService
);
});

describe('destroy dynamic datasource mid-block', () => {
it('removes destroyed ds from the iteration within the same block', async () => {
const meta = mockMetadata([
{templateName: 'Test', startBlock: 1},
{templateName: 'Test', startBlock: 5},
{templateName: 'Test', startBlock: 10},
]);
await dynamicDsService.init(meta);
const datasources = await dynamicDsService.getDynamicDatasources();

// When processing startBlock=5, destroy ds at index 2 (startBlock=10)
manager.destroyConfig = {triggerStartBlock: 5, targetTemplate: 'Test', targetIndex: 2};

await manager.indexBlock(mockBlock(100), datasources);

// startBlock=10 should never be reached
expect(manager.processedStartBlocks).toEqual([1, 5]);
});
Comment on lines +131 to +148
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -nP --type=ts -C4 'indexBlockData\s*\([^)]*dataSources:\s*[^)]*\)' packages
rg -nP --type=ts -C4 'for\s*\(\s*let\s+\w+\s*=\s*0\s*;\s*\w+\s*<\s*dataSources\.length\s*;\s*\w+\+\+\s*\)' packages

Repository: subquery/subql

Length of output: 1782


🏁 Script executed:

cat -n packages/node-core/src/indexer/indexer.manager.spec.ts | head -200

Repository: subquery/subql

Length of output: 8563


🏁 Script executed:

rg -n "destroyDynamicDatasource\|self.destroy\|destroy.*itself" packages/node-core/src/indexer/ --type ts

Repository: subquery/subql

Length of output: 40


🏁 Script executed:

find packages/node-core/src -name "*dynamic-ds*" -type f

Repository: subquery/subql

Length of output: 235


🏁 Script executed:

rg -n "destroyDynamicDatasource\|destroy" packages/node-core/src/indexer/dynamic-ds.service.ts -C 5 | head -150

Repository: subquery/subql

Length of output: 40


🏁 Script executed:

cat packages/node-core/src/indexer/dynamic-ds.service.ts | head -200

Repository: subquery/subql

Length of output: 7309


🏁 Script executed:

rg -n "refilter\|splice\|filter.*dataSources" packages/node/src/indexer/indexer.manager.ts -C 3

Repository: subquery/subql

Length of output: 40


🏁 Script executed:

cat -n packages/node/src/indexer/indexer.manager.ts | sed -n '180,220p'

Repository: subquery/subql

Length of output: 1567


🏁 Script executed:

rg -n "internalIndexBlock\|filterDatasources" packages/node-core/src/indexer/indexer.manager.ts -C 5 | head -100

Repository: subquery/subql

Length of output: 40


🏁 Script executed:

cat -n packages/node-core/src/indexer/indexer.manager.ts | head -300

Repository: subquery/subql

Length of output: 13439


🏁 Script executed:

cat -n packages/node-core/src/indexer/indexer.manager.spec.ts | sed -n '36,75p'

Repository: subquery/subql

Length of output: 1657


🏁 Script executed:

python3 << 'EOF'
# Trace through the suggested test scenario

# Initial state
datasources = [
    {'startBlock': 1, 'endBlock': None},  # index 0
    {'startBlock': 5, 'endBlock': None},  # index 1 - will self-destroy
    {'startBlock': 10, 'endBlock': None}  # index 2
]

destroyConfig = {'triggerStartBlock': 5, 'targetTemplate': 'Test', 'targetIndex': 1}
processedStartBlocks = []
blockHeight = 100

print("=== Simulating indexBlockData loop ===")
print(f"Initial dataSources length: {len(datasources)}")
print()

# Simulate the for loop in indexBlockData
for i in range(len(datasources)):
    if i >= len(datasources):  # Safety check
        print(f"i={i}: Loop condition would fail: {i} >= {len(datasources)}")
        break
    
    ds = datasources[i]
    processedStartBlocks.append(ds['startBlock'])
    print(f"i={i}: Process ds[{i}] (startBlock={ds['startBlock']})")
    
    # Check if destroy should trigger
    if ds['startBlock'] == destroyConfig['triggerStartBlock']:
        print(f"  → Destroy triggered for index {destroyConfig['targetIndex']}")
        
        # Simulate destroyDynamicDatasource
        if destroyConfig['targetIndex'] < len(datasources):
            datasources[destroyConfig['targetIndex']]['endBlock'] = blockHeight
            print(f"  → Set endBlock={blockHeight} on ds[{destroyConfig['targetIndex']}]")
        
        # Simulate re-filtering (removing datasources with endBlock <= blockHeight)
        original_len = len(datasources)
        datasources = [ds for ds in datasources 
                       if ds['endBlock'] is None or ds['endBlock'] > blockHeight]
        print(f"  → Re-filtered: array shrunk from {original_len} to {len(datasources)}")
        print(f"  → Remaining datasources: {[ds['startBlock'] for ds in datasources]}")
    
    print()

print(f"=== Result ===")
print(f"processedStartBlocks: {processedStartBlocks}")
print(f"Suggested test expects: [1, 5, 10]")
print(f"Match: {processedStartBlocks == [1, 5, 10]}")
EOF

Repository: subquery/subql

Length of output: 469


Add self-destruction regression test to this suite, with corrected expectations.

The suite currently tests destroying a later datasource (index 2 while processing index 1), but misses the critical case from issue #2099: a datasource destroying itself mid-block. With the in-place refiltering and index-based loop, self-destruction shrinks the array during iteration and can skip subsequent datasources.

A self-destruction test is needed. The suggested test below identifies the right scenario but has incorrect expectations—when datasource at index 1 destroys itself during processing at index 1, the array shrinks from 3 to 2, causing the loop to exit before reaching the datasource originally at index 2 (startBlock=10):

💡 Corrected test
+    it('skips subsequent datasources when current one destroys itself', async () => {
+      const meta = mockMetadata([
+        {templateName: 'Test', startBlock: 1},
+        {templateName: 'Test', startBlock: 5},
+        {templateName: 'Test', startBlock: 10},
+      ]);
+      await dynamicDsService.init(meta);
+      const datasources = await dynamicDsService.getDynamicDatasources();
+
+      manager.destroyConfig = {triggerStartBlock: 5, targetTemplate: 'Test', targetIndex: 1};
+
+      await manager.indexBlock(mockBlock(100), datasources);
+
+      expect(manager.processedStartBlocks).toEqual([1, 5]);
+      expect(dynamicDsService.getDatasourceParamByIndex(1)?.endBlock).toBe(100);
+    });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
describe('destroy dynamic datasource mid-block', () => {
it('removes destroyed ds from the iteration within the same block', async () => {
const meta = mockMetadata([
{templateName: 'Test', startBlock: 1},
{templateName: 'Test', startBlock: 5},
{templateName: 'Test', startBlock: 10},
]);
await dynamicDsService.init(meta);
const datasources = await dynamicDsService.getDynamicDatasources();
// When processing startBlock=5, destroy ds at index 2 (startBlock=10)
manager.destroyConfig = {triggerStartBlock: 5, targetTemplate: 'Test', targetIndex: 2};
await manager.indexBlock(mockBlock(100), datasources);
// startBlock=10 should never be reached
expect(manager.processedStartBlocks).toEqual([1, 5]);
});
describe('destroy dynamic datasource mid-block', () => {
it('removes destroyed ds from the iteration within the same block', async () => {
const meta = mockMetadata([
{templateName: 'Test', startBlock: 1},
{templateName: 'Test', startBlock: 5},
{templateName: 'Test', startBlock: 10},
]);
await dynamicDsService.init(meta);
const datasources = await dynamicDsService.getDynamicDatasources();
// When processing startBlock=5, destroy ds at index 2 (startBlock=10)
manager.destroyConfig = {triggerStartBlock: 5, targetTemplate: 'Test', targetIndex: 2};
await manager.indexBlock(mockBlock(100), datasources);
// startBlock=10 should never be reached
expect(manager.processedStartBlocks).toEqual([1, 5]);
});
it('skips subsequent datasources when current one destroys itself', async () => {
const meta = mockMetadata([
{templateName: 'Test', startBlock: 1},
{templateName: 'Test', startBlock: 5},
{templateName: 'Test', startBlock: 10},
]);
await dynamicDsService.init(meta);
const datasources = await dynamicDsService.getDynamicDatasources();
manager.destroyConfig = {triggerStartBlock: 5, targetTemplate: 'Test', targetIndex: 1};
await manager.indexBlock(mockBlock(100), datasources);
expect(manager.processedStartBlocks).toEqual([1, 5]);
expect(dynamicDsService.getDatasourceParamByIndex(1)?.endBlock).toBe(100);
});
});
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/node-core/src/indexer/indexer.manager.spec.ts` around lines 131 -
148, Add a regression test that covers a datasource destroying itself mid-block:
use mockMetadata([...]) and dynamicDsService.init(...) to create three
datasources (startBlock 1,5,10), set manager.destroyConfig = {triggerStartBlock:
5, targetTemplate: 'Test', targetIndex: 1} so the datasource being processed at
startBlock 5 self-destructs, call manager.indexBlock(mockBlock(100),
datasources), and assert manager.processedStartBlocks equals [1, 5] (not
including startBlock 10) to verify the in-place refiltering bug is caught.


it('sets endBlock on the destroyed datasource', async () => {
const meta = mockMetadata([
{templateName: 'Test', startBlock: 1},
{templateName: 'Test', startBlock: 5},
]);
await dynamicDsService.init(meta);
const datasources = await dynamicDsService.getDynamicDatasources();

manager.destroyConfig = {triggerStartBlock: 1, targetTemplate: 'Test', targetIndex: 1};

await manager.indexBlock(mockBlock(50), datasources);

const param = dynamicDsService.getDatasourceParamByIndex(1);
expect(param?.endBlock).toBe(50);
});

it('processes all datasources when nothing is destroyed', async () => {
const meta = mockMetadata([
{templateName: 'Test', startBlock: 1},
{templateName: 'Test', startBlock: 5},
{templateName: 'Test', startBlock: 10},
]);
await dynamicDsService.init(meta);
const datasources = await dynamicDsService.getDynamicDatasources();

await manager.indexBlock(mockBlock(100), datasources);

expect(manager.processedStartBlocks).toEqual([1, 5, 10]);
});

it('filters out already-destroyed datasources before processing starts', async () => {
const meta = mockMetadata([
{templateName: 'Test', startBlock: 1, endBlock: 50},
{templateName: 'Test', startBlock: 5},
{templateName: 'Test', startBlock: 10},
]);
await dynamicDsService.init(meta);
const datasources = await dynamicDsService.getDynamicDatasources();

await manager.indexBlock(mockBlock(100), datasources);

// startBlock=1 was destroyed at block 50, should be excluded
expect(manager.processedStartBlocks).toEqual([5, 10]);
});
});
});
24 changes: 23 additions & 1 deletion packages/node-core/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ export abstract class BaseIndexerManager<
dynamicDsCreated = true;
}, 'createDynamicDatasource');

// Inject function to get dynamic datasources by template into vm
vm.freeze((templateName: string) => {
return this.dynamicDsService.getDynamicDatasourcesByTemplate(templateName);
}, 'getDynamicDatasources');

// Inject function to destroy ds into vm
vm.freeze(async (templateName: string, index: number) => {
await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index);

// Re-filter and mutate the array in-place so that indexBlockData's reference
// sees the change. A simple reassignment (filteredDataSources = ...) would only
// update this closure's variable, leaving the dataSources parameter in
// indexBlockData pointing at the stale array.
const refiltered = this.filterDataSources(blockHeight, filteredDataSources);
filteredDataSources.length = 0;
filteredDataSources.push(...refiltered);
}, 'destroyDynamicDatasource');

return vm;
});
}
Expand All @@ -135,11 +153,15 @@ export abstract class BaseIndexerManager<
private filterDataSources(nextProcessingHeight: number, dataSources: DS[]): DS[] {
let filteredDs: DS[];

// Strict `>` (not `>=`) is intentional: destroyDynamicDatasource sets
// endBlock = currentBlockHeight, and the destroyed DS must be excluded
// when re-filtering within the same block. With `>=` the DS would pass
// the filter and remain active for the rest of the block.
filteredDs = dataSources.filter(
(ds) =>
ds.startBlock !== undefined &&
ds.startBlock <= nextProcessingHeight &&
(ds.endBlock ?? Number.MAX_SAFE_INTEGER) >= nextProcessingHeight
(ds.endBlock ?? Number.MAX_SAFE_INTEGER) > nextProcessingHeight
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure why this line has changed.

);

// perform filter for custom ds
Expand Down
Loading