SorryToPerson logo
返回
算法2026-05-16·14 分钟

算法知识库:并行算法实现

JavaScript/TypeScript 实现并行算法,如并行排序、并行矩阵运算等。

并行算法实现

1. 并行归并排序 (Parallel Merge Sort)

ts
class ParallelMergeSort {
  private workers: Worker[] = [];
  private maxWorkers: number;

  constructor(maxWorkers: number = navigator.hardwareConcurrency || 4) {
    this.maxWorkers = maxWorkers;
  }

  // 并行归并排序
  async sort(arr: number[]): Promise<number[]> {
    if (arr.length <= 1) return arr;

    // 如果数组很小,使用串行排序
    if (arr.length < 1000) {
      return this.mergeSort(arr);
    }

    // 分割数组
    const mid = Math.floor(arr.length / 2);
    const left = arr.slice(0, mid);
    const right = arr.slice(mid);

    // 并行排序两半
    const [sortedLeft, sortedRight] = await Promise.all([this.sort(left), this.sort(right)]);

    // 归并结果
    return this.merge(sortedLeft, sortedRight);
  }

  // 串行归并排序 (用于小数组)
  private mergeSort(arr: number[]): number[] {
    if (arr.length <= 1) return arr;

    const mid = Math.floor(arr.length / 2);
    const left = this.mergeSort(arr.slice(0, mid));
    const right = this.mergeSort(arr.slice(mid));

    return this.merge(left, right);
  }

  // 归并两个有序数组
  private merge(left: number[], right: number[]): number[] {
    const result: number[] = [];
    let i = 0,
      j = 0;

    while (i < left.length && j < right.length) {
      if (left[i] <= right[j]) {
        result.push(left[i++]);
      } else {
        result.push(right[j++]);
      }
    }

    // 添加剩余元素
    while (i < left.length) result.push(left[i++]);
    while (j < right.length) result.push(right[j++]);

    return result;
  }

  // 使用 Web Workers 的并行归并排序
  async sortWithWorkers(arr: number[]): Promise<number[]> {
    if (arr.length <= 1) return arr;

    // 创建 worker 池
    this.initializeWorkers();

    try {
      return await this.parallelSort(arr);
    } finally {
      this.terminateWorkers();
    }
  }

  private initializeWorkers(): void {
    const workerCode = `
      self.onmessage = function(e) {
        const { type, data } = e.data;

        if (type === 'sort') {
          const sorted = mergeSort(data);
          self.postMessage({ type: 'result', data: sorted });
        } else if (type === 'merge') {
          const { left, right } = data;
          const merged = merge(left, right);
          self.postMessage({ type: 'result', data: merged });
        }
      };

      function mergeSort(arr) {
        if (arr.length <= 1) return arr;
        const mid = Math.floor(arr.length / 2);
        const left = mergeSort(arr.slice(0, mid));
        const right = mergeSort(arr.slice(mid));
        return merge(left, right);
      }

      function merge(left, right) {
        const result = [];
        let i = 0, j = 0;
        while (i < left.length && j < right.length) {
          if (left[i] <= right[j]) {
            result.push(left[i++]);
          } else {
            result.push(right[j++]);
          }
        }
        while (i < left.length) result.push(left[i++]);
        while (j < right.length) result.push(right[j++]);
        return result;
      }
    `;

    for (let i = 0; i < this.maxWorkers; i++) {
      const blob = new Blob([workerCode], { type: 'application/javascript' });
      const worker = new Worker(URL.createObjectURL(blob));
      this.workers.push(worker);
    }
  }

  private terminateWorkers(): void {
    this.workers.forEach((worker) => worker.terminate());
    this.workers = [];
  }

  private async parallelSort(arr: number[]): Promise<number[]> {
    if (arr.length <= 1000) {
      return this.mergeSort(arr);
    }

    const mid = Math.floor(arr.length / 2);
    const left = arr.slice(0, mid);
    const right = arr.slice(mid);

    // 使用 Promise.race 来处理 worker 分配
    const promises: Promise<number[]>[] = [];

    const sortLeft = this.assignWorker('sort', left);
    const sortRight = this.assignWorker('sort', right);

    promises.push(sortLeft, sortRight);

    const [sortedLeft, sortedRight] = await Promise.all(promises);

    // 归并结果
    return this.assignWorker('merge', { left: sortedLeft, right: sortedRight });
  }

