SorryToPerson logo
返回
算法2026-05-09·14 分钟

算法知识库:分布式算法实现

JavaScript/TypeScript 实现分布式系统中的经典算法,如一致性哈希、分布式锁、共识算法等。

分布式算法实现

1. 一致性哈希 (Consistent Hashing)

ts
class ConsistentHashing {
  private nodes: Map<string, number[]> = new Map();
  private sortedHashes: number[] = [];
  private hashFunction: (key: string) => number;

  constructor(hashFunction?: (key: string) => number) {
    this.hashFunction = hashFunction || this.defaultHash;
  }

  private defaultHash(key: string): number {
    let hash = 0;
    for (let i = 0; i < key.length; i++) {
      const char = key.charCodeAt(i);
      hash = (hash << 5) - hash + char;
      hash = hash & hash; // 转换为32位整数
    }
    return Math.abs(hash);
  }

  addNode(nodeId: string, virtualNodes: number = 1): void {
    if (this.nodes.has(nodeId)) {
      return;
    }

    const hashes: number[] = [];
    for (let i = 0; i < virtualNodes; i++) {
      const virtualNodeId = `${nodeId}#${i}`;
      const hash = this.hashFunction(virtualNodeId);
      hashes.push(hash);
    }

    this.nodes.set(nodeId, hashes);
    this.sortedHashes.push(...hashes);
    this.sortedHashes.sort((a, b) => a - b);
  }

  removeNode(nodeId: string): void {
    if (!this.nodes.has(nodeId)) {
      return;
    }

    const hashes = this.nodes.get(nodeId)!;
    this.nodes.delete(nodeId);

    // 从排序列表中移除
    for (const hash of hashes) {
      const index = this.sortedHashes.indexOf(hash);
      if (index !== -1) {
        this.sortedHashes.splice(index, 1);
      }
    }
  }

  getNode(key: string): string | null {
    if (this.sortedHashes.length === 0) {
      return null;
    }

    const keyHash = this.hashFunction(key);

    // 找到第一个大于等于 keyHash 的节点
    let index = this.binarySearch(keyHash);

    if (index === this.sortedHashes.length) {
      index = 0; // 环形结构,回到开头
    }

    const targetHash = this.sortedHashes[index];

    // 找到对应的节点
    for (const [nodeId, hashes] of this.nodes) {
      if (hashes.includes(targetHash)) {
        return nodeId;
      }
    }

    return null;
  }

  private binarySearch(target: number): number {
    let left = 0;
    let right = this.sortedHashes.length - 1;

    while (left <= right) {
      const mid = Math.floor((left + right) / 2);
      if (this.sortedHashes[mid] === target) {
        return mid;
      } else if (this.sortedHashes[mid] < target) {
        left = mid + 1;
      } else {
        right = mid - 1;
      }
    }

    return left;
  }

  getAllNodes(): string[] {
    return Array.from(this.nodes.keys());
  }

  // 获取键的分布情况
  getDistribution(keys: string[]): Map<string, number> {
    const distribution = new Map<string, number>();

    for (const node of this.getAllNodes()) {
      distribution.set(node, 0);
    }

    for (const key of keys) {
      const node = this.getNode(key);
      if (node) {
        distribution.set(node, distribution.get(node)! + 1);
      }
    }

    return distribution;
  }
}

// 使用示例
function consistentHashingExample() {
  const ch = new ConsistentHashing();

  // 添加节点
  ch.addNode('node1', 3);
  ch.addNode('node2', 3);
  ch.addNode('node3', 3);

  // 测试键分布
  const keys = ['key1', 'key2', 'key3', 'key4', 'key5'];
  const distribution = ch.getDistribution(keys);

  console.log('键分布:', distribution);

  // 添加新节点
  ch.addNode('node4', 3);
  const newDistribution = ch.getDistribution(keys);
  console.log('添加节点后的分布:', newDistribution);
}

2. 分布式锁 (Distributed Lock)

ts
interface LockInfo {
  lockId: string;
  owner: string;
  acquiredAt: number;
  expiresAt: number;
}

class DistributedLock {
  private locks: Map<string, LockInfo> = new Map();
  private defaultTTL: number = 30000; // 30秒

