Skip to content

Commit af4c532

Browse files
committed
fix: fixing vaults handlers
[ci skip]
1 parent 1ed063c commit af4c532

11 files changed

Lines changed: 39 additions & 46 deletions

File tree

src/agent/handlers/clientManifest.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import type {
77
NodeAddressMessage,
88
NodeIdMessage,
99
SignedNotificationEncoded,
10-
VaultInfo,
11-
VaultsGitInfoGetMessage,
1210
VaultsGitPackGetMessage,
1311
VaultsScanMessage,
1412
} from './types';

src/agent/handlers/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,6 @@ export type GitPackMessage = {
5555

5656
export type VaultsGitPackGetMessage = {
5757
body: string;
58+
nameOrId: VaultIdEncoded | VaultName;
59+
vaultAction: VaultAction;
5860
};

src/agent/handlers/vaultsGitInfoGet.ts

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,16 @@
1-
import type { GitPackMessage, VaultInfo } from './types';
2-
import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types';
31
import type { DB } from '@matrixai/db';
42
import type { VaultManager } from '../../vaults';
53
import type { ACL } from '../../acl';
64
import type Logger from '@matrixai/logger';
7-
import type { VaultsGitInfoGetMessage } from './types';
8-
import type { VaultAction } from '../../vaults/types';
9-
import type { JSONRPCRequest } from '@/rpc/types';
5+
import type { JSONRPCRequest } from '../../rpc/types';
106
import type { ContextTimed } from '@matrixai/contexts';
11-
import type { JSONValue } from '@/types';
7+
import type { JSONValue } from '../../types';
128
import { ReadableStream } from 'stream/web';
139
import * as agentErrors from '../errors';
1410
import * as vaultsUtils from '../../vaults/utils';
1511
import * as vaultsErrors from '../../vaults/errors';
1612
import { RawHandler } from '../../rpc/handlers';
17-
import { validateSync } from '../../validation';
18-
import { matchSync, never } from '../../utils';
13+
import { never } from '../../utils';
1914
import * as validationUtils from '../../validation/utils';
2015
import * as nodesUtils from '../../nodes/utils';
2116
import * as agentUtils from '../utils';
@@ -29,12 +24,18 @@ class VaultsGitInfoGetHandler extends RawHandler<{
2924
}> {
3025
public async handle(
3126
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
32-
cancel: (reason?: any) => void,
27+
_cancel,
3328
meta: Record<string, JSONValue> | undefined,
34-
ctx: ContextTimed,
29+
_ctx: ContextTimed, // TODO: use
3530
): Promise<[JSONValue, ReadableStream<Uint8Array>]> {
3631
const { db, vaultManager, acl } = this.container;
3732
const [headerMessage, inputStream] = input;
33+
const readableProm = (async () => {
34+
for await (const _ of inputStream) {
35+
// Input stream is not used here, wait for finish.
36+
// It should be closed by the caller immediately
37+
}
38+
})();
3839
const params = headerMessage.params;
3940
if (params == null || !utils.isObject(params)) never();
4041
if (
@@ -103,7 +104,7 @@ class VaultsGitInfoGetHandler extends RawHandler<{
103104
controller.close();
104105
},
105106
});
106-
107+
await readableProm;
107108
return [
108109
{
109110
vaultName: data.vaultName,

src/agent/handlers/vaultsGitPackGet.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { matchSync } from '../../utils';
1414
import * as validationUtils from '../../validation/utils';
1515
import { ServerHandler } from '../../rpc/handlers';
1616

17+
// TODO: This needs to be a raw handler
1718
class VaultsGitPackGetHandler extends ServerHandler<
1819
{
1920
vaultManager: VaultManager;
@@ -34,7 +35,7 @@ class VaultsGitPackGetHandler extends ServerHandler<
3435
throw new agentErrors.ErrorAgentNodeIdMissing();
3536
}
3637
const nodeIdEncoded = nodesUtils.encodeNodeId(requestingNodeId);
37-
const nameOrId = meta.get('vaultNameOrId').pop()!.toString();
38+
const nameOrId = input.nameOrId;
3839
yield* db.withTransactionG(async function* (
3940
tran,
4041
): AsyncGenerator<AgentRPCResponseResult<GitPackMessage>> {
@@ -58,7 +59,7 @@ class VaultsGitPackGetHandler extends ServerHandler<
5859
);
5960
},
6061
{
61-
actionType: meta.get('vaultAction').pop()!.toString(),
62+
actionType: input.vaultAction,
6263
},
6364
);
6465
// Checking permissions

src/nodes/NodeConnection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class NodeConnection<M extends ClientManifest> extends EventTarget {
6666
targetHostname,
6767
tlsConfig,
6868
connectionKeepAliveIntervalTime,
69-
connectionMaxIdleTimeout,
69+
connectionMaxIdleTimeout = 60_000,
7070
quicSocket,
7171
manifest,
7272
logger,
@@ -98,7 +98,7 @@ class NodeConnection<M extends ClientManifest> extends EventTarget {
9898
tlsConfig,
9999
manifest,
100100
connectionKeepAliveIntervalTime,
101-
connectionMaxIdleTimeout,
101+
connectionMaxIdleTimeout = 60_000,
102102
quicSocket,
103103
logger = new Logger(this.name),
104104
}: {

src/nodes/types.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ type NodeData = {
2626

2727
type SeedNodes = Record<NodeIdEncoded, NodeAddress>;
2828

29-
3029
export type {
3130
NodeId,
3231
NodeIdString,

src/rpc/RPCServer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ class RPCServer extends EventTarget {
240240
handler: DuplexHandlerImplementation<I, O>,
241241
timeout: number | undefined,
242242
): void {
243-
const rawSteamHandler: RawHandlerImplementation = (
243+
const rawSteamHandler: RawHandlerImplementation = async (
244244
[header, input],
245245
cancel,
246246
meta,

src/rpc/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ type HandlerImplementation<I, O> = (
164164

165165
type RawHandlerImplementation = HandlerImplementation<
166166
[JSONRPCRequest, ReadableStream<Uint8Array>],
167-
[JSONValue | undefined, ReadableStream<Uint8Array>]
167+
Promise<[JSONValue | undefined, ReadableStream<Uint8Array>]>
168168
>;
169169

170170
type DuplexHandlerImplementation<

src/vaults/VaultInternal.ts

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type { NodeId, NodeIdEncoded } from '../ids/types';
1717
import type NodeConnectionManager from '../nodes/NodeConnectionManager';
1818
import type RPCClient from '../rpc/RPCClient';
1919
import type { clientManifest as agentClientManifest } from '../agent/handlers/clientManifest';
20+
import type { POJO } from '../types';
2021
import path from 'path';
2122
import git from 'isomorphic-git';
2223
import Logger from '@matrixai/logger';
@@ -26,11 +27,11 @@ import {
2627
} from '@matrixai/async-init/dist/CreateDestroyStartStop';
2728
import { withF, withG } from '@matrixai/resources';
2829
import { RWLockWriter } from '@matrixai/async-locks';
29-
import * as utils from '@/utils';
30-
import * as validationUtils from '@/validation/utils';
3130
import * as vaultsErrors from './errors';
3231
import * as vaultsUtils from './utils';
3332
import { tagLast } from './types';
33+
import * as validationUtils from '../validation/utils';
34+
import * as utils from '../utils';
3435
import * as nodesUtils from '../nodes/utils';
3536
import { never } from '../utils/utils';
3637

@@ -139,7 +140,7 @@ class VaultInternal {
139140

140141
const vaultIdEncoded = vaultsUtils.encodeVaultId(vaultId);
141142
logger.info(`Cloning ${this.name} - ${vaultIdEncoded}`);
142-
const vault = new VaultInternal({
143+
const vault = new this({
143144
vaultId,
144145
db,
145146
vaultsDbPath,
@@ -766,13 +767,11 @@ class VaultInternal {
766767
typeof vaultNameOrId === 'string'
767768
? vaultNameOrId
768769
: vaultsUtils.encodeVaultId(vaultNameOrId);
769-
console.log('a');
770770
const response = await client.methods.vaultsGitInfoGet({
771771
vaultNameOrId: vaultNameOrId_,
772772
action: vaultAction,
773773
});
774-
console.log('a');
775-
console.log(response.meta);
774+
await response.writable.close();
776775

777776
const result = response.meta?.result;
778777
if (result == null || !utils.isObject(result)) never();
@@ -793,7 +792,6 @@ class VaultInternal {
793792
for await (const chunk of response.readable) {
794793
infoResponse.push(chunk);
795794
}
796-
// TODO: complete
797795
return [
798796
async function ({
799797
url,
@@ -818,17 +816,13 @@ class VaultInternal {
818816
};
819817
} else if (method === 'POST') {
820818
const responseBuffers: Array<Uint8Array> = [];
821-
const stream = client.methods.vaultsGitPackGet(metadata);
822-
const chunk = new vaultsPB.PackChunk();
823-
// Body is usually an async generator but in the cases we are using,
824-
// only the first value is used
825-
chunk.setChunk(body[0]);
826-
// Tell the server what commit we need
827-
await stream.write(chunk);
828-
let packResponse = (await stream.read()).value;
829-
while (packResponse != null) {
830-
responseBuffers.push(packResponse.getChunk_asU8());
831-
packResponse = (await stream.read()).value;
819+
const stream = await client.methods.vaultsGitPackGet({
820+
body: body[0].toString('binary'),
821+
nameOrId: result.vaultIdEncoded as string,
822+
vaultAction,
823+
});
824+
for await (const value of stream) {
825+
responseBuffers.push(Buffer.from(value.chunk, 'binary'));
832826
}
833827
return {
834828
url: url,

src/vaults/VaultManager.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -796,11 +796,11 @@ class VaultManager {
796796
tran?: DBTransaction,
797797
): AsyncGenerator<Buffer> {
798798
if (tran == null) {
799-
return this.db.withTransactionF(async (tran) =>
800-
this.handleInfoRequest(vaultId, tran),
801-
);
799+
const handleInfoRequest = (tran) => this.handleInfoRequest(vaultId, tran);
800+
return yield* this.db.withTransactionG(async function* (tran) {
801+
return yield* handleInfoRequest(tran);
802+
});
802803
}
803-
804804
const efs = this.efs;
805805
const vault = await this.getVault(vaultId, tran);
806806
return yield* withG(
@@ -986,7 +986,6 @@ class VaultManager {
986986
if (tran == null) {
987987
return this.db.withTransactionF((tran) => this.getVault(vaultId, tran));
988988
}
989-
990989
const vaultIdString = vaultId.toString() as VaultIdString;
991990
// 1. get the vault, if it exists then return that
992991
const vault = this.vaultMap.get(vaultIdString);
@@ -1035,7 +1034,6 @@ class VaultManager {
10351034
return [vaultId.toString(), RWLockWriter, 'read'];
10361035
},
10371036
);
1038-
10391037
// Running the function with locking
10401038
return await this.vaultLocks.withF(...vaultLocks, async () => {
10411039
// Getting the vaults while locked
@@ -1044,7 +1042,7 @@ class VaultManager {
10441042
return await this.getVault(vaultId, tran);
10451043
}),
10461044
);
1047-
return f(...vaults);
1045+
return await f(...vaults);
10481046
});
10491047
}
10501048

0 commit comments

Comments
 (0)