  private assignWorker(type: string, data: any): Promise<number[]> {
    return new Promise((resolve, reject) => {
      // 找到空闲的 worker
      const worker = this.workers.find((w) => !w.onmessage);

      if (!worker) {
        // 如果没有空闲 worker,使用串行处理
        if (type === 'sort') {
          resolve(this.mergeSort(data));
        } else if (type === 'merge') {
          resolve(this.merge(data.left, data.right));
        }
        return;
      }

      worker.onmessage = (e) => {
        const { type: responseType, data: result } = e.data;
        if (responseType === 'result') {
          worker.onmessage = null; // 释放 worker
          resolve(result);
        }
      };

      worker.onerror = (error) => {
        worker.onmessage = null;
        reject(error);
      };

      worker.postMessage({ type, data });
    });
  }
}

// 使用示例
async function parallelMergeSortExample() {
  const sorter = new ParallelMergeSort();

  const arr = Array.from({ length: 10000 }, () => Math.floor(Math.random() * 10000));
  console.log('Original array length:', arr.length);

  const start = performance.now();
  const sorted = await sorter.sort(arr);
  const end = performance.now();

  console.log('Sorted in', (end - start).toFixed(2), 'ms');
  console.log(
    'Is sorted:',
    sorted.every((val, i) => i === 0 || val >= sorted[i - 1]),
  );
}

2. 并行矩阵乘法 (Parallel Matrix Multiplication)

ts
class ParallelMatrixMultiplication {
  private workers: Worker[] = [];
  private maxWorkers: number;

  constructor(maxWorkers: number = navigator.hardwareConcurrency || 4) {
    this.maxWorkers = maxWorkers;
  }

  // 并行矩阵乘法
  async multiply(A: number[][], B: number[][]): Promise<number[][]> {
    const rowsA = A.length;
    const colsA = A[0].length;
    const rowsB = B.length;
    const colsB = B[0].length;

    if (colsA !== rowsB) {
      throw new Error("Matrix dimensions don't match for multiplication");
    }

    const result = Array.from({ length: rowsA }, () => new Array(colsB).fill(0));

    // 如果矩阵很小,使用串行乘法
    if (rowsA * colsB < 1000) {
      return this.serialMultiply(A, B);
    }

    // 分块并行计算
    const blockSize = Math.max(1, Math.floor(rowsA / this.maxWorkers));
    const promises: Promise<void>[] = [];

    for (let i = 0; i < rowsA; i += blockSize) {
      const endRow = Math.min(i + blockSize, rowsA);
      promises.push(this.multiplyBlock(A, B, result, i, endRow));
    }

    await Promise.all(promises);
    return result;
  }

  // 串行矩阵乘法
  private serialMultiply(A: number[][], B: number[][]): number[][] {
    const rowsA = A.length;
    const colsA = A[0].length;
    const colsB = B[0].length;
    const result = Array.from({ length: rowsA }, () => new Array(colsB).fill(0));

    for (let i = 0; i < rowsA; i++) {
      for (let j = 0; j < colsB; j++) {
        for (let k = 0; k < colsA; k++) {
          result[i][j] += A[i][k] * B[k][j];
        }
      }
    }

    return result;
  }

  // 并行计算矩阵块
  private async multiplyBlock(A: number[][], B: number[][], result: number[][], startRow: number, endRow: number): Promise<void> {
    const colsA = A[0].length;
    const colsB = B[0].length;

    for (let i = startRow; i < endRow; i++) {
      for (let j = 0; j < colsB; j++) {
        for (let k = 0; k < colsA; k++) {
          result[i][j] += A[i][k] * B[k][j];
        }
      }
    }
  }

  // 使用 Web Workers 的并行矩阵乘法
  async multiplyWithWorkers(A: number[][], B: number[][]): Promise<number[][]> {
    const rowsA = A.length;
    const colsA = A[0].length;
    const rowsB = B.length;
    const colsB = B[0].length;

    if (colsA !== rowsB) {
      throw new Error("Matrix dimensions don't match for multiplication");
    }

    // 初始化 workers
    this.initializeMatrixWorkers();

    try {
      const result = Array.from({ length: rowsA }, () => new Array(colsB).fill(0));

      // 分发计算任务
      const tasks = this.createTasks(A, B, rowsA, colsB);
      const results = await Promise.all(tasks.map((task) => this.assignMatrixWorker(task)));

      // 合并结果
      results.forEach(({ row, col, value }) => {
        result[row][col] = value;
      });

      return result;
    } finally {
      this.terminateMatrixWorkers();
    }
  }

