算法2026-05-09·14 分钟
算法知识库:分布式算法实现
JavaScript/TypeScript 实现分布式系统中的经典算法,如一致性哈希、分布式锁、共识算法等。
分布式算法实现
1. 一致性哈希 (Consistent Hashing)
ts
class ConsistentHashing {
private nodes: Map<string, number[]> = new Map();
private sortedHashes: number[] = [];
private hashFunction: (key: string) => number;
constructor(hashFunction?: (key: string) => number) {
this.hashFunction = hashFunction || this.defaultHash;
}
private defaultHash(key: string): number {
let hash = 0;
for (let i = 0; i < key.length; i++) {
const char = key.charCodeAt(i);
hash = (hash << 5) - hash + char;
hash = hash & hash; // 转换为32位整数
}
return Math.abs(hash);
}
addNode(nodeId: string, virtualNodes: number = 1): void {
if (this.nodes.has(nodeId)) {
return;
}
const hashes: number[] = [];
for (let i = 0; i < virtualNodes; i++) {
const virtualNodeId = `${nodeId}#${i}`;
const hash = this.hashFunction(virtualNodeId);
hashes.push(hash);
}
this.nodes.set(nodeId, hashes);
this.sortedHashes.push(...hashes);
this.sortedHashes.sort((a, b) => a - b);
}
removeNode(nodeId: string): void {
if (!this.nodes.has(nodeId)) {
return;
}
const hashes = this.nodes.get(nodeId)!;
this.nodes.delete(nodeId);
// 从排序列表中移除
for (const hash of hashes) {
const index = this.sortedHashes.indexOf(hash);
if (index !== -1) {
this.sortedHashes.splice(index, 1);
}
}
}
getNode(key: string): string | null {
if (this.sortedHashes.length === 0) {
return null;
}
const keyHash = this.hashFunction(key);
// 找到第一个大于等于 keyHash 的节点
let index = this.binarySearch(keyHash);
if (index === this.sortedHashes.length) {
index = 0; // 环形结构,回到开头
}
const targetHash = this.sortedHashes[index];
// 找到对应的节点
for (const [nodeId, hashes] of this.nodes) {
if (hashes.includes(targetHash)) {
return nodeId;
}
}
return null;
}
private binarySearch(target: number): number {
let left = 0;
let right = this.sortedHashes.length - 1;
while (left <= right) {
const mid = Math.floor((left + right) / 2);
if (this.sortedHashes[mid] === target) {
return mid;
} else if (this.sortedHashes[mid] < target) {
left = mid + 1;
} else {
right = mid - 1;
}
}
return left;
}
getAllNodes(): string[] {
return Array.from(this.nodes.keys());
}
// 获取键的分布情况
getDistribution(keys: string[]): Map<string, number> {
const distribution = new Map<string, number>();
for (const node of this.getAllNodes()) {
distribution.set(node, 0);
}
for (const key of keys) {
const node = this.getNode(key);
if (node) {
distribution.set(node, distribution.get(node)! + 1);
}
}
return distribution;
}
}
// 使用示例
function consistentHashingExample() {
const ch = new ConsistentHashing();
// 添加节点
ch.addNode('node1', 3);
ch.addNode('node2', 3);
ch.addNode('node3', 3);
// 测试键分布
const keys = ['key1', 'key2', 'key3', 'key4', 'key5'];
const distribution = ch.getDistribution(keys);
console.log('键分布:', distribution);
// 添加新节点
ch.addNode('node4', 3);
const newDistribution = ch.getDistribution(keys);
console.log('添加节点后的分布:', newDistribution);
}2. 分布式锁 (Distributed Lock)
ts
interface LockInfo {
lockId: string;
owner: string;
acquiredAt: number;
expiresAt: number;
}
class DistributedLock {
private locks: Map<string, LockInfo> = new Map();
private defaultTTL: number = 30000; // 30秒
// 尝试获取锁
async acquire(resource: string, owner: string, ttl?: number): Promise<boolean> {
const lockTTL = ttl || this.defaultTTL;
const now = Date.now();
const lockId = this.generateLockId();
const existingLock = this.locks.get(resource);
if (existingLock) {
// 检查锁是否过期
if (now > existingLock.expiresAt) {
// 锁已过期,可以获取
this.locks.set(resource, {
lockId,
owner,
acquiredAt: now,
expiresAt: now + lockTTL,
});
return true;
} else {
// 锁仍有效
return false;
}
} else {
// 没有锁,可以获取
this.locks.set(resource, {
lockId,
owner,
acquiredAt: now,
expiresAt: now + lockTTL,
});
return true;
}
}
// 释放锁
async release(resource: string, owner: string, lockId: string): Promise<boolean> {
const existingLock = this.locks.get(resource);
if (existingLock && existingLock.owner === owner && existingLock.lockId === lockId) {
this.locks.delete(resource);
return true;
}
return false;
}
// 续期锁
async extend(resource: string, owner: string, lockId: string, ttl?: number): Promise<boolean> {
const lockTTL = ttl || this.defaultTTL;
const existingLock = this.locks.get(resource);
if (existingLock && existingLock.owner === owner && existingLock.lockId === lockId) {
existingLock.expiresAt = Date.now() + lockTTL;
return true;
}
return false;
}
// 检查锁状态
getLockInfo(resource: string): LockInfo | null {
return this.locks.get(resource) || null;
}
// 清理过期锁
cleanupExpiredLocks(): void {
const now = Date.now();
for (const [resource, lockInfo] of this.locks) {
if (now > lockInfo.expiresAt) {
this.locks.delete(resource);
}
}
}
private generateLockId(): string {
return Math.random().toString(36).substring(2) + Date.now().toString(36);
}
}
// 基于 Redis 的分布式锁实现
class RedisDistributedLock {
private redis: Map<string, string> = new Map(); // 模拟 Redis
private lockPrefix = 'lock:';
async acquire(resource: string, owner: string, ttl: number = 30000): Promise<string | null> {
const lockKey = this.lockPrefix + resource;
const lockValue = owner + ':' + Date.now();
const expiresAt = Date.now() + ttl;
// 使用 SET NX PX 命令原子操作
if (!this.redis.has(lockKey)) {
this.redis.set(lockKey, lockValue + ':' + expiresAt);
return lockValue;
}
// 检查锁是否过期
const existingValue = this.redis.get(lockKey)!;
const [, , existingExpiresAt] = existingValue.split(':');
if (Date.now() > parseInt(existingExpiresAt)) {
// 锁已过期,尝试获取
this.redis.set(lockKey, lockValue + ':' + expiresAt);
return lockValue;
}
return null;
}
async release(resource: string, lockValue: string): Promise<boolean> {
const lockKey = this.lockPrefix + resource;
const existingValue = this.redis.get(lockKey);
if (existingValue && existingValue.startsWith(lockValue)) {
this.redis.delete(lockKey);
return true;
}
return false;
}
}3. Raft 共识算法简化实现
ts
enum RaftState {
FOLLOWER = 'follower',
CANDIDATE = 'candidate',
LEADER = 'leader',
}
interface LogEntry {
term: number;
command: string;
index: number;
}
interface AppendEntriesRequest {
term: number;
leaderId: string;
prevLogIndex: number;
prevLogTerm: number;
entries: LogEntry[];
leaderCommit: number;
}
interface AppendEntriesResponse {
term: number;
success: boolean;
}
interface RequestVoteRequest {
term: number;
candidateId: string;
lastLogIndex: number;
lastLogTerm: number;
}
interface RequestVoteResponse {
term: number;
voteGranted: boolean;
}
class RaftNode {
private id: string;
private state: RaftState = RaftState.FOLLOWER;
private currentTerm: number = 0;
private votedFor: string | null = null;
private log: LogEntry[] = [];
private commitIndex: number = 0;
private lastApplied: number = 0;
// Leader 状态
private nextIndex: Map<string, number> = new Map();
private matchIndex: Map<string, number> = new Map();
// 定时器
private electionTimeout: number;
private heartbeatTimeout: number;
private peers: string[] = [];
private messageQueue: any[] = [];
constructor(id: string, peers: string[]) {
this.id = id;
this.peers = peers;
this.resetElectionTimeout();
}
// 处理选举超时
private resetElectionTimeout(): void {
this.electionTimeout = Date.now() + 150 + Math.random() * 150;
}
// 处理心跳超时
private resetHeartbeatTimeout(): void {
this.heartbeatTimeout = Date.now() + 50;
}
// 成为候选者
private becomeCandidate(): void {
this.state = RaftState.CANDIDATE;
this.currentTerm++;
this.votedFor = this.id;
this.resetElectionTimeout();
// 请求投票
this.requestVotes();
}
// 成为领导者
private becomeLeader(): void {
this.state = RaftState.LEADER;
this.nextIndex.clear();
this.matchIndex.clear();
for (const peer of this.peers) {
this.nextIndex.set(peer, this.log.length);
this.matchIndex.set(peer, 0);
}
this.sendHeartbeats();
}
// 成为跟随者
private becomeFollower(term: number): void {
this.state = RaftState.FOLLOWER;
this.currentTerm = term;
this.votedFor = null;
this.resetElectionTimeout();
}
// 请求投票
private requestVotes(): void {
const lastLogIndex = this.log.length - 1;
const lastLogTerm = lastLogIndex >= 0 ? this.log[lastLogIndex].term : 0;
const request: RequestVoteRequest = {
term: this.currentTerm,
candidateId: this.id,
lastLogIndex,
lastLogTerm,
};
// 发送给所有对等节点
for (const peer of this.peers) {
this.sendMessage(peer, 'requestVote', request);
}
}
// 发送心跳
private sendHeartbeats(): void {
for (const peer of this.peers) {
this.sendAppendEntries(peer);
}
}
// 发送追加条目
private sendAppendEntries(peer: string): void {
const nextIdx = this.nextIndex.get(peer)!;
const prevLogIndex = nextIdx - 1;
const prevLogTerm = prevLogIndex >= 0 ? this.log[prevLogIndex].term : 0;
const entries = this.log.slice(nextIdx);
const request: AppendEntriesRequest = {
term: this.currentTerm,
leaderId: this.id,
prevLogIndex,
prevLogTerm,
entries,
leaderCommit: this.commitIndex,
};
this.sendMessage(peer, 'appendEntries', request);
}
// 处理请求投票
handleRequestVote(request: RequestVoteRequest): RequestVoteResponse {
let voteGranted = false;
if (request.term > this.currentTerm) {
this.becomeFollower(request.term);
}
if (request.term === this.currentTerm && (this.votedFor === null || this.votedFor === request.candidateId)) {
const lastLogIndex = this.log.length - 1;
const lastLogTerm = lastLogIndex >= 0 ? this.log[lastLogIndex].term : 0;
if (request.lastLogTerm > lastLogTerm || (request.lastLogTerm === lastLogTerm && request.lastLogIndex >= lastLogIndex)) {
voteGranted = true;
this.votedFor = request.candidateId;
this.resetElectionTimeout();
}
}
return {
term: this.currentTerm,
voteGranted,
};
}
// 处理追加条目
handleAppendEntries(request: AppendEntriesRequest): AppendEntriesResponse {
if (request.term > this.currentTerm) {
this.becomeFollower(request.term);
}
if (request.term < this.currentTerm) {
return { term: this.currentTerm, success: false };
}
this.resetElectionTimeout();
// 检查日志一致性
if (request.prevLogIndex >= 0) {
if (request.prevLogIndex >= this.log.length || this.log[request.prevLogIndex].term !== request.prevLogTerm) {
return { term: this.currentTerm, success: false };
}
}
// 追加新条目
for (let i = 0; i < request.entries.length; i++) {
const entry = request.entries[i];
const index = request.prevLogIndex + 1 + i;
if (index < this.log.length) {
if (this.log[index].term !== entry.term) {
this.log = this.log.slice(0, index);
}
}
if (index >= this.log.length) {
this.log.push(entry);
}
}
// 更新提交索引
if (request.leaderCommit > this.commitIndex) {
this.commitIndex = Math.min(request.leaderCommit, this.log.length - 1);
}
return { term: this.currentTerm, success: true };
}
// 模拟消息发送
private sendMessage(peer: string, type: string, message: any): void {
// 在实际实现中,这里会通过网络发送消息
this.messageQueue.push({ peer, type, message });
}
// 主循环
run(): void {
setInterval(() => {
const now = Date.now();
if (this.state === RaftState.FOLLOWER || this.state === RaftState.CANDIDATE) {
if (now > this.electionTimeout) {
this.becomeCandidate();
}
}
if (this.state === RaftState.LEADER) {
if (now > this.heartbeatTimeout) {
this.sendHeartbeats();
this.resetHeartbeatTimeout();
}
}
}, 10);
}
}4. 向量时钟 (Vector Clock)
ts
class VectorClock {
private clock: Map<string, number> = new Map();
constructor(nodes: string[]) {
for (const node of nodes) {
this.clock.set(node, 0);
}
}
// 增加本地时钟
increment(nodeId: string): void {
this.clock.set(nodeId, this.clock.get(nodeId)! + 1);
}
// 更新时钟(接收到消息时)
update(otherClock: Map<string, number>): void {
for (const [node, time] of otherClock) {
const currentTime = this.clock.get(node) || 0;
this.clock.set(node, Math.max(currentTime, time));
}
}
// 合并两个时钟
merge(otherClock: VectorClock): void {
this.update(otherClock.clock);
}
// 比较时钟
compare(otherClock: VectorClock): 'before' | 'after' | 'concurrent' {
let isBefore = true;
let isAfter = true;
const allNodes = new Set([...this.clock.keys(), ...otherClock.clock.keys()]);
for (const node of allNodes) {
const thisTime = this.clock.get(node) || 0;
const otherTime = otherClock.clock.get(node) || 0;
if (thisTime < otherTime) {
isAfter = false;
} else if (thisTime > otherTime) {
isBefore = false;
}
}
if (isBefore && !isAfter) return 'before';
if (!isBefore && isAfter) return 'after';
return 'concurrent';
}
// 复制时钟
clone(): VectorClock {
const newClock = new VectorClock([]);
for (const [node, time] of this.clock) {
newClock.clock.set(node, time);
}
return newClock;
}
// 获取时钟值
getClock(): Map<string, number> {
return new Map(this.clock);
}
toString(): string {
const entries = Array.from(this.clock.entries()).sort();
return `{${entries.map(([node, time]) => `${node}:${time}`).join(', ')}}`;
}
}
// 使用示例
function vectorClockExample() {
const vc1 = new VectorClock(['A', 'B', 'C']);
const vc2 = new VectorClock(['A', 'B', 'C']);
vc1.increment('A');
vc1.increment('A');
vc2.increment('B');
console.log('VC1:', vc1.toString());
console.log('VC2:', vc2.toString());
console.log('VC1 vs VC2:', vc1.compare(vc2));
vc1.update(vc2.getClock());
console.log('VC1 after update:', vc1.toString());
}5. 实现要点
- 一致性哈希通过虚拟节点实现负载均衡和节点动态增删。
- 分布式锁解决并发访问控制,支持锁续期和过期清理。
- Raft 算法通过选举和日志复制保证数据一致性。
- 向量时钟用于检测分布式系统中的事件因果关系。
算法分布式JavaScript