Skip to content

Commit 6132914

Browse files
committed
chore: added cancellation to discovery background task
1 parent 6c93bfe commit 6132914

6 files changed

Lines changed: 49 additions & 34 deletions

File tree

src/nodes/NodeConnection.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,9 @@ class NodeConnection {
169169
targetHostname,
170170
tlsConfig,
171171
connectionKeepAliveIntervalTime,
172-
connectionKeepAliveTimeoutTime = config.defaultsSystem
173-
.nodesConnectionIdleTimeoutTimeMin,
174-
connectionInitialMaxStreamsBidi = config.defaultsSystem
175-
.nodesConnectionInitialMaxStreamsBidi,
176-
connectionInitialMaxStreamsUni = config.defaultsSystem
177-
.nodesConnectionInitialMaxStreamsUni,
172+
connectionKeepAliveTimeoutTime,
173+
connectionInitialMaxStreamsBidi,
174+
connectionInitialMaxStreamsUni,
178175
quicSocket,
179176
manifest,
180177
logger,

src/nodes/NodeConnectionManager.ts

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -654,13 +654,10 @@ class NodeConnectionManager {
654654
const cancelAuthenticationPs: Array<PromiseCancellable<void>> = [];
655655
const cancelReason = new nodesErrors.ErrorNodeConnectionManagerStopping();
656656
for (const [nodeIdString] of this.connections) {
657-
const destroyP = this.authenticateCancel(nodeIdString, cancelReason).then(
658-
async () => {
659-
return await this.destroyConnection(
660-
IdInternal.fromString<NodeId>(nodeIdString),
661-
force,
662-
);
663-
},
657+
this.authenticateCancel(nodeIdString, cancelReason);
658+
const destroyP = this.destroyConnection(
659+
IdInternal.fromString<NodeId>(nodeIdString),
660+
force,
664661
);
665662
destroyConnectionPs.push(destroyP);
666663
}
@@ -1177,7 +1174,7 @@ class NodeConnectionManager {
11771174
const remainingKeys = Object.keys(connectionsEntry.connections);
11781175
if (remainingKeys.length === 0) {
11791176
// Clean up authentication
1180-
await this.authenticateCancel(
1177+
this.authenticateCancel(
11811178
targetNodeIdString,
11821179
new nodesErrors.ErrorNodeManagerAuthenticationFailed(
11831180
'Connection destroyed before authentication could complete',
@@ -1808,7 +1805,6 @@ class NodeConnectionManager {
18081805
nodeId: NodeId,
18091806
@decorators.context ctx: ContextTimed,
18101807
): Promise<void> {
1811-
ctx.signal.throwIfAborted();
18121808
const targetNodeIdString = nodeId.toString() as NodeIdString;
18131809
const connectionsEntry = this.connections.get(targetNodeIdString);
18141810
if (connectionsEntry == null) {
@@ -1818,11 +1814,16 @@ class NodeConnectionManager {
18181814
const abortHandler = () => {
18191815
rejectAbortP(ctx.signal.reason);
18201816
};
1821-
ctx.signal.addEventListener('abort', abortHandler, { once: true });
1817+
if (ctx.signal.aborted) {
1818+
abortHandler();
1819+
} else {
1820+
ctx.signal.addEventListener('abort', abortHandler, { once: true });
1821+
}
18221822
try {
18231823
return await Promise.race([connectionsEntry.authenticatedP, abortP]);
18241824
} catch (e) {
1825-
// Capture the stacktrace here since knowing where we're waiting for authentication is more useful
1825+
// Capture the stacktrace here since knowing where we're waiting for
1826+
// authentication is more useful.
18261827
Error.captureStackTrace(e);
18271828
throw e;
18281829
} finally {
@@ -1881,7 +1882,7 @@ class NodeConnectionManager {
18811882
}
18821883
}
18831884

1884-
protected async authenticateCancel(
1885+
protected authenticateCancel(
18851886
targetNodeIdString: NodeIdString,
18861887
reason: Error,
18871888
) {

src/nodes/NodeGraph.ts

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -291,18 +291,20 @@ class NodeGraph {
291291
@createDestroyStartStop.ready(new nodesErrors.ErrorNodeGraphNotRunning())
292292
public async *getNodeContacts(
293293
order: 'asc' | 'desc' = 'asc',
294-
tran?: DBTransaction,
294+
tran: DBTransaction | undefined,
295+
ctx: ContextTimed,
295296
): AsyncGenerator<[NodeId, NodeContact]> {
296297
if (tran == null) {
297298
// Lambda generators don't grab the `this` context, so we need to bind it
298-
const getNodeContacts = (tran) => this.getNodeContacts(order, tran);
299+
const getNodeContacts = (tran) => this.getNodeContacts(order, tran, ctx);
299300
return yield* this.db.withTransactionG(async function* (tran) {
300301
return yield* getNodeContacts(tran);
301302
});
302303
}
303304
return yield* nodesUtils.collectNodeContacts(
304305
[...this.nodeGraphBucketsDbPath],
305306
tran,
307+
ctx,
306308
{ reverse: order !== 'asc' },
307309
);
308310
}
@@ -659,6 +661,7 @@ class NodeGraph {
659661
for await (const result of nodesUtils.collectNodeContacts(
660662
[...this.nodeGraphBucketsDbPath, bucketKey],
661663
tran,
664+
ctx,
662665
{
663666
reverse: order !== 'asc',
664667
limit,
@@ -736,10 +739,18 @@ class NodeGraph {
736739
* Resets the bucket according to the new node ID.
737740
* Run this after new node ID is generated via renewal or reset.
738741
*/
742+
public async resetBuckets(
743+
tran?: DBTransaction,
744+
ctx?: ContextTimed,
745+
): Promise<void>;
739746
@createDestroyStartStop.ready(new nodesErrors.ErrorNodeGraphNotRunning())
740-
public async resetBuckets(tran?: DBTransaction): Promise<void> {
747+
@timedCancellable(true)
748+
public async resetBuckets(
749+
tran: DBTransaction | undefined,
750+
@context ctx: ContextTimed,
751+
): Promise<void> {
741752
if (tran == null) {
742-
return this.db.withTransactionF((tran) => this.resetBuckets(tran));
753+
return this.db.withTransactionF((tran) => this.resetBuckets(tran, ctx));
743754
}
744755
// Setup new space
745756
const spaceNew = this.space === '0' ? '1' : '0';
@@ -760,6 +771,7 @@ class NodeGraph {
760771
for await (const [nodeId, nodeContact] of nodesUtils.collectNodeContacts(
761772
[...this.nodeGraphBucketsDbPath],
762773
tran,
774+
ctx,
763775
)) {
764776
const nodeIdKey = nodesUtils.bucketDbKey(nodeId);
765777
const nodeIdOwn = this.keyRing.getNodeId();
@@ -964,12 +976,12 @@ class NodeGraph {
964976
for await (const nodeEntry of nodesUtils.collectNodeContacts(
965977
this.nodeGraphBucketsDbPath,
966978
tran,
979+
ctx,
967980
{
968981
lt: [bucketIdKey, ''],
969982
limit: remainingLimit,
970983
},
971984
)) {
972-
ctx.signal.throwIfAborted();
973985
nodes.push(nodeEntry);
974986
}
975987
}
@@ -981,12 +993,12 @@ class NodeGraph {
981993
for await (const nodeEntry of nodesUtils.collectNodeContacts(
982994
this.nodeGraphBucketsDbPath,
983995
tran,
996+
ctx,
984997
{
985998
gt: [bucketId, ''],
986999
limit: remainingLimit,
9871000
},
9881001
)) {
989-
ctx.signal.throwIfAborted();
9901002
nodes.push(nodeEntry);
9911003
}
9921004
}

src/nodes/NodeManager.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,7 @@ class NodeManager {
132132
_taskInfo,
133133
bucketIndex: NodeBucketIndex,
134134
) => {
135-
// Don't use defaults like this
136-
// if a default is to be used
137-
// provide it directly
138-
139-
await this.refreshBucket(
140-
bucketIndex,
141-
this.connectionConnectTimeoutTime,
142-
ctx,
143-
);
135+
await this.refreshBucket(bucketIndex, undefined, ctx);
144136
// When completed reschedule the task
145137
// if refreshBucketDelay is 0 then it's considered disabled
146138
if (this.refreshBucketDelayTime > 0) {
@@ -718,6 +710,7 @@ class NodeManager {
718710
}
719711

720712
while (true) {
713+
ctx.signal.throwIfAborted();
721714
const isDone = await nodeConnectionsQueue.withNodeSignal(
722715
async (nodeIdTarget, nodeIdSignaller) => {
723716
let nodeConnection: NodeConnection | undefined;
@@ -859,6 +852,7 @@ class NodeManager {
859852
}
860853

861854
while (true) {
855+
ctx.signal.throwIfAborted();
862856
const isDone = await nodeConnectionsQueue.withNodeDirect(
863857
async (nodeIdTarget, nodeContact) => {
864858
if (!this.nodeConnectionManager.hasConnection(nodeIdTarget)) {
@@ -1131,6 +1125,7 @@ class NodeManager {
11311125
);
11321126
// Collecting results
11331127
for await (const result of resultStream) {
1128+
ctx.signal.throwIfAborted();
11341129
const nodeIdNew = nodesUtils.decodeNodeId(result.nodeId);
11351130
if (nodeIdNew == null) {
11361131
utils.never(`failed to decode NodeId "${result.nodeId}"`);
@@ -1147,6 +1142,7 @@ class NodeManager {
11471142
ctx,
11481143
);
11491144
for await (const { nodeIdEncoded, nodeContact } of resultStream) {
1145+
ctx.signal.throwIfAborted();
11501146
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded);
11511147
if (nodeId == null) {
11521148
utils.never(`failed to decode NodeId "${nodeIdEncoded}"`);

src/nodes/utils.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import type {
1818
NodesAuthenticateConnectionMessageBasicPublic,
1919
NodesAuthenticateConnectionMessageNone,
2020
} from './agent/types.js';
21+
import type { ContextTimed } from '@matrixai/contexts';
2122
import dns from 'dns';
2223
import { utils as dbUtils } from '@matrixai/db';
2324
import { IdInternal } from '@matrixai/id';
@@ -753,6 +754,7 @@ const quicServerCrypto: QUICServerCrypto = {
753754
async function* collectNodeContacts(
754755
levelPath: LevelPath,
755756
tran: DBTransaction,
757+
ctx: ContextTimed,
756758
options: {
757759
reverse?: boolean;
758760
lt?: LevelPath;
@@ -773,6 +775,7 @@ async function* collectNodeContacts(
773775
gt: options.gt,
774776
valueAsBuffer: false,
775777
})) {
778+
ctx.signal.throwIfAborted();
776779
const { nodeId: nodeIdCurrent, nodeContactAddress } = parseBucketsDbKey([
777780
...(options.pathAdjust ?? []),
778781
...keyPath,

tests/nodes/NodeGraph.test.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { ContextTimed } from '@matrixai/contexts';
12
import type {
23
NodeContactAddress,
34
NodeContact,
@@ -308,7 +309,12 @@ describe(`${NodeGraph.name} test`, () => {
308309
await nodeGraph.setNodeContact(nodeId2, nodeContact2);
309310

310311
const results: Array<[NodeId, NodeContact]> = [];
311-
for await (const result of nodeGraph.getNodeContacts()) {
312+
const abortController = new AbortController();
313+
for await (const result of nodeGraph.getNodeContacts(
314+
undefined,
315+
undefined,
316+
{ signal: abortController.signal } as ContextTimed,
317+
)) {
312318
results.push(result);
313319
}
314320
expect(results.length).toBe(2);

0 commit comments

Comments
 (0)