本文所涉及的注释源码:bigcoder84/dledger
Raft 协议主要包含两个部分:Leader选举和日志复制。
前面我们在 Raft协议深度解析:RocketMQ中的自动Leader选举与故障转移 一文中已经详细介绍了DLedger如何实现Leader选举的,而本文主要聚焦于Leader选举完成后的日志复制的过程。
一. RocketMQ DLedger 存储实现 说起日志的复制,就必须要从日志存储实现说起,它约束着Raft每一个结点如何存储数据。下面先介绍一次Raft存储的核心实现类:
1.1 存储实现核心类
DLedgerStore:存储抽象类,该类有如下核心抽象方法:
getMemberState: 获取节点状态机
appendAsLeader:向主节点追加日志(数据)
appendAsFollower:向从节点广播日志(数据)
get:根据日志下标查找日志
getLedgerEndTerm:获取Leader节点当前最大的投票轮次
getLedgerEndIndex:获取Leader节点下一条日志写入的日志序号
truncate:删除日志
getFirstLogOfTargetTerm:从endIndex开始,向前追溯targetTerm任期的第一个日志
updateLedgerEndIndexAndTerm:更新 Leader 节点维护的ledgerEndIndex和ledgerEndTerm
startup:启动存储管理器
shutdown:关闭存储管理器
DLedgerMemoryStore:DLedger基于内存实现的日志存储实现类。
DLedgerMmapFileStore:基于文件内存映射机制的存储实现,核心属性如下:
ledgerBeforeBeginIndex:日志的起始序号
ledgerBeforeBeginTerm:日志起始的投票轮次
ledgerEndIndex:下一条日志下标(序号)
ledgerEndTerm:当前最大的投票轮次
DLedgerConfig:DLedger的配置信息
RocketMQ DLedger的上述核心类与RocketMQ存储模块的对应关系
RocketMQ存储模块
DLedger存储模块
描述
MappedFile
DefaultMmapFile
表示一个物理文件
MappedFileQueue
MmapFileList
表示逻辑上连续多个物理文件
DefaultMessageStore
DLedgerMmapFileStore
存储实现类
CommitLog#FlushCommitLogService
DLedgerMmapFileStore#FlushDataService
实现文件刷盘机制
DefaultMessageStore#CleanCommitLogService
DLedgerMmapFileStore#CleanSpaceService
清理过期文件
1.2 数据存储协议 RocketMQ DLedger数据存储协议如下图:
magic:魔数,4字节。
size:条目总长度,包含header(协议头)+body(消息体),占4字节。
index:当前条目的日志序号,占8字节。
term:条目所属的投票轮次,占8字节。
pos:条目的物理偏移量,类似CommitLog文件的物理偏移量,占8字节。
channel:保留字段,当前版本未使用,占4字节。
chain crc:当前版本未使用,占4字节。
body crc:消息体的CRC校验和,用来区分数据是否损坏,占4字节。
body size:用来存储消息体的长度,占4个字节。
body:消息体的内容。
RocketMQ DLedger 中日志实例用 DLedgerEntry
表示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class DLedgerEntry { public final static int POS_OFFSET = 4 + 4 + 8 + 8 ; public final static int HEADER_SIZE = POS_OFFSET + 8 + 4 + 4 + 4 ; public final static int BODY_OFFSET = HEADER_SIZE + 4 ; private int magic = DLedgerEntryType.NORMAL.getMagic(); private int size; private long index; private long term; private long pos; private int channel; private int chainCrc; private int bodyCrc; private byte [] body; }
解码流程参考:io.openmessaging.storage.dledger.entry.DLedgerEntryCoder#decode(java.nio.ByteBuffer, boolean):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static DLedgerEntry decode (ByteBuffer byteBuffer, boolean readBody) { DLedgerEntry entry = new DLedgerEntry (); entry.setMagic(byteBuffer.getInt()); entry.setSize(byteBuffer.getInt()); entry.setIndex(byteBuffer.getLong()); entry.setTerm(byteBuffer.getLong()); entry.setPos(byteBuffer.getLong()); entry.setChannel(byteBuffer.getInt()); entry.setChainCrc(byteBuffer.getInt()); entry.setBodyCrc(byteBuffer.getInt()); int bodySize = byteBuffer.getInt(); if (readBody && bodySize < entry.getSize()) { byte [] body = new byte [bodySize]; byteBuffer.get(body); entry.setBody(body); } return entry; }
1.3 索引存储协议 RocketMQ DLedger索引的存储协议如下图:
存储协议中各个字段的含义如下。
magic:魔数。
pos:条目的物理偏移量,类似CommitLog文件的物理偏移量,占8字节。
size:条目长度。
index:当前条目的日志序号,占8字节。
term:条目所属的投票轮次,占8字节。
索引条目采用定长的方式进行的存储,目的是为了加速日志条目的查找的速度 。
我们假设一种场景需要查询 index 下标对应的日志数据,由于日志条目时变长的,如果没有索引文件,我们需要在索引文件上一个一个条目的去遍历查找,这样的效率很低。
有了索引文件后,我们可以通过 index * 32
找到Index所对应的索引存储的物理偏移量,这样我们可以轻松获取日志索引中存储的索引所处理的物理偏移量pos,然后通过日志的物理偏移量就可以直接获取到日志记录了。
RocketMQ DLedger 中索引实例用 DLedgerIndexEntry
表示:
1 2 3 4 5 6 7 8 9 10 11 12 public class DLedgerIndexEntry { private int magic; private long position; private int size; private long index; private long term; }
解码流程参考:io.openmessaging.storage.dledger.entry.DLedgerEntryCoder#decodeIndex:
1 2 3 4 5 6 7 8 9 public static DLedgerIndexEntry decodeIndex (ByteBuffer byteBuffer) { DLedgerIndexEntry indexEntry = new DLedgerIndexEntry (); indexEntry.setMagic(byteBuffer.getInt()); indexEntry.setPosition(byteBuffer.getLong()); indexEntry.setSize(byteBuffer.getInt()); indexEntry.setIndex(byteBuffer.getLong()); indexEntry.setTerm(byteBuffer.getLong()); return indexEntry; }
二. RocketMQ DLedger主从切换之日志追加 Raft协议负责组主要包含两个步骤:Leader选举和日志复制。使用Raft协议的集群在向外提供服务之前需要先在集群中进行Leader选举,推举一个主节点接受客户端的读写请求。Raft协议负责组的其他节点只需要复制数据,不对外提供服务。当Leader节点接受客户端的写请求后,先将数据存储在Leader节点上,然后将日志数据广播给它的从节点,只有超过半数的节点都成功存储了该日志,Leader节点才会向客户端返回写入成功。
2.1 日志追加流程概述 Leader节点处理日志写入请求的入口为DLedgerServer的handleAppend()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Override public CompletableFuture<AppendEntryResponse> handleAppend (AppendEntryRequest request) throws IOException { try { PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s" , request.getRemoteId(), memberState.getSelfId()); PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s" , request.getGroup(), memberState.getGroup()); PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER); PreConditions.check(memberState.getTransferee() == null , DLedgerResponseCode.LEADER_TRANSFERRING); long currTerm = memberState.currTerm(); if (dLedgerEntryPusher.isPendingFull(currTerm)) { AppendEntryResponse appendEntryResponse = new AppendEntryResponse (); appendEntryResponse.setGroup(memberState.getGroup()); appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode()); appendEntryResponse.setTerm(currTerm); appendEntryResponse.setLeaderId(memberState.getSelfId()); return AppendFuture.newCompletedFuture(-1 , appendEntryResponse); } AppendFuture<AppendEntryResponse> future; if (request instanceof BatchAppendEntryRequest) { BatchAppendEntryRequest batchRequest = (BatchAppendEntryRequest) request; if (batchRequest.getBatchMsgs() == null || batchRequest.getBatchMsgs().isEmpty()) { throw new DLedgerException (DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" + " with empty bodys" ); } future = appendAsLeader(batchRequest.getBatchMsgs()); } else { future = appendAsLeader(request.getBody()); } return future; } catch (DLedgerException e) { LOGGER.error("[{}][HandleAppend] failed" , memberState.getSelfId(), e); AppendEntryResponse response = new AppendEntryResponse (); response.copyBaseInfo(request); response.setCode(e.getCode().getCode()); response.setLeaderId(memberState.getLeaderId()); return AppendFuture.newCompletedFuture(-1 , response); } }
第一步:验证请求的合理性。
如果请求目的节点不是当前节点,返回错误。
如果请求的集群不是当前节点所在的集群,则返回错误。
如果当前节点不是leader节点,则抛出异常。
第二步:消息的追加是一个异步过程,会将内容暂存到内存队列中。首先检查内存队列是否已满,如果已满则向客户端返回错误码,表示本次消息发送失败。如果队列未满,则先将数据追加到Leader节点的PageCache中,然后转发给Leader的所有从节点,最后Leader节点等待从节点日志复制的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public AppendFuture<AppendEntryResponse> appendAsLeader (List<byte []> bodies) throws DLedgerException { PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER); if (bodies.size() == 0 ) { return AppendFuture.newCompletedFuture(-1 , null ); } AppendFuture<AppendEntryResponse> future; StopWatch watch = StopWatch.createStarted(); DLedgerEntry entry = new DLedgerEntry (); long totalBytes = 0 ; if (bodies.size() > 1 ) { long [] positions = new long [bodies.size()]; for (int i = 0 ; i < bodies.size(); i++) { totalBytes += bodies.get(i).length; DLedgerEntry dLedgerEntry = new DLedgerEntry (); dLedgerEntry.setBody(bodies.get(i)); entry = dLedgerStore.appendAsLeader(dLedgerEntry); positions[i] = entry.getPos(); } future = new BatchAppendFuture <>(positions); } else { DLedgerEntry dLedgerEntry = new DLedgerEntry (); totalBytes += bodies.get(0 ).length; dLedgerEntry.setBody(bodies.get(0 )); entry = dLedgerStore.appendAsLeader(dLedgerEntry); future = new AppendFuture <>(); } final DLedgerEntry finalResEntry = entry; final AppendFuture<AppendEntryResponse> finalFuture = future; final long totalBytesFinal = totalBytes; finalFuture.handle((r, e) -> { if (e == null && r.getCode() == DLedgerResponseCode.SUCCESS.getCode()) { Attributes attributes = DLedgerMetricsManager.newAttributesBuilder().build(); DLedgerMetricsManager.appendEntryLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributes); DLedgerMetricsManager.appendEntryBatchCount.record(bodies.size(), attributes); DLedgerMetricsManager.appendEntryBatchBytes.record(totalBytesFinal, attributes); } return r; }); Closure closure = new Closure () { @Override public void done (Status status) { AppendEntryResponse response = new AppendEntryResponse (); response.setGroup(DLedgerServer.this .memberState.getGroup()); response.setTerm(DLedgerServer.this .memberState.currTerm()); response.setIndex(finalResEntry.getIndex()); response.setLeaderId(DLedgerServer.this .memberState.getLeaderId()); response.setPos(finalResEntry.getPos()); response.setCode(status.code.getCode()); finalFuture.complete(response); } }; dLedgerEntryPusher.appendClosure(closure, finalResEntry.getTerm(), finalResEntry.getIndex()); return finalFuture; }
日志追加时会有两种模式:单条追加和批量追加。appendAsLeader
方法主要将两种模式的追加进行统一封装,最后调用 DLedgerStore#appendAsLeader
将日志存储到指定位置。
2.2 Leader节点日志存储 Leader节点的数据存储主要由DLedgerStore的appendAsLeader() 方法实现。DLedger提供了基于内存和基于文件两种持久化实现,本节重点关注基于文件的存储实现方法,其实现类为 DLedgerMmapFileStore。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Override public DLedgerEntry appendAsLeader (DLedgerEntry entry) { PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER); PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL); ByteBuffer dataBuffer = localEntryBuffer.get(); ByteBuffer indexBuffer = localIndexBuffer.get(); DLedgerEntryCoder.encode(entry, dataBuffer); int entrySize = dataBuffer.remaining(); synchronized (memberState) { PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null ); PreConditions.check(memberState.getTransferee() == null , DLedgerResponseCode.LEADER_TRANSFERRING, null ); long nextIndex = ledgerEndIndex + 1 ; entry.setIndex(nextIndex); entry.setTerm(memberState.currTerm()); DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), entry.getMagic()); long prePos = dataFileList.preAppend(dataBuffer.remaining()); entry.setPos(prePos); PreConditions.check(prePos != -1 , DLedgerResponseCode.DISK_ERROR, null ); DLedgerEntryCoder.setPos(dataBuffer, prePos); for (AppendHook writeHook : appendHooks) { writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET); } long dataPos = dataFileList.append(dataBuffer.array(), 0 , dataBuffer.remaining()); PreConditions.check(dataPos != -1 , DLedgerResponseCode.DISK_ERROR, null ); PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null ); DLedgerEntryCoder.encodeIndex(dataPos, entrySize, DLedgerEntryType.NORMAL.getMagic(), nextIndex, memberState.currTerm(), indexBuffer); long indexPos = indexFileList.append(indexBuffer.array(), 0 , indexBuffer.remaining(), false ); PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null ); if (LOGGER.isDebugEnabled()) { LOGGER.info("[{}] Append as Leader {} {}" , memberState.getSelfId(), entry.getIndex(), entry.getBody().length); } ledgerEndIndex++; ledgerEndTerm = memberState.currTerm(); updateLedgerEndIndexAndTerm(); return entry; } }
在该方法中,主要执行以下逻辑:
检查Leader状态 :首先,方法检查当前节点是否是集群中的Leader节点,如果不是则抛出错误。
检查磁盘空间 :接着,检查磁盘是否已满,如果已满则抛出错误。
获取缓冲区 :从本地线程变量中获取用于存储数据和索引的ByteBuffer,数据缓冲区大小为4MB,索引缓冲区大小为64B。
编码日志条目 :将传入的日志条目进行编码,并写入数据ByteBuffer中。
设置日志条目信息 :在同步块中,再次检查Leader状态,确保没有发生领导者转移。然后为日志条目设置索引、投票轮次等信息。
计算物理偏移量 :计算日志条目的起始物理偏移量,并设置到日志条目中。
执行写入钩子 :如果有注册的写入钩子(AppendHook),则执行这些钩子。
追加数据到PageCache :将编码后的数据追加到PageCache中,需要注意此时数据尚未写入硬盘。
编码索引信息 :将索引信息编码,包括数据位置、日志大小、日志类型、索引和投票轮次。
写入索引到PageCache :将索引信息追加到索引文件列表的PageCache中。
日志和索引位置检查 :检查索引写入的位置是否正确。
更新日志存储状态 :更新日志的结束索引和投票轮次,并将这些信息持久化。
返回日志条目 :最后,方法返回追加的日志条目。
日志追加到Leader节点的PageCache后,将异步转发给它所有的从节点,然后等待各从节点的反馈,并对这些反馈结果进行仲裁,只有集群内超过半数的节点存储了该条日志,Leader节点才可以向客户端返回日志写入成功,日志的复制将在后面详细介绍,在介绍Leader节点如何等待从节点复制、响应ACK之前,我们再介绍一下与存储相关的两个核心方法:DataFileList的preAppend()与append()方法。
2.2.1 DataFileList#preAppend DataFileList的preAppend()方法为预写入,主要是根据当前日志的长度计算该条日志的物理偏移量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public long preAppend (int len, boolean useBlank) { MmapFile mappedFile = getLastMappedFile(); if (null == mappedFile || mappedFile.isFull()) { mappedFile = getLastMappedFile(0 ); } if (null == mappedFile) { LOGGER.error("Create mapped file for {}" , storePath); return -1 ; } int blank = useBlank ? MIN_BLANK_LEN : 0 ; if (len + blank > mappedFile.getFileSize() - mappedFile.getWrotePosition()) { if (blank < MIN_BLANK_LEN) { LOGGER.error("Blank {} should ge {}" , blank, MIN_BLANK_LEN); return -1 ; } else { ByteBuffer byteBuffer = ByteBuffer.allocate(mappedFile.getFileSize() - mappedFile.getWrotePosition()); byteBuffer.putInt(BLANK_MAGIC_CODE); byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); if (mappedFile.appendMessage(byteBuffer.array())) { mappedFile.setWrotePosition(mappedFile.getFileSize()); } else { LOGGER.error("Append blank error for {}" , storePath); return -1 ; } mappedFile = getLastMappedFile(0 ); if (null == mappedFile) { LOGGER.error("Create mapped file for {}" , storePath); return -1 ; } } } return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); }
如果当前文件剩余的空间少于MIN_BLANK_LEN,将返回-1,表示存储错误,需要人工干预,正常情况下是不会出现这种情况的,因为写入一条消息之前会确保能容纳待写入的消息,并且还需要空余 MIN_BLANK_LEN 个字节,因为一个独立的物理文件,默认会填充文件结尾魔数(BLANK_MAGIC_CODE)。
如果空余空间大于MIN_BLANK_LEN,会首先写入文件结尾魔数(4字节),然后将该文件剩余的字节数写入接下来的4个字节,表示该文件全部用完。然后创建一个新的文件,并返回新文件的起始位置,表示这条日志写入新文件起始位置。
2.2.2 DataFileList#append 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public long append (byte [] data, int pos, int len, boolean useBlank) { if (preAppend(len, useBlank) == -1 ) { return -1 ; } MmapFile mappedFile = getLastMappedFile(); long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); if (!mappedFile.appendMessage(data, pos, len)) { LOGGER.error("Append error for {}" , storePath); return -1 ; } return currPosition; } @Override public boolean appendMessage (final byte [] data, final int offset, final int length) { int currentPos = this .wrotePosition; if ((currentPos + length) <= this .fileSize) { ByteBuffer byteBuffer = this .mappedByteBuffer.slice(); byteBuffer.position(currentPos); byteBuffer.put(data, offset, length); WROTE_POSITION_UPDATER.addAndGet(this , length); return true ; } return false ; }
三. RocketMQ DLedger 主从切换之日志复制 Leader节点首先将客户端发送过来的日志按照指定格式存储在Leader节点上,但此时并不会向客户端返回写入成功,而是需要将日志转发给它的所有从节点,只有超过半数的节点都存储了该条日志,Leader节点才会向客户端返回日志写入成功。
日志的复制主要包括如下3个步骤:
Leader节点将日志推送到从节点。
从节点收到Leader节点推送的日志并存储,然后向Leader节点汇报日志复制结果。
Leader节点对日志复制进行仲裁,如果成功存储该条日志的节点超过半数,则向客户端返回写入成功。
3.1 日志复制设计理念 3.1.1 日志编号 为了方便对日志进行管理与辨别,Raft协议对每条日志进行编号,每一条消息到达主节点时会生成一个全局唯一的递增号,这样可以根据日志序号来快速判断日志中的数据在主从复制过程中是否保持一致,在 DLedger 的实现中对应 DLedgerMemoryStore 中的 ledgerBeforeBeginIndex、ledgerEndIndex,分别表示当前节点最小的日志序号与最大的日志序号,下一条日志的序号为ledgerEndIndex+1。
3.1.2 日志追加与提交机制 Leader节点收到客户端的数据写入请求后,先通过解析请求提取数据,构建日志对象,并生成日志序号,用seq表示。然后将日志存储到Leader节点内,将日志广播(推送)给其所有从节点。这个过程存在网络延时,如果客户端向主节点查询日志序号为seq的日志,日志已经存储在Leader节点中了,直接返回给客户端显然是有问题的,这是因为网络等原因导致从节点未能正常存储该日志,导致数据不一致,该如何避免出现这个问题呢?
为了解决上述问题,DLedger引入了已提交指针(committedIndex)。当主节点收到客户端的请求时,先将数据进行存储,此时数据是未提交的,这一过程被称为日志追加(已在第四节中介绍了),此时该条日志对客户端不可见,只有当集群内超过半数的节点都将日志追加完成后,才会更新committedIndex指针,该条日志才会向客户端返回写入成功。一条日志被提交成功的充分必要条件是已超过集群内半数节点成功追加日志。
3.1.3 保证日志一致性 一个拥有3个节点的Raft集群,只需要主节点和其中一个从节点成功追加日志,就可以认为是成功提交了日志,客户端即可通过主节点访问该日志。因为部分数据存在延迟,所以在DLedger的实现中,读写请求都将由Leader节点负责。那么落后的从节点如何再次跟上集群的进度呢?
DLedger的实现思路是按照日志序号向从节点源源不断地转发日志,从节点接收日志后,将这些待追加的数据放入一个待写队列。从节点并不是从挂起队列中处理一个个追加请求的,而是先查找从节点当前已追加的最大日志序号,用ledgerEndIndex表示,然后尝试追加ledgerEndIndex+1的日志,根据日志序号从待写队列中查找日志,如果该队列不为空,并且待写日志不在待写队列中,说明从节点未接收到这条日志,发生了数据缺失。从节点在响应主节点的append请求时会告知数据不一致,然后主节点的日志转发线程状态变更为COMPARE,向该从节点发送COMPARE命令,用来比较主从节点的数据差异。根据比较出的差异重新从主节点同步数据或删除从节点上多余的数据,最终达到一致。同时,主节点也会对推送超时的消息发起重推,尽最大可能帮助从节点及时更新到主节点的数据。
3.2 日志复制类设计体系
DledgerEntryPusher 是 DLedger 日志转发与处理核心类,该类构建如下3个对象,每一个对象对应一个线程,复制处理不同的事情:
EntryDispatcher:日志转发线程,当前节点为主节点时追加。
QuorumAckChecker:日志追加ACK投票仲裁线程,当前节点为主节点时激活。
EntryHandler:日志接收处理线程,当节点为从节点时激活。
DLedger的日志复制使用推送模式,其核心入口为DLedgerEntryPusher,下面逐一介绍上述核心类及核心属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class DLedgerEntryPusher { private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerEntryPusher.class); private final DLedgerConfig dLedgerConfig; private final DLedgerStore dLedgerStore; private final MemberState memberState; private final DLedgerRpcService dLedgerRpcService; private final Map<Long, ConcurrentMap<String, Long>> peerWaterMarksByTerm = new ConcurrentHashMap <>(); private final Map<Long, ConcurrentMap<Long, Closure>> pendingClosure = new ConcurrentHashMap <>(); private final EntryHandler entryHandler; private final QuorumAckChecker quorumAckChecker; private final Map<String, EntryDispatcher> dispatcherMap = new HashMap <>(); private final String selfId; private StateMachineCaller fsmCaller; }
通常了解一个类需要从其构造函数开始,我们先看一下DLedgerEntryPusher的构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public DLedgerEntryPusher (DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore, DLedgerRpcService dLedgerRpcService) { this .dLedgerConfig = dLedgerConfig; this .selfId = this .dLedgerConfig.getSelfId(); this .memberState = memberState; this .dLedgerStore = dLedgerStore; this .dLedgerRpcService = dLedgerRpcService; for (String peer : memberState.getPeerMap().keySet()) { if (!peer.equals(memberState.getSelfId())) { dispatcherMap.put(peer, new EntryDispatcher (peer, LOGGER)); } } this .entryHandler = new EntryHandler (LOGGER); this .quorumAckChecker = new QuorumAckChecker (LOGGER); }
这里主要是根据集群的配置,为每一个从节点创建一个 EntryDispatcher 转发线程,即每一个从节点的日志转发相互不干扰。
接下来我们看一下 startup
方法:
1 2 3 4 5 6 7 8 9 10 11 public void startup () { entryHandler.start(); quorumAckChecker.start(); for (EntryDispatcher dispatcher : dispatcherMap.values()) { dispatcher.start(); } }
在 EntryDispatcher 启动时会启动三类线程:
EntryDispatcher:日志请求转发器,负责向从节点转发日志,主节点为每一个从节点构建一个 EntryDispatcher 线程,每个从节点独立发送互不干扰;
QuorumAckChecker:日志追加ACK投票仲裁线程,用于判断日志是否可提交,当前节点为主节点时激活;
EntryHandler:从节点上开启的线程,用于接收主节点的 push 请求(append、commit);
需要注意的是由于节点身份的不同所生效的线程类型也并不相同,你如如果是Follower节点,那就只有 EntryHandler 现成生效,没有生效的线程会间隔1ms进行空转,这样做的目的是当节点身份发生变化时能及时反应。
3.3 日志转发(Leader向Follower发送日志) 3.3.1 EntryDispatcher核心属性 日志转发由 EntryDispatcher 实现,EntryDispatcher 有如下核心属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private class EntryDispatcher extends ShutdownAbleThread { private final AtomicReference<EntryDispatcherState> type = new AtomicReference <>(EntryDispatcherState.COMPARE); private long lastPushCommitTimeMs = -1 ; private final String peerId; private long writeIndex = DLedgerEntryPusher.this .dLedgerStore.getLedgerEndIndex() + 1 ; private long matchIndex = -1 ; private final int maxPendingSize = 1000 ; private long term = -1 ; private String leaderId = null ; private long lastCheckLeakTimeMs = System.currentTimeMillis(); private final ConcurrentMap<Long, Pair<Long, Integer>> pendingMap = new ConcurrentHashMap <>(); private final PushEntryRequest batchAppendEntryRequest = new PushEntryRequest (); private long lastAppendEntryRequestSendTimeMs = -1 ; private final Quota quota = new Quota (dLedgerConfig.getPeerPushQuota()); ......... }
3.3.2 推送请求类型 在详细介绍日志转发流程之前,先介绍一下主节点向从节点发送推送请求的类型,在 PushEntryRequest.Type 中定义,可选值如下:
1 2 3 4 5 6 7 8 public enum Type { APPEND, COMMIT, COMPARE, TRUNCATE, INSTALL_SNAPSHOT }
APPEND:将日志条目追加到从节点。
COMMIT:通常Leader节点会将提交的索引附加到append请求, 如果append请求很少且分散,Leader节点将发送一个单独的请求来通 知从节点提交索引。
COMPARE:如果Leader节点发生变化,新的Leader节点需要与它的从节点日志条目进行比较,以便截断从节点多余的数据。
TRUNCATE:如果Leader节点通过索引完成日志对比后,发现从节点存在多余的数据(未提交的数据),则 Leader 节点将发送 TRUNCATE给它的从节点,删除多余的数据,实现主从节点数据一致性。
INSTALL_SNAPSHOT:将从节点数据存入快照。
3.3.3 Leader节点日志转发入口 EntryDispatcher 是一个线程类,继承自 ShutdownAbleThread,其 run() 方法会循环执行 doWork() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override public void doWork () { try { if (checkNotLeaderAndFreshState()) { waitForRunning(1 ); return ; } switch (type.get()) { case COMPARE: doCompare(); break ; case TRUNCATE: doTruncate(); break ; case APPEND: doAppend(); break ; case INSTALL_SNAPSHOT: doInstallSnapshot(); break ; case COMMIT: doCommit(); break ; } waitForRunning(1 ); } catch (Throwable t) { DLedgerEntryPusher.LOGGER.error("[Push-{}]Error in {} writeIndex={} matchIndex={}" , peerId, getName(), writeIndex, matchIndex, t); changeState(EntryDispatcherState.COMPARE); DLedgerUtils.sleep(500 ); } }
该方法主要完成如下两件事。
检查当前节点的状态,确定当前节点状态是否可以发送 append、compare、truncate 请求。
根据当前转发器的状态向从节点发送 append、compare、truncate 请求。
checkAndFreshState()方法不只是简单地检测一下状态,而是会根据运行状态改变日志转发器的状态,从而驱动转发器是发送 append 请求还是发送compare请求,下面详细看一下该方法的实现细节:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private boolean checkNotLeaderAndFreshState () { if (!memberState.isLeader()) { return true ; } if (term != memberState.currTerm() || leaderId == null || !leaderId.equals(memberState.getLeaderId())) { synchronized (memberState) { if (!memberState.isLeader()) { return true ; } PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN); logger.info("[Push-{}->{}]Update term: {} and leaderId: {} to new term: {}, new leaderId: {}" , selfId, peerId, term, leaderId, memberState.currTerm(), memberState.getLeaderId()); term = memberState.currTerm(); leaderId = memberState.getSelfId(); changeState(EntryDispatcherState.COMPARE); } } return false ; }
如果当前节点的状态不是Leader则直接返回;如果日志转发器(EntryDispatcher)的投票轮次为空或与状态机的投票轮次不相等,这种情况通常是由于集群触发了重新选举,当前节点刚被选举成 Leader节点,此时需要将日志转发器的term、leaderId与状态机同步,然后将同步模式改为Compare,目的是让新上任的Leader节点寻找自己与Follower节点的共识点在哪,说白了就是找到其他 Follower 节点多余未提交的的日志Index,为后续 truncate 请求做铺垫。
changeState改变日志转发器的状态,该方法非常重要,我们来看一下状态转换过程中需要处理的核心逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private synchronized void changeState (EntryDispatcherState target) { logger.info("[Push-{}]Change state from {} to {}, matchIndex: {}, writeIndex: {}" , peerId, type.get(), target, matchIndex, writeIndex); switch (target) { case APPEND: resetBatchAppendEntryRequest(); break ; case COMPARE: if (this .type.compareAndSet(EntryDispatcherState.APPEND, EntryDispatcherState.COMPARE)) { writeIndex = dLedgerStore.getLedgerEndIndex() + 1 ; pendingMap.clear(); } break ; default : break ; } type.set(target); }
3.3.4 Leader节点发送Compare请求(doCompare) 日志转发器EntryDispatcher的初始状态为 COMPARE,当一个节点被选举为Leader后,日志转发器的状态同样会先设置为COMPARE,Leader节点先向从节点发送该请求的目的是比较主、从节点之间数据的差异,以此确保发送主从切换时不会丢失数据,并且重新确定待转发的日志序号。
通过EntryDispatcher的doWork()方法可知,如果节点状态为COMPARE,会调用 doCompare() 方法。doCompare()方法内部代码都是while(true)包裹,在查看其代码时注意其退出条件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 private void doCompare () throws Exception { while (true ) { if (checkNotLeaderAndFreshState()) { break ; } if (this .type.get() != EntryDispatcherState.COMPARE) { break ; } if (dLedgerStore.getLedgerEndIndex() == -1 ) { break ; } PushEntryRequest request; long compareIndex = writeIndex - 1 ; long compareTerm = -1 ; if (compareIndex < dLedgerStore.getLedgerBeforeBeginIndex()) { changeState(EntryDispatcherState.INSTALL_SNAPSHOT); return ; } else if (compareIndex == dLedgerStore.getLedgerBeforeBeginIndex()) { compareTerm = dLedgerStore.getLedgerBeforeBeginTerm(); request = buildCompareOrTruncatePushRequest(compareTerm, compareIndex, PushEntryRequest.Type.COMPARE); } else { DLedgerEntry entry = dLedgerStore.get(compareIndex); PreConditions.check(entry != null , DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d" , compareIndex); compareTerm = entry.getTerm(); request = buildCompareOrTruncatePushRequest(compareTerm, entry.getIndex(), PushEntryRequest.Type.COMPARE); } CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request); PushEntryResponse response = responseFuture.get(3 , TimeUnit.SECONDS); PreConditions.check(response != null , DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d" , compareIndex); PreConditions.check(response.getCode() == DLedgerResponseCode.INCONSISTENT_STATE.getCode() || response.getCode() == DLedgerResponseCode.SUCCESS.getCode() , DLedgerResponseCode.valueOf(response.getCode()), "compareIndex=%d" , compareIndex); if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) { matchIndex = compareIndex; updatePeerWaterMark(compareTerm, peerId, matchIndex); changeState(EntryDispatcherState.TRUNCATE); return ; } if (response.getXTerm() != -1 ) { writeIndex = response.getXIndex(); } else { writeIndex = response.getEndIndex() + 1 ; } } }
在该方法中首先是对基本状态做了一些校验:
如果当前节点的状态不是 Leader 则退出循环;
判断请求类型是否为 Compare,如果不是则退出循环;
ledgerEndIndex== -1 表示Leader中没有存储数据,是一个新的集群,所以无需比较主从是否一致;
然后构建当前正在比对的compareIndex所对应的日志的所处轮次信息,将compareIndex对应的存储轮次发送给Follower节点后,Follower节点会对比自己与Leader在相同的Index上的存储轮次信息是否相同:
如果相同则证明此条日志与Leader节点保持一致,返回SUCCESS,此时则证明找到了共识点,将状态改成truncate模式以删除从节点多余日志;
如果不同会有两种情况:
主节点发送的index在从节点上还不存在,这样从节点会将自己的末尾指针返回给Leader,Leader会从Follower节点的末尾指针重新开始对比;
主节点发送的index在从节点上存在,但是所处的轮次并不一致,证明从节点这条日志是需要被删除,Follower节点会找到Leader对比轮次所在的最后一个日志索引并返回给Leader,Leader会从这个索引位置继续开始对比,直到找对最终的共识点。
3.3.5 Leader节点发送truncate请求(doTruncate) Leader节点在发送compare请求后,得知与从节点的数据存在差异,将向从节点发送truncate请求,指示从节点应该将truncateIndex 及以后的日志删除:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void doTruncate () throws Exception { PreConditions.check(type.get() == EntryDispatcherState.TRUNCATE, DLedgerResponseCode.UNKNOWN); long truncateIndex = matchIndex + 1 ; logger.info("[Push-{}]Will push data to truncate truncateIndex={}" , peerId, truncateIndex); PushEntryRequest truncateRequest = buildCompareOrTruncatePushRequest(-1 , truncateIndex, PushEntryRequest.Type.TRUNCATE); PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3 , TimeUnit.SECONDS); PreConditions.check(truncateResponse != null , DLedgerResponseCode.UNKNOWN, "truncateIndex=%d" , truncateIndex); PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d" , truncateIndex); lastPushCommitTimeMs = System.currentTimeMillis(); changeState(EntryDispatcherState.APPEND); }
该方法的实现比较简单,主节点构建truncate请求包并通过网络向从节点发送请求,从节点在收到请求后会清理多余的数据,使主从节点数据保持一致。日志转发器在处理完truncate请求后,状态将变更为APPEND,开始向从节点转发日志。
3.3.6 Leader节点向Follower节点推送日志(doAppend) Leader节点在确认主从数据一致后,开始将新的消息转发到从节点。doAppend()方法内部的逻辑被包裹在while(true)中,故在查看其代码时应注意退出条件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 private void doAppend () throws Exception { while (true ) { if (checkNotLeaderAndFreshState()) { break ; } if (type.get() != EntryDispatcherState.APPEND) { break ; } doCheckAppendResponse(); if (writeIndex > dLedgerStore.getLedgerEndIndex()) { if (this .batchAppendEntryRequest.getCount() > 0 ) { sendBatchAppendEntryRequest(); } else { doCommit(); } break ; } if (writeIndex <= dLedgerStore.getLedgerBeforeBeginIndex()) { logger.info("[Push-{}]The ledgerBeginBeginIndex={} is less than or equal to writeIndex={}" , peerId, dLedgerStore.getLedgerBeforeBeginIndex(), writeIndex); changeState(EntryDispatcherState.INSTALL_SNAPSHOT); break ; } if (pendingMap.size() >= maxPendingSize || DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 ) { long peerWaterMark = getPeerWaterMark(term, peerId); for (Map.Entry<Long, Pair<Long, Integer>> entry : pendingMap.entrySet()) { if (entry.getKey() + entry.getValue().getValue() - 1 <= peerWaterMark) { pendingMap.remove(entry.getKey()); } } lastCheckLeakTimeMs = System.currentTimeMillis(); } if (pendingMap.size() >= maxPendingSize) { doCheckAppendResponse(); break ; } long lastIndexToBeSend = doAppendInner(writeIndex); if (lastIndexToBeSend == -1 ) { break ; } writeIndex = lastIndexToBeSend + 1 ; } }
第一步:再次判断节点状态,确保当前节点是Leader节点并且日志转发器内部的状态为APPEND。
第二步:检查从节点未接收的第一个append请求是否超时,如果超时,则重推。
第三步:writeIndex表示当前已追加到从节点的日志序号。通常情况下,主节点向从节点发送append请求时会带上主节点已提交的指针,但如果append请求发送不频繁,pending请求超过其队列长度(默认为1万字节)时,会阻止数据的追加,此时有可能会出现writeIndex 大于leaderEndIndex的情况,需要单独发送commit请求,并检查 append 请求响应。
第四步:检测pendingMap(挂起的请求数量)是否发生泄露,正常来说发送给从节点的请求如果成功响应就会从pendingMap中移除,这里是一种兜底操作。获取当前节点的水位线(已成功append请求的日志序号),如果挂起请求的日志序号小于水位线,则丢弃,并记录最后一次检查的时间戳。
第五步:如果挂起的请求数仍然大于阈值,则再次检查这些请求是否超时(默认超时时间为1s),如果超时则会触发重发机制
第六步:循环同步数据至从节点,方法内部会优化,会按照配置收集一批需要发送的日志,等到到达发送阈值则一起发送,而不是一条条发送。
第七步:移动写指针。
3.3.6.1 日志推送 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private long doAppendInner (long index) throws Exception { DLedgerEntry entry = getDLedgerEntryForAppend(index); if (null == entry) { logger.error("[Push-{}]Get null entry from index={}" , peerId, index); changeState(EntryDispatcherState.INSTALL_SNAPSHOT); return -1 ; } checkQuotaAndWait(entry); batchAppendEntryRequest.addEntry(entry); if (!dLedgerConfig.isEnableBatchAppend() || batchAppendEntryRequest.getTotalSize() >= dLedgerConfig.getMaxBatchAppendSize() || DLedgerUtils.elapsed(this .lastAppendEntryRequestSendTimeMs) >= dLedgerConfig.getMaxBatchAppendIntervalMs()) { sendBatchAppendEntryRequest(); } return entry.getIndex(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 private void sendBatchAppendEntryRequest () throws Exception { batchAppendEntryRequest.setCommitIndex(memberState.getCommittedIndex()); final long firstIndex = batchAppendEntryRequest.getFirstEntryIndex(); final long lastIndex = batchAppendEntryRequest.getLastEntryIndex(); final long lastTerm = batchAppendEntryRequest.getLastEntryTerm(); final long entriesCount = batchAppendEntryRequest.getCount(); final long entriesSize = batchAppendEntryRequest.getTotalSize(); StopWatch watch = StopWatch.createStarted(); CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(batchAppendEntryRequest); pendingMap.put(firstIndex, new Pair <>(System.currentTimeMillis(), batchAppendEntryRequest.getCount())); responseFuture.whenComplete((x, ex) -> { try { PreConditions.check(ex == null , DLedgerResponseCode.UNKNOWN); DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode()); switch (responseCode) { case SUCCESS: Attributes attributes = DLedgerMetricsManager.newAttributesBuilder().put(LABEL_REMOTE_ID, this .peerId).build(); DLedgerMetricsManager.replicateEntryLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributes); DLedgerMetricsManager.replicateEntryBatchCount.record(entriesCount, attributes); DLedgerMetricsManager.replicateEntryBatchBytes.record(entriesSize, attributes); pendingMap.remove(firstIndex); if (lastIndex > matchIndex) { matchIndex = lastIndex; updatePeerWaterMark(lastTerm, peerId, matchIndex); } break ; case INCONSISTENT_STATE: logger.info("[Push-{}]Get INCONSISTENT_STATE when append entries from {} to {} when term is {}" , peerId, firstIndex, lastIndex, term); changeState(EntryDispatcherState.COMPARE); break ; default : logger.warn("[Push-{}]Get error response code {} {}" , peerId, responseCode, x.baseInfo()); break ; } } catch (Throwable t) { logger.error("Failed to deal with the callback when append request return" , t); } }); lastPushCommitTimeMs = System.currentTimeMillis(); batchAppendEntryRequest.clear(); }
3.3.6.2 日志推送的流控机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void checkQuotaAndWait (DLedgerEntry entry) { if (dLedgerStore.getLedgerEndIndex() - entry.getIndex() <= maxPendingSize) { return ; } quota.sample(entry.getSize()); if (quota.validateNow()) { long leftNow = quota.leftNow(); logger.warn("[Push-{}]Quota exhaust, will sleep {}ms" , peerId, leftNow); DLedgerUtils.sleep(leftNow); } }
在前面介绍日志推送流程中,会在通过网络发送之前,调用 checkQuotaAndWait
进行一定的流控操作,流控是使用的滑动窗口实现的,在该方法中首先会记录当前时间窗口的日志条目数量。如果当前时间窗口是新的(即timeVec中的记录不是当前秒),则重置该窗口的计数;如果是同一时间窗口,则累加日志条目数量。然后计算当前时间窗口剩余的时间(毫秒),如果已达到最大限制,则可能需要等待直到下一个时间窗口开始。
其中 Quota
中核心属性如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class Quota { private final int max; private final int [] samples; private final long [] timeVec; private final int window; }
3.3.6.3 日志推送的重试机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void doCheckAppendResponse () throws Exception { long peerWaterMark = getPeerWaterMark(term, peerId); Pair<Long, Integer> pair = pendingMap.get(peerWaterMark + 1 ); if (pair == null ) return ; long sendTimeMs = pair.getKey(); if (DLedgerUtils.elapsed(sendTimeMs) > dLedgerConfig.getMaxPushTimeOutMs()) { batchAppendEntryRequest.clear(); writeIndex = peerWaterMark + 1 ; logger.warn("[Push-{}]Reset write index to {} for resending the entries which are timeout" , peerId, peerWaterMark + 1 ); } }
如果因网络等原因,主节点在向从节点追加日志时失败,该如何保证从节点与主节点一致呢?从上文我们可以得知,Leader节点在向 从节点转发日志后,会存储该日志的推送时间戳到pendingMap,当pendingMap的积压超过1000ms时会触发重推机制,将writeIndex指针重置为超时的Index上。
3.3.7 日志转发整体流程
3.4 日志复制(Follower节点接收日志并存储) Leader节点实时向从节点转发消息,从节点接收到日志后进行存储,然后向Leader节点反馈复制进度,从节点的日志接收主要由EntryHandler实现。
3.4.1 EntryHandler核心属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private class EntryHandler extends ShutdownAbleThread { private long lastCheckFastForwardTimeMs = System.currentTimeMillis(); ConcurrentMap<Long, Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> writeRequestMap = new ConcurrentHashMap <>(); BlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> compareOrTruncateRequests = new ArrayBlockingQueue <>(1024 ); }
3.4.2 Follower日志复制入口 从节点收到Leader节点的推送请求后(无论是APPEND、COMMIT、COMPARE、TRUNCATE),由EntryHandler的handlePush()方法执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public CompletableFuture<PushEntryResponse> handlePush (PushEntryRequest request) throws Exception { CompletableFuture<PushEntryResponse> future = new TimeoutFuture <>(1000 ); switch (request.getType()) { case APPEND: PreConditions.check(request.getCount() > 0 , DLedgerResponseCode.UNEXPECTED_ARGUMENT); long index = request.getFirstEntryIndex(); Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair <>(request, future)); if (old != null ) { logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}" , index, old.getKey().baseInfo(), request.baseInfo()); future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode())); } break ; case COMMIT: synchronized (this ) { if (!compareOrTruncateRequests.offer(new Pair <>(request, future))) { logger.warn("compareOrTruncateRequests blockingQueue is full when put commit request" ); future.complete(buildResponse(request, DLedgerResponseCode.PUSH_REQUEST_IS_FULL.getCode())); } } break ; case COMPARE: case TRUNCATE: writeRequestMap.clear(); synchronized (this ) { if (!compareOrTruncateRequests.offer(new Pair <>(request, future))) { logger.warn("compareOrTruncateRequests blockingQueue is full when put compare or truncate request" ); future.complete(buildResponse(request, DLedgerResponseCode.PUSH_REQUEST_IS_FULL.getCode())); } } break ; default : logger.error("[BUG]Unknown type {} from {}" , request.getType(), request.baseInfo()); future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode())); break ; } wakeup(); return future; }
handlePush()方法的主要职责是将处理请求放入队列,由doWork()方法从处理队列中拉取任务进行处理。
如果是append请求,将请求放入writeRequestMap集合,如果已存在该条日志的推送请求,表示Leader重复推送,则返回状态码REPEATED_PUSH。
如果是commit请求,将请求存入compareOrTruncateRequests 请求处理队列。
如果是compare或truncate请求,将待追加队列writeRequestMap清空,并将请求放入compareOrTruncateRequests请求队列,由doWork()方法进行异步处理。
3.4.3 EntryHandler任务分发机制 EntryHandler的handlePush()方法主要是接收请求并将其放入队列的处理队列,而doWork()方法是从指定队列中获取待执行任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 @Override public void doWork () { try { if (!memberState.isFollower()) { clearCompareOrTruncateRequestsIfNeed(); waitForRunning(1 ); return ; } Pair<InstallSnapshotRequest, CompletableFuture<InstallSnapshotResponse>> installSnapshotPair = null ; this .inflightInstallSnapshotRequestLock.lock(); try { if (inflightInstallSnapshotRequest != null && inflightInstallSnapshotRequest.getKey() != null && inflightInstallSnapshotRequest.getValue() != null ) { installSnapshotPair = inflightInstallSnapshotRequest; inflightInstallSnapshotRequest = new Pair <>(null , null ); } } finally { this .inflightInstallSnapshotRequestLock.unlock(); } if (installSnapshotPair != null ) { handleDoInstallSnapshot(installSnapshotPair.getKey(), installSnapshotPair.getValue()); } if (compareOrTruncateRequests.peek() != null ) { Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll(); PreConditions.check(pair != null , DLedgerResponseCode.UNKNOWN); switch (pair.getKey().getType()) { case TRUNCATE: handleDoTruncate(pair.getKey().getPreLogIndex(), pair.getKey(), pair.getValue()); break ; case COMPARE: handleDoCompare(pair.getKey(), pair.getValue()); break ; case COMMIT: handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue()); break ; default : break ; } return ; } long nextIndex = dLedgerStore.getLedgerEndIndex() + 1 ; Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex); if (pair == null ) { checkAbnormalFuture(dLedgerStore.getLedgerEndIndex()); waitForRunning(1 ); return ; } PushEntryRequest request = pair.getKey(); handleDoAppend(nextIndex, request, pair.getValue()); } catch (Throwable t) { DLedgerEntryPusher.LOGGER.error("Error in {}" , getName(), t); DLedgerUtils.sleep(100 ); } } }
第一步:如果当前节点的状态不是从节点,则跳出
第二步:如果compareOrTruncateRequests队列不为空,优先处理COMMIT、COMPARE、TRUNCATE等请求。值得注意的是,这里使用的是peek、poll等非阻塞方法,所以队列为空不会阻塞线程使得append请求能够正常处理。
第三步:处理日志追加append请求,根据当前节点 已存储的最大日志序号计算下一条待写日志的日志序号,从待写队列 中获取日志的处理请求。如果能查找到对应日志的追加请求,则执行doAppend()方法追加日志;如果从待写队列中没有找到对应的追加请 求,则调用checkAbnormalFuture检查追加请求是否丢失。
3.4.4 compare请求响应 从上文得知,Leader节点首先会向从节点发送compare请求,以此比较两者的数据是否存在差异,这一步由EntryHandler的handleDoCompare()方法实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 private CompletableFuture<PushEntryResponse> handleDoCompare (PushEntryRequest request, CompletableFuture<PushEntryResponse> future) { try { PreConditions.check(request.getType() == PushEntryRequest.Type.COMPARE, DLedgerResponseCode.UNKNOWN); long preLogIndex = request.getPreLogIndex(); long preLogTerm = request.getPreLogTerm(); if (preLogTerm == -1 && preLogIndex == -1 ) { future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); return future; } if (dLedgerStore.getLedgerEndIndex() >= preLogIndex) { long compareTerm = 0 ; if (dLedgerStore.getLedgerBeforeBeginIndex() == preLogIndex) { compareTerm = dLedgerStore.getLedgerBeforeBeginTerm(); } else { DLedgerEntry local = dLedgerStore.get(preLogIndex); compareTerm = local.getTerm(); } if (compareTerm == preLogTerm) { future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); return future; } DLedgerEntry firstEntryWithTargetTerm = dLedgerStore.getFirstLogOfTargetTerm(compareTerm, preLogIndex); PreConditions.check(firstEntryWithTargetTerm != null , DLedgerResponseCode.INCONSISTENT_STATE); PushEntryResponse response = buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()); response.setXTerm(compareTerm); response.setXIndex(firstEntryWithTargetTerm.getIndex()); future.complete(response); return future; } PushEntryResponse response = buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()); response.setEndIndex(dLedgerStore.getLedgerEndIndex()); future.complete(response); } catch (Throwable t) { logger.error("[HandleDoCompare] preLogIndex={}, preLogTerm={}" , request.getPreLogIndex(), request.getPreLogTerm(), t); future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } return future; }
该方法最终目的是为了找到Leader节点和当前Follower节点的共识点,在该方法中会对比Leader端发来的Index对应选举任期和当前Follower节点这个Index对应的选举任期是否相同,会有如下情况:
Leader想要对比的Index在Follower节点不存在:则Follower节点返回当前节点 ledgerEndIndex 给Leader,意思是让Leader节点从我自己的最末尾Index进行对比。
Leader想要对比Index在Follower节点中存在:
Index对应的任期相同:意味着找到了共识点,返回SUCCESS。这样Leader节点就会从这个共识点删除从节点多余的日志,然后重新追加日志。
Index对应的任期不同:从preLogIndex开始,向前追溯从节点Index所处任期的第一个日志。这样Leader节点就会从这个点重新开始对比,这也可以看到日志对比并不是一个日志一个日志依次对比,这样做效率会很低,当遇到任期不一致的情况时,Follower节点就会跳过当前任期,对比前一个任期日志是否一致。
3.4.5 truncate请求响应 Leader节点与从节点进行数据对比后,如果发现数据有差异,将计算出需要截断的日志序号,发送truncate请求给从节点,从节点对多余的日志进行截断:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private CompletableFuture<PushEntryResponse> handleDoTruncate (long truncateIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) { try { logger.info("[HandleDoTruncate] truncateIndex={}" , truncateIndex); PreConditions.check(request.getType() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN); long index = dLedgerStore.truncate(truncateIndex); PreConditions.check(index == truncateIndex - 1 , DLedgerResponseCode.INCONSISTENT_STATE); future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); long committedIndex = request.getCommitIndex() <= dLedgerStore.getLedgerEndIndex() ? request.getCommitIndex() : dLedgerStore.getLedgerEndIndex(); if (DLedgerEntryPusher.this .memberState.followerUpdateCommittedIndex(committedIndex)) { DLedgerEntryPusher.this .fsmCaller.onCommitted(committedIndex); } } catch (Throwable t) { logger.error("[HandleDoTruncate] truncateIndex={}" , truncateIndex, t); future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } return future; }
Follower节点在收到Leader节点发来的truncate请求后,会将truncateIndex及以后得所有日志全部删除,并更新本地已提交日志的索引指针。而日志的删除操作是由 DLedgerMmapFileStore#truncate
实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Override public long truncate (long truncateIndex) { if (truncateIndex > this .ledgerEndIndex) { return this .ledgerEndIndex; } DLedgerEntry firstTruncateEntry = this .get(truncateIndex); long truncateStartPos = firstTruncateEntry.getPos(); synchronized (this .memberState) { if (truncateIndex > this .ledgerEndIndex) { return this .ledgerEndIndex; } dataFileList.truncateOffset(truncateStartPos); if (dataFileList.getMaxWrotePosition() != truncateStartPos) { LOGGER.warn("[TRUNCATE] truncate for data file error, try to truncate pos: {}, but after truncate, max wrote pos: {}, now try to rebuild" , truncateStartPos, dataFileList.getMaxWrotePosition()); PreConditions.check(dataFileList.rebuildWithPos(truncateStartPos), DLedgerResponseCode.DISK_ERROR, "rebuild data file truncatePos=%d" , truncateStartPos); } reviseDataFileListFlushedWhere(truncateStartPos); long truncateIndexFilePos = truncateIndex * INDEX_UNIT_SIZE; indexFileList.truncateOffset(truncateIndexFilePos); if (indexFileList.getMaxWrotePosition() != truncateIndexFilePos) { LOGGER.warn("[TRUNCATE] truncate for index file error, try to truncate pos: {}, but after truncate, max wrote pos: {}, now try to rebuild" , truncateIndexFilePos, indexFileList.getMaxWrotePosition()); PreConditions.check(dataFileList.rebuildWithPos(truncateStartPos), DLedgerResponseCode.DISK_ERROR, "rebuild index file truncatePos=%d" , truncateIndexFilePos); } reviseIndexFileListFlushedWhere(truncateIndexFilePos); if (truncateIndex == 0 ) { ledgerEndTerm = -1 ; ledgerEndIndex = -1 ; } else { SelectMmapBufferResult endIndexBuf = indexFileList.getData((truncateIndex - 1 ) * INDEX_UNIT_SIZE, INDEX_UNIT_SIZE); ByteBuffer buffer = endIndexBuf.getByteBuffer(); DLedgerIndexEntry indexEntry = DLedgerEntryCoder.decodeIndex(buffer); ledgerEndTerm = indexEntry.getTerm(); ledgerEndIndex = indexEntry.getIndex(); } } LOGGER.info("[TRUNCATE] truncateIndex: {}, after truncate, ledgerEndIndex: {} ledgerEndTerm: {}" , truncateIndex, ledgerEndIndex, ledgerEndTerm); return ledgerEndIndex; }
3.4.6 append请求响应 Leader节点与从节点进行差异对比,截断从节点多余的数据文件后,会实时转发日志到从节点,具体由EntryHandler的handleDoAppend()方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void handleDoAppend (long writeIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) { try { PreConditions.check(writeIndex == request.getFirstEntryIndex(), DLedgerResponseCode.INCONSISTENT_STATE); for (DLedgerEntry entry : request.getEntries()) { dLedgerStore.appendAsFollower(entry, request.getTerm(), request.getLeaderId()); } future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); long committedIndex = Math.min(dLedgerStore.getLedgerEndIndex(), request.getCommitIndex()); if (DLedgerEntryPusher.this .memberState.followerUpdateCommittedIndex(committedIndex)) { DLedgerEntryPusher.this .fsmCaller.onCommitted(committedIndex); } } catch (Throwable t) { logger.error("[HandleDoAppend] writeIndex={}" , writeIndex, t); future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } }
将从Leader节点的日志追加到从节点,具体调用DLedgerStore的appendAsFollower()方法实现,其实现细节与服务端追加日志的流程基本类似,只是少了日志转发这个流程。然后使用Leader节点的已提交指针更新从节点的已提交指针,即append请求会附带有commit请求的效果。
3.4.7 从节点日志复制异常检测机制 收到Leader节点的append请求后,从节点首先会将这些写入请求 存储在writeRequestMap处理队列中,从节点并不是直接从该队列中获取一个待写入处理请求进行数据追加,而是查找当前节点已存储的最大日志序号leaderEndIndex,然后加1得出下一条待追加的日志序号nextIndex。如果该日志序号在writeRequestMap中不存在日志推送请求,则有可能是因为发生了推送请求丢失,在这种情况下,需要进行异常检测,以便尽快恢复异常,使主节点与从节点最终保持一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 private void checkAbnormalFuture (long endIndex) { if (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) < 1000 ) { return ; } lastCheckFastForwardTimeMs = System.currentTimeMillis(); if (writeRequestMap.isEmpty()) { return ; } checkAppendFuture(endIndex); } private void checkAppendFuture (long endIndex) { long minFastForwardIndex = Long.MAX_VALUE; for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) { long firstEntryIndex = pair.getKey().getFirstEntryIndex(); long lastEntryIndex = pair.getKey().getLastEntryIndex(); if (lastEntryIndex <= endIndex) { try { for (DLedgerEntry dLedgerEntry : pair.getKey().getEntries()) { PreConditions.check(dLedgerEntry.equals(dLedgerStore.get(dLedgerEntry.getIndex())), DLedgerResponseCode.INCONSISTENT_STATE); } pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode())); logger.warn("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed" , lastEntryIndex, endIndex); } catch (Throwable t) { logger.error("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed" , lastEntryIndex, endIndex, t); pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } writeRequestMap.remove(pair.getKey().getFirstEntryIndex()); continue ; } if (firstEntryIndex == endIndex + 1 ) { return ; } TimeoutFuture<PushEntryResponse> future = (TimeoutFuture<PushEntryResponse>) pair.getValue(); if (!future.isTimeOut()) { continue ; } if (firstEntryIndex < minFastForwardIndex) { minFastForwardIndex = firstEntryIndex; } } if (minFastForwardIndex == Long.MAX_VALUE) { return ; } Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(minFastForwardIndex); if (pair == null ) { return ; } logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}" , endIndex, minFastForwardIndex); pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode())); }
3.4.8 日志复制整体流程
3.5 日志复制仲裁 Raft协议判断一条日志写入成功的标准是集群中超过半数的节点存储了该日志,Leader节点首先存储数据,然后异步向它所有的从节点推送日志。不需要所有的从节点都返回日志追加成功才认为是成功写入,故Leader节点需要对返回结果进行仲裁,这部分功能主要由 QuorumAckChecker 实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 @Override public void doWork () { try { if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000 ) { logger.info("[{}][{}] term={} ledgerBeforeBegin={} ledgerEnd={} committed={} watermarks={} appliedIndex={}" , memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeforeBeginIndex(), dLedgerStore.getLedgerEndIndex(), memberState.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm), memberState.getAppliedIndex()); lastPrintWatermarkTimeMs = System.currentTimeMillis(); } long currTerm = memberState.currTerm(); checkTermForPendingMap(currTerm, "QuorumAckChecker" ); checkTermForWaterMark(currTerm, "QuorumAckChecker" ); if (pendingClosure.size() > 1 ) { for (Long term : pendingClosure.keySet()) { if (term == currTerm) { continue ; } for (Map.Entry<Long, Closure> futureEntry : pendingClosure.get(term).entrySet()) { logger.info("[TermChange] Will clear the pending closure index={} for term changed from {} to {}" , futureEntry.getKey(), term, currTerm); futureEntry.getValue().done(Status.error(DLedgerResponseCode.EXPIRED_TERM)); } pendingClosure.remove(term); } } if (peerWaterMarksByTerm.size() > 1 ) { for (Long term : peerWaterMarksByTerm.keySet()) { if (term == currTerm) { continue ; } logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}" , term, currTerm); peerWaterMarksByTerm.remove(term); } } if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 ) { checkResponseFuturesElapsed(DLedgerEntryPusher.this .memberState.getAppliedIndex()); lastCheckLeakTimeMs = System.currentTimeMillis(); } if (DLedgerUtils.elapsed(lastCheckTimeoutTimeMs) > 1000 ) { checkResponseFuturesTimeout(DLedgerEntryPusher.this .memberState.getAppliedIndex() + 1 ); lastCheckTimeoutTimeMs = System.currentTimeMillis(); } if (!memberState.isLeader()) { waitForRunning(1 ); return ; } updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex()); Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm); List<Long> sortedWaterMarks = peerWaterMarks.values() .stream() .sorted(Comparator.reverseOrder()) .collect(Collectors.toList()); long quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2 ); if (DLedgerEntryPusher.this .memberState.leaderUpdateCommittedIndex(currTerm, quorumIndex)) { DLedgerEntryPusher.this .fsmCaller.onCommitted(quorumIndex); } else { waitForRunning(1 ); } } catch (Throwable t) { DLedgerEntryPusher.LOGGER.error("Error in {}" , getName(), t); DLedgerUtils.sleep(100 ); } }
四. 总结 本文详细阐述了 RocketMQ DLedger 中的日志复制流程。
日志复制的设计理念包括:为每条日志编号以便管理与辨别;引入已提交指针来避免数据不一致问题,只有超过半数节点追加完成才向客户端返回写入成功;通过向从节点源源不断转发日志,并处理从节点落后时的数据同步与删除,保证日志一致性。
日志复制类的设计体系中,DledgerEntryPusher
是核心类,构建了 EntryDispatcher
、QuorumAckChecker
和 EntryHandler
三个对象,分别对应不同线程,处理不同任务。
日志复制流程如下:
Leader 节点将日志推送到从节点:
日志转发由 EntryDispatcher
实现,其具有多种推送请求类型,初始状态为 COMPARE
。
doCompare
方法用于 Leader 向从节点发送 COMPARE
请求以比较数据差异,找到共识点后状态变更为 TRUNCATE
。
doTruncate
方法处理发送 TRUNCATE
请求,指示从节点删除多余日志,处理完成后状态变更为 APPEND
。
doAppend
方法在确认主从数据一致后向从节点推送日志,内部有流控和重试机制。
从节点收到 Leader 节点推送的日志并存储,然后向 Leader 节点汇报日志复制结果:
从节点的日志接收由 EntryHandler
实现,其通过 handlePush
方法接收请求并放入相应队列,doWork
方法进行任务分发处理。
handleDoCompare
方法响应 Leader 的 COMPARE
请求,寻找共识点。
handleDoTruncate
方法响应 TRUNCATE
请求,删除多余日志。
handleDoAppend
方法响应 APPEND
请求,追加日志。
还具有异常检测机制,检查从节点日志复制是否异常。
Leader 节点对日志复制进行仲裁:主要由 QuorumAckChecker
实现,计算所有节点水位线的中位数,根据结果更新已提交索引。