Skip to content

Commit ae28af7

Browse files
authored
Merge pull request #4 from thinknathan/thread-fix
Thread fix
2 parents a3861c7 + bf90c99 commit ae28af7

File tree

9 files changed

+181
-66
lines changed

9 files changed

+181
-66
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "node-simple-pixelate",
3-
"version": "1.1.5",
3+
"version": "1.1.6",
44
"description": "Pixelizes images to create pixel art.",
55
"repository": {
66
"type": "git",

px.cjs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ const yargs = require('yargs');
55
const os = require('os');
66
const processImage_1 = require('./utils/processImage');
77
const processPath_1 = require('./utils/processPath');
8-
function main() {
8+
async function main() {
9+
// console.time('Done in');
910
// Parse command line arguments
10-
const options = yargs
11+
const options = await yargs(process.argv.slice(2))
1112
.option('f', {
1213
alias: 'filename',
1314
describe: 'Input image filename',
@@ -250,7 +251,8 @@ function main() {
250251
}
251252
return Math.round(value);
252253
},
253-
}).argv;
254+
})
255+
.parse();
254256
if (options.filename) {
255257
// Process a single image
256258
(0, processImage_1.processImage)(options);
@@ -263,12 +265,13 @@ function main() {
263265
console.error(err);
264266
}
265267
numCores = Math.max(numCores - 1, 1); // Min 1
266-
numCores = Math.min(numCores, 16); // Max 16
267-
(0, processPath_1.processPath)(options.folderPath, options, numCores);
268+
numCores = Math.min(numCores, 32); // Max 32
269+
await (0, processPath_1.processPath)(options.folderPath, options, numCores);
268270
} else {
269271
console.error(
270272
'Error: Requires either `filename` or `folderPath`. Run `px --help` for help.',
271273
);
272274
}
275+
// console.timeEnd('Done in');
273276
}
274277
main();

src/px.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import * as os from 'os';
66
import { processImage } from './utils/processImage';
77
import { processPath } from './utils/processPath';
88

9-
function main() {
9+
async function main() {
10+
// console.time('Done in');
11+
1012
// Parse command line arguments
11-
const options = yargs
13+
const options = (await yargs(process.argv.slice(2))
1214
.option('f', {
1315
alias: 'filename',
1416
describe: 'Input image filename',
@@ -252,7 +254,8 @@ function main() {
252254
}
253255
return Math.round(value);
254256
},
255-
}).argv as unknown as Options;
257+
})
258+
.parse()) as unknown as Options;
256259

257260
if (options.filename) {
258261
// Process a single image
@@ -266,13 +269,14 @@ function main() {
266269
console.error(err);
267270
}
268271
numCores = Math.max(numCores - 1, 1); // Min 1
269-
numCores = Math.min(numCores, 16); // Max 16
270-
processPath(options.folderPath, options, numCores);
272+
numCores = Math.min(numCores, 32); // Max 32
273+
await processPath(options.folderPath, options, numCores);
271274
} else {
272275
console.error(
273276
'Error: Requires either `filename` or `folderPath`. Run `px --help` for help.',
274277
);
275278
}
279+
// console.timeEnd('Done in');
276280
}
277281

278282
main();

src/utils/processImage.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import * as Jimp from 'jimp';
22
import * as fs from 'fs';
33
import * as path from 'path';
4-
import { workerData, isMainThread } from 'worker_threads';
4+
import { parentPort, isMainThread } from 'worker_threads';
55

66
import { definedPalettes } from './definedPalettes';
77
import { applyAlphaThreshold } from './applyAlphaThreshold';
@@ -18,14 +18,18 @@ function errorCallback(err: unknown) {
1818
}
1919
}
2020

