SorryToPerson logo
返回
算法2026-04-30·10 分钟

算法知识库:分布式算法实现

JavaScript/TypeScript 实现分布式算法,如一致性哈希、分布式锁、共识算法等。

分布式算法实现

1. 一致性哈希

ts
class ConsistentHashing {
  private nodes: Map<number, string> = new Map();
  private sortedKeys: number[] = [];
  private replicas: number;

  constructor(replicas: number = 3) {
    this.replicas = replicas;
  }

  addNode(node: string): void {
    for (let i = 0; i < this.replicas; i += 1) {
      const hash = this.hash(`${node}:${i}`);
      this.nodes.set(hash, node);
      this.sortedKeys.push(hash);
    }
    this.sortedKeys.sort((a, b) => a - b);
  }

  removeNode(node: string): void {
    for (let i = 0; i < this.replicas; i += 1) {
      const hash = this.hash(`${node}:${i}`);
      this.nodes.delete(hash);
      const index = this.sortedKeys.indexOf(hash);
      if (index > -1) {
        this.sortedKeys.splice(index, 1);
      }
    }
  }

  getNode(key: string): string | null {
    if (this.sortedKeys.length === 0) return null;

    const hash = this.hash(key);
    const index = this.findCeilingIndex(hash);

    return this.nodes.get(this.sortedKeys[index]) || null;
  }

  private findCeilingIndex(hash: number): number {
    let low = 0;
    let high = this.sortedKeys.length - 1;

    while (low <= high) {
      const mid = Math.floor((low + high) / 2);
      if (this.sortedKeys[mid] < hash) {
        low = mid + 1;
      } else {
        high = mid - 1;
      }
    }

    return low % this.sortedKeys.length;
  }

  private hash(key: string): number {
    let hash = 0;
    for (let i = 0; i < key.length; i += 1) {
      const char = key.charCodeAt(i);
      hash = (hash << 5) - hash + char;
      hash = hash & hash; // 转换为 32 位整数
    }
    return Math.abs(hash);
  }

  getAllNodes(): string[] {
    return Array.from(new Set(this.nodes.values()));
  }
}

2. 分布式锁 (基于 Redis 简化版)

ts
class DistributedLock {
  private locks: Map<string, { token: string; expiresAt: number }> = new Map();
  private defaultTTL = 30000; // 30 秒

  async acquire(key: string, ttl: number = this.defaultTTL): Promise<string | null> {
    const token = this.generateToken();
    const expiresAt = Date.now() + ttl;

    // 简化版:直接设置锁
    if (!this.locks.has(key) || this.locks.get(key)!.expiresAt < Date.now()) {
      this.locks.set(key, { token, expiresAt });
      return token;
    }

    return null;
  }

  async release(key: string, token: string): Promise<boolean> {
    const lock = this.locks.get(key);
    if (lock && lock.token === token) {
      this.locks.delete(key);
      return true;
    }
    return false;
  }

  async extend(key: string, token: string, ttl: number = this.defaultTTL): Promise<boolean> {
    const lock = this.locks.get(key);
    if (lock && lock.token === token) {
      lock.expiresAt = Date.now() + ttl;
      return true;
    }
    return false;
  }

  private generateToken(): string {
    return Math.random().toString(36).substring(2) + Date.now().toString(36);
  }

  // 清理过期锁
  cleanup(): void {
    const now = Date.now();
    for (const [key, lock] of this.locks) {
      if (lock.expiresAt < now) {
        this.locks.delete(key);
      }
    }
  }
}

3. Raft 共识算法 (简化版)

ts
enum RaftState {
  FOLLOWER = 'follower',
  CANDIDATE = 'candidate',
  LEADER = 'leader',
}

interface LogEntry {
  term: number;
  command: string;
  index: number;
}

class RaftNode {
  private state: RaftState = RaftState.FOLLOWER;
  private currentTerm = 0;
  private votedFor: string | null = null;
  private log: LogEntry[] = [];
  private commitIndex = 0;
  private lastApplied = 0;
  private nextIndex: Map<string, number> = new Map();
  private matchIndex: Map<string, number> = new Map();

  private peers: string[] = [];
  private nodeId: string;
  private electionTimeout: number;
  private heartbeatInterval: number;

  constructor(nodeId: string, peers: string[]) {
    this.nodeId = nodeId;
    this.peers = peers;
    this.resetElectionTimeout();
    this.startElectionTimer();
  }

  private resetElectionTimeout(): void {
    // 随机选举超时时间 (150-300ms)
    this.electionTimeout = Date.now() + 150 + Math.random() * 150;
  }

  private startElectionTimer(): void {
    setInterval(() => {
      if (Date.now() > this.electionTimeout) {
        this.startElection();
      }
    }, 10);
  }

  private startElection(): void {
    this.state = RaftState.CANDIDATE;
    this.currentTerm += 1;
    this.votedFor = this.nodeId;
    this.resetElectionTimeout();

    // 发送请求投票 RPC
    const votes = 1; // 给自己投票
    for (const peer of this.peers) {
      // 模拟发送投票请求
      if (this.requestVote(peer)) {
        votes += 1;
      }
    }

    if (votes > this.peers.length / 2) {
      this.becomeLeader();
    }
  }

  private requestVote(peer: string): boolean {
    // 简化版:总是同意投票
    return Math.random() > 0.3; // 70% 成功率
  }

