02-最终一致性方案详解

张开发
2026/4/15 23:44:16 15 分钟阅读

分享文章

02-最终一致性方案详解
最终一致性方案详解本章导读最终一致性是分布式系统中平衡性能与一致性的重要策略,广泛应用于互联网大规模系统。本章将深入探讨事件驱动架构、CQRS、Saga模式等最终一致性实现方案,帮助你设计高可用、高性能的分布式系统。学习目标:目标1:理解最终一致性的核心概念和适用场景目标2:掌握事件溯源、CQRS模式的设计与实现目标3:能够运用Saga模式处理分布式事务前置知识:熟悉强一致性方案(2PC、Paxos、Raft),了解消息队列基础阅读时长:约 50 分钟一、知识概述最终一致性(Eventual Consistency)是分布式系统中一种重要的一致性模型,它允许系统在一段时间内处于不一致状态,但保证在没有新更新的情况下,最终所有副本都会达到一致状态。本文将深入探讨最终一致性的实现方案,包括消息队列、事件溯源、CQRS 等模式。最终一致性的定义最终一致性保证:收敛性:所有副本最终会收敛到相同的状态可用性:系统始终可读写分区容错:网络分区时系统仍可工作BASE 理论BASE 是对 CAP 定理中 AP 方案的延伸:Basically Available(基本可用):系统出现故障时,允许损失部分可用性Soft State(软状态):允许系统存在中间状态Eventually Consistent(最终一致性):系统最终达到一致二、核心实现方案2.1 消息队列实现最终一致性原理说明通过消息队列实现异步数据同步:主系统完成操作后发送消息从系统异步消费消息并同步数据通过重试机制保证消息最终被处理Java 实现importjava.util.*;importjava.util.concurrent.*;importjava.util.concurrent.atomic.*;/** * 最终一致性消息队列 */publicclassEventualConsistencyMessageQueue{privatefinalStringname;privatefinalBlockingQueueMessagequeue;privatefinalExecutorServiceexecutor;privatefinalMessageStoremessageStore;privatefinalListMessageConsumerconsumers;privatefinalScheduledExecutorServiceretryScheduler;// 消息状态publicenumMessageStatus{PENDING,PROCESSING,COMPLETED,FAILED,RETRYING}publicEventualConsistencyMessageQueue(Stringname){this.name=name;this.queue=newLinkedBlockingQueue(10000);this.executor=Executors.newFixedThreadPool(4);this.messageStore=newMessageStore();this.consumers=newCopyOnWriteArrayList();this.retryScheduler=Executors.newScheduledThreadPool(1);startConsumers();startRetryTask();}/** * 发送消息 */publicvoidsend(Stringtopic,Objectpayload){Messagemessage=newMessage(UUID.randomUUID().toString(),topic,payload,System.currentTimeMillis());// 持久化消息messageStore.save(message);// 入队try{queue.put(message);System.out.println("[Queue-"+name+"] Sent message: "+message.id);}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}/** * 注册消费者 */publicvoidsubscribe(MessageConsumerconsumer){consumers.add(consumer);}/** * 启动消费者 */privatevoidstartConsumers(){for(inti=0;i4;i++){executor.submit(()-{while(!Thread.currentThread().isInterrupted()){try{Messagemessage=queue.poll(1,TimeUnit.SECONDS);if(message!=null){processMessage(message);}}catch(InterruptedExceptione){Thread.currentThread().interrupt();break;}}});}}/** * 处理消息 */privatevoidprocessMessage(Messagemessage){messageStore.updateStatus(message.id,MessageStatus.PROCESSING);booleansuccess=false;ExceptionlastException=null;for(MessageConsumerconsumer:consumers){if(consumer.getTopic().equals(message.topic)){try{consumer.consume(message);success=true;}catch(Exceptione){lastException=e;success=false;break;}}}if(success){messageStore.updateStatus(message.id,MessageStatus.COMPLETED);System.out.println("[Queue-"+name+"] Message completed: "+message.id);}else{handleFailedMessage(message,lastException);}}/** * 处理失败消息 */privatevoidhandleFailedMessage(Messagemessage,Exceptione){message.retryCount++;if(message.retryCount=message.maxRetries){// 超过最大重试次数,标记为失败messageStore.updateStatus(message.id,MessageStatus.FAILED);System.err.println("[Queue-"+name+"] Message failed after "+message.retryCount+" retries: "+message.id);}else{// 加入重试队列messageStore.updateStatus(message.id,MessageStatus.RETRYING);System.out.println("[Queue-"+name+"] Message will retry: "+message.id+" (attempt "+message.retryCount+")");}}/** * 启动重试任务 */privatevoidstartRetryTask(){retryScheduler.scheduleAtFixedRate(()-{ListMessageretryMessages=messageStore.findRetryMessages();for(Messagemessage:retryMessages){try{queue.put(message);}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}},5,5,TimeUnit.SECONDS);}/** * 关闭队列 */publicvoidshutdown(){executor.shutdown();retryScheduler.shutdown();}// 内部类publicstaticclassMessage{finalStringid;finalStringtopic;finalObjectpayload;finallongtimestamp;volatileintretryCount;finalintmaxRetries;volatileMessageStatusstatus;Message(Stringid,Stringtopic,Objectpayload,longtimestamp){this.id=id;this.topic=topic;this.payload=payload;this.timestamp=timestamp;this.retryCount=0;this.maxRetries=3;this.status=MessageStatus.PENDING;}}publicinterfaceMessageConsumer{StringgetTopic();voidconsume(Messagemessage)throwsException;}/** * 消息存储 */staticclassMessageStore{privatefinalMapString,Messagemessages=newConcurrentHashMap();voidsave(Messagemessage){messages.put(message.id,message);}voidupdateStatus(Stringid,MessageStatusstatus){Messagemessage=messages.get(id);if(message!=null){message.status=status;}}ListMessagefindRetryMessages(){returnmessages.values().stream().filter(m-m.status==MessageStatus.RETRYING).toList();}}}/** * 基于消息队列的最终一致性数据同步 */publicclassEventualConsistencySync{privatefinalEventualConsistencyMessageQueuemessageQueue;publicEventualConsistencySync(){this.messageQueue=newEventualConsistencyMessageQueue("data-sync");}/** * 同步数据变更 */publicvoidsyncDataChange(Stringtable,Stringoperation,MapString,Objectdata){DataChangeEventevent=newDataChangeEvent(table,operation,data);messageQueue.send("data-change",event);}/** * 注册数据同步处理器 */publicvoidregisterSyncHandler(DataSyncHandlerhandler){messageQueue.subscribe(newEventualConsistencyMessageQueue.MessageConsumer(){@OverridepublicStringgetTopic(){return"data-change";}@Overridepublicvoidconsume(EventualConsistencyMessageQueue.Messagemessage)throwsException{DataChangeEventevent=(DataChangeEvent)message.payload;handler.handleSync(event);}});}staticclassDataChangeEvent{finalStringtable;finalStringoperation;finalMapString,Objectdata;finallongtimestamp;DataChangeEvent(Stringtable,Stringoperation,MapString,Objectdata){this.table=table;this.operation=operation;this.data=data;this.timestamp=System.currentTimeMillis();}}interfaceDataSyncHandler{voidhandleSync(DataChangeEventevent)throwsException;}}// 使用示例publicclassEventualConsistencyDemo{publicstaticvoidmain(String[]args)throwsInterruptedException{EventualConsistencySyncsync=newEventualConsistencySync();// 注册同步处理器sync.registerSyncHandler(event-{System.out.println("Syncing "+event.operation+" on "+event.table+": "+event.data);// 模拟同步到其他数据库});// 发送数据变更MapString,ObjectuserData=newHashMap();userData.put("id","user-001");userData.put("name","张三");userData.put("email","zhangsan@example.com");sync.syncDataChange("users","INSERT",userData);// 更新操作userData.put("name","李四");sync.syncDataChange("users","UPDATE",userData);Thread.sleep(2000);}}2.2 事件溯源(Event Sourcing)原理说明事件溯源通过存储状态变更事件来记录数据:不存储当前状态,而是存储所有变更事件当前状态通过重放事件计算得出保证事件的不可变性和顺序性Java 实现importjava.util.*;importjava.util.concurrent.*;importjava.util.stream.*;/** * 事件存储 */publicclassEventStore{privatefinalMapString,ListEventstreams;privatefinalListEventallEvents;privatefinalListEventSubscribersubscribers;publicEventStore(){this.streams=newConcurrentHashMap();this.allEvents=newCopyOnWriteArrayList();this.subscribers=newCopyOnWriteArrayList();}/** * 追加事件到流 */publicvoidappend(StringstreamId,Eventevent){// 获取当前版本ListEventstream=streams.computeIfAbsent(streamId,k-newCopyOnWriteArrayList());longexpectedVersion=stream.size();// 乐观锁检查if(event.expectedVersion!=expectedVersion){thrownewConcurrentModificationException("Expected version "+expectedVersion+" but got "+event.expectedVersion);

更多文章