-
Notifications
You must be signed in to change notification settings - Fork 395
feat: implemented dynamic datasource removing themseleves and adding them at end blocks #2904
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
78f8551
323a622
4971c3d
af7339f
cd60d99
54eb623
1382702
87dca65
5cf5af1
efb5b82
a386dc4
de059a3
78b5091
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,15 +15,16 @@ class TestDynamicDsService extends DynamicDsService<BaseDataSource, ISubqueryPro | |
| } | ||
|
|
||
| // Make it public | ||
| getTemplate(templateName: string, startBlock?: number | undefined): BaseDataSource { | ||
| return super.getTemplate(templateName, startBlock); | ||
| getTemplate(templateName: string, startBlock?: number | undefined, endBlock?: number | undefined): BaseDataSource { | ||
| return super.getTemplate(templateName, startBlock, endBlock); | ||
| } | ||
| } | ||
|
|
||
| const testParam1 = {templateName: 'Test', startBlock: 1}; | ||
| const testParam2 = {templateName: 'Test', startBlock: 2}; | ||
| const testParam3 = {templateName: 'Test', startBlock: 3}; | ||
| const testParam4 = {templateName: 'Test', startBlock: 4}; | ||
| const testParamOther = {templateName: 'Other', startBlock: 5}; | ||
|
|
||
| const mockMetadata = (initData: DatasourceParams[] = []) => { | ||
| let datasourceParams: DatasourceParams[] = initData; | ||
|
|
@@ -40,7 +41,7 @@ const mockMetadata = (initData: DatasourceParams[] = []) => { | |
| describe('DynamicDsService', () => { | ||
| let service: TestDynamicDsService; | ||
| const project = { | ||
| templates: [{name: 'Test'}], | ||
| templates: [{name: 'Test'}, {name: 'Other'}], | ||
| } as any as ISubqueryProject; | ||
|
|
||
| beforeEach(() => { | ||
|
|
@@ -70,6 +71,67 @@ describe('DynamicDsService', () => { | |
| ]); | ||
| }); | ||
|
|
||
| it('can destroy a dynamic datasource', async () => { | ||
| const meta = mockMetadata([testParam1, testParam2]); | ||
| await service.init(meta); | ||
|
|
||
| await service.destroyDynamicDatasource('Test', 50); | ||
|
|
||
| const updatedParams = (service as any)._datasourceParams; | ||
| expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50}); | ||
| expect(updatedParams[1]).toEqual(testParam2); | ||
|
|
||
| const datasources = (service as any)._datasources; | ||
| expect(datasources[0].endBlock).toBe(50); | ||
| }); | ||
|
|
||
| it('throws error when destroying non-existent datasource', async () => { | ||
| const meta = mockMetadata([testParam1]); | ||
| await service.init(meta); | ||
|
|
||
| await expect(service.destroyDynamicDatasource('NonExistent', 50)).rejects.toThrow( | ||
| 'Dynamic datasource with template name "NonExistent" not found' | ||
| ); | ||
| }); | ||
|
|
||
| it('throws error when destroying already destroyed datasource', async () => { | ||
| const destroyedParam = {...testParam1, endBlock: 30}; | ||
| const meta = mockMetadata([destroyedParam]); | ||
| await service.init(meta); | ||
|
|
||
| await expect(service.destroyDynamicDatasource('Test', 50)).rejects.toThrow( | ||
| 'Dynamic datasource "Test" is already destroyed' | ||
| ); | ||
| }); | ||
|
|
||
| it('allows creating new datasource after destroying existing one', async () => { | ||
| const meta = mockMetadata([testParam1]); | ||
| await service.init(meta); | ||
|
|
||
| expect((service as any)._datasourceParams).toEqual([testParam1]); | ||
|
|
||
| await service.destroyDynamicDatasource('Test', 50); | ||
|
|
||
| const paramsAfterDestroy = (service as any)._datasourceParams; | ||
| expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 50}); | ||
|
|
||
| const newParam = {templateName: 'Test', startBlock: 60}; | ||
| await service.createDynamicDatasource(newParam); | ||
|
|
||
| const finalParams = (service as any)._datasourceParams; | ||
| const destroyedCount = finalParams.filter((p) => p.endBlock !== undefined).length; | ||
| const activeCount = finalParams.filter((p) => p.endBlock === undefined).length; | ||
|
|
||
| expect(destroyedCount).toBeGreaterThanOrEqual(1); | ||
| expect(activeCount).toBeGreaterThanOrEqual(1); | ||
|
|
||
| const destroyedParam = finalParams.find((p) => p.startBlock === 1 && p.endBlock === 50); | ||
| expect(destroyedParam).toBeDefined(); | ||
|
|
||
| const newParamFound = finalParams.find((p) => p.startBlock === 60 && !p.endBlock); | ||
| expect(newParamFound).toBeDefined(); | ||
| }); | ||
|
|
||
| it('resets dynamic datasources', async () => { | ||
| const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]); | ||
| await service.init(meta); | ||
|
|
@@ -83,6 +145,25 @@ describe('DynamicDsService', () => { | |
| ]); | ||
| }); | ||
|
|
||
| it('handles reset after datasource destruction correctly', async () => { | ||
| const params = [testParam1, testParam2, testParam3, testParam4]; | ||
| const meta = mockMetadata(params); | ||
| await service.init(meta); | ||
|
|
||
| await service.destroyDynamicDatasource('Test', 25); // Destroys testParam1 | ||
|
|
||
| const paramsAfterDestroy = (service as any)._datasourceParams; | ||
| expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 25}); | ||
|
|
||
| // Reset to block 2 (should keep testParam1 and testParam2) | ||
| await service.resetDynamicDatasource(2, null as any); | ||
|
|
||
| const paramsAfterReset = (service as any)._datasourceParams; | ||
| expect(paramsAfterReset).toHaveLength(2); | ||
| expect(paramsAfterReset[0]).toEqual({...testParam1, endBlock: 25}); | ||
| expect(paramsAfterReset[1]).toEqual(testParam2); | ||
| }); | ||
|
|
||
| it('getDynamicDatasources with force reloads from metadata', async () => { | ||
| const meta = mockMetadata([testParam1, testParam2]); | ||
| await service.init(meta); | ||
|
|
@@ -107,6 +188,29 @@ describe('DynamicDsService', () => { | |
| ]); | ||
| }); | ||
|
|
||
| it('loads destroyed datasources with endBlock correctly', async () => { | ||
| const destroyedParam = {...testParam1, endBlock: 100}; | ||
| const meta = mockMetadata([destroyedParam, testParam2]); | ||
| await service.init(meta); | ||
|
|
||
| const datasources = await service.getDynamicDatasources(); | ||
| expect(datasources).toHaveLength(2); | ||
| expect((datasources[0] as any).endBlock).toBe(100); | ||
| expect((datasources[1] as any).endBlock).toBeUndefined(); | ||
| }); | ||
|
|
||
| it('updates metadata correctly when destroying datasource', async () => { | ||
| const meta = mockMetadata([testParam1, testParam2]); | ||
| await service.init(meta); | ||
|
|
||
| await service.destroyDynamicDatasource('Test', 75); | ||
|
|
||
| const metadataParams = await meta.find('dynamicDatasources'); | ||
| expect(metadataParams).toBeDefined(); | ||
| expect(metadataParams![0]).toEqual({...testParam1, endBlock: 75}); | ||
| expect(metadataParams![1]).toEqual(testParam2); | ||
|
Comment on lines
+205
to
+215
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This metadata assertion is currently non-diagnostic. These checks depend on 🧪 One way to make the fixture independentconst cloneParam = (param: DatasourceParams): DatasourceParams => ({
...param,
args: param.args ? {...param.args} : undefined,
});
const mockMetadata = (initData: DatasourceParams[] = []) => {
let datasourceParams: DatasourceParams[] = initData.map(cloneParam);
return {
set: (_key: string, value: DatasourceParams[]) => {
datasourceParams = value.map(cloneParam);
},
find: (_key: string) => Promise.resolve(datasourceParams.map(cloneParam)),
setNewDynamicDatasource: (params: DatasourceParams) => datasourceParams.push(cloneParam(params)),
} as unknown as CacheMetadataModel;
};🤖 Prompt for AI Agents |
||
| }); | ||
|
|
||
| it('can find a template and cannot mutate the template', () => { | ||
| const template1 = service.getTemplate('Test', 1); | ||
| const template2 = service.getTemplate('Test', 2); | ||
|
|
@@ -120,4 +224,35 @@ describe('DynamicDsService', () => { | |
| // eslint-disable-next-line @typescript-eslint/no-non-null-assertion | ||
| expect(project.templates![0]).toEqual({name: 'Test'}); | ||
| }); | ||
|
|
||
| it('can create template with endBlock', () => { | ||
| const template = service.getTemplate('Test', 1, 100); | ||
|
|
||
| expect(template.startBlock).toBe(1); | ||
| expect((template as any).endBlock).toBe(100); | ||
| expect((template as any).name).toBeUndefined(); | ||
| }); | ||
|
|
||
| it('handles multiple templates with same name during destruction', async () => { | ||
| const param1 = {templateName: 'Test', startBlock: 1}; | ||
| const param2 = {templateName: 'Test', startBlock: 5}; | ||
| const param3 = {templateName: 'Other', startBlock: 3}; | ||
|
|
||
| const meta = mockMetadata([param1, param2, param3]); | ||
| await service.init(meta); | ||
|
|
||
| // Should destroy the first matching one | ||
| await service.destroyDynamicDatasource('Test', 10); | ||
|
|
||
| const updatedParams = (service as any)._datasourceParams; | ||
| expect(updatedParams[0]).toEqual({...param1, endBlock: 10}); | ||
| expect(updatedParams[1]).toEqual(param2); // Not destroyed | ||
| expect(updatedParams[2]).toEqual(param3); // Not destroyed | ||
| }); | ||
|
|
||
| it('throws error when service not initialized for destruction', async () => { | ||
| await expect(service.destroyDynamicDatasource('Test', 50)).rejects.toThrow( | ||
| 'DynamicDsService has not been initialized' | ||
| ); | ||
| }); | ||
| }); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,11 +19,13 @@ export interface DatasourceParams { | |
| templateName: string; | ||
| args?: Record<string, unknown>; | ||
| startBlock: number; | ||
| endBlock?: number; | ||
| } | ||
|
|
||
| export interface IDynamicDsService<DS> { | ||
| dynamicDatasources: DS[]; | ||
| createDynamicDatasource(params: DatasourceParams): Promise<DS>; | ||
| destroyDynamicDatasource(templateName: string, currentBlockHeight: number): Promise<void>; | ||
| getDynamicDatasources(forceReload?: boolean): Promise<DS[]>; | ||
| } | ||
|
|
||
|
|
@@ -91,6 +93,34 @@ export class DynamicDsService<DS extends BaseDataSource = BaseDataSource, P exte | |
| } | ||
| } | ||
|
|
||
| async destroyDynamicDatasource(templateName: string, currentBlockHeight: number, tx?: Transaction): Promise<void> { | ||
| if (!this._datasources || !this._datasourceParams) { | ||
| throw new Error('DynamicDsService has not been initialized'); | ||
| } | ||
|
|
||
| const dsIndex = this._datasourceParams.findIndex((params) => params.templateName === templateName); | ||
| if (dsIndex === -1) { | ||
| throw new Error(`Dynamic datasource with template name "${templateName}" not found`); | ||
| } | ||
|
|
||
| const dsParam = this._datasourceParams[dsIndex]; | ||
|
|
||
| if (dsParam.endBlock !== undefined) { | ||
| throw new Error(`Dynamic datasource "${templateName}" is already destroyed`); | ||
| } | ||
|
|
||
| const updatedParams = {...dsParam, endBlock: currentBlockHeight}; | ||
| this._datasourceParams[dsIndex] = updatedParams; | ||
|
|
||
| if (this._datasources[dsIndex]) { | ||
| (this._datasources[dsIndex] as any).endBlock = currentBlockHeight; | ||
| } | ||
|
stwiname marked this conversation as resolved.
|
||
|
|
||
| await this.metadata.set(METADATA_KEY, this._datasourceParams, tx); | ||
|
Comment on lines
+165
to
+179
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In-memory state updated before persistence could cause inconsistency. If Consider reordering to persist first: - // Update the datasource params
- const updatedParams = {...dsParam, endBlock: currentBlockHeight};
- this._datasourceParams[index] = updatedParams;
-
- // Update the datasource object if it exists
- const datasource = this._datasources[index];
- if (!datasource) {
- throw new Error(`Datasources array out of sync with params at index ${index}`);
- }
- // Set endBlock on the datasource object
- datasource.endBlock = currentBlockHeight;
-
- await this.metadata.set(METADATA_KEY, this._datasourceParams, tx);
+ // Prepare updated params
+ const updatedParams = {...dsParam, endBlock: currentBlockHeight};
+
+ // Create a copy for persistence
+ const paramsForPersistence = [...this._datasourceParams];
+ paramsForPersistence[index] = updatedParams;
+
+ // Persist first, then update in-memory on success
+ await this.metadata.set(METADATA_KEY, paramsForPersistence, tx);
+
+ // Update in-memory state only after successful persistence
+ this._datasourceParams[index] = updatedParams;
+
+ const datasource = this._datasources[index];
+ if (!datasource) {
+ throw new Error(`Datasources array out of sync with params at index ${index}`);
+ }
+ datasource.endBlock = currentBlockHeight;
🤖 Prompt for AI Agents |
||
|
|
||
| logger.info(`Destroyed dynamic datasource "${templateName}" at block ${currentBlockHeight}`); | ||
| } | ||
|
|
||
| // Not force only seems to be used for project changes | ||
| async getDynamicDatasources(forceReload?: boolean): Promise<DS[]> { | ||
| // Workers should not cache this result in order to keep in sync | ||
|
|
@@ -117,19 +147,19 @@ export class DynamicDsService<DS extends BaseDataSource = BaseDataSource, P exte | |
| * | ||
| * This will throw if the template cannot be found by name. | ||
| * | ||
| * Inserts the startBlock into the template. | ||
| * Inserts the startBlock and optionally endBlock into the template. | ||
| * */ | ||
| protected getTemplate(templateName: string, startBlock?: number): DS { | ||
| protected getTemplate(templateName: string, startBlock?: number, endBlock?: number): DS { | ||
| const t = (this.project.templates ?? []).find((t) => t.name === templateName); | ||
| if (!t) { | ||
| throw new Error(`Unable to find matching template in project for name: "${templateName}"`); | ||
| } | ||
| const {name, ...template} = cloneDeep(t); | ||
| return {...template, startBlock} as DS; | ||
| return {...template, startBlock, endBlock} as DS; | ||
| } | ||
|
|
||
| private async getDatasource(params: DatasourceParams): Promise<DS> { | ||
| const dsObj = this.getTemplate(params.templateName, params.startBlock); | ||
| const dsObj = this.getTemplate(params.templateName, params.startBlock, params.endBlock); | ||
|
|
||
| try { | ||
| await this.blockchainService.updateDynamicDs(params, dsObj); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -116,6 +116,29 @@ export abstract class BaseIndexerManager< | |
| dynamicDsCreated = true; | ||
| }, 'createDynamicDatasource'); | ||
|
|
||
| // Inject function to destroy ds into vm | ||
| vm.freeze(async (templateName?: string) => { | ||
| if (!templateName) { | ||
| throw new Error('Cannot destroy datasource: template name must be provided'); | ||
| } | ||
|
|
||
| await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight); | ||
|
|
||
| // Mark datasources with this template for removal from current processing | ||
| filteredDataSources.forEach((fds) => { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The destroyed dynamic datasources should be removed from |
||
| const dsParams = this.dynamicDsService.dynamicDatasources.find((dynamicDs) => { | ||
| // Find the corresponding params for this datasource | ||
| const params = (this.dynamicDsService as any)._datasourceParams?.find( | ||
| (p: any) => p.templateName === templateName && p.startBlock === (fds as any).startBlock | ||
| ); | ||
| return params !== undefined; | ||
| }); | ||
| if (dsParams) { | ||
| (fds as any).endBlock = blockHeight; | ||
| } | ||
| }); | ||
| }, 'destroyDynamicDatasource'); | ||
|
|
||
|
Comment on lines
+124
to
+136
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainIn-block destroy isn’t effective: reassign doesn’t affect current iteration; filter logic won’t exclude endBlock === current block.
Fix by removing the destroyed DS from filteredDataSources in place using object identity. Apply this diff: vm.freeze(async (templateName: string, index: number) => {
await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index);
- // Re-filter datasources to exclude the destroyed one
- // The destroyed datasource now has endBlock set, so filterDataSources will exclude it
- // Note: Reassigning filteredDataSources is intentional - subsequent handlers
- // within the same block will see the updated filtered list
- filteredDataSources = this.filterDataSources(blockHeight, filteredDataSources);
+ // Remove the destroyed DS from the current list IN PLACE so the rest of this block
+ // no longer schedules it. EndBlock handles exclusion from subsequent blocks.
+ const allDynamic = await this.dynamicDsService.getDynamicDatasources();
+ const destroyed = allDynamic[index] as DS | undefined;
+ if (destroyed) {
+ const pos = filteredDataSources.indexOf(destroyed);
+ if (pos !== -1) filteredDataSources.splice(pos, 1);
+ }
}, 'destroyDynamicDatasource');Run this script to confirm indexBlockData iterates the original array (so reassigning the variable is ineffective): 🏁 Script executed: #!/bin/bash
# Look for indexBlockData implementations and how 'dataSources' is iterated
rg -nP -C3 '(?:async\s+)?indexBlockData\s*\(.*dataSources\s*:\s*[^\)]*\)' --type ts
rg -nP -C3 '\bfor\s*\(\s*const\s+\w+\s+of\s+dataSources\b|\bdataSources\.forEach\(' --type tsLength of output: 2595 I need to gather more information about the indexing flow. Let me search for the complete context. I need to examine the indexBlockData implementation and the filterDataSources logic to verify the claims. Let me search directly in the codebase for the relevant implementations. Length of output: 2063 🏁 Script executed: #!/bin/bash
# Find filterDataSources implementation with endBlock logic
rg -nA15 'filterDataSources' packages/node-core/src/indexer/indexer.manager.ts --type tsLength of output: 2146 🏁 Script executed: #!/bin/bash
# Get the full context around lines 100-160 in indexer.manager.ts
sed -n '100,160p' packages/node-core/src/indexer/indexer.manager.tsLength of output: 2380 Let me check for all implementations of 🏁 Script executed: #!/bin/bash
# Find all indexBlockData implementations
rg -nB2 -A10 'indexBlockData\s*\(' packages/ --type ts -g '!*.spec.ts' | grep -A10 'for\|forEach'Length of output: 0 🏁 Script executed: #!/bin/bash
# Verify when vm.freeze callbacks are executed - search for where handlers use injected functions
rg -nC5 'destroyDynamicDatasource|createDynamicDatasource' packages/ --type ts -g '!*.spec.ts' | head -60Length of output: 4941 The review comment is verified as correct and identifies a real bug. Based on my analysis of the codebase:
🤖 Prompt for AI Agents✅ Addressed in commit 1382702
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Gmin2 it would be good to address this and to have a test that confirms the correct behaviour There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Gmin2 this is still an issue, heres a simplified example. |
||
| return vm; | ||
| }); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Strengthen count assertions for precision.
Lines 127-128 use
toBeGreaterThanOrEqual(1), but after destroying one datasource and creating one new datasource, the counts should be exact: 1 destroyed and 1 active. Weak assertions may miss bugs where extra datasources are inadvertently created or destroyed.Apply this diff:
🤖 Prompt for AI Agents