  private becomeLeader(): void {
    this.state = RaftState.LEADER;
    // 初始化 nextIndex 和 matchIndex
    for (const peer of this.peers) {
      this.nextIndex.set(peer, this.log.length + 1);
      this.matchIndex.set(peer, 0);
    }

    // 开始心跳
    this.startHeartbeat();
  }

  private startHeartbeat(): void {
    setInterval(() => {
      if (this.state === RaftState.LEADER) {
        this.sendHeartbeats();
      }
    }, 50);
  }

  private sendHeartbeats(): void {
    for (const peer of this.peers) {
      // 发送心跳和日志复制
      this.appendEntries(peer);
    }
  }

  private appendEntries(peer: string): void {
    const prevLogIndex = this.nextIndex.get(peer)! - 1;
    const prevLogTerm = prevLogIndex >= 0 ? this.log[prevLogIndex].term : 0;
    const entries = this.log.slice(prevLogIndex + 1);

    // 模拟日志复制
    if (this.sendAppendEntriesRPC(peer, this.currentTerm, prevLogIndex, prevLogTerm, entries, this.commitIndex)) {
      this.nextIndex.set(peer, prevLogIndex + entries.length + 1);
      this.matchIndex.set(peer, prevLogIndex + entries.length);

      // 更新 commitIndex
      this.updateCommitIndex();
    } else {
      this.nextIndex.set(peer, Math.max(1, this.nextIndex.get(peer)! - 1));
    }
  }

  private sendAppendEntriesRPC(peer: string, term: number, prevLogIndex: number, prevLogTerm: number, entries: LogEntry[], leaderCommit: number): boolean {
    // 简化版:总是成功
    return Math.random() > 0.2; // 80% 成功率
  }

  private updateCommitIndex(): void {
    const majority = Math.floor((this.peers.length + 1) / 2) + 1;
    let count = 1; // 自己

    for (const peer of this.peers) {
      if (this.matchIndex.get(peer)! >= this.commitIndex + 1) {
        count += 1;
      }
    }

    if (count >= majority) {
      this.commitIndex += 1;
    }
  }

  appendCommand(command: string): boolean {
    if (this.state !== RaftState.LEADER) return false;

    const entry: LogEntry = {
      term: this.currentTerm,
      command,
      index: this.log.length,
    };

    this.log.push(entry);
    return true;
  }

  getState(): RaftState {
    return this.state;
  }

  getCurrentTerm(): number {
    return this.currentTerm;
  }
}

4. 向量时钟

ts
class VectorClock {
  private clock: Map<string, number> = new Map();

  constructor(nodes: string[]) {
    for (const node of nodes) {
      this.clock.set(node, 0);
    }
  }

  increment(node: string): void {
    this.clock.set(node, (this.clock.get(node) || 0) + 1);
  }

  update(other: VectorClock): void {
    for (const [node, time] of other.clock) {
      this.clock.set(node, Math.max(this.clock.get(node) || 0, time));
    }
  }

  compare(other: VectorClock): 'before' | 'after' | 'concurrent' {
    let isBefore = false;
    let isAfter = false;

    for (const node of this.clock.keys()) {
      const thisTime = this.clock.get(node) || 0;
      const otherTime = other.clock.get(node) || 0;

      if (thisTime < otherTime) {
        isBefore = true;
      } else if (thisTime > otherTime) {
        isAfter = true;
      }
    }

    if (isBefore && !isAfter) return 'before';
    if (isAfter && !isBefore) return 'after';
    return 'concurrent';
  }

  toString(): string {
    const entries = Array.from(this.clock.entries()).sort();
    return `{${entries.map(([node, time]) => `${node}:${time}`).join(', ')}}`;
  }
}

5. 布隆过滤器

ts
class BloomFilter {
  private bitArray: boolean[];
  private size: number;
  private hashFunctions: ((key: string) => number)[];

  constructor(size: number = 1000, numHashes: number = 3) {
    this.size = size;
    this.bitArray = new Array(size).fill(false);
    this.hashFunctions = this.createHashFunctions(numHashes);
  }

  private createHashFunctions(numHashes: number): ((key: string) => number)[] {
    const functions: ((key: string) => number)[] = [];

    for (let i = 0; i < numHashes; i += 1) {
      functions.push((key: string) => {
        let hash = 0;
        for (let j = 0; j < key.length; j += 1) {
          const char = key.charCodeAt(j);
          hash = (hash << 5) - hash + char + i;
          hash = hash & hash; // 转换为 32 位整数
        }
        return Math.abs(hash) % this.size;
      });
    }

    return functions;
  }

  add(key: string): void {
    for (const hashFunc of this.hashFunctions) {
      const index = hashFunc(key);
      this.bitArray[index] = true;
    }
  }

  contains(key: string): boolean {
    for (const hashFunc of this.hashFunctions) {
      const index = hashFunc(key);
      if (!this.bitArray[index]) {
        return false;
      }
    }
    return true;
  }

  // 估算误报率
  estimateFalsePositiveRate(numItems: number): number {
    const m = this.size;
    const k = this.hashFunctions.length;
    const n = numItems;

    return Math.pow(1 - Math.exp((-k * n) / m), k);
  }
}

6. 实现要点

  • 一致性哈希实现负载均衡。
  • 分布式锁防止竞态条件。
  • Raft 提供共识保证。
  • 向量时钟处理因果关系。
  • 布隆过滤器提供空间高效的成员测试。
算法分布式JavaScript