  private initializeMatrixWorkers(): void {
    const workerCode = `
      self.onmessage = function(e) {
        const { A, B, row, col } = e.data;

        const colsA = A[0].length;
        let sum = 0;
        for (let k = 0; k < colsA; k++) {
          sum += A[row][k] * B[k][col];
        }

        self.postMessage({ row, col, value: sum });
      };
    `;

    for (let i = 0; i < this.maxWorkers; i++) {
      const blob = new Blob([workerCode], { type: 'application/javascript' });
      const worker = new Worker(URL.createObjectURL(blob));
      this.workers.push(worker);
    }
  }

  private terminateMatrixWorkers(): void {
    this.workers.forEach((worker) => worker.terminate());
    this.workers = [];
  }

  private createTasks(A: number[][], B: number[][], rowsA: number, colsB: number) {
    const tasks: { A: number[][]; B: number[][]; row: number; col: number }[] = [];

    for (let i = 0; i < rowsA; i++) {
      for (let j = 0; j < colsB; j++) {
        tasks.push({ A, B, row: i, col: j });
      }
    }

    return tasks;
  }

  private assignMatrixWorker(task: any): Promise<{ row: number; col: number; value: number }> {
    return new Promise((resolve, reject) => {
      const worker = this.workers.find((w) => !w.onmessage);

      if (!worker) {
        // 串行计算
        const { A, B, row, col } = task;
        const colsA = A[0].length;
        let sum = 0;
        for (let k = 0; k < colsA; k++) {
          sum += A[row][k] * B[k][col];
        }
        resolve({ row, col, value: sum });
        return;
      }

      worker.onmessage = (e) => {
        worker.onmessage = null;
        resolve(e.data);
      };

      worker.onerror = (error) => {
        worker.onmessage = null;
        reject(error);
      };

      worker.postMessage(task);
    });
  }

  // 矩阵转置 (并行优化)
  async transpose(matrix: number[][]): Promise<number[][]> {
    const rows = matrix.length;
    const cols = matrix[0].length;
    const result = Array.from({ length: cols }, () => new Array(rows));

    // 并行转置
    const promises: Promise<void>[] = [];
    const chunkSize = Math.max(1, Math.floor(cols / this.maxWorkers));

    for (let startCol = 0; startCol < cols; startCol += chunkSize) {
      const endCol = Math.min(startCol + chunkSize, cols);
      promises.push(this.transposeBlock(matrix, result, startCol, endCol));
    }

    await Promise.all(promises);
    return result;
  }

  private async transposeBlock(matrix: number[][], result: number[][], startCol: number, endCol: number): Promise<void> {
    const rows = matrix.length;
    for (let j = startCol; j < endCol; j++) {
      for (let i = 0; i < rows; i++) {
        result[j][i] = matrix[i][j];
      }
    }
  }
}

// 使用示例
async function parallelMatrixExample() {
  const multiplier = new ParallelMatrixMultiplication();

  // 创建测试矩阵
  const size = 100;
  const A = Array.from({ length: size }, () => Array.from({ length: size }, () => Math.random()));
  const B = Array.from({ length: size }, () => Array.from({ length: size }, () => Math.random()));

  console.log('Matrix size:', size + 'x' + size);

  const start = performance.now();
  const result = await multiplier.multiply(A, B);
  const end = performance.now();

  console.log('Multiplication completed in', (end - start).toFixed(2), 'ms');
  console.log('Result size:', result.length + 'x' + result[0].length);
}

3. 并行前缀和 (Parallel Prefix Sum)

ts
class ParallelPrefixSum {
  private maxWorkers: number;

  constructor(maxWorkers: number = navigator.hardwareConcurrency || 4) {
    this.maxWorkers = maxWorkers;
  }

  // Hillis-Steele 并行前缀和算法
  async prefixSum(arr: number[]): Promise<number[]> {
    const n = arr.length;
    let result = [...arr];

    // 使用对数轮次计算前缀和
    const rounds = Math.ceil(Math.log2(n));

    for (let d = 0; d < rounds; d++) {
      const stride = 1 << d; // 2^d

      // 并行更新每一轮
      const promises: Promise<void>[] = [];
      const chunkSize = Math.max(1, Math.floor(n / this.maxWorkers));

      for (let start = 0; start < n; start += chunkSize) {
        const end = Math.min(start + chunkSize, n);
        promises.push(this.updatePrefixSumChunk(result, stride, start, end));
      }

      await Promise.all(promises);
    }

    return result;
  }

  private async updatePrefixSumChunk(arr: number[], stride: number, start: number, end: number): Promise<void> {
    for (let i = start; i < end; i++) {
      if (i >= stride) {
        arr[i] += arr[i - stride];
      }
    }
  }

