算法2026-04-20·8 分钟
算法知识库:并行和并发算法实现
JavaScript/TypeScript 实现并行和并发算法,如并行排序、并发数据结构等。
并行和并发算法实现
1. 并行归并排序
ts
async function parallelMergeSort(arr: number[]): Promise<number[]> {
if (arr.length <= 1) return arr;
const mid = Math.floor(arr.length / 2);
const [left, right] = await Promise.all([parallelMergeSort(arr.slice(0, mid)), parallelMergeSort(arr.slice(mid))]);
return merge(left, right);
}
function merge(left: number[], right: number[]): number[] {
const result: number[] = [];
let i = 0;
let j = 0;
while (i < left.length && j < right.length) {
if (left[i] <= right[j]) {
result.push(left[i]);
i += 1;
} else {
result.push(right[j]);
j += 1;
}
}
return result.concat(left.slice(i)).concat(right.slice(j));
}2. 并发计数器
ts
class ConcurrentCounter {
private count: number = 0;
private mutex: Promise<void> = Promise.resolve();
async increment(): Promise<number> {
await this.mutex;
this.mutex = new Promise((resolve) => {
this.count += 1;
resolve();
});
await this.mutex;
return this.count;
}
async get(): Promise<number> {
await this.mutex;
return this.count;
}
}3. 生产者-消费者模式
ts
class ProducerConsumer<T> {
private buffer: T[] = [];
private maxSize: number;
private producers: Promise<void>[] = [];
private consumers: Promise<void>[] = [];
constructor(maxSize: number) {
this.maxSize = maxSize;
}
async produce(item: T): Promise<void> {
while (this.buffer.length >= this.maxSize) {
await new Promise((resolve) => setTimeout(resolve, 10));
}
this.buffer.push(item);
}
async consume(): Promise<T | null> {
while (this.buffer.length === 0) {
await new Promise((resolve) => setTimeout(resolve, 10));
}
return this.buffer.shift() || null;
}
async *produceAsync(items: T[]): AsyncGenerator<void> {
for (const item of items) {
await this.produce(item);
yield;
}
}
async *consumeAsync(count: number): AsyncGenerator<T> {
for (let i = 0; i < count; i += 1) {
const item = await this.consume();
if (item !== null) {
yield item;
}
}
}
}4. 并行映射
ts
async function parallelMap<T, U>(items: T[], mapper: (item: T) => Promise<U>, concurrency: number = 4): Promise<U[]> {
const results: U[] = [];
const semaphore = new Semaphore(concurrency);
const promises = items.map(async (item, index) => {
await semaphore.acquire();
try {
const result = await mapper(item);
results[index] = result;
} finally {
semaphore.release();
}
});
await Promise.all(promises);
return results;
}
class Semaphore {
private permits: number;
private waitQueue: (() => void)[] = [];
constructor(permits: number) {
this.permits = permits;
}
async acquire(): Promise<void> {
if (this.permits > 0) {
this.permits -= 1;
return;
}
return new Promise((resolve) => {
this.waitQueue.push(resolve);
});
}
release(): void {
this.permits += 1;
if (this.waitQueue.length > 0) {
const resolve = this.waitQueue.shift()!;
this.permits -= 1;
resolve();
}
}
}5. 并发哈希表
ts
class ConcurrentHashMap<K, V> {
private map: Map<K, V> = new Map();
private locks: Map<K, Promise<void>> = new Map();
async get(key: K): Promise<V | undefined> {
const lock = this.locks.get(key);
if (lock) await lock;
return this.map.get(key);
}
async set(key: K, value: V): Promise<void> {
const existingLock = this.locks.get(key);
if (existingLock) await existingLock;
const lock = new Promise<void>((resolve) => {
this.map.set(key, value);
this.locks.delete(key);
resolve();
});
this.locks.set(key, lock);
await lock;
}
async delete(key: K): Promise<boolean> {
const lock = this.locks.get(key);
if (lock) await lock;
const newLock = new Promise<void>((resolve) => {
const result = this.map.delete(key);
this.locks.delete(key);
resolve();
});
this.locks.set(key, newLock);
await newLock;
return this.map.has(key) === false;
}
}6. 实现要点
- 使用 Promise 处理异步操作。
- 信号量控制并发数量。
- 锁机制保证数据一致性。
- 生产者-消费者模式解耦合。
算法并行并发JavaScript