  // 尝试获取锁
  async acquire(resource: string, owner: string, ttl?: number): Promise<boolean> {
    const lockTTL = ttl || this.defaultTTL;
    const now = Date.now();
    const lockId = this.generateLockId();

    const existingLock = this.locks.get(resource);

    if (existingLock) {
      // 检查锁是否过期
      if (now > existingLock.expiresAt) {
        // 锁已过期,可以获取
        this.locks.set(resource, {
          lockId,
          owner,
          acquiredAt: now,
          expiresAt: now + lockTTL,
        });
        return true;
      } else {
        // 锁仍有效
        return false;
      }
    } else {
      // 没有锁,可以获取
      this.locks.set(resource, {
        lockId,
        owner,
        acquiredAt: now,
        expiresAt: now + lockTTL,
      });
      return true;
    }
  }

  // 释放锁
  async release(resource: string, owner: string, lockId: string): Promise<boolean> {
    const existingLock = this.locks.get(resource);

    if (existingLock && existingLock.owner === owner && existingLock.lockId === lockId) {
      this.locks.delete(resource);
      return true;
    }

    return false;
  }

  // 续期锁
  async extend(resource: string, owner: string, lockId: string, ttl?: number): Promise<boolean> {
    const lockTTL = ttl || this.defaultTTL;
    const existingLock = this.locks.get(resource);

    if (existingLock && existingLock.owner === owner && existingLock.lockId === lockId) {
      existingLock.expiresAt = Date.now() + lockTTL;
      return true;
    }

    return false;
  }

  // 检查锁状态
  getLockInfo(resource: string): LockInfo | null {
    return this.locks.get(resource) || null;
  }

  // 清理过期锁
  cleanupExpiredLocks(): void {
    const now = Date.now();
    for (const [resource, lockInfo] of this.locks) {
      if (now > lockInfo.expiresAt) {
        this.locks.delete(resource);
      }
    }
  }

  private generateLockId(): string {
    return Math.random().toString(36).substring(2) + Date.now().toString(36);
  }
}

// 基于 Redis 的分布式锁实现
class RedisDistributedLock {
  private redis: Map<string, string> = new Map(); // 模拟 Redis
  private lockPrefix = 'lock:';

  async acquire(resource: string, owner: string, ttl: number = 30000): Promise<string | null> {
    const lockKey = this.lockPrefix + resource;
    const lockValue = owner + ':' + Date.now();
    const expiresAt = Date.now() + ttl;

    // 使用 SET NX PX 命令原子操作
    if (!this.redis.has(lockKey)) {
      this.redis.set(lockKey, lockValue + ':' + expiresAt);
      return lockValue;
    }

    // 检查锁是否过期
    const existingValue = this.redis.get(lockKey)!;
    const [, , existingExpiresAt] = existingValue.split(':');

    if (Date.now() > parseInt(existingExpiresAt)) {
      // 锁已过期,尝试获取
      this.redis.set(lockKey, lockValue + ':' + expiresAt);
      return lockValue;
    }

    return null;
  }

  async release(resource: string, lockValue: string): Promise<boolean> {
    const lockKey = this.lockPrefix + resource;
    const existingValue = this.redis.get(lockKey);

    if (existingValue && existingValue.startsWith(lockValue)) {
      this.redis.delete(lockKey);
      return true;
    }

    return false;
  }
}

3. Raft 共识算法简化实现

ts
enum RaftState {
  FOLLOWER = 'follower',
  CANDIDATE = 'candidate',
  LEADER = 'leader',
}

interface LogEntry {
  term: number;
  command: string;
  index: number;
}

interface AppendEntriesRequest {
  term: number;
  leaderId: string;
  prevLogIndex: number;
  prevLogTerm: number;
  entries: LogEntry[];
  leaderCommit: number;
}

interface AppendEntriesResponse {
  term: number;
  success: boolean;
}

interface RequestVoteRequest {
  term: number;
  candidateId: string;
  lastLogIndex: number;
  lastLogTerm: number;
}

interface RequestVoteResponse {
  term: number;
  voteGranted: boolean;
}

class RaftNode {
  private id: string;
  private state: RaftState = RaftState.FOLLOWER;
  private currentTerm: number = 0;
  private votedFor: string | null = null;
  private log: LogEntry[] = [];
  private commitIndex: number = 0;
  private lastApplied: number = 0;

