Skip to content

Commit f1b36c6

Browse files
committed
reuse threads when on watch mode
1 parent a4a63e6 commit f1b36c6

3 files changed

Lines changed: 111 additions & 86 deletions

File tree

src/dictionary/build.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import { extractResultError, ResultError } from "../compound.ts";
44
import { PositionedError } from "../parser/parser_lib.ts";
5-
import { parseDictionary } from "./parallel_parser.ts";
5+
import { Parser } from "./parallel_parser.ts";
66
import { Dictionary } from "./type.ts";
77

88
const SOURCE = new URL("../../dictionary.txt", import.meta.url);
@@ -27,7 +27,7 @@ export const dictionary: Dictionary = new Map(Object.entries(json));
2727
`;
2828
await Deno.writeTextFile(DESTINATION, code);
2929
}
30-
export async function build(): Promise<boolean> {
30+
export async function build(parser: Parser): Promise<boolean> {
3131
// deno-lint-ignore no-console
3232
console.log(
3333
`Building dictionary with ${navigator.hardwareConcurrency} threads...`,
@@ -36,7 +36,7 @@ export async function build(): Promise<boolean> {
3636
const text = await Deno.readTextFile(SOURCE);
3737
let dictionary: Dictionary;
3838
try {
39-
dictionary = await parseDictionary(text);
39+
dictionary = await parser.parse(text);
4040
} catch (error) {
4141
displayError(text, extractResultError(error));
4242
return false;
@@ -117,5 +117,6 @@ function displayError(source: string, errors: ReadonlyArray<ResultError>) {
117117
}
118118
}
119119
if (import.meta.main) {
120-
Deno.exitCode = await build() ? 0 : 1;
120+
using parser = new Parser();
121+
Deno.exitCode = await build(parser) ? 0 : 1;
121122
}

src/dictionary/parallel_parser.ts

Lines changed: 103 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// This code is Deno only (it uses the new `using` keyword)
2+
13
import { extractResultError, ResultError } from "../compound.ts";
24
import { Position, PositionedError } from "../parser/parser_lib.ts";
35
import { HEADS } from "./parser.ts";
@@ -12,95 +14,115 @@ type WorkerError =
1214
}>
1315
| Readonly<{ type: "other"; error: unknown }>;
1416

15-
function buildOffloaded(source: string): Promise<Dictionary> {
16-
return new Promise((resolve, reject) => {
17-
const worker = new Worker(
18-
new URL("./worker.ts", import.meta.url),
19-
{ type: "module" },
20-
);
21-
worker.postMessage(source);
22-
worker.onmessage = (event) => {
23-
resolve(event.data as Dictionary);
24-
worker.terminate();
25-
};
26-
worker.onerror = (event) => {
27-
const error = event.error as WorkerError;
28-
switch (error.type) {
29-
case "result error":
30-
reject(
31-
new AggregateError(
32-
error.errors.map((error) =>
33-
new PositionedError(
34-
error.message,
35-
{ position: error.position ?? undefined },
36-
)
17+
class ParserWorker {
18+
#worker = new Worker(
19+
new URL("./worker.ts", import.meta.url),
20+
{ type: "module" },
21+
);
22+
[Symbol.dispose]() {
23+
this.#worker.terminate();
24+
}
25+
parse(source: string): Promise<Dictionary> {
26+
return new Promise((resolve, reject) => {
27+
const messageCallback = (event: MessageEvent) => {
28+
resolve(event.data as Dictionary);
29+
this.#worker.removeEventListener("message", messageCallback);
30+
};
31+
this.#worker.addEventListener("message", messageCallback);
32+
const errorCallback = (event: ErrorEvent) => {
33+
const error = event.error as WorkerError;
34+
switch (error.type) {
35+
case "result error":
36+
reject(
37+
new AggregateError(
38+
error.errors.map((error) =>
39+
new PositionedError(
40+
error.message,
41+
{ position: error.position ?? undefined },
42+
)
43+
),
3744
),
38-
),
39-
);
40-
break;
41-
case "other":
42-
reject(error.error);
43-
break;
44-
}
45-
};
46-
});
45+
);
46+
break;
47+
case "other":
48+
reject(error.error);
49+
break;
50+
}
51+
this.#worker.removeEventListener("error", errorCallback);
52+
};
53+
this.#worker.addEventListener("error", errorCallback);
54+
this.#worker.postMessage(source);
55+
});
56+
}
4757
}
48-
export async function parseDictionary(source: string): Promise<Dictionary> {
49-
const heads = [...source.matchAll(HEADS)].map((match) => match.index);
50-
const regionIndices = [...new Array(navigator.hardwareConcurrency).keys()]
51-
.map((index) => {
52-
const start = index * source.length / navigator.hardwareConcurrency;
53-
for (const head of heads) {
54-
if (start <= head) {
55-
return head;
58+
export class Parser {
59+
#workers = new Array(navigator.hardwareConcurrency)
60+
.fill(undefined)
61+
.map(() => new ParserWorker());
62+
[Symbol.dispose](): void {
63+
using stack = new DisposableStack();
64+
for (const worker of this.#workers) {
65+
stack.use(worker);
66+
}
67+
}
68+
async parse(source: string): Promise<Dictionary> {
69+
const heads = [...source.matchAll(HEADS)].map((match) => match.index);
70+
const regionIndices = [...new Array(this.#workers.length).keys()]
71+
.map((index) => {
72+
const start = index * source.length / this.#workers.length;
73+
for (const head of heads) {
74+
if (start <= head) {
75+
return head;
76+
}
77+
}
78+
return source.length;
79+
});
80+
const jobs = regionIndices.map((index, i) => ({
81+
index: index,
82+
job: this.#workers[i].parse(
83+
source.slice(index, regionIndices[i + 1] ?? source.length),
84+
),
85+
}));
86+
const dictionary: Dictionary = new Map();
87+
const errors: Array<ResultError> = [];
88+
for (const job of jobs) {
89+
let entries: Dictionary;
90+
try {
91+
// deno-lint-ignore no-await-in-loop
92+
entries = await job.job;
93+
} catch (error) {
94+
for (const resultError of extractResultError(error)) {
95+
if (
96+
resultError instanceof PositionedError &&
97+
resultError.position != null
98+
) {
99+
errors.push(
100+
new PositionedError(resultError.message, {
101+
position: {
102+
position: job.index + resultError.position.position,
103+
length: resultError.position.length,
104+
},
105+
cause: resultError,
106+
}),
107+
);
108+
} else {
109+
errors.push(resultError);
110+
}
56111
}
112+
continue;
57113
}
58-
return source.length;
59-
});
60-
const jobs = regionIndices.map((index, i) => ({
61-
index: index,
62-
job: buildOffloaded(
63-
source.slice(index, regionIndices[i + 1] ?? source.length),
64-
),
65-
}));
66-
const dictionary: Dictionary = new Map();
67-
const errors: Array<ResultError> = [];
68-
for (const job of jobs) {
69-
let entries: Dictionary;
70-
try {
71-
// deno-lint-ignore no-await-in-loop
72-
entries = await job.job;
73-
} catch (error) {
74-
for (const resultError of extractResultError(error)) {
75-
if (
76-
resultError instanceof PositionedError && resultError.position != null
77-
) {
78-
errors.push(
79-
new PositionedError(resultError.message, {
80-
position: {
81-
position: job.index + resultError.position.position,
82-
length: resultError.position.length,
83-
},
84-
cause: resultError,
85-
}),
86-
);
114+
for (const [word, definition] of entries.entries()) {
115+
if (dictionary.has(word)) {
116+
errors.push(new ResultError(`duplicate Toki Pona word "${word}"`));
87117
} else {
88-
errors.push(resultError);
118+
dictionary.set(word, definition);
89119
}
90120
}
91-
continue;
92121
}
93-
for (const [word, definition] of entries.entries()) {
94-
if (dictionary.has(word)) {
95-
errors.push(new ResultError(`duplicate Toki Pona word "${word}"`));
96-
} else {
97-
dictionary.set(word, definition);
98-
}
122+
if (errors.length === 0) {
123+
return dictionary;
124+
} else {
125+
throw new AggregateError(errors);
99126
}
100127
}
101-
if (errors.length === 0) {
102-
return dictionary;
103-
} else {
104-
throw new AggregateError(errors);
105-
}
106128
}

src/dictionary/watch.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@
33
import { unreachable } from "@std/assert/unreachable";
44
import { debounce } from "@std/async/debounce";
55
import { build } from "./build.ts";
6+
import { Parser } from "./parallel_parser.ts";
67

78
if (import.meta.main) {
89
await using stack = new AsyncDisposableStack();
910
using watcher = Deno.watchFs("./dictionary.txt");
11+
using parser = new Parser();
1012
let task = Promise.resolve();
1113
stack.defer(async () => await task);
1214
const buildDebounced = debounce(() => {
1315
task = task.then(async () => {
14-
await build();
16+
await build(parser);
1517
});
1618
}, 200);
1719
buildDebounced();

0 commit comments

Comments
 (0)