SorryToPerson logo
返回
算法2026-04-20·8 分钟

算法知识库:并行和并发算法实现

JavaScript/TypeScript 实现并行和并发算法,如并行排序、并发数据结构等。

并行和并发算法实现

1. 并行归并排序

ts
async function parallelMergeSort(arr: number[]): Promise<number[]> {
  if (arr.length <= 1) return arr;

  const mid = Math.floor(arr.length / 2);
  const [left, right] = await Promise.all([parallelMergeSort(arr.slice(0, mid)), parallelMergeSort(arr.slice(mid))]);

  return merge(left, right);
}

function merge(left: number[], right: number[]): number[] {
  const result: number[] = [];
  let i = 0;
  let j = 0;

  while (i < left.length && j < right.length) {
    if (left[i] <= right[j]) {
      result.push(left[i]);
      i += 1;
    } else {
      result.push(right[j]);
      j += 1;
    }
  }

  return result.concat(left.slice(i)).concat(right.slice(j));
}

2. 并发计数器

ts
class ConcurrentCounter {
  private count: number = 0;
  private mutex: Promise<void> = Promise.resolve();

  async increment(): Promise<number> {
    await this.mutex;
    this.mutex = new Promise((resolve) => {
      this.count += 1;
      resolve();
    });
    await this.mutex;
    return this.count;
  }

  async get(): Promise<number> {
    await this.mutex;
    return this.count;
  }
}

3. 生产者-消费者模式

ts
class ProducerConsumer<T> {
  private buffer: T[] = [];
  private maxSize: number;
  private producers: Promise<void>[] = [];
  private consumers: Promise<void>[] = [];

  constructor(maxSize: number) {
    this.maxSize = maxSize;
  }

  async produce(item: T): Promise<void> {
    while (this.buffer.length >= this.maxSize) {
      await new Promise((resolve) => setTimeout(resolve, 10));
    }
    this.buffer.push(item);
  }

  async consume(): Promise<T | null> {
    while (this.buffer.length === 0) {
      await new Promise((resolve) => setTimeout(resolve, 10));
    }
    return this.buffer.shift() || null;
  }

  async *produceAsync(items: T[]): AsyncGenerator<void> {
    for (const item of items) {
      await this.produce(item);
      yield;
    }
  }

  async *consumeAsync(count: number): AsyncGenerator<T> {
    for (let i = 0; i < count; i += 1) {
      const item = await this.consume();
      if (item !== null) {
        yield item;
      }
    }
  }
}

4. 并行映射

ts
async function parallelMap<T, U>(items: T[], mapper: (item: T) => Promise<U>, concurrency: number = 4): Promise<U[]> {
  const results: U[] = [];
  const semaphore = new Semaphore(concurrency);

  const promises = items.map(async (item, index) => {
    await semaphore.acquire();
    try {
      const result = await mapper(item);
      results[index] = result;
    } finally {
      semaphore.release();
    }
  });

  await Promise.all(promises);
  return results;
}

class Semaphore {
  private permits: number;
  private waitQueue: (() => void)[] = [];

  constructor(permits: number) {
    this.permits = permits;
  }

  async acquire(): Promise<void> {
    if (this.permits > 0) {
      this.permits -= 1;
      return;
    }

    return new Promise((resolve) => {
      this.waitQueue.push(resolve);
    });
  }

  release(): void {
    this.permits += 1;
    if (this.waitQueue.length > 0) {
      const resolve = this.waitQueue.shift()!;
      this.permits -= 1;
      resolve();
    }
  }
}

5. 并发哈希表

ts
class ConcurrentHashMap<K, V> {
  private map: Map<K, V> = new Map();
  private locks: Map<K, Promise<void>> = new Map();

  async get(key: K): Promise<V | undefined> {
    const lock = this.locks.get(key);
    if (lock) await lock;
    return this.map.get(key);
  }

  async set(key: K, value: V): Promise<void> {
    const existingLock = this.locks.get(key);
    if (existingLock) await existingLock;

    const lock = new Promise<void>((resolve) => {
      this.map.set(key, value);
      this.locks.delete(key);
      resolve();
    });

    this.locks.set(key, lock);
    await lock;
  }

  async delete(key: K): Promise<boolean> {
    const lock = this.locks.get(key);
    if (lock) await lock;

    const newLock = new Promise<void>((resolve) => {
      const result = this.map.delete(key);
      this.locks.delete(key);
      resolve();
    });

    this.locks.set(key, newLock);
    await newLock;
    return this.map.has(key) === false;
  }
}

6. 实现要点

  • 使用 Promise 处理异步操作。
  • 信号量控制并发数量。
  • 锁机制保证数据一致性。
  • 生产者-消费者模式解耦合。
算法并行并发JavaScript