  // Leader 状态
  private nextIndex: Map<string, number> = new Map();
  private matchIndex: Map<string, number> = new Map();

  // 定时器
  private electionTimeout: number;
  private heartbeatTimeout: number;

  private peers: string[] = [];
  private messageQueue: any[] = [];

  constructor(id: string, peers: string[]) {
    this.id = id;
    this.peers = peers;
    this.resetElectionTimeout();
  }

  // 处理选举超时
  private resetElectionTimeout(): void {
    this.electionTimeout = Date.now() + 150 + Math.random() * 150;
  }

  // 处理心跳超时
  private resetHeartbeatTimeout(): void {
    this.heartbeatTimeout = Date.now() + 50;
  }

  // 成为候选者
  private becomeCandidate(): void {
    this.state = RaftState.CANDIDATE;
    this.currentTerm++;
    this.votedFor = this.id;
    this.resetElectionTimeout();

    // 请求投票
    this.requestVotes();
  }

  // 成为领导者
  private becomeLeader(): void {
    this.state = RaftState.LEADER;
    this.nextIndex.clear();
    this.matchIndex.clear();

    for (const peer of this.peers) {
      this.nextIndex.set(peer, this.log.length);
      this.matchIndex.set(peer, 0);
    }

    this.sendHeartbeats();
  }

  // 成为跟随者
  private becomeFollower(term: number): void {
    this.state = RaftState.FOLLOWER;
    this.currentTerm = term;
    this.votedFor = null;
    this.resetElectionTimeout();
  }

  // 请求投票
  private requestVotes(): void {
    const lastLogIndex = this.log.length - 1;
    const lastLogTerm = lastLogIndex >= 0 ? this.log[lastLogIndex].term : 0;

    const request: RequestVoteRequest = {
      term: this.currentTerm,
      candidateId: this.id,
      lastLogIndex,
      lastLogTerm,
    };

    // 发送给所有对等节点
    for (const peer of this.peers) {
      this.sendMessage(peer, 'requestVote', request);
    }
  }

  // 发送心跳
  private sendHeartbeats(): void {
    for (const peer of this.peers) {
      this.sendAppendEntries(peer);
    }
  }

  // 发送追加条目
  private sendAppendEntries(peer: string): void {
    const nextIdx = this.nextIndex.get(peer)!;
    const prevLogIndex = nextIdx - 1;
    const prevLogTerm = prevLogIndex >= 0 ? this.log[prevLogIndex].term : 0;

    const entries = this.log.slice(nextIdx);

    const request: AppendEntriesRequest = {
      term: this.currentTerm,
      leaderId: this.id,
      prevLogIndex,
      prevLogTerm,
      entries,
      leaderCommit: this.commitIndex,
    };

    this.sendMessage(peer, 'appendEntries', request);
  }

  // 处理请求投票
  handleRequestVote(request: RequestVoteRequest): RequestVoteResponse {
    let voteGranted = false;

    if (request.term > this.currentTerm) {
      this.becomeFollower(request.term);
    }

    if (request.term === this.currentTerm && (this.votedFor === null || this.votedFor === request.candidateId)) {
      const lastLogIndex = this.log.length - 1;
      const lastLogTerm = lastLogIndex >= 0 ? this.log[lastLogIndex].term : 0;

      if (request.lastLogTerm > lastLogTerm || (request.lastLogTerm === lastLogTerm && request.lastLogIndex >= lastLogIndex)) {
        voteGranted = true;
        this.votedFor = request.candidateId;
        this.resetElectionTimeout();
      }
    }

    return {
      term: this.currentTerm,
      voteGranted,
    };
  }

