算法2026-04-30·10 分钟
算法知识库:分布式算法实现
JavaScript/TypeScript 实现分布式算法,如一致性哈希、分布式锁、共识算法等。
分布式算法实现
1. 一致性哈希
ts
class ConsistentHashing {
private nodes: Map<number, string> = new Map();
private sortedKeys: number[] = [];
private replicas: number;
constructor(replicas: number = 3) {
this.replicas = replicas;
}
addNode(node: string): void {
for (let i = 0; i < this.replicas; i += 1) {
const hash = this.hash(`${node}:${i}`);
this.nodes.set(hash, node);
this.sortedKeys.push(hash);
}
this.sortedKeys.sort((a, b) => a - b);
}
removeNode(node: string): void {
for (let i = 0; i < this.replicas; i += 1) {
const hash = this.hash(`${node}:${i}`);
this.nodes.delete(hash);
const index = this.sortedKeys.indexOf(hash);
if (index > -1) {
this.sortedKeys.splice(index, 1);
}
}
}
getNode(key: string): string | null {
if (this.sortedKeys.length === 0) return null;
const hash = this.hash(key);
const index = this.findCeilingIndex(hash);
return this.nodes.get(this.sortedKeys[index]) || null;
}
private findCeilingIndex(hash: number): number {
let low = 0;
let high = this.sortedKeys.length - 1;
while (low <= high) {
const mid = Math.floor((low + high) / 2);
if (this.sortedKeys[mid] < hash) {
low = mid + 1;
} else {
high = mid - 1;
}
}
return low % this.sortedKeys.length;
}
private hash(key: string): number {
let hash = 0;
for (let i = 0; i < key.length; i += 1) {
const char = key.charCodeAt(i);
hash = (hash << 5) - hash + char;
hash = hash & hash; // 转换为 32 位整数
}
return Math.abs(hash);
}
getAllNodes(): string[] {
return Array.from(new Set(this.nodes.values()));
}
}2. 分布式锁 (基于 Redis 简化版)
ts
class DistributedLock {
private locks: Map<string, { token: string; expiresAt: number }> = new Map();
private defaultTTL = 30000; // 30 秒
async acquire(key: string, ttl: number = this.defaultTTL): Promise<string | null> {
const token = this.generateToken();
const expiresAt = Date.now() + ttl;
// 简化版:直接设置锁
if (!this.locks.has(key) || this.locks.get(key)!.expiresAt < Date.now()) {
this.locks.set(key, { token, expiresAt });
return token;
}
return null;
}
async release(key: string, token: string): Promise<boolean> {
const lock = this.locks.get(key);
if (lock && lock.token === token) {
this.locks.delete(key);
return true;
}
return false;
}
async extend(key: string, token: string, ttl: number = this.defaultTTL): Promise<boolean> {
const lock = this.locks.get(key);
if (lock && lock.token === token) {
lock.expiresAt = Date.now() + ttl;
return true;
}
return false;
}
private generateToken(): string {
return Math.random().toString(36).substring(2) + Date.now().toString(36);
}
// 清理过期锁
cleanup(): void {
const now = Date.now();
for (const [key, lock] of this.locks) {
if (lock.expiresAt < now) {
this.locks.delete(key);
}
}
}
}3. Raft 共识算法 (简化版)
ts
enum RaftState {
FOLLOWER = 'follower',
CANDIDATE = 'candidate',
LEADER = 'leader',
}
interface LogEntry {
term: number;
command: string;
index: number;
}
class RaftNode {
private state: RaftState = RaftState.FOLLOWER;
private currentTerm = 0;
private votedFor: string | null = null;
private log: LogEntry[] = [];
private commitIndex = 0;
private lastApplied = 0;
private nextIndex: Map<string, number> = new Map();
private matchIndex: Map<string, number> = new Map();
private peers: string[] = [];
private nodeId: string;
private electionTimeout: number;
private heartbeatInterval: number;
constructor(nodeId: string, peers: string[]) {
this.nodeId = nodeId;
this.peers = peers;
this.resetElectionTimeout();
this.startElectionTimer();
}
private resetElectionTimeout(): void {
// 随机选举超时时间 (150-300ms)
this.electionTimeout = Date.now() + 150 + Math.random() * 150;
}
private startElectionTimer(): void {
setInterval(() => {
if (Date.now() > this.electionTimeout) {
this.startElection();
}
}, 10);
}
private startElection(): void {
this.state = RaftState.CANDIDATE;
this.currentTerm += 1;
this.votedFor = this.nodeId;
this.resetElectionTimeout();
// 发送请求投票 RPC
const votes = 1; // 给自己投票
for (const peer of this.peers) {
// 模拟发送投票请求
if (this.requestVote(peer)) {
votes += 1;
}
}
if (votes > this.peers.length / 2) {
this.becomeLeader();
}
}
private requestVote(peer: string): boolean {
// 简化版:总是同意投票
return Math.random() > 0.3; // 70% 成功率
}
private becomeLeader(): void {
this.state = RaftState.LEADER;
// 初始化 nextIndex 和 matchIndex
for (const peer of this.peers) {
this.nextIndex.set(peer, this.log.length + 1);
this.matchIndex.set(peer, 0);
}
// 开始心跳
this.startHeartbeat();
}
private startHeartbeat(): void {
setInterval(() => {
if (this.state === RaftState.LEADER) {
this.sendHeartbeats();
}
}, 50);
}
private sendHeartbeats(): void {
for (const peer of this.peers) {
// 发送心跳和日志复制
this.appendEntries(peer);
}
}
private appendEntries(peer: string): void {
const prevLogIndex = this.nextIndex.get(peer)! - 1;
const prevLogTerm = prevLogIndex >= 0 ? this.log[prevLogIndex].term : 0;
const entries = this.log.slice(prevLogIndex + 1);
// 模拟日志复制
if (this.sendAppendEntriesRPC(peer, this.currentTerm, prevLogIndex, prevLogTerm, entries, this.commitIndex)) {
this.nextIndex.set(peer, prevLogIndex + entries.length + 1);
this.matchIndex.set(peer, prevLogIndex + entries.length);
// 更新 commitIndex
this.updateCommitIndex();
} else {
this.nextIndex.set(peer, Math.max(1, this.nextIndex.get(peer)! - 1));
}
}
private sendAppendEntriesRPC(peer: string, term: number, prevLogIndex: number, prevLogTerm: number, entries: LogEntry[], leaderCommit: number): boolean {
// 简化版:总是成功
return Math.random() > 0.2; // 80% 成功率
}
private updateCommitIndex(): void {
const majority = Math.floor((this.peers.length + 1) / 2) + 1;
let count = 1; // 自己
for (const peer of this.peers) {
if (this.matchIndex.get(peer)! >= this.commitIndex + 1) {
count += 1;
}
}
if (count >= majority) {
this.commitIndex += 1;
}
}
appendCommand(command: string): boolean {
if (this.state !== RaftState.LEADER) return false;
const entry: LogEntry = {
term: this.currentTerm,
command,
index: this.log.length,
};
this.log.push(entry);
return true;
}
getState(): RaftState {
return this.state;
}
getCurrentTerm(): number {
return this.currentTerm;
}
}4. 向量时钟
ts
class VectorClock {
private clock: Map<string, number> = new Map();
constructor(nodes: string[]) {
for (const node of nodes) {
this.clock.set(node, 0);
}
}
increment(node: string): void {
this.clock.set(node, (this.clock.get(node) || 0) + 1);
}
update(other: VectorClock): void {
for (const [node, time] of other.clock) {
this.clock.set(node, Math.max(this.clock.get(node) || 0, time));
}
}
compare(other: VectorClock): 'before' | 'after' | 'concurrent' {
let isBefore = false;
let isAfter = false;
for (const node of this.clock.keys()) {
const thisTime = this.clock.get(node) || 0;
const otherTime = other.clock.get(node) || 0;
if (thisTime < otherTime) {
isBefore = true;
} else if (thisTime > otherTime) {
isAfter = true;
}
}
if (isBefore && !isAfter) return 'before';
if (isAfter && !isBefore) return 'after';
return 'concurrent';
}
toString(): string {
const entries = Array.from(this.clock.entries()).sort();
return `{${entries.map(([node, time]) => `${node}:${time}`).join(', ')}}`;
}
}5. 布隆过滤器
ts
class BloomFilter {
private bitArray: boolean[];
private size: number;
private hashFunctions: ((key: string) => number)[];
constructor(size: number = 1000, numHashes: number = 3) {
this.size = size;
this.bitArray = new Array(size).fill(false);
this.hashFunctions = this.createHashFunctions(numHashes);
}
private createHashFunctions(numHashes: number): ((key: string) => number)[] {
const functions: ((key: string) => number)[] = [];
for (let i = 0; i < numHashes; i += 1) {
functions.push((key: string) => {
let hash = 0;
for (let j = 0; j < key.length; j += 1) {
const char = key.charCodeAt(j);
hash = (hash << 5) - hash + char + i;
hash = hash & hash; // 转换为 32 位整数
}
return Math.abs(hash) % this.size;
});
}
return functions;
}
add(key: string): void {
for (const hashFunc of this.hashFunctions) {
const index = hashFunc(key);
this.bitArray[index] = true;
}
}
contains(key: string): boolean {
for (const hashFunc of this.hashFunctions) {
const index = hashFunc(key);
if (!this.bitArray[index]) {
return false;
}
}
return true;
}
// 估算误报率
estimateFalsePositiveRate(numItems: number): number {
const m = this.size;
const k = this.hashFunctions.length;
const n = numItems;
return Math.pow(1 - Math.exp((-k * n) / m), k);
}
}6. 实现要点
- 一致性哈希实现负载均衡。
- 分布式锁防止竞态条件。
- Raft 提供共识保证。
- 向量时钟处理因果关系。
- 布隆过滤器提供空间高效的成员测试。
算法分布式JavaScript