  // 更高效的并行前缀和 (使用分治)
  async efficientPrefixSum(arr: number[]): Promise<number[]> {
    const n = arr.length;
    if (n <= 1) return [...arr];

    // 递归分割
    const mid = Math.floor(n / 2);
    const left = arr.slice(0, mid);
    const right = arr.slice(mid);

    // 并行计算左右两半的前缀和
    const [leftPrefix, rightPrefix] = await Promise.all([this.efficientPrefixSum(left), this.efficientPrefixSum(right)]);

    // 右半部分加上左半部分的最后一个元素
    const rightOffset = leftPrefix[leftPrefix.length - 1];
    for (let i = 0; i < rightPrefix.length; i++) {
      rightPrefix[i] += rightOffset;
    }

    return [...leftPrefix, ...rightPrefix];
  }

  // 使用 Web Workers 的并行前缀和
  async prefixSumWithWorkers(arr: number[]): Promise<number[]> {
    const n = arr.length;
    if (n <= 1) return [...arr];

    // 初始化 workers
    const workers: Worker[] = [];
    const workerCode = `
      self.onmessage = function(e) {
        const { arr, stride } = e.data;
        const result = [...arr];

        for (let i = stride; i < arr.length; i++) {
          result[i] += result[i - stride];
        }

        self.postMessage(result);
      };
    `;

    // 创建 workers
    for (let i = 0; i < this.maxWorkers; i++) {
      const blob = new Blob([workerCode], { type: 'application/javascript' });
      workers.push(new Worker(URL.createObjectURL(blob)));
    }

    try {
      let current = [...arr];
      const rounds = Math.ceil(Math.log2(n));

      for (let d = 0; d < rounds; d++) {
        const stride = 1 << d;

        // 将数组分块分配给 workers
        const chunkSize = Math.max(1, Math.floor(n / this.maxWorkers));
        const promises: Promise<number[]>[] = [];

        for (let start = 0; start < n; start += chunkSize) {
          const end = Math.min(start + chunkSize, n);
          const chunk = current.slice(start, end);

          promises.push(this.processChunkWithWorker(workers, chunk, stride, start));
        }

        const results = await Promise.all(promises);

        // 合并结果
        for (let i = 0; i < results.length; i++) {
          const start = i * chunkSize;
          const chunkResult = results[i];
          for (let j = 0; j < chunkResult.length; j++) {
            current[start + j] = chunkResult[j];
          }
        }
      }

      return current;
    } finally {
      workers.forEach((worker) => worker.terminate());
    }
  }

  private processChunkWithWorker(workers: Worker[], chunk: number[], stride: number, offset: number): Promise<number[]> {
    return new Promise((resolve, reject) => {
      const worker = workers.find((w) => !w.onmessage);

      if (!worker) {
        // 串行处理
        const result = [...chunk];
        for (let i = stride; i < chunk.length; i++) {
          result[i] += result[i - stride];
        }
        resolve(result);
        return;
      }

      worker.onmessage = (e) => {
        worker.onmessage = null;
        resolve(e.data);
      };

      worker.onerror = (error) => {
        worker.onmessage = null;
        reject(error);
      };

      worker.postMessage({ arr: chunk, stride });
    });
  }

  // 验证前缀和结果
  validatePrefixSum(original: number[], prefix: number[]): boolean {
    if (prefix.length !== original.length) return false;

    let sum = 0;
    for (let i = 0; i < original.length; i++) {
      sum += original[i];
      if (prefix[i] !== sum) return false;
    }

    return true;
  }
}

// 使用示例
async function parallelPrefixSumExample() {
  const prefixSum = new ParallelPrefixSum();

  const arr = Array.from({ length: 1000 }, () => Math.floor(Math.random() * 10));
  console.log('Original array length:', arr.length);

  const start = performance.now();
  const result = await prefixSum.prefixSum(arr);
  const end = performance.now();

  console.log('Prefix sum computed in', (end - start).toFixed(2), 'ms');
  console.log('Is valid:', prefixSum.validatePrefixSum(arr, result));
  console.log('First 10 elements:', result.slice(0, 10));
}

4. 并行快速排序 (Parallel Quick Sort)

ts
class ParallelQuickSort {
  private maxWorkers: number;
  private workers: Worker[] = [];

  constructor(maxWorkers: number = navigator.hardwareConcurrency || 4) {
    this.maxWorkers = maxWorkers;
  }