  // 处理追加条目
  handleAppendEntries(request: AppendEntriesRequest): AppendEntriesResponse {
    if (request.term > this.currentTerm) {
      this.becomeFollower(request.term);
    }

    if (request.term < this.currentTerm) {
      return { term: this.currentTerm, success: false };
    }

    this.resetElectionTimeout();

    // 检查日志一致性
    if (request.prevLogIndex >= 0) {
      if (request.prevLogIndex >= this.log.length || this.log[request.prevLogIndex].term !== request.prevLogTerm) {
        return { term: this.currentTerm, success: false };
      }
    }

    // 追加新条目
    for (let i = 0; i < request.entries.length; i++) {
      const entry = request.entries[i];
      const index = request.prevLogIndex + 1 + i;

      if (index < this.log.length) {
        if (this.log[index].term !== entry.term) {
          this.log = this.log.slice(0, index);
        }
      }

      if (index >= this.log.length) {
        this.log.push(entry);
      }
    }

    // 更新提交索引
    if (request.leaderCommit > this.commitIndex) {
      this.commitIndex = Math.min(request.leaderCommit, this.log.length - 1);
    }

    return { term: this.currentTerm, success: true };
  }

  // 模拟消息发送
  private sendMessage(peer: string, type: string, message: any): void {
    // 在实际实现中,这里会通过网络发送消息
    this.messageQueue.push({ peer, type, message });
  }

  // 主循环
  run(): void {
    setInterval(() => {
      const now = Date.now();

      if (this.state === RaftState.FOLLOWER || this.state === RaftState.CANDIDATE) {
        if (now > this.electionTimeout) {
          this.becomeCandidate();
        }
      }

      if (this.state === RaftState.LEADER) {
        if (now > this.heartbeatTimeout) {
          this.sendHeartbeats();
          this.resetHeartbeatTimeout();
        }
      }
    }, 10);
  }
}

4. 向量时钟 (Vector Clock)

ts
class VectorClock {
  private clock: Map<string, number> = new Map();

  constructor(nodes: string[]) {
    for (const node of nodes) {
      this.clock.set(node, 0);
    }
  }

  // 增加本地时钟
  increment(nodeId: string): void {
    this.clock.set(nodeId, this.clock.get(nodeId)! + 1);
  }

  // 更新时钟(接收到消息时)
  update(otherClock: Map<string, number>): void {
    for (const [node, time] of otherClock) {
      const currentTime = this.clock.get(node) || 0;
      this.clock.set(node, Math.max(currentTime, time));
    }
  }

  // 合并两个时钟
  merge(otherClock: VectorClock): void {
    this.update(otherClock.clock);
  }

  // 比较时钟
  compare(otherClock: VectorClock): 'before' | 'after' | 'concurrent' {
    let isBefore = true;
    let isAfter = true;

    const allNodes = new Set([...this.clock.keys(), ...otherClock.clock.keys()]);

    for (const node of allNodes) {
      const thisTime = this.clock.get(node) || 0;
      const otherTime = otherClock.clock.get(node) || 0;

      if (thisTime < otherTime) {
        isAfter = false;
      } else if (thisTime > otherTime) {
        isBefore = false;
      }
    }

    if (isBefore && !isAfter) return 'before';
    if (!isBefore && isAfter) return 'after';
    return 'concurrent';
  }

  // 复制时钟
  clone(): VectorClock {
    const newClock = new VectorClock([]);
    for (const [node, time] of this.clock) {
      newClock.clock.set(node, time);
    }
    return newClock;
  }

  // 获取时钟值
  getClock(): Map<string, number> {
    return new Map(this.clock);
  }

  toString(): string {
    const entries = Array.from(this.clock.entries()).sort();
    return `{${entries.map(([node, time]) => `${node}:${time}`).join(', ')}}`;
  }
}

// 使用示例
function vectorClockExample() {
  const vc1 = new VectorClock(['A', 'B', 'C']);
  const vc2 = new VectorClock(['A', 'B', 'C']);

  vc1.increment('A');
  vc1.increment('A');

  vc2.increment('B');

  console.log('VC1:', vc1.toString());
  console.log('VC2:', vc2.toString());
  console.log('VC1 vs VC2:', vc1.compare(vc2));

  vc1.update(vc2.getClock());
  console.log('VC1 after update:', vc1.toString());
}

5. 实现要点

  • 一致性哈希通过虚拟节点实现负载均衡和节点动态增删。
  • 分布式锁解决并发访问控制,支持锁续期和过期清理。
  • Raft 算法通过选举和日志复制保证数据一致性。
  • 向量时钟用于检测分布式系统中的事件因果关系。
算法分布式JavaScript