算法2026-05-16·14 分钟
算法知识库:并行算法实现
JavaScript/TypeScript 实现并行算法,如并行排序、并行矩阵运算等。
并行算法实现
1. 并行归并排序 (Parallel Merge Sort)
ts
class ParallelMergeSort {
private workers: Worker[] = [];
private maxWorkers: number;
constructor(maxWorkers: number = navigator.hardwareConcurrency || 4) {
this.maxWorkers = maxWorkers;
}
// 并行归并排序
async sort(arr: number[]): Promise<number[]> {
if (arr.length <= 1) return arr;
// 如果数组很小,使用串行排序
if (arr.length < 1000) {
return this.mergeSort(arr);
}
// 分割数组
const mid = Math.floor(arr.length / 2);
const left = arr.slice(0, mid);
const right = arr.slice(mid);
// 并行排序两半
const [sortedLeft, sortedRight] = await Promise.all([this.sort(left), this.sort(right)]);
// 归并结果
return this.merge(sortedLeft, sortedRight);
}
// 串行归并排序 (用于小数组)
private mergeSort(arr: number[]): number[] {
if (arr.length <= 1) return arr;
const mid = Math.floor(arr.length / 2);
const left = this.mergeSort(arr.slice(0, mid));
const right = this.mergeSort(arr.slice(mid));
return this.merge(left, right);
}
// 归并两个有序数组
private merge(left: number[], right: number[]): number[] {
const result: number[] = [];
let i = 0,
j = 0;
while (i < left.length && j < right.length) {
if (left[i] <= right[j]) {
result.push(left[i++]);
} else {
result.push(right[j++]);
}
}
// 添加剩余元素
while (i < left.length) result.push(left[i++]);
while (j < right.length) result.push(right[j++]);
return result;
}
// 使用 Web Workers 的并行归并排序
async sortWithWorkers(arr: number[]): Promise<number[]> {
if (arr.length <= 1) return arr;
// 创建 worker 池
this.initializeWorkers();
try {
return await this.parallelSort(arr);
} finally {
this.terminateWorkers();
}
}
private initializeWorkers(): void {
const workerCode = `
self.onmessage = function(e) {
const { type, data } = e.data;
if (type === 'sort') {
const sorted = mergeSort(data);
self.postMessage({ type: 'result', data: sorted });
} else if (type === 'merge') {
const { left, right } = data;
const merged = merge(left, right);
self.postMessage({ type: 'result', data: merged });
}
};
function mergeSort(arr) {
if (arr.length <= 1) return arr;
const mid = Math.floor(arr.length / 2);
const left = mergeSort(arr.slice(0, mid));
const right = mergeSort(arr.slice(mid));
return merge(left, right);
}
function merge(left, right) {
const result = [];
let i = 0, j = 0;
while (i < left.length && j < right.length) {
if (left[i] <= right[j]) {
result.push(left[i++]);
} else {
result.push(right[j++]);
}
}
while (i < left.length) result.push(left[i++]);
while (j < right.length) result.push(right[j++]);
return result;
}
`;
for (let i = 0; i < this.maxWorkers; i++) {
const blob = new Blob([workerCode], { type: 'application/javascript' });
const worker = new Worker(URL.createObjectURL(blob));
this.workers.push(worker);
}
}
private terminateWorkers(): void {
this.workers.forEach((worker) => worker.terminate());
this.workers = [];
}
private async parallelSort(arr: number[]): Promise<number[]> {
if (arr.length <= 1000) {
return this.mergeSort(arr);
}
const mid = Math.floor(arr.length / 2);
const left = arr.slice(0, mid);
const right = arr.slice(mid);
// 使用 Promise.race 来处理 worker 分配
const promises: Promise<number[]>[] = [];
const sortLeft = this.assignWorker('sort', left);
const sortRight = this.assignWorker('sort', right);
promises.push(sortLeft, sortRight);
const [sortedLeft, sortedRight] = await Promise.all(promises);
// 归并结果
return this.assignWorker('merge', { left: sortedLeft, right: sortedRight });
}
private assignWorker(type: string, data: any): Promise<number[]> {
return new Promise((resolve, reject) => {
// 找到空闲的 worker
const worker = this.workers.find((w) => !w.onmessage);
if (!worker) {
// 如果没有空闲 worker,使用串行处理
if (type === 'sort') {
resolve(this.mergeSort(data));
} else if (type === 'merge') {
resolve(this.merge(data.left, data.right));
}
return;
}
worker.onmessage = (e) => {
const { type: responseType, data: result } = e.data;
if (responseType === 'result') {
worker.onmessage = null; // 释放 worker
resolve(result);
}
};
worker.onerror = (error) => {
worker.onmessage = null;
reject(error);
};
worker.postMessage({ type, data });
});
}
}
// 使用示例
async function parallelMergeSortExample() {
const sorter = new ParallelMergeSort();
const arr = Array.from({ length: 10000 }, () => Math.floor(Math.random() * 10000));
console.log('Original array length:', arr.length);
const start = performance.now();
const sorted = await sorter.sort(arr);
const end = performance.now();
console.log('Sorted in', (end - start).toFixed(2), 'ms');
console.log(
'Is sorted:',
sorted.every((val, i) => i === 0 || val >= sorted[i - 1]),
);
}2. 并行矩阵乘法 (Parallel Matrix Multiplication)
ts
class ParallelMatrixMultiplication {
private workers: Worker[] = [];
private maxWorkers: number;
constructor(maxWorkers: number = navigator.hardwareConcurrency || 4) {
this.maxWorkers = maxWorkers;
}
// 并行矩阵乘法
async multiply(A: number[][], B: number[][]): Promise<number[][]> {
const rowsA = A.length;
const colsA = A[0].length;
const rowsB = B.length;
const colsB = B[0].length;
if (colsA !== rowsB) {
throw new Error("Matrix dimensions don't match for multiplication");
}
const result = Array.from({ length: rowsA }, () => new Array(colsB).fill(0));
// 如果矩阵很小,使用串行乘法
if (rowsA * colsB < 1000) {
return this.serialMultiply(A, B);
}
// 分块并行计算
const blockSize = Math.max(1, Math.floor(rowsA / this.maxWorkers));
const promises: Promise<void>[] = [];
for (let i = 0; i < rowsA; i += blockSize) {
const endRow = Math.min(i + blockSize, rowsA);
promises.push(this.multiplyBlock(A, B, result, i, endRow));
}
await Promise.all(promises);
return result;
}
// 串行矩阵乘法
private serialMultiply(A: number[][], B: number[][]): number[][] {
const rowsA = A.length;
const colsA = A[0].length;
const colsB = B[0].length;
const result = Array.from({ length: rowsA }, () => new Array(colsB).fill(0));
for (let i = 0; i < rowsA; i++) {
for (let j = 0; j < colsB; j++) {
for (let k = 0; k < colsA; k++) {
result[i][j] += A[i][k] * B[k][j];
}
}
}
return result;
}
// 并行计算矩阵块
private async multiplyBlock(A: number[][], B: number[][], result: number[][], startRow: number, endRow: number): Promise<void> {
const colsA = A[0].length;
const colsB = B[0].length;
for (let i = startRow; i < endRow; i++) {
for (let j = 0; j < colsB; j++) {
for (let k = 0; k < colsA; k++) {
result[i][j] += A[i][k] * B[k][j];
}
}
}
}
// 使用 Web Workers 的并行矩阵乘法
async multiplyWithWorkers(A: number[][], B: number[][]): Promise<number[][]> {
const rowsA = A.length;
const colsA = A[0].length;
const rowsB = B.length;
const colsB = B[0].length;
if (colsA !== rowsB) {
throw new Error("Matrix dimensions don't match for multiplication");
}
// 初始化 workers
this.initializeMatrixWorkers();
try {
const result = Array.from({ length: rowsA }, () => new Array(colsB).fill(0));
// 分发计算任务
const tasks = this.createTasks(A, B, rowsA, colsB);
const results = await Promise.all(tasks.map((task) => this.assignMatrixWorker(task)));
// 合并结果
results.forEach(({ row, col, value }) => {
result[row][col] = value;
});
return result;
} finally {
this.terminateMatrixWorkers();
}
}
private initializeMatrixWorkers(): void {
const workerCode = `
self.onmessage = function(e) {
const { A, B, row, col } = e.data;
const colsA = A[0].length;
let sum = 0;
for (let k = 0; k < colsA; k++) {
sum += A[row][k] * B[k][col];
}
self.postMessage({ row, col, value: sum });
};
`;
for (let i = 0; i < this.maxWorkers; i++) {
const blob = new Blob([workerCode], { type: 'application/javascript' });
const worker = new Worker(URL.createObjectURL(blob));
this.workers.push(worker);
}
}
private terminateMatrixWorkers(): void {
this.workers.forEach((worker) => worker.terminate());
this.workers = [];
}
private createTasks(A: number[][], B: number[][], rowsA: number, colsB: number) {
const tasks: { A: number[][]; B: number[][]; row: number; col: number }[] = [];
for (let i = 0; i < rowsA; i++) {
for (let j = 0; j < colsB; j++) {
tasks.push({ A, B, row: i, col: j });
}
}
return tasks;
}
private assignMatrixWorker(task: any): Promise<{ row: number; col: number; value: number }> {
return new Promise((resolve, reject) => {
const worker = this.workers.find((w) => !w.onmessage);
if (!worker) {
// 串行计算
const { A, B, row, col } = task;
const colsA = A[0].length;
let sum = 0;
for (let k = 0; k < colsA; k++) {
sum += A[row][k] * B[k][col];
}
resolve({ row, col, value: sum });
return;
}
worker.onmessage = (e) => {
worker.onmessage = null;
resolve(e.data);
};
worker.onerror = (error) => {
worker.onmessage = null;
reject(error);
};
worker.postMessage(task);
});
}
// 矩阵转置 (并行优化)
async transpose(matrix: number[][]): Promise<number[][]> {
const rows = matrix.length;
const cols = matrix[0].length;
const result = Array.from({ length: cols }, () => new Array(rows));
// 并行转置
const promises: Promise<void>[] = [];
const chunkSize = Math.max(1, Math.floor(cols / this.maxWorkers));
for (let startCol = 0; startCol < cols; startCol += chunkSize) {
const endCol = Math.min(startCol + chunkSize, cols);
promises.push(this.transposeBlock(matrix, result, startCol, endCol));
}
await Promise.all(promises);
return result;
}
private async transposeBlock(matrix: number[][], result: number[][], startCol: number, endCol: number): Promise<void> {
const rows = matrix.length;
for (let j = startCol; j < endCol; j++) {
for (let i = 0; i < rows; i++) {
result[j][i] = matrix[i][j];
}
}
}
}
// 使用示例
async function parallelMatrixExample() {
const multiplier = new ParallelMatrixMultiplication();
// 创建测试矩阵
const size = 100;
const A = Array.from({ length: size }, () => Array.from({ length: size }, () => Math.random()));
const B = Array.from({ length: size }, () => Array.from({ length: size }, () => Math.random()));
console.log('Matrix size:', size + 'x' + size);
const start = performance.now();
const result = await multiplier.multiply(A, B);
const end = performance.now();
console.log('Multiplication completed in', (end - start).toFixed(2), 'ms');
console.log('Result size:', result.length + 'x' + result[0].length);
}3. 并行前缀和 (Parallel Prefix Sum)
ts
class ParallelPrefixSum {
private maxWorkers: number;
constructor(maxWorkers: number = navigator.hardwareConcurrency || 4) {
this.maxWorkers = maxWorkers;
}
// Hillis-Steele 并行前缀和算法
async prefixSum(arr: number[]): Promise<number[]> {
const n = arr.length;
let result = [...arr];
// 使用对数轮次计算前缀和
const rounds = Math.ceil(Math.log2(n));
for (let d = 0; d < rounds; d++) {
const stride = 1 << d; // 2^d
// 并行更新每一轮
const promises: Promise<void>[] = [];
const chunkSize = Math.max(1, Math.floor(n / this.maxWorkers));
for (let start = 0; start < n; start += chunkSize) {
const end = Math.min(start + chunkSize, n);
promises.push(this.updatePrefixSumChunk(result, stride, start, end));
}
await Promise.all(promises);
}
return result;
}
private async updatePrefixSumChunk(arr: number[], stride: number, start: number, end: number): Promise<void> {
for (let i = start; i < end; i++) {
if (i >= stride) {
arr[i] += arr[i - stride];
}
}
}
// 更高效的并行前缀和 (使用分治)
async efficientPrefixSum(arr: number[]): Promise<number[]> {
const n = arr.length;
if (n <= 1) return [...arr];
// 递归分割
const mid = Math.floor(n / 2);
const left = arr.slice(0, mid);
const right = arr.slice(mid);
// 并行计算左右两半的前缀和
const [leftPrefix, rightPrefix] = await Promise.all([this.efficientPrefixSum(left), this.efficientPrefixSum(right)]);
// 右半部分加上左半部分的最后一个元素
const rightOffset = leftPrefix[leftPrefix.length - 1];
for (let i = 0; i < rightPrefix.length; i++) {
rightPrefix[i] += rightOffset;
}
return [...leftPrefix, ...rightPrefix];
}
// 使用 Web Workers 的并行前缀和
async prefixSumWithWorkers(arr: number[]): Promise<number[]> {
const n = arr.length;
if (n <= 1) return [...arr];
// 初始化 workers
const workers: Worker[] = [];
const workerCode = `
self.onmessage = function(e) {
const { arr, stride } = e.data;
const result = [...arr];
for (let i = stride; i < arr.length; i++) {
result[i] += result[i - stride];
}
self.postMessage(result);
};
`;
// 创建 workers
for (let i = 0; i < this.maxWorkers; i++) {
const blob = new Blob([workerCode], { type: 'application/javascript' });
workers.push(new Worker(URL.createObjectURL(blob)));
}
try {
let current = [...arr];
const rounds = Math.ceil(Math.log2(n));
for (let d = 0; d < rounds; d++) {
const stride = 1 << d;
// 将数组分块分配给 workers
const chunkSize = Math.max(1, Math.floor(n / this.maxWorkers));
const promises: Promise<number[]>[] = [];
for (let start = 0; start < n; start += chunkSize) {
const end = Math.min(start + chunkSize, n);
const chunk = current.slice(start, end);
promises.push(this.processChunkWithWorker(workers, chunk, stride, start));
}
const results = await Promise.all(promises);
// 合并结果
for (let i = 0; i < results.length; i++) {
const start = i * chunkSize;
const chunkResult = results[i];
for (let j = 0; j < chunkResult.length; j++) {
current[start + j] = chunkResult[j];
}
}
}
return current;
} finally {
workers.forEach((worker) => worker.terminate());
}
}
private processChunkWithWorker(workers: Worker[], chunk: number[], stride: number, offset: number): Promise<number[]> {
return new Promise((resolve, reject) => {
const worker = workers.find((w) => !w.onmessage);
if (!worker) {
// 串行处理
const result = [...chunk];
for (let i = stride; i < chunk.length; i++) {
result[i] += result[i - stride];
}
resolve(result);
return;
}
worker.onmessage = (e) => {
worker.onmessage = null;
resolve(e.data);
};
worker.onerror = (error) => {
worker.onmessage = null;
reject(error);
};
worker.postMessage({ arr: chunk, stride });
});
}
// 验证前缀和结果
validatePrefixSum(original: number[], prefix: number[]): boolean {
if (prefix.length !== original.length) return false;
let sum = 0;
for (let i = 0; i < original.length; i++) {
sum += original[i];
if (prefix[i] !== sum) return false;
}
return true;
}
}
// 使用示例
async function parallelPrefixSumExample() {
const prefixSum = new ParallelPrefixSum();
const arr = Array.from({ length: 1000 }, () => Math.floor(Math.random() * 10));
console.log('Original array length:', arr.length);
const start = performance.now();
const result = await prefixSum.prefixSum(arr);
const end = performance.now();
console.log('Prefix sum computed in', (end - start).toFixed(2), 'ms');
console.log('Is valid:', prefixSum.validatePrefixSum(arr, result));
console.log('First 10 elements:', result.slice(0, 10));
}4. 并行快速排序 (Parallel Quick Sort)
ts
class ParallelQuickSort {
private maxWorkers: number;
private workers: Worker[] = [];
constructor(maxWorkers: number = navigator.hardwareConcurrency || 4) {
this.maxWorkers = maxWorkers;
}
// 并行快速排序
async sort(arr: number[]): Promise<number[]> {
if (arr.length <= 1) return arr;
// 小数组使用串行排序
if (arr.length < 100) {
return this.quickSort([...arr]);
}
const pivot = arr[Math.floor(arr.length / 2)];
const left: number[] = [];
const right: number[] = [];
// 分区
for (const num of arr) {
if (num < pivot) left.push(num);
else if (num > pivot) right.push(num);
}
// 并行排序左右两部分
const [sortedLeft, sortedRight] = await Promise.all([this.sort(left), this.sort(right)]);
return [...sortedLeft, pivot, ...sortedRight];
}
// 串行快速排序
private quickSort(arr: number[]): number[] {
if (arr.length <= 1) return arr;
const pivot = arr[Math.floor(arr.length / 2)];
const left: number[] = [];
const right: number[] = [];
for (const num of arr) {
if (num < pivot) left.push(num);
else if (num > pivot) right.push(num);
}
return [...this.quickSort(left), pivot, ...this.quickSort(right)];
}
// 使用 Web Workers 的并行快速排序
async sortWithWorkers(arr: number[]): Promise<number[]> {
// 初始化 workers
this.initializeQuickSortWorkers();
try {
return await this.parallelQuickSort(arr);
} finally {
this.terminateQuickSortWorkers();
}
}
private initializeQuickSortWorkers(): void {
const workerCode = `
self.onmessage = function(e) {
const arr = e.data;
const sorted = quickSort(arr);
self.postMessage(sorted);
};
function quickSort(arr) {
if (arr.length <= 1) return arr;
const pivot = arr[Math.floor(arr.length / 2)];
const left = [];
const right = [];
for (const num of arr) {
if (num < pivot) left.push(num);
else if (num > pivot) right.push(num);
}
return [...quickSort(left), pivot, ...quickSort(right)];
}
`;
for (let i = 0; i < this.maxWorkers; i++) {
const blob = new Blob([workerCode], { type: 'application/javascript' });
const worker = new Worker(URL.createObjectURL(blob));
this.workers.push(worker);
}
}
private terminateQuickSortWorkers(): void {
this.workers.forEach((worker) => worker.terminate());
this.workers = [];
}
private async parallelQuickSort(arr: number[]): Promise<number[]> {
if (arr.length <= 100) {
return this.quickSort(arr);
}
const pivot = arr[Math.floor(arr.length / 2)];
const left: number[] = [];
const right: number[] = [];
for (const num of arr) {
if (num < pivot) left.push(num);
else if (num > pivot) right.push(num);
}
// 使用 workers 并行排序
const promises: Promise<number[]>[] = [];
if (left.length > 0) {
promises.push(this.assignWorker(left));
} else {
promises.push(Promise.resolve([]));
}
if (right.length > 0) {
promises.push(this.assignWorker(right));
} else {
promises.push(Promise.resolve([]));
}
const [sortedLeft, sortedRight] = await Promise.all(promises);
return [...sortedLeft, pivot, ...sortedRight];
}
private assignWorker(arr: number[]): Promise<number[]> {
return new Promise((resolve, reject) => {
const worker = this.workers.find((w) => !w.onmessage);
if (!worker) {
// 没有空闲 worker,使用串行排序
resolve(this.quickSort(arr));
return;
}
worker.onmessage = (e) => {
worker.onmessage = null;
resolve(e.data);
};
worker.onerror = (error) => {
worker.onmessage = null;
reject(error);
};
worker.postMessage(arr);
});
}
// 检查数组是否已排序
isSorted(arr: number[]): boolean {
for (let i = 1; i < arr.length; i++) {
if (arr[i] < arr[i - 1]) return false;
}
return true;
}
}
// 使用示例
async function parallelQuickSortExample() {
const sorter = new ParallelQuickSort();
const arr = Array.from({ length: 10000 }, () => Math.floor(Math.random() * 10000));
console.log('Original array length:', arr.length);
const start = performance.now();
const sorted = await sorter.sort(arr);
const end = performance.now();
console.log('Sorted in', (end - start).toFixed(2), 'ms');
console.log('Is sorted:', sorter.isSorted(sorted));
}5. 实现要点
- 并行归并排序使用分治策略,递归分割数组。
- 矩阵乘法通过分块计算提高并行度。
- 前缀和使用 Hillis-Steele 算法实现并行计算。
- 快速排序通过并行处理左右子数组加速。
- 所有算法都支持 Web Workers 以利用多核 CPU。
算法并行算法JavaScript