21+
/**
22+
* Called on a worker thread to signal current work is complete
23+
*/
24+
const workerIsDone = () => parentPort?.postMessage('complete');
25+
2126
/**
2227
* Processes the given image with various image manipulation options.
2328
*
2429
* @param options - Image processing options.
2530
* @param skipExtCheck - (Optional) Skips extension check if set to true.
2631
*/
2732
export function processImage(options: Options, skipExtCheck?: boolean): void {
28-
console.time('Done in');
2933
const { filename } = options;
3034
Jimp.read(filename!)
3135
.then((image) => {
@@ -204,14 +208,26 @@ function continueProcessing(image: Jimp, options: Options): void {
204208
}
205209
outputFilename = `${outputFilename}.png`;
206210

207-
image.write(outputFilename, errorCallback);
208-
console.log(`Image saved: ${outputFilename}`);
209-
console.timeEnd('Done in');
211+
image
212+
.writeAsync(outputFilename)
213+
.then(() => {
214+
console.log(`Image saved: ${outputFilename}`);
215+
216+
if (!isMainThread) {
217+
workerIsDone();
218+
}
219+
})
220+
.catch(errorCallback);
210221
}
211222

212223
// If used as a worker thread, get file name from message
213224
if (!isMainThread) {
214-
const { filePath, options } = workerData;
215-
options.filename = filePath;
216-
processImage(options, true);
225+
parentPort?.on(
226+
'message',
227+
async (message: { filePath: string; options: Options }) => {
228+
const { filePath, options } = message;
229+
options.filename = filePath;
230+
processImage(options, true);
231+
},
232+
);
217233
}

src/utils/processPath.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export async function processPath(
99
directoryPath: string,
1010
options: Options,
1111
maxWorkers: number,
12-
): Promise<void> {
12+
): Promise<boolean> {
1313
const workerPool = new WorkerPool(maxWorkers);
1414

1515
try {
@@ -24,10 +24,13 @@ export async function processPath(
2424
workerPool.addTask(filePath, options);
2525
}
2626
}
27-
28-
// Wait for all tasks to complete before exiting
29-
workerPool.waitForCompletion();
3027
} catch (err) {
3128
console.error(`Error reading directory: ${directoryPath}`, err);
3229
}
30+
31+
await workerPool.allComplete();
32+
33+
workerPool.exitAll();
34+
35+
return true;
3336
}

src/utils/workerPool.ts

Lines changed: 61 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,48 @@
11
import { Worker } from 'worker_threads';
22
import * as path from 'path';
33

4+
type TWorker = Worker & { isIdle: boolean };
5+
46
/**
57
* Manages a pool of worker threads for parallel processing of image files.
68
*/
79
export class WorkerPool {
8-
private workers: Worker[] = [];
10+
private workers: TWorker[] = [];
911
private taskQueue: { filePath: string; options: Options }[] = [];
1012
private maxWorkers: number;
13+
private completePromise?: Promise<void>;
14+
private completeResolve?: () => void;
15+
private isComplete(): boolean {
16+
return (
17+
this.taskQueue.length === 0 &&
18+
this.workers.every((worker) => worker.isIdle)
19+
);
20+
}
21+
22+
/**
23+
* Terminate all workers in the pool.
24+
*/
25+
public exitAll(): void {
26+
this.workers.forEach((worker) => worker.terminate());
27+
this.workers = [];
28+
}
29+
30+
/**
31+
* Returns a promise that resolves when all work is done.
32+
*/
33+
public async allComplete(): Promise<void> {
34+
if (this.isComplete()) {
35+
return Promise.resolve();
36+
}
37+
38+
if (!this.completePromise) {
39+
this.completePromise = new Promise<void>((resolve) => {
40+
this.completeResolve = resolve;
41+
});
42+
}
43+
44+
return this.completePromise;
45+
}
1146

1247
/**
1348
* Creates a new WorkerPool instance.
@@ -25,18 +60,22 @@ export class WorkerPool {
2560
* @param options - Image processing options for the file.
2661
*/
2762
private createWorker(filePath: string, options: Options): void {
28-
const worker = new Worker(path.join(__dirname, 'processImage.js'), {
29-
workerData: { filePath, options },
30-
});
63+
const worker = new Worker(
64+
path.join(__dirname, 'processImage.js'),
65+
) as TWorker;
66+
67+
worker.isIdle = false;
68+
worker.postMessage({ filePath, options });
3169

3270
// Listen for messages and errors from the worker
33-
worker.on('message', (message) => {
34-
console.log(message);
71+
worker.on('message', () => {
72+
worker.isIdle = true;
3573
this.processNextTask();
3674
});
3775

3876
worker.on('error', (err) => {
3977
console.error(`Error in worker for file ${filePath}:`, err);
78+
worker.isIdle = true;
4079
this.processNextTask();
4180
});
4281

@@ -49,7 +88,22 @@ export class WorkerPool {
4988
private processNextTask(): void {
5089
const nextTask = this.taskQueue.shift();
5190
if (nextTask) {
52-
this.createWorker(nextTask.filePath, nextTask.options);
91+
if (this.workers.length < this.maxWorkers) {
92+
this.createWorker(nextTask.filePath, nextTask.options);
93+
} else {
94+
const worker = this.workers.find((w) => w.isIdle);
95+
if (worker) {
96+
worker.isIdle = false;
97+
worker.postMessage(nextTask);
98+
} else {
99+
// Something went wrong, there are no idle workers somehow
100+
throw Error('Could not find an idle worker.');
101+
}
102+
}
103+
} else if (this.isComplete() && this.completeResolve) {
104+
this.completeResolve();
105+
this.completePromise = undefined;
106+
this.completeResolve = undefined;
53107
}
54108
}
55109

@@ -66,15 +120,4 @@ export class WorkerPool {
66120
this.taskQueue.push({ filePath, options });
67121
}
68122
}
69-
70-
/**
71-
* Waits for all tasks to complete before exiting.
72-
*/
73-
public waitForCompletion(): void {
74-
this.workers.forEach((worker) => {
75-
worker.on('exit', () => {
76-
this.processNextTask();
77-
});
78-
});
79-
}
80123
}

utils/processImage.js

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@ function errorCallback(err) {
1818
console.error(err);
1919
}
2020
}
21+
/**
22+
* Called on a worker thread to signal current work is complete
23+
*/
24+
const workerIsDone = () => worker_threads_1.parentPort?.postMessage('complete');
2125
/**
2226
* Processes the given image with various image manipulation options.
2327
*
2428
* @param options - Image processing options.
2529
* @param skipExtCheck - (Optional) Skips extension check if set to true.
2630
*/
2731
function processImage(options, skipExtCheck) {
28-
console.time('Done in');
2932
const { filename } = options;
3033
Jimp.read(filename)
3134
.then((image) => {
@@ -169,13 +172,21 @@ function continueProcessing(image, options) {
169172
outputFilename = `${outputFilename}-z_${pixelSize}`;
170173
}
171174
outputFilename = `${outputFilename}.png`;
172-
image.write(outputFilename, errorCallback);
173-
console.log(`Image saved: ${outputFilename}`);
174-
console.timeEnd('Done in');
175+
image
176+
.writeAsync(outputFilename)
177+
.then(() => {
178+
console.log(`Image saved: ${outputFilename}`);
179+
if (!worker_threads_1.isMainThread) {
180+
workerIsDone();
181+
}
182+
})
183+
.catch(errorCallback);
175184
}
176185
// If used as a worker thread, get file name from message
177186
if (!worker_threads_1.isMainThread) {
178-
const { filePath, options } = worker_threads_1.workerData;
179-
options.filename = filePath;
180-
processImage(options, true);
187+
worker_threads_1.parentPort?.on('message', async (message) => {
188+
const { filePath, options } = message;
189+
options.filename = filePath;
190+
processImage(options, true);
191+
});
181192
}

utils/processPath.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ async function processPath(directoryPath, options, maxWorkers) {
1919
workerPool.addTask(filePath, options);
2020
}
2121
}
22-
// Wait for all tasks to complete before exiting
23-
workerPool.waitForCompletion();
2422
}
2523
catch (err) {
2624
console.error(`Error reading directory: ${directoryPath}`, err);
2725
}
26+
await workerPool.allComplete();
27+
workerPool.exitAll();
28+
return true;
2829
}
2930
exports.processPath = processPath;

0 commit comments

Comments
 (0)