  // 并行快速排序
  async sort(arr: number[]): Promise<number[]> {
    if (arr.length <= 1) return arr;

    // 小数组使用串行排序
    if (arr.length < 100) {
      return this.quickSort([...arr]);
    }

    const pivot = arr[Math.floor(arr.length / 2)];
    const left: number[] = [];
    const right: number[] = [];

    // 分区
    for (const num of arr) {
      if (num < pivot) left.push(num);
      else if (num > pivot) right.push(num);
    }

    // 并行排序左右两部分
    const [sortedLeft, sortedRight] = await Promise.all([this.sort(left), this.sort(right)]);

    return [...sortedLeft, pivot, ...sortedRight];
  }

  // 串行快速排序
  private quickSort(arr: number[]): number[] {
    if (arr.length <= 1) return arr;

    const pivot = arr[Math.floor(arr.length / 2)];
    const left: number[] = [];
    const right: number[] = [];

    for (const num of arr) {
      if (num < pivot) left.push(num);
      else if (num > pivot) right.push(num);
    }

    return [...this.quickSort(left), pivot, ...this.quickSort(right)];
  }

  // 使用 Web Workers 的并行快速排序
  async sortWithWorkers(arr: number[]): Promise<number[]> {
    // 初始化 workers
    this.initializeQuickSortWorkers();

    try {
      return await this.parallelQuickSort(arr);
    } finally {
      this.terminateQuickSortWorkers();
    }
  }

  private initializeQuickSortWorkers(): void {
    const workerCode = `
      self.onmessage = function(e) {
        const arr = e.data;
        const sorted = quickSort(arr);
        self.postMessage(sorted);
      };

      function quickSort(arr) {
        if (arr.length <= 1) return arr;

        const pivot = arr[Math.floor(arr.length / 2)];
        const left = [];
        const right = [];

        for (const num of arr) {
          if (num < pivot) left.push(num);
          else if (num > pivot) right.push(num);
        }

        return [...quickSort(left), pivot, ...quickSort(right)];
      }
    `;

    for (let i = 0; i < this.maxWorkers; i++) {
      const blob = new Blob([workerCode], { type: 'application/javascript' });
      const worker = new Worker(URL.createObjectURL(blob));
      this.workers.push(worker);
    }
  }

  private terminateQuickSortWorkers(): void {
    this.workers.forEach((worker) => worker.terminate());
    this.workers = [];
  }

  private async parallelQuickSort(arr: number[]): Promise<number[]> {
    if (arr.length <= 100) {
      return this.quickSort(arr);
    }

    const pivot = arr[Math.floor(arr.length / 2)];
    const left: number[] = [];
    const right: number[] = [];

    for (const num of arr) {
      if (num < pivot) left.push(num);
      else if (num > pivot) right.push(num);
    }

    // 使用 workers 并行排序
    const promises: Promise<number[]>[] = [];

    if (left.length > 0) {
      promises.push(this.assignWorker(left));
    } else {
      promises.push(Promise.resolve([]));
    }

    if (right.length > 0) {
      promises.push(this.assignWorker(right));
    } else {
      promises.push(Promise.resolve([]));
    }

    const [sortedLeft, sortedRight] = await Promise.all(promises);

    return [...sortedLeft, pivot, ...sortedRight];
  }

  private assignWorker(arr: number[]): Promise<number[]> {
    return new Promise((resolve, reject) => {
      const worker = this.workers.find((w) => !w.onmessage);

      if (!worker) {
        // 没有空闲 worker,使用串行排序
        resolve(this.quickSort(arr));
        return;
      }

      worker.onmessage = (e) => {
        worker.onmessage = null;
        resolve(e.data);
      };

      worker.onerror = (error) => {
        worker.onmessage = null;
        reject(error);
      };

      worker.postMessage(arr);
    });
  }

  // 检查数组是否已排序
  isSorted(arr: number[]): boolean {
    for (let i = 1; i < arr.length; i++) {
      if (arr[i] < arr[i - 1]) return false;
    }
    return true;
  }
}

// 使用示例
async function parallelQuickSortExample() {
  const sorter = new ParallelQuickSort();

  const arr = Array.from({ length: 10000 }, () => Math.floor(Math.random() * 10000));
  console.log('Original array length:', arr.length);

  const start = performance.now();
  const sorted = await sorter.sort(arr);
  const end = performance.now();

  console.log('Sorted in', (end - start).toFixed(2), 'ms');
  console.log('Is sorted:', sorter.isSorted(sorted));
}

5. 实现要点

  • 并行归并排序使用分治策略,递归分割数组。
  • 矩阵乘法通过分块计算提高并行度。
  • 前缀和使用 Hillis-Steele 算法实现并行计算。
  • 快速排序通过并行处理左右子数组加速。
  • 所有算法都支持 Web Workers 以利用多核 CPU。
算法并行算法JavaScript