本文所涉及的注释源码:bigcoder84/dledger

Raft 协议主要包含两个部分:Leader选举和日志复制。

前面我们在 Raft协议深度解析:RocketMQ中的自动Leader选举与故障转移 一文中已经详细介绍了DLedger如何实现Leader选举的,而本文主要聚焦于Leader选举完成后的日志复制的过程。

一. RocketMQ DLedger 存储实现

说起日志的复制,就必须要从日志存储实现说起,它约束着Raft每一个结点如何存储数据。下面先介绍一次Raft存储的核心实现类:

1.1 存储实现核心类

  • DLedgerStore:存储抽象类,该类有如下核心抽象方法:

    • getMemberState: 获取节点状态机
    • appendAsLeader:向主节点追加日志(数据)
    • appendAsFollower:向从节点广播日志(数据)
    • get:根据日志下标查找日志
    • getLedgerEndTerm:获取Leader节点当前最大的投票轮次
    • getLedgerEndIndex:获取Leader节点下一条日志写入的日志序号
    • truncate:删除日志
    • getFirstLogOfTargetTerm:从endIndex开始,向前追溯targetTerm任期的第一个日志
    • updateLedgerEndIndexAndTerm:更新 Leader 节点维护的ledgerEndIndex和ledgerEndTerm
    • startup:启动存储管理器
    • shutdown:关闭存储管理器
  • DLedgerMemoryStore:DLedger基于内存实现的日志存储实现类。

  • DLedgerMmapFileStore:基于文件内存映射机制的存储实现,核心属性如下:

    • ledgerBeforeBeginIndex:日志的起始序号
    • ledgerBeforeBeginTerm:日志起始的投票轮次
    • ledgerEndIndex:下一条日志下标(序号)
    • ledgerEndTerm:当前最大的投票轮次
  • DLedgerConfig:DLedger的配置信息

RocketMQ DLedger的上述核心类与RocketMQ存储模块的对应关系

RocketMQ存储模块 DLedger存储模块 描述
MappedFile DefaultMmapFile 表示一个物理文件
MappedFileQueue MmapFileList 表示逻辑上连续多个物理文件
DefaultMessageStore DLedgerMmapFileStore 存储实现类
CommitLog#FlushCommitLogService DLedgerMmapFileStore#FlushDataService 实现文件刷盘机制
DefaultMessageStore#CleanCommitLogService DLedgerMmapFileStore#CleanSpaceService 清理过期文件

1.2 数据存储协议

RocketMQ DLedger数据存储协议如下图:

  1. magic:魔数,4字节。

  2. size:条目总长度,包含header(协议头)+body(消息体),占4字节。

  3. index:当前条目的日志序号,占8字节。

  4. term:条目所属的投票轮次,占8字节。

  5. pos:条目的物理偏移量,类似CommitLog文件的物理偏移量,占8字节。

  6. channel:保留字段,当前版本未使用,占4字节。

  7. chain crc:当前版本未使用,占4字节。

  8. body crc:消息体的CRC校验和,用来区分数据是否损坏,占4字节。

  9. body size:用来存储消息体的长度,占4个字节。

  10. body:消息体的内容。

RocketMQ DLedger 中日志实例用 DLedgerEntry 表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DLedgerEntry {

public final static int POS_OFFSET = 4 + 4 + 8 + 8;
public final static int HEADER_SIZE = POS_OFFSET + 8 + 4 + 4 + 4;
public final static int BODY_OFFSET = HEADER_SIZE + 4;

private int magic = DLedgerEntryType.NORMAL.getMagic();
private int size;
private long index;
private long term;
private long pos; //used to validate data
private int channel; //reserved
private int chainCrc; //like the block chain, this crc indicates any modification before this entry.
private int bodyCrc; //the crc of the body
private byte[] body;
}

解码流程参考:io.openmessaging.storage.dledger.entry.DLedgerEntryCoder#decode(java.nio.ByteBuffer, boolean):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static DLedgerEntry decode(ByteBuffer byteBuffer, boolean readBody) {
DLedgerEntry entry = new DLedgerEntry();
entry.setMagic(byteBuffer.getInt());
entry.setSize(byteBuffer.getInt());
entry.setIndex(byteBuffer.getLong());
entry.setTerm(byteBuffer.getLong());
entry.setPos(byteBuffer.getLong());
entry.setChannel(byteBuffer.getInt());
entry.setChainCrc(byteBuffer.getInt());
entry.setBodyCrc(byteBuffer.getInt());
int bodySize = byteBuffer.getInt();
if (readBody && bodySize < entry.getSize()) {
byte[] body = new byte[bodySize];
byteBuffer.get(body);
entry.setBody(body);
}
return entry;
}

1.3 索引存储协议

RocketMQ DLedger索引的存储协议如下图:

存储协议中各个字段的含义如下。

  1. magic:魔数。
  2. pos:条目的物理偏移量,类似CommitLog文件的物理偏移量,占8字节。
  3. size:条目长度。
  4. index:当前条目的日志序号,占8字节。
  5. term:条目所属的投票轮次,占8字节。

索引条目采用定长的方式进行的存储,目的是为了加速日志条目的查找的速度

我们假设一种场景需要查询 index 下标对应的日志数据,由于日志条目时变长的,如果没有索引文件,我们需要在索引文件上一个一个条目的去遍历查找,这样的效率很低。

有了索引文件后,我们可以通过 index * 32 找到Index所对应的索引存储的物理偏移量,这样我们可以轻松获取日志索引中存储的索引所处理的物理偏移量pos,然后通过日志的物理偏移量就可以直接获取到日志记录了。

RocketMQ DLedger 中索引实例用 DLedgerIndexEntry 表示:

1
2
3
4
5
6
7
8
9
10
11
12
public class DLedgerIndexEntry {

private int magic;

private long position;

private int size;

private long index;

private long term;
}

解码流程参考:io.openmessaging.storage.dledger.entry.DLedgerEntryCoder#decodeIndex:

1
2
3
4
5
6
7
8
9
public static DLedgerIndexEntry decodeIndex(ByteBuffer byteBuffer) {
DLedgerIndexEntry indexEntry = new DLedgerIndexEntry();
indexEntry.setMagic(byteBuffer.getInt());
indexEntry.setPosition(byteBuffer.getLong());
indexEntry.setSize(byteBuffer.getInt());
indexEntry.setIndex(byteBuffer.getLong());
indexEntry.setTerm(byteBuffer.getLong());
return indexEntry;
}

二. RocketMQ DLedger主从切换之日志追加

Raft协议负责组主要包含两个步骤:Leader选举和日志复制。使用Raft协议的集群在向外提供服务之前需要先在集群中进行Leader选举,推举一个主节点接受客户端的读写请求。Raft协议负责组的其他节点只需要复制数据,不对外提供服务。当Leader节点接受客户端的写请求后,先将数据存储在Leader节点上,然后将日志数据广播给它的从节点,只有超过半数的节点都成功存储了该日志,Leader节点才会向客户端返回写入成功。

2.1 日志追加流程概述

Leader节点处理日志写入请求的入口为DLedgerServer的handleAppend()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
   // io.openmessaging.storage.dledger.DLedgerServer#handleAppend

@Override
public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest request) throws IOException {
try {
// 如果请求目的节点不是当前节点,返回错误
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
// 如果请求的集群不是当前节点所在的集群,则返回错误
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
// 如果当前节点不是leader节点,则抛出异常
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING);
long currTerm = memberState.currTerm();
// 消息的追加是一个异步的过程,会将内容暂存到内存队列中。首先检查内存队列是否已满,如果已满则向客户端返回错误码,表示本次发送失败。如果未满,
// 则先将数据追加到Leader节点的PageCache中,然后转发到Leader的所有从节点,最后Leader节点等待从节点日志复制结果。
if (dLedgerEntryPusher.isPendingFull(currTerm)) {
AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
appendEntryResponse.setGroup(memberState.getGroup());
appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());
appendEntryResponse.setTerm(currTerm);
appendEntryResponse.setLeaderId(memberState.getSelfId());
return AppendFuture.newCompletedFuture(-1, appendEntryResponse);
}
AppendFuture<AppendEntryResponse> future;
if (request instanceof BatchAppendEntryRequest) {
BatchAppendEntryRequest batchRequest = (BatchAppendEntryRequest) request;
if (batchRequest.getBatchMsgs() == null || batchRequest.getBatchMsgs().isEmpty()) {
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
" with empty bodys");
}
// 将消息追加到Leader节点中
future = appendAsLeader(batchRequest.getBatchMsgs());
} else {
// 将消息追加到Leader节点中
future = appendAsLeader(request.getBody());
}
return future;
} catch (DLedgerException e) {
LOGGER.error("[{}][HandleAppend] failed", memberState.getSelfId(), e);
AppendEntryResponse response = new AppendEntryResponse();
response.copyBaseInfo(request);
response.setCode(e.getCode().getCode());
response.setLeaderId(memberState.getLeaderId());
return AppendFuture.newCompletedFuture(-1, response);
}
}

第一步:验证请求的合理性。

  1. 如果请求目的节点不是当前节点,返回错误。
  2. 如果请求的集群不是当前节点所在的集群,则返回错误。
  3. 如果当前节点不是leader节点,则抛出异常。

第二步:消息的追加是一个异步过程,会将内容暂存到内存队列中。首先检查内存队列是否已满,如果已满则向客户端返回错误码,表示本次消息发送失败。如果队列未满,则先将数据追加到Leader节点的PageCache中,然后转发给Leader的所有从节点,最后Leader节点等待从节点日志复制的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// io.openmessaging.storage.dledger.DLedgerServer#appendAsLeader(java.util.List<byte[]>)
public AppendFuture<AppendEntryResponse> appendAsLeader(List<byte[]> bodies) throws DLedgerException {
// 判断当前节点是否是Leader,如果不是则报错
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
// 消息不能为空
if (bodies.size() == 0) {
return AppendFuture.newCompletedFuture(-1, null);
}
AppendFuture<AppendEntryResponse> future;
StopWatch watch = StopWatch.createStarted();
DLedgerEntry entry = new DLedgerEntry();
long totalBytes = 0;
if (bodies.size() > 1) {
long[] positions = new long[bodies.size()];
// 追加多个消息
for (int i = 0; i < bodies.size(); i++) {
totalBytes += bodies.get(i).length;
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(bodies.get(i));
entry = dLedgerStore.appendAsLeader(dLedgerEntry);
positions[i] = entry.getPos();
}
// only wait last entry ack is ok
future = new BatchAppendFuture<>(positions);
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
totalBytes += bodies.get(0).length;
dLedgerEntry.setBody(bodies.get(0));
// 底层调用 appendAsLeader 追加日志
entry = dLedgerStore.appendAsLeader(dLedgerEntry);
future = new AppendFuture<>();
}
final DLedgerEntry finalResEntry = entry;
final AppendFuture<AppendEntryResponse> finalFuture = future;
final long totalBytesFinal = totalBytes;
finalFuture.handle((r, e) -> {
if (e == null && r.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {
Attributes attributes = DLedgerMetricsManager.newAttributesBuilder().build();
// 监控上报
DLedgerMetricsManager.appendEntryLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributes);
DLedgerMetricsManager.appendEntryBatchCount.record(bodies.size(), attributes);
DLedgerMetricsManager.appendEntryBatchBytes.record(totalBytesFinal, attributes);
}
return r;
});
Closure closure = new Closure() {
@Override
public void done(Status status) {
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(DLedgerServer.this.memberState.getGroup());
response.setTerm(DLedgerServer.this.memberState.currTerm());
response.setIndex(finalResEntry.getIndex());
response.setLeaderId(DLedgerServer.this.memberState.getLeaderId());
response.setPos(finalResEntry.getPos());
response.setCode(status.code.getCode());
finalFuture.complete(response);
}
};
dLedgerEntryPusher.appendClosure(closure, finalResEntry.getTerm(), finalResEntry.getIndex());
return finalFuture;
}

日志追加时会有两种模式:单条追加和批量追加。appendAsLeader 方法主要将两种模式的追加进行统一封装,最后调用 DLedgerStore#appendAsLeader 将日志存储到指定位置。

2.2 Leader节点日志存储

Leader节点的数据存储主要由DLedgerStore的appendAsLeader() 方法实现。DLedger提供了基于内存和基于文件两种持久化实现,本节重点关注基于文件的存储实现方法,其实现类为 DLedgerMmapFileStore。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore#appendAsLeader

@Override
public DLedgerEntry appendAsLeader(DLedgerEntry entry) {
// 第一步:判断当前节点是否是Leader,如果不是则报错
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
// 第二步:判断磁盘是否已满
PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);
// 从本地线程变量中获取一个存储数据用的ByteBuffer和一个存储索引用的ByteBuffer。存储数据用的ByteBuffer大小为4MB,存储索引用的ByteBuffer大小为64B。
ByteBuffer dataBuffer = localEntryBuffer.get();
ByteBuffer indexBuffer = localIndexBuffer.get();
// 对客户端发来的日志进行编码,并将编码后的日志数据写入ByteBuffer中。
DLedgerEntryCoder.encode(entry, dataBuffer);
int entrySize = dataBuffer.remaining();
// 锁定状态机
synchronized (memberState) {
// 再一次判断是否是Leader节点
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);
PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING, null);
// 为当前日志条目设置序号、投票轮次等信息
long nextIndex = ledgerEndIndex + 1;
entry.setIndex(nextIndex);
entry.setTerm(memberState.currTerm());
// 将当前日志(包括序号、投票轮次等)写入索引ByteBuffer中。
DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), entry.getMagic());
// 计算消息的起始物理偏移量,与CommitLog文件的物理偏移量设计思想相同
long prePos = dataFileList.preAppend(dataBuffer.remaining());
entry.setPos(prePos);
PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);
// 将该偏移量写入数据ByteBuffer中
DLedgerEntryCoder.setPos(dataBuffer, prePos);
for (AppendHook writeHook : appendHooks) {
writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);
}
// 调用DataFileList的append方法,将日志追加到PageCache中,此时数据还没有刷写到硬盘中。
long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());
PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);
PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);
DLedgerEntryCoder.encodeIndex(dataPos, entrySize, DLedgerEntryType.NORMAL.getMagic(), nextIndex, memberState.currTerm(), indexBuffer);
// 将索引的ByteBuffer写入PageCache中
long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);
PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);
if (LOGGER.isDebugEnabled()) {
LOGGER.info("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length);
}
// 日志序号+1
ledgerEndIndex++;
// 记录当前最大的投票轮次
ledgerEndTerm = memberState.currTerm();
updateLedgerEndIndexAndTerm();
return entry;
}
}

在该方法中,主要执行以下逻辑:

  1. 检查Leader状态:首先,方法检查当前节点是否是集群中的Leader节点,如果不是则抛出错误。
  2. 检查磁盘空间:接着,检查磁盘是否已满,如果已满则抛出错误。
  3. 获取缓冲区:从本地线程变量中获取用于存储数据和索引的ByteBuffer,数据缓冲区大小为4MB,索引缓冲区大小为64B。
  4. 编码日志条目:将传入的日志条目进行编码,并写入数据ByteBuffer中。
  5. 设置日志条目信息:在同步块中,再次检查Leader状态,确保没有发生领导者转移。然后为日志条目设置索引、投票轮次等信息。
  6. 计算物理偏移量:计算日志条目的起始物理偏移量,并设置到日志条目中。
  7. 执行写入钩子:如果有注册的写入钩子(AppendHook),则执行这些钩子。
  8. 追加数据到PageCache:将编码后的数据追加到PageCache中,需要注意此时数据尚未写入硬盘。
  9. 编码索引信息:将索引信息编码,包括数据位置、日志大小、日志类型、索引和投票轮次。
  10. 写入索引到PageCache:将索引信息追加到索引文件列表的PageCache中。
  11. 日志和索引位置检查:检查索引写入的位置是否正确。
  12. 更新日志存储状态:更新日志的结束索引和投票轮次,并将这些信息持久化。
  13. 返回日志条目:最后,方法返回追加的日志条目。

日志追加到Leader节点的PageCache后,将异步转发给它所有的从节点,然后等待各从节点的反馈,并对这些反馈结果进行仲裁,只有集群内超过半数的节点存储了该条日志,Leader节点才可以向客户端返回日志写入成功,日志的复制将在后面详细介绍,在介绍Leader节点如何等待从节点复制、响应ACK之前,我们再介绍一下与存储相关的两个核心方法:DataFileList的preAppend()与append()方法。

2.2.1 DataFileList#preAppend

DataFileList的preAppend()方法为预写入,主要是根据当前日志的长度计算该条日志的物理偏移量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// io.openmessaging.storage.dledger.store.file.MmapFileList#preAppend(int, boolean)

/**
* 日志预写入,主要是根据当前日志的长度计算该条日志的物理偏移量,该方法主要处理写入动作处于文件末尾的场景。
* 因为会存在日志写入时,当前文件容纳不下的情况,如果出现这种情况会新建一个新的文件,并返回新文件的起始位置作为写入位置。
*
* @param len 需要申请的长度
* @param useBlank 是否需要填充
* @return
*/
public long preAppend(int len, boolean useBlank) {
// 获取逻辑文件中最后一个物理文件
MmapFile mappedFile = getLastMappedFile();
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = getLastMappedFile(0);
}
if (null == mappedFile) {
LOGGER.error("Create mapped file for {}", storePath);
return -1;
}
int blank = useBlank ? MIN_BLANK_LEN : 0;
if (len + blank > mappedFile.getFileSize() - mappedFile.getWrotePosition()) {
// 如果当前文件剩余空间已不足以存放一条消息
if (blank < MIN_BLANK_LEN) {
// 如果当前文件剩余的空间少于MIN_BLANK_LEN,将返回-1,表 示存储错误,需要人工干预,正常情况下是不会出现这种情况的,
// 因为写入一条消息之前会确保能容纳待写入的消息,并且还需要空余MIN_BLANK_LEN个字节,因为一个独立的物理文件,
// 默认会填充文件结尾魔数(BLANK_MAGIC_CODE)。
LOGGER.error("Blank {} should ge {}", blank, MIN_BLANK_LEN);
return -1;
} else {
// 如果空余空间大于MIN_BLANK_LEN,会首先写入文件结尾魔数(4字节),然后将该文件剩余的字节数写入接下来的4个字节,表示该文件全部用完。
// 后面创建一个新文件,使得当前日志能够写入新的文件中。
ByteBuffer byteBuffer = ByteBuffer.allocate(mappedFile.getFileSize() - mappedFile.getWrotePosition());
byteBuffer.putInt(BLANK_MAGIC_CODE);
byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
if (mappedFile.appendMessage(byteBuffer.array())) {
//need to set the wrote position
// 将写指针置入文件末尾,这样在下一次调用 getLastMappedFile 方法时就会创建一个新的文件
mappedFile.setWrotePosition(mappedFile.getFileSize());
} else {
LOGGER.error("Append blank error for {}", storePath);
return -1;
}
// 如果文件以写满,这里会创建一个新的文件,
mappedFile = getLastMappedFile(0);
if (null == mappedFile) {
LOGGER.error("Create mapped file for {}", storePath);
return -1;
}
}
}
// 如果当前文件有剩余的空间容纳当前日志,则返回待写入消息的物理起始偏移量
return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
}

  1. 如果当前文件剩余的空间少于MIN_BLANK_LEN,将返回-1,表示存储错误,需要人工干预,正常情况下是不会出现这种情况的,因为写入一条消息之前会确保能容纳待写入的消息,并且还需要空余 MIN_BLANK_LEN 个字节,因为一个独立的物理文件,默认会填充文件结尾魔数(BLANK_MAGIC_CODE)。
  2. 如果空余空间大于MIN_BLANK_LEN,会首先写入文件结尾魔数(4字节),然后将该文件剩余的字节数写入接下来的4个字节,表示该文件全部用完。然后创建一个新的文件,并返回新文件的起始位置,表示这条日志写入新文件起始位置。

2.2.2 DataFileList#append

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
   
//io.openmessaging.storage.dledger.store.file.MmapFileList#append(byte[], int, int, boolean)
public long append(byte[] data, int pos, int len, boolean useBlank) {
if (preAppend(len, useBlank) == -1) {
return -1;
}
MmapFile mappedFile = getLastMappedFile();
long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
// 追加数据至文件末尾
if (!mappedFile.appendMessage(data, pos, len)) {
LOGGER.error("Append error for {}", storePath);
return -1;
}
return currPosition;
}


// io.openmessaging.storage.dledger.store.file.DefaultMmapFile#appendMessage(byte[], int, int)
/**
* Content of data from offset to offset + length will be written to file.
*
* @param offset The offset of the subarray to be used.
* @param length The length of the subarray to be used.
*/
@Override
public boolean appendMessage(final byte[] data, final int offset, final int length) {
int currentPos = this.wrotePosition;

if ((currentPos + length) <= this.fileSize) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
byteBuffer.put(data, offset, length);
WROTE_POSITION_UPDATER.addAndGet(this, length);
return true;
}
return false;
}

三. RocketMQ DLedger 主从切换之日志复制

Leader节点首先将客户端发送过来的日志按照指定格式存储在Leader节点上,但此时并不会向客户端返回写入成功,而是需要将日志转发给它的所有从节点,只有超过半数的节点都存储了该条日志,Leader节点才会向客户端返回日志写入成功。

日志的复制主要包括如下3个步骤:

  1. Leader节点将日志推送到从节点。
  2. 从节点收到Leader节点推送的日志并存储,然后向Leader节点汇报日志复制结果。
  3. Leader节点对日志复制进行仲裁,如果成功存储该条日志的节点超过半数,则向客户端返回写入成功。

3.1 日志复制设计理念

3.1.1 日志编号

为了方便对日志进行管理与辨别,Raft协议对每条日志进行编号,每一条消息到达主节点时会生成一个全局唯一的递增号,这样可以根据日志序号来快速判断日志中的数据在主从复制过程中是否保持一致,在 DLedger 的实现中对应 DLedgerMemoryStore 中的 ledgerBeforeBeginIndex、ledgerEndIndex,分别表示当前节点最小的日志序号与最大的日志序号,下一条日志的序号为ledgerEndIndex+1。

3.1.2 日志追加与提交机制

Leader节点收到客户端的数据写入请求后,先通过解析请求提取数据,构建日志对象,并生成日志序号,用seq表示。然后将日志存储到Leader节点内,将日志广播(推送)给其所有从节点。这个过程存在网络延时,如果客户端向主节点查询日志序号为seq的日志,日志已经存储在Leader节点中了,直接返回给客户端显然是有问题的,这是因为网络等原因导致从节点未能正常存储该日志,导致数据不一致,该如何避免出现这个问题呢?

为了解决上述问题,DLedger引入了已提交指针(committedIndex)。当主节点收到客户端的请求时,先将数据进行存储,此时数据是未提交的,这一过程被称为日志追加(已在第四节中介绍了),此时该条日志对客户端不可见,只有当集群内超过半数的节点都将日志追加完成后,才会更新committedIndex指针,该条日志才会向客户端返回写入成功。一条日志被提交成功的充分必要条件是已超过集群内半数节点成功追加日志。

3.1.3 保证日志一致性

一个拥有3个节点的Raft集群,只需要主节点和其中一个从节点成功追加日志,就可以认为是成功提交了日志,客户端即可通过主节点访问该日志。因为部分数据存在延迟,所以在DLedger的实现中,读写请求都将由Leader节点负责。那么落后的从节点如何再次跟上集群的进度呢?

DLedger的实现思路是按照日志序号向从节点源源不断地转发日志,从节点接收日志后,将这些待追加的数据放入一个待写队列。从节点并不是从挂起队列中处理一个个追加请求的,而是先查找从节点当前已追加的最大日志序号,用ledgerEndIndex表示,然后尝试追加ledgerEndIndex+1的日志,根据日志序号从待写队列中查找日志,如果该队列不为空,并且待写日志不在待写队列中,说明从节点未接收到这条日志,发生了数据缺失。从节点在响应主节点的append请求时会告知数据不一致,然后主节点的日志转发线程状态变更为COMPARE,向该从节点发送COMPARE命令,用来比较主从节点的数据差异。根据比较出的差异重新从主节点同步数据或删除从节点上多余的数据,最终达到一致。同时,主节点也会对推送超时的消息发起重推,尽最大可能帮助从节点及时更新到主节点的数据。

3.2 日志复制类设计体系

DledgerEntryPusher 是 DLedger 日志转发与处理核心类,该类构建如下3个对象,每一个对象对应一个线程,复制处理不同的事情:

  1. EntryDispatcher:日志转发线程,当前节点为主节点时追加。

  2. QuorumAckChecker:日志追加ACK投票仲裁线程,当前节点为主节点时激活。

  3. EntryHandler:日志接收处理线程,当节点为从节点时激活。

DLedger的日志复制使用推送模式,其核心入口为DLedgerEntryPusher,下面逐一介绍上述核心类及核心属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class DLedgerEntryPusher {

private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerEntryPusher.class);

/**
* 多副本相关配置。
*/
private final DLedgerConfig dLedgerConfig;
/**
* 存储实现类。
*/
private final DLedgerStore dLedgerStore;
/**
* 节点状态机。
*/
private final MemberState memberState;
/**
* RPC 服务实现类,用于集群内的其他节点进行网络通讯。
*/
private final DLedgerRpcService dLedgerRpcService;

/**
* 每个节点基于投票轮次的当前水位线标记。
* 用于记录从节点已复制的日志序号
*/
private final Map<Long/*term*/, ConcurrentMap<String/*peer id*/, Long/*match index*/>> peerWaterMarksByTerm = new ConcurrentHashMap<>();

/**
* 正在处理的 apend 请求的回调函数。放在这里的index所指向的日志是待确认的日志,也就是说客户端目前正处在阻塞状态,等待从节点接收日志。
*
* 当日志写入Leader节点后,会异步将日志发送给Follower节点,当集群中大多数节点成功写入该日志后,会回调这里暂存的回调函数,从而返回客户端成功写入的状态。
*/
private final Map<Long/*term*/, ConcurrentMap<Long/*index*/, Closure/*upper callback*/>> pendingClosure = new ConcurrentHashMap<>();

/**
* 从节点上开启的线程,用于接收主节点的 push 请求(append、commit)。
*/
private final EntryHandler entryHandler;
/**
* 日志追加ACK投票仲裁线程,用于判断日志是否可提交,当前节点为主节点时激活
*/
private final QuorumAckChecker quorumAckChecker;
/**
* 日志请求转发器,负责向从节点转发日志,主节点为每一个从节点构建一个EntryDispatcher,EntryDispatcher是一个线程
*/
private final Map<String/*peer id*/, EntryDispatcher/*entry dispatcher for each peer*/> dispatcherMap = new HashMap<>();
/**
* 当前节点的ID
*/
private final String selfId;
/**
* 通过任务队列修改状态机状态,保证所有修改状态机状态的任务按顺序执行
*/
private StateMachineCaller fsmCaller;
}

通常了解一个类需要从其构造函数开始,我们先看一下DLedgerEntryPusher的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,
DLedgerRpcService dLedgerRpcService) {
this.dLedgerConfig = dLedgerConfig;
this.selfId = this.dLedgerConfig.getSelfId();
this.memberState = memberState;
this.dLedgerStore = dLedgerStore;
this.dLedgerRpcService = dLedgerRpcService;
// 为每一个Follower节点创建一个EntryDispatcher线程,复制向Follower节点推送日志
for (String peer : memberState.getPeerMap().keySet()) {
if (!peer.equals(memberState.getSelfId())) {
dispatcherMap.put(peer, new EntryDispatcher(peer, LOGGER));
}
}
this.entryHandler = new EntryHandler(LOGGER);
this.quorumAckChecker = new QuorumAckChecker(LOGGER);
}

这里主要是根据集群的配置,为每一个从节点创建一个 EntryDispatcher 转发线程,即每一个从节点的日志转发相互不干扰。

接下来我们看一下 startup 方法:

1
2
3
4
5
6
7
8
9
10
11
   // io.openmessaging.storage.dledger.DLedgerEntryPusher#startup
public void startup() {
// 启动 EntryHandler,负责接受Leader节点推送的日志,如果节点不是Follower节点现成也会启动,但是不会执行任何逻辑,直到身份变成Follower节点。
entryHandler.start();
// 启动 日志追加ACK投票仲裁线程,用于判断日志是否可提交,当前节点为Leader节点时激活
quorumAckChecker.start();
// 启动 日志分发线程,用于向Follower节点推送日志,当前节点为Leader节点时激活
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
dispatcher.start();
}
}

在 EntryDispatcher 启动时会启动三类线程:

  • EntryDispatcher:日志请求转发器,负责向从节点转发日志,主节点为每一个从节点构建一个 EntryDispatcher 线程,每个从节点独立发送互不干扰;
  • QuorumAckChecker:日志追加ACK投票仲裁线程,用于判断日志是否可提交,当前节点为主节点时激活;
  • EntryHandler:从节点上开启的线程,用于接收主节点的 push 请求(append、commit);

需要注意的是由于节点身份的不同所生效的线程类型也并不相同,你如如果是Follower节点,那就只有 EntryHandler 现成生效,没有生效的线程会间隔1ms进行空转,这样做的目的是当节点身份发生变化时能及时反应。

3.3 日志转发(Leader向Follower发送日志)

3.3.1 EntryDispatcher核心属性

日志转发由 EntryDispatcher 实现,EntryDispatcher 有如下核心属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private class EntryDispatcher extends ShutdownAbleThread {

/**
* 向从节点发送命令的类型
*/
private final AtomicReference<EntryDispatcherState> type = new AtomicReference<>(EntryDispatcherState.COMPARE);
/**
* 上一次发送commit请求的时间戳。
*/
private long lastPushCommitTimeMs = -1;
/**
* 目标节点ID
*/
private final String peerId;

/**
* 已写入的日志序号
*/
private long writeIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() + 1;

/**
* the index of the last entry to be pushed to this peer(initialized to -1)
*/
private long matchIndex = -1;

private final int maxPendingSize = 1000;
/**
* Leader节点当前的投票轮次
*/
private long term = -1;
/**
* Leader节点ID
*/
private String leaderId = null;
/**
* 上次检测泄露的时间,所谓泄露,指的是挂起的日志请求数量超过了maxPendingSize。
*/
private long lastCheckLeakTimeMs = System.currentTimeMillis();

/**
* 记录日志的挂起时间,key表示日志的序列(entryIndex),value表示挂起时间戳。
*/
private final ConcurrentMap<Long/*index*/, Pair<Long/*send timestamp*/, Integer/*entries count in req*/>> pendingMap = new ConcurrentHashMap<>();
/**
* 需要批量push的日志数据
*/
private final PushEntryRequest batchAppendEntryRequest = new PushEntryRequest();

private long lastAppendEntryRequestSendTimeMs = -1;

/**
* 配额。
*/
private final Quota quota = new Quota(dLedgerConfig.getPeerPushQuota());

.........
}

3.3.2 推送请求类型

在详细介绍日志转发流程之前,先介绍一下主节点向从节点发送推送请求的类型,在 PushEntryRequest.Type 中定义,可选值如下:

1
2
3
4
5
6
7
8
   // io.openmessaging.storage.dledger.protocol.PushEntryRequest.Type
public enum Type {
APPEND,
COMMIT,
COMPARE,
TRUNCATE,
INSTALL_SNAPSHOT
}
  1. APPEND:将日志条目追加到从节点。
  2. COMMIT:通常Leader节点会将提交的索引附加到append请求, 如果append请求很少且分散,Leader节点将发送一个单独的请求来通 知从节点提交索引。
  3. COMPARE:如果Leader节点发生变化,新的Leader节点需要与它的从节点日志条目进行比较,以便截断从节点多余的数据。
  4. TRUNCATE:如果Leader节点通过索引完成日志对比后,发现从节点存在多余的数据(未提交的数据),则 Leader 节点将发送 TRUNCATE给它的从节点,删除多余的数据,实现主从节点数据一致性。
  5. INSTALL_SNAPSHOT:将从节点数据存入快照。

3.3.3 Leader节点日志转发入口

EntryDispatcher 是一个线程类,继承自 ShutdownAbleThread,其 run() 方法会循环执行 doWork() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcher#doWork
@Override
public void doWork() {
try {
// 检查当前节点状态
if (checkNotLeaderAndFreshState()) {
waitForRunning(1);
return;
}
switch (type.get()) {
// 根据类型,做不同的操作
case COMPARE:
doCompare();
break;
case TRUNCATE:
doTruncate();
break;
case APPEND:
doAppend();
break;
case INSTALL_SNAPSHOT:
doInstallSnapshot();
break;
case COMMIT:
doCommit();
break;
}
waitForRunning(1);
} catch (Throwable t) {
DLedgerEntryPusher.LOGGER.error("[Push-{}]Error in {} writeIndex={} matchIndex={}", peerId, getName(), writeIndex, matchIndex, t);
changeState(EntryDispatcherState.COMPARE);
DLedgerUtils.sleep(500);
}
}

该方法主要完成如下两件事。

  1. 检查当前节点的状态,确定当前节点状态是否可以发送 append、compare、truncate 请求。

  2. 根据当前转发器的状态向从节点发送 append、compare、truncate 请求。

checkAndFreshState()方法不只是简单地检测一下状态,而是会根据运行状态改变日志转发器的状态,从而驱动转发器是发送 append 请求还是发送compare请求,下面详细看一下该方法的实现细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcher#checkNotLeaderAndFreshState
private boolean checkNotLeaderAndFreshState() {
if (!memberState.isLeader()) {
// 如果当前节点的状态不是Leader则直接返回。
return true;
}
if (term != memberState.currTerm() || leaderId == null || !leaderId.equals(memberState.getLeaderId())) {
// 如果日志转发器(EntryDispatcher)的投票轮次为空或与状态机的投票轮次不相等,
// 将日志转发器的term、leaderId与状态机同步,即发送compare请求。这种情况通常
// 是由于集群触发了重新选举,当前节点刚被选举成 Leader节点。
synchronized (memberState) {
if (!memberState.isLeader()) {
return true;
}
PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
logger.info("[Push-{}->{}]Update term: {} and leaderId: {} to new term: {}, new leaderId: {}", selfId, peerId, term, leaderId, memberState.currTerm(), memberState.getLeaderId());
term = memberState.currTerm();
leaderId = memberState.getSelfId();
// 改变日志转发器的状态,该方法非常重要
changeState(EntryDispatcherState.COMPARE);
}
}
return false;
}

如果当前节点的状态不是Leader则直接返回;如果日志转发器(EntryDispatcher)的投票轮次为空或与状态机的投票轮次不相等,这种情况通常是由于集群触发了重新选举,当前节点刚被选举成 Leader节点,此时需要将日志转发器的term、leaderId与状态机同步,然后将同步模式改为Compare,目的是让新上任的Leader节点寻找自己与Follower节点的共识点在哪,说白了就是找到其他 Follower 节点多余未提交的的日志Index,为后续 truncate 请求做铺垫。

changeState改变日志转发器的状态,该方法非常重要,我们来看一下状态转换过程中需要处理的核心逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcher#changeState
private synchronized void changeState(EntryDispatcherState target) {
logger.info("[Push-{}]Change state from {} to {}, matchIndex: {}, writeIndex: {}", peerId, type.get(), target, matchIndex, writeIndex);
switch (target) {
case APPEND:
resetBatchAppendEntryRequest();
break;
case COMPARE:
if (this.type.compareAndSet(EntryDispatcherState.APPEND, EntryDispatcherState.COMPARE)) {
writeIndex = dLedgerStore.getLedgerEndIndex() + 1;
pendingMap.clear();
}
break;
default:
break;
}
type.set(target);
}

3.3.4 Leader节点发送Compare请求(doCompare)

日志转发器EntryDispatcher的初始状态为 COMPARE,当一个节点被选举为Leader后,日志转发器的状态同样会先设置为COMPARE,Leader节点先向从节点发送该请求的目的是比较主、从节点之间数据的差异,以此确保发送主从切换时不会丢失数据,并且重新确定待转发的日志序号。

通过EntryDispatcher的doWork()方法可知,如果节点状态为COMPARE,会调用 doCompare() 方法。doCompare()方法内部代码都是while(true)包裹,在查看其代码时注意其退出条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcher#doCompare 
/**
* 该方法用于Leader节点向从节点发送Compare请求,目的是为了找到与从节点的共识点,
* 也就是找到从节点未提交的日志Index,从而实现删除从节点未提交的数据。
*
* @throws Exception
*/
private void doCompare() throws Exception {
// 注意这里是while(true),所以需要注意循环退出条件
while (true) {
if (checkNotLeaderAndFreshState()) {
break;
}
// 判断请求类型是否为Compare,如果不是则退出循环
if (this.type.get() != EntryDispatcherState.COMPARE) {
break;
}
// ledgerEndIndex== -1 表示Leader中没有存储数据,是一个新的集群,所以无需比较主从是否一致
if (dLedgerStore.getLedgerEndIndex() == -1) {
break;
}

// compare process start from the [nextIndex -1]
PushEntryRequest request;
// compareIndex 代表正在比对的索引下标,对比前一条日志,term 和 index 是否一致
long compareIndex = writeIndex - 1;
long compareTerm = -1;
if (compareIndex < dLedgerStore.getLedgerBeforeBeginIndex()) {
// 需要比较的条目已被压缩删除,只需更改状态即可安装快照
changeState(EntryDispatcherState.INSTALL_SNAPSHOT);
return;
} else if (compareIndex == dLedgerStore.getLedgerBeforeBeginIndex()) {
compareTerm = dLedgerStore.getLedgerBeforeBeginTerm();
request = buildCompareOrTruncatePushRequest(compareTerm, compareIndex, PushEntryRequest.Type.COMPARE);
} else {
// 获取正在比对的日志信息
DLedgerEntry entry = dLedgerStore.get(compareIndex);
PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);
// 正在比对的日志所处的选举轮次
compareTerm = entry.getTerm();
request = buildCompareOrTruncatePushRequest(compareTerm, entry.getIndex(), PushEntryRequest.Type.COMPARE);
}
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);
PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS);
PreConditions.check(response != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);
PreConditions.check(response.getCode() == DLedgerResponseCode.INCONSISTENT_STATE.getCode() || response.getCode() == DLedgerResponseCode.SUCCESS.getCode()
, DLedgerResponseCode.valueOf(response.getCode()), "compareIndex=%d", compareIndex);

// fast backup algorithm to locate the match index
if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {
// 证明找到了与Follower节点的共识点
matchIndex = compareIndex;
// 此时更新这个Follower节点的水位线
updatePeerWaterMark(compareTerm, peerId, matchIndex);
// 将发送模式改成truncate,以将从节点的未提交的日志删除
changeState(EntryDispatcherState.TRUNCATE);
return;
}

// 证明在compareIndex日志上,Follower与当前Leader所处选举轮次并不一致,证明从节点这条日志是需要被删除,然后才会将主节点已提交的日志再次同步到follower上
if (response.getXTerm() != -1) {
// response.getXTerm() != -1 代表当前对比index 所处的任期和Leader节点不一致,
// 此时 response.getXIndex() 返回的是当前对比任期在从节点结束的位置,所以将指针移到从节点在当前轮次的结束处,再次进行对比。
writeIndex = response.getXIndex();
} else {
// response.getXTerm() == -1 代表从节点上的 leaderEndIndex 比当前对比的index小,
// 则把对比指针,移到从节点末尾的 leaderEndIndex上
writeIndex = response.getEndIndex() + 1;
}
}
}

在该方法中首先是对基本状态做了一些校验:

  1. 如果当前节点的状态不是 Leader 则退出循环;
  2. 判断请求类型是否为 Compare,如果不是则退出循环;
  3. ledgerEndIndex== -1 表示Leader中没有存储数据,是一个新的集群,所以无需比较主从是否一致;

然后构建当前正在比对的compareIndex所对应的日志的所处轮次信息,将compareIndex对应的存储轮次发送给Follower节点后,Follower节点会对比自己与Leader在相同的Index上的存储轮次信息是否相同:

  • 如果相同则证明此条日志与Leader节点保持一致,返回SUCCESS,此时则证明找到了共识点,将状态改成truncate模式以删除从节点多余日志;
  • 如果不同会有两种情况:
    • 主节点发送的index在从节点上还不存在,这样从节点会将自己的末尾指针返回给Leader,Leader会从Follower节点的末尾指针重新开始对比;
    • 主节点发送的index在从节点上存在,但是所处的轮次并不一致,证明从节点这条日志是需要被删除,Follower节点会找到Leader对比轮次所在的最后一个日志索引并返回给Leader,Leader会从这个索引位置继续开始对比,直到找对最终的共识点。

3.3.5 Leader节点发送truncate请求(doTruncate)

Leader节点在发送compare请求后,得知与从节点的数据存在差异,将向从节点发送truncate请求,指示从节点应该将truncateIndex 及以后的日志删除:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcher#doTruncate 
/**
* 发起truncate请求,用于删除Follower节点未提交的日志
* @throws Exception
*/
private void doTruncate() throws Exception {
// 检测当前状态是否为Truncate
PreConditions.check(type.get() == EntryDispatcherState.TRUNCATE, DLedgerResponseCode.UNKNOWN);
// 删除共识点以后得所有日志,truncateIndex代表删除的起始位置
long truncateIndex = matchIndex + 1;
logger.info("[Push-{}]Will push data to truncate truncateIndex={}", peerId, truncateIndex);
// 构建truncate请求
PushEntryRequest truncateRequest = buildCompareOrTruncatePushRequest(-1, truncateIndex, PushEntryRequest.Type.TRUNCATE);
// 发送请求,等待Follower响应
PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS);

PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);
PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);
// 更新 lastPushCommitTimeMs 时间
lastPushCommitTimeMs = System.currentTimeMillis();
// 将状态改为Append,Follower节点的多余日志删除完成后,就需要Leader节点同步数据给Follower了
changeState(EntryDispatcherState.APPEND);
}

该方法的实现比较简单,主节点构建truncate请求包并通过网络向从节点发送请求,从节点在收到请求后会清理多余的数据,使主从节点数据保持一致。日志转发器在处理完truncate请求后,状态将变更为APPEND,开始向从节点转发日志。

3.3.6 Leader节点向Follower节点推送日志(doAppend)

Leader节点在确认主从数据一致后,开始将新的消息转发到从节点。doAppend()方法内部的逻辑被包裹在while(true)中,故在查看其代码时应注意退出条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcher#doAppend
private void doAppend() throws Exception {
while (true) {
if (checkNotLeaderAndFreshState()) {
break;
}
// 第一步:校验当前状态是否是Append,如果不是则退出循环
if (type.get() != EntryDispatcherState.APPEND) {
break;
}
// 第二步:检查从节点未接收的第一个append请求是否超时,如果超时,则重推
doCheckAppendResponse();
// 第三步:writeIndex表示当前已追加到从节点的日志序号。通常情况下,主节点向从节点发送append请求时会带上主节点已提交的指针,
// 但如果append请求发送不频繁,pending请求超过其队列长度(默认为1万字节)时,会阻止数据的追加,
// 此时有可能会出现writeIndex大于leaderEndIndex的情况
if (writeIndex > dLedgerStore.getLedgerEndIndex()) {
if (this.batchAppendEntryRequest.getCount() > 0) {
sendBatchAppendEntryRequest();
} else {
doCommit();
}
break;
}
// check if now not entries in store can be sent
if (writeIndex <= dLedgerStore.getLedgerBeforeBeginIndex()) {
logger.info("[Push-{}]The ledgerBeginBeginIndex={} is less than or equal to writeIndex={}", peerId, dLedgerStore.getLedgerBeforeBeginIndex(), writeIndex);
changeState(EntryDispatcherState.INSTALL_SNAPSHOT);
break;
}
// 第四步:检测pendingMap(挂起的请求数量)是否发生泄露,正常来说发送给从节点的请求如果成功响应就会从pendingMap中移除,这里是一种兜底操作
// 获取当前节点的水位线(已成功append请求的日志序号),如果挂起请求的日志序号小于水位线,则丢弃,并记录最后一次检查的时间戳
if (pendingMap.size() >= maxPendingSize || DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000) {
long peerWaterMark = getPeerWaterMark(term, peerId);
for (Map.Entry<Long, Pair<Long, Integer>> entry : pendingMap.entrySet()) {
if (entry.getKey() + entry.getValue().getValue() - 1 <= peerWaterMark) {
// 被Follower节点成功接收的日志条目需要从pendingMap中移除
pendingMap.remove(entry.getKey());
}
}
lastCheckLeakTimeMs = System.currentTimeMillis();
}
if (pendingMap.size() >= maxPendingSize) {
// 第五步:如果挂起的请求数仍然大于阈值,则再次检查这些请求是否超时(默认超时时间为1s),如果超时则会触发重发机制
doCheckAppendResponse();
break;
}
// 第六步:循环同步数据至从节点,方法内部会优化,会按照配置收集一批需要发送的日志,等到到达发送阈值则一起发送,而不是一条条发送
long lastIndexToBeSend = doAppendInner(writeIndex);
if (lastIndexToBeSend == -1) {
break;
}
// 第七步:移动写指针
writeIndex = lastIndexToBeSend + 1;
}
}
  • 第一步:再次判断节点状态,确保当前节点是Leader节点并且日志转发器内部的状态为APPEND。

  • 第二步:检查从节点未接收的第一个append请求是否超时,如果超时,则重推。

  • 第三步:writeIndex表示当前已追加到从节点的日志序号。通常情况下,主节点向从节点发送append请求时会带上主节点已提交的指针,但如果append请求发送不频繁,pending请求超过其队列长度(默认为1万字节)时,会阻止数据的追加,此时有可能会出现writeIndex 大于leaderEndIndex的情况,需要单独发送commit请求,并检查 append 请求响应。

  • 第四步:检测pendingMap(挂起的请求数量)是否发生泄露,正常来说发送给从节点的请求如果成功响应就会从pendingMap中移除,这里是一种兜底操作。获取当前节点的水位线(已成功append请求的日志序号),如果挂起请求的日志序号小于水位线,则丢弃,并记录最后一次检查的时间戳。

  • 第五步:如果挂起的请求数仍然大于阈值,则再次检查这些请求是否超时(默认超时时间为1s),如果超时则会触发重发机制

  • 第六步:循环同步数据至从节点,方法内部会优化,会按照配置收集一批需要发送的日志,等到到达发送阈值则一起发送,而不是一条条发送。

  • 第七步:移动写指针。

3.3.6.1 日志推送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcher#doAppendInner
/**
* 将条目追加到follower,将其追加到内存中,直到达到阈值,才会真正发送给Follower
*
* @param index 从哪个索引追加
* @return 最后一个要追加的条目的索引
* @throws Exception
*/
private long doAppendInner(long index) throws Exception {
// 从磁盘中读取将要发送的日志信息
DLedgerEntry entry = getDLedgerEntryForAppend(index);
if (null == entry) {
// means should install snapshot
logger.error("[Push-{}]Get null entry from index={}", peerId, index);
changeState(EntryDispatcherState.INSTALL_SNAPSHOT);
return -1;
}
// 流控检查
checkQuotaAndWait(entry);
batchAppendEntryRequest.addEntry(entry);
// 检查此次操作是否真正触发发送动作
if (!dLedgerConfig.isEnableBatchAppend() || batchAppendEntryRequest.getTotalSize() >= dLedgerConfig.getMaxBatchAppendSize()
|| DLedgerUtils.elapsed(this.lastAppendEntryRequestSendTimeMs) >= dLedgerConfig.getMaxBatchAppendIntervalMs()) {
// 未开启批量发送 或者 批量发送数量超过阈值 或者 上一次发送时间超过1s
// 发送日志
sendBatchAppendEntryRequest();
}
// 返回最后一个要追加的索引
return entry.getIndex();
}
  • 第一步:获取磁盘中读取将要发送的日志信息。

  • 第二步:进行日志推送流控检查,如果触发流控,现成就会阻塞等待下一个时间窗口(滑动窗口限流)。

  • 第三步:发送日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcher#sendBatchAppendEntryRequest
/**
* 执行真正的日志发送操作,将Leader节点的日志发送到Follower节点
* @throws Exception
*/
private void sendBatchAppendEntryRequest() throws Exception {
// 设置committedIndex,这样Follower节点收到Append请求后能够顺道更新自己的committedIndex
batchAppendEntryRequest.setCommitIndex(memberState.getCommittedIndex());
// 此次批量发送的第一条日志的下标
final long firstIndex = batchAppendEntryRequest.getFirstEntryIndex();
// 此次批量发送的最后一条日志的下标
final long lastIndex = batchAppendEntryRequest.getLastEntryIndex();
// 当前发送日志所处的选举轮次
final long lastTerm = batchAppendEntryRequest.getLastEntryTerm();
// 此次批量发送的日志数量
final long entriesCount = batchAppendEntryRequest.getCount();
// 此次批量发送的日志大小
final long entriesSize = batchAppendEntryRequest.getTotalSize();
StopWatch watch = StopWatch.createStarted();
// 通过dLedgerRpcService发送日志
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(batchAppendEntryRequest);
// 将请求加入pendingMap,用于后续检查超时,一旦请求正常返回则删除这条记录
pendingMap.put(firstIndex, new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount()));
responseFuture.whenComplete((x, ex) -> {
try {
PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
switch (responseCode) {
case SUCCESS:
// Follower节点返回成功
// 监控指标上报
Attributes attributes = DLedgerMetricsManager.newAttributesBuilder().put(LABEL_REMOTE_ID, this.peerId).build();
DLedgerMetricsManager.replicateEntryLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributes);
DLedgerMetricsManager.replicateEntryBatchCount.record(entriesCount, attributes);
DLedgerMetricsManager.replicateEntryBatchBytes.record(entriesSize, attributes);
// 发送成功后,删除挂起请求记录。
pendingMap.remove(firstIndex);
if (lastIndex > matchIndex) {
// 更新 matchIndex
matchIndex = lastIndex;
// 更新当前Follower的水位线
updatePeerWaterMark(lastTerm, peerId, matchIndex);
}
break;
case INCONSISTENT_STATE:
logger.info("[Push-{}]Get INCONSISTENT_STATE when append entries from {} to {} when term is {}", peerId, firstIndex, lastIndex, term);
// 从节点返回INCONSISTENT_STATE,说明Follower节点的日志和Leader节点的不一致,需要重新比较
changeState(EntryDispatcherState.COMPARE);
break;
default:
logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
break;
}
} catch (Throwable t) {
logger.error("Failed to deal with the callback when append request return", t);
}
});
// 更新 上一次发送commit请求的时间戳。
lastPushCommitTimeMs = System.currentTimeMillis();
// 清空请求缓存
batchAppendEntryRequest.clear();
}
3.3.6.2 日志推送的流控机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcher#checkQuotaAndWait
/**
* 在checkQuotaAndWait方法中,如果当前待发送的日志条目数量超过了最大允许的待发送数量(maxPendingSize),
* 则会检查流控。如果触发了流控(validateNow返回true),则会记录警告信息,并根据leftNow方法返回的剩余时间进行等待。
* @param entry
*/
private void checkQuotaAndWait(DLedgerEntry entry) {
// 如果剩余发送的日志条目数量小于最大允许的待发送日志数量,则跳过流控检查
if (dLedgerStore.getLedgerEndIndex() - entry.getIndex() <= maxPendingSize) {
return;
}
// 记录当前时间窗口的日志条目数量。如果当前时间窗口是新的(即timeVec中的记录不是当前秒),则重置该窗口的计数;如果是同一时间窗口,则累加日志条目数量
quota.sample(entry.getSize());
// 检查当前时间窗口是否已达到最大限制。如果是,则返回true,表示触发了流控。
if (quota.validateNow()) {
// 计算当前时间窗口剩余的时间(毫秒),如果已达到最大限制,则可能需要等待直到下一个时间窗口开始。
long leftNow = quota.leftNow();
logger.warn("[Push-{}]Quota exhaust, will sleep {}ms", peerId, leftNow);
DLedgerUtils.sleep(leftNow);
}
}

在前面介绍日志推送流程中,会在通过网络发送之前,调用 checkQuotaAndWait 进行一定的流控操作,流控是使用的滑动窗口实现的,在该方法中首先会记录当前时间窗口的日志条目数量。如果当前时间窗口是新的(即timeVec中的记录不是当前秒),则重置该窗口的计数;如果是同一时间窗口,则累加日志条目数量。然后计算当前时间窗口剩余的时间(毫秒),如果已达到最大限制,则可能需要等待直到下一个时间窗口开始。

其中 Quota 中核心属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Quota {

/**
* 在指定时间窗口内允许的最大日志条目数量。
*/
private final int max;

/**
* 一个数组,用于存储每个时间窗口的日志条目数量。
*/
private final int[] samples;
/**
* 一个数组,记录每个时间窗口的开始时间(秒)。
*/
private final long[] timeVec;
/**
* 窗口数量
*/
private final int window;
}
3.3.6.3 日志推送的重试机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Leader节点在向从节点转发日志后,会存储该日志的推送时间戳到pendingMap,
* 当pendingMap的积压超过1000ms时会触发重推机制,该逻辑封装在当前方法中
* @throws Exception
*/
private void doCheckAppendResponse() throws Exception {
// 获取从节点已复制的日志序号
long peerWaterMark = getPeerWaterMark(term, peerId);
// 尝试获取从节点已复制序号+1的记录,如果能找到,说明从服务下一条需要追加的消息已经存储在主节点中,
// 接着在尝试推送,如果该条推送已经超时,默认超时时间为1s,调用doAppendInner重新推送
Pair<Long, Integer> pair = pendingMap.get(peerWaterMark + 1);
if (pair == null)
return;
long sendTimeMs = pair.getKey();
if (DLedgerUtils.elapsed(sendTimeMs) > dLedgerConfig.getMaxPushTimeOutMs()) {
// 发送如果超时,则重置writeIndex指针,重发消息
batchAppendEntryRequest.clear();
writeIndex = peerWaterMark + 1;
logger.warn("[Push-{}]Reset write index to {} for resending the entries which are timeout", peerId, peerWaterMark + 1);
}
}

如果因网络等原因,主节点在向从节点追加日志时失败,该如何保证从节点与主节点一致呢?从上文我们可以得知,Leader节点在向 从节点转发日志后,会存储该日志的推送时间戳到pendingMap,当pendingMap的积压超过1000ms时会触发重推机制,将writeIndex指针重置为超时的Index上。

3.3.7 日志转发整体流程

3.4 日志复制(Follower节点接收日志并存储)

Leader节点实时向从节点转发消息,从节点接收到日志后进行存储,然后向Leader节点反馈复制进度,从节点的日志接收主要由EntryHandler实现。

3.4.1 EntryHandler核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
   //io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryHandler

/**
* 从节点收到Leader节点推送的日志并存储,然后向Leader节点汇报日志复制结果。
* This thread will be activated by the follower.
* Accept the push request and order it by the index, then append to ledger store one by one.
*/
private class EntryHandler extends ShutdownAbleThread {

/**
* 上一次检查主服务器是否有推送消息的时间戳。
*/
private long lastCheckFastForwardTimeMs = System.currentTimeMillis();

/**
* append请求处理队列。
*/
ConcurrentMap<Long/*index*/, Pair<PushEntryRequest/*request*/, CompletableFuture<PushEntryResponse/*complete future*/>>> writeRequestMap = new ConcurrentHashMap<>();
/**
* COMMIT、COMPARE、TRUNCATE相关请求的处理队列。
*/
BlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>>
compareOrTruncateRequests = new ArrayBlockingQueue<>(1024);
}

3.4.2 Follower日志复制入口

从节点收到Leader节点的推送请求后(无论是APPEND、COMMIT、COMPARE、TRUNCATE),由EntryHandler的handlePush()方法执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryHandler#handlePush
/**
* 处理Leader节点发送到当前Follower节点的请求
* @param request
* @return
* @throws Exception
*/
public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
// The timeout should smaller than the remoting layer's request timeout
CompletableFuture<PushEntryResponse> future = new TimeoutFuture<>(1000);
switch (request.getType()) {
case APPEND:
PreConditions.check(request.getCount() > 0, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
long index = request.getFirstEntryIndex();
// 将请求放入队列中,由doWork方法异步处理
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future));
if (old != null) {
// 表示重复推送
logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo());
future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));
}
break;
case COMMIT:
synchronized (this) {
// 将commit放入请求队列,由doWork方法异步处理
if (!compareOrTruncateRequests.offer(new Pair<>(request, future))) {
logger.warn("compareOrTruncateRequests blockingQueue is full when put commit request");
future.complete(buildResponse(request, DLedgerResponseCode.PUSH_REQUEST_IS_FULL.getCode()));
}
}
break;
case COMPARE:
case TRUNCATE:
// 如果是compare或truncate请求,则清除append队列中所有的请求
writeRequestMap.clear();
synchronized (this) {
// 并将 compare或truncate 请求放入队列中,由doWork方法异步处理
if (!compareOrTruncateRequests.offer(new Pair<>(request, future))) {
logger.warn("compareOrTruncateRequests blockingQueue is full when put compare or truncate request");
future.complete(buildResponse(request, DLedgerResponseCode.PUSH_REQUEST_IS_FULL.getCode()));
}
}
break;
default:
logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo());
future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
break;
}
wakeup();
return future;
}

handlePush()方法的主要职责是将处理请求放入队列,由doWork()方法从处理队列中拉取任务进行处理。

  1. 如果是append请求,将请求放入writeRequestMap集合,如果已存在该条日志的推送请求,表示Leader重复推送,则返回状态码REPEATED_PUSH。
  2. 如果是commit请求,将请求存入compareOrTruncateRequests 请求处理队列。
  3. 如果是compare或truncate请求,将待追加队列writeRequestMap清空,并将请求放入compareOrTruncateRequests请求队列,由doWork()方法进行异步处理。

3.4.3 EntryHandler任务分发机制

EntryHandler的handlePush()方法主要是接收请求并将其放入队列的处理队列,而doWork()方法是从指定队列中获取待执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryHandler#doWork
@Override
public void doWork() {
try {
// 第一步:校验是否是Follower节点
if (!memberState.isFollower()) {
clearCompareOrTruncateRequestsIfNeed();
waitForRunning(1);
return;
}
// deal with install snapshot request first
Pair<InstallSnapshotRequest, CompletableFuture<InstallSnapshotResponse>> installSnapshotPair = null;
this.inflightInstallSnapshotRequestLock.lock();
try {
if (inflightInstallSnapshotRequest != null && inflightInstallSnapshotRequest.getKey() != null && inflightInstallSnapshotRequest.getValue() != null) {
installSnapshotPair = inflightInstallSnapshotRequest;
inflightInstallSnapshotRequest = new Pair<>(null, null);
}
} finally {
this.inflightInstallSnapshotRequestLock.unlock();
}
if (installSnapshotPair != null) {
handleDoInstallSnapshot(installSnapshotPair.getKey(), installSnapshotPair.getValue());
}
// 第二步:处理 TRUNCATE、COMPARE、COMMIT 请求
if (compareOrTruncateRequests.peek() != null) {
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll();
PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN);
switch (pair.getKey().getType()) {
case TRUNCATE:
handleDoTruncate(pair.getKey().getPreLogIndex(), pair.getKey(), pair.getValue());
break;
case COMPARE:
handleDoCompare(pair.getKey(), pair.getValue());
break;
case COMMIT:
handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue());
break;
default:
break;
}
return;
}
long nextIndex = dLedgerStore.getLedgerEndIndex() + 1;
// 从请求队列中获取这一个日志索引所对应的请求
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex);
if (pair == null) {
// 检查追加请求是否丢失
checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());
// 如果下一个日志索引不在队列中,则证明主节点还没有把这条日志推送过来,此时我们等待
waitForRunning(1);
// 如果这一个索引在队列中不存在,则退出。等待下一次检查
return;
}
PushEntryRequest request = pair.getKey();
// 执行日志追加
handleDoAppend(nextIndex, request, pair.getValue());
} catch (Throwable t) {
DLedgerEntryPusher.LOGGER.error("Error in {}", getName(), t);
DLedgerUtils.sleep(100);
}
}
}
  • 第一步:如果当前节点的状态不是从节点,则跳出
  • 第二步:如果compareOrTruncateRequests队列不为空,优先处理COMMIT、COMPARE、TRUNCATE等请求。值得注意的是,这里使用的是peek、poll等非阻塞方法,所以队列为空不会阻塞线程使得append请求能够正常处理。
  • 第三步:处理日志追加append请求,根据当前节点 已存储的最大日志序号计算下一条待写日志的日志序号,从待写队列 中获取日志的处理请求。如果能查找到对应日志的追加请求,则执行doAppend()方法追加日志;如果从待写队列中没有找到对应的追加请 求,则调用checkAbnormalFuture检查追加请求是否丢失。

3.4.4 compare请求响应

从上文得知,Leader节点首先会向从节点发送compare请求,以此比较两者的数据是否存在差异,这一步由EntryHandler的handleDoCompare()方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryHandler#handleDoCompare
/**
* Follower端处理Leader端发起的Compare请求
*
* @param request
* @param future
* @return
*/
private CompletableFuture<PushEntryResponse> handleDoCompare(PushEntryRequest request,
CompletableFuture<PushEntryResponse> future) {
try {
PreConditions.check(request.getType() == PushEntryRequest.Type.COMPARE, DLedgerResponseCode.UNKNOWN);
// Leader端发来需要对比的日志索引值
long preLogIndex = request.getPreLogIndex();
// Leader端Index日志所处的任期
long preLogTerm = request.getPreLogTerm();
if (preLogTerm == -1 && preLogIndex == -1) {
// leader节点日志为空,则直接返回
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
return future;
}
if (dLedgerStore.getLedgerEndIndex() >= preLogIndex) {
long compareTerm = 0;
// 找到指定Index在当前节点的日志中的任期
if (dLedgerStore.getLedgerBeforeBeginIndex() == preLogIndex) {
// 如果查找的Index刚好是当前节点存储的第一条日志,则不用读取磁盘获取日志任期
compareTerm = dLedgerStore.getLedgerBeforeBeginTerm();
} else {
// 从磁盘中读取日志内容,然后获取到日志任期
DLedgerEntry local = dLedgerStore.get(preLogIndex);
compareTerm = local.getTerm();
}
if (compareTerm == preLogTerm) {
// 如果任期相同,则认为Follower节点的日志和Leader节点是相同的,也就证明找到了共识点
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
return future;
}
// 如果任期不相同,则从preLogIndex开始,向前追溯compareTerm任期的第一个日志
DLedgerEntry firstEntryWithTargetTerm = dLedgerStore.getFirstLogOfTargetTerm(compareTerm, preLogIndex);
PreConditions.check(firstEntryWithTargetTerm != null, DLedgerResponseCode.INCONSISTENT_STATE);
PushEntryResponse response = buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode());
// 设置Leader节点对比的Index在当前节点所处的任期
response.setXTerm(compareTerm);
// 设置Leader节点对比任期,在当前节点最大的index值
response.setXIndex(firstEntryWithTargetTerm.getIndex());
future.complete(response);
return future;
}
// dLedgerStore.getLedgerEndIndex() < preLogIndex,代表Leader想要对比的日志在当前节点不存咋,则返回当前节点的endIndex
PushEntryResponse response = buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode());
response.setEndIndex(dLedgerStore.getLedgerEndIndex());
future.complete(response);
} catch (Throwable t) {
logger.error("[HandleDoCompare] preLogIndex={}, preLogTerm={}", request.getPreLogIndex(), request.getPreLogTerm(), t);
future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
}
return future;
}

该方法最终目的是为了找到Leader节点和当前Follower节点的共识点,在该方法中会对比Leader端发来的Index对应选举任期和当前Follower节点这个Index对应的选举任期是否相同,会有如下情况:

  1. Leader想要对比的Index在Follower节点不存在:则Follower节点返回当前节点 ledgerEndIndex 给Leader,意思是让Leader节点从我自己的最末尾Index进行对比。
  2. Leader想要对比Index在Follower节点中存在:
    • Index对应的任期相同:意味着找到了共识点,返回SUCCESS。这样Leader节点就会从这个共识点删除从节点多余的日志,然后重新追加日志。
    • Index对应的任期不同:从preLogIndex开始,向前追溯从节点Index所处任期的第一个日志。这样Leader节点就会从这个点重新开始对比,这也可以看到日志对比并不是一个日志一个日志依次对比,这样做效率会很低,当遇到任期不一致的情况时,Follower节点就会跳过当前任期,对比前一个任期日志是否一致。

3.4.5 truncate请求响应

Leader节点与从节点进行数据对比后,如果发现数据有差异,将计算出需要截断的日志序号,发送truncate请求给从节点,从节点对多余的日志进行截断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryHandler#handleDoTruncate
/**
* 该方法时Follower节点收到Leader节点的Truncate请求所执行的方法
* @param truncateIndex
* @param request
* @param future
* @return
*/
private CompletableFuture<PushEntryResponse> handleDoTruncate(long truncateIndex, PushEntryRequest request,
CompletableFuture<PushEntryResponse> future) {
try {
logger.info("[HandleDoTruncate] truncateIndex={}", truncateIndex);
PreConditions.check(request.getType() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
// 删除truncateIndex之后的日志
long index = dLedgerStore.truncate(truncateIndex);
PreConditions.check(index == truncateIndex - 1, DLedgerResponseCode.INCONSISTENT_STATE);
// 删除成功,则返回成功
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
// 更新本地的已提交索引,如果Leader已提交索引大于本地的最大索引,则证明本地的所有日志都处于已提交状态,反之则更新已提交索引为Leader的已提交索引
long committedIndex = request.getCommitIndex() <= dLedgerStore.getLedgerEndIndex() ? request.getCommitIndex() : dLedgerStore.getLedgerEndIndex();
// 更新状态机中的已提交索引
if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(committedIndex)) {
// todo 该方法待定
DLedgerEntryPusher.this.fsmCaller.onCommitted(committedIndex);
}
} catch (Throwable t) {
logger.error("[HandleDoTruncate] truncateIndex={}", truncateIndex, t);
future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
}
return future;
}

Follower节点在收到Leader节点发来的truncate请求后,会将truncateIndex及以后得所有日志全部删除,并更新本地已提交日志的索引指针。而日志的删除操作是由 DLedgerMmapFileStore#truncate 实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore#truncate(long)
@Override
public long truncate(long truncateIndex) {
// 如果需要删除的index在ledgerEndIndex之后,直接返回ledgerEndIndex,不用继续执行删除流程
if (truncateIndex > this.ledgerEndIndex) {
return this.ledgerEndIndex;
}
// 获取truncateIndex所对应的日志
DLedgerEntry firstTruncateEntry = this.get(truncateIndex);
// 获取物理偏移量
long truncateStartPos = firstTruncateEntry.getPos();
synchronized (this.memberState) {
// 加锁后再次比较,如果需要删除的index在ledgerEndIndex之后,直接返回ledgerEndIndex,不用继续执行删除流程
if (truncateIndex > this.ledgerEndIndex) {
return this.ledgerEndIndex;
}
//从物理文件中删除指定物理偏移量之后的数据
dataFileList.truncateOffset(truncateStartPos);
if (dataFileList.getMaxWrotePosition() != truncateStartPos) {
LOGGER.warn("[TRUNCATE] truncate for data file error, try to truncate pos: {}, but after truncate, max wrote pos: {}, now try to rebuild", truncateStartPos, dataFileList.getMaxWrotePosition());
PreConditions.check(dataFileList.rebuildWithPos(truncateStartPos), DLedgerResponseCode.DISK_ERROR, "rebuild data file truncatePos=%d", truncateStartPos);
}
// 重置数据文件的写指针
reviseDataFileListFlushedWhere(truncateStartPos);

// 删除索引文件对应的数据
long truncateIndexFilePos = truncateIndex * INDEX_UNIT_SIZE;
indexFileList.truncateOffset(truncateIndexFilePos);
if (indexFileList.getMaxWrotePosition() != truncateIndexFilePos) {
LOGGER.warn("[TRUNCATE] truncate for index file error, try to truncate pos: {}, but after truncate, max wrote pos: {}, now try to rebuild", truncateIndexFilePos, indexFileList.getMaxWrotePosition());
PreConditions.check(dataFileList.rebuildWithPos(truncateStartPos), DLedgerResponseCode.DISK_ERROR, "rebuild index file truncatePos=%d", truncateIndexFilePos);
}
// 重置索引文件的写指针
reviseIndexFileListFlushedWhere(truncateIndexFilePos);

// update store end index and its term
if (truncateIndex == 0) {
// truncateIndex == 0 代表清除所有数据
ledgerEndTerm = -1;
ledgerEndIndex = -1;
} else {
// 删除后更新 ledgerEndTerm、ledgerEndIndex
SelectMmapBufferResult endIndexBuf = indexFileList.getData((truncateIndex - 1) * INDEX_UNIT_SIZE, INDEX_UNIT_SIZE);
ByteBuffer buffer = endIndexBuf.getByteBuffer();
DLedgerIndexEntry indexEntry = DLedgerEntryCoder.decodeIndex(buffer);
ledgerEndTerm = indexEntry.getTerm();
ledgerEndIndex = indexEntry.getIndex();
}
}
LOGGER.info("[TRUNCATE] truncateIndex: {}, after truncate, ledgerEndIndex: {} ledgerEndTerm: {}", truncateIndex, ledgerEndIndex, ledgerEndTerm);
return ledgerEndIndex;
}

3.4.6 append请求响应

Leader节点与从节点进行差异对比,截断从节点多余的数据文件后,会实时转发日志到从节点,具体由EntryHandler的handleDoAppend()方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryHandler#handleDoAppend
private void handleDoAppend(long writeIndex, PushEntryRequest request,
CompletableFuture<PushEntryResponse> future) {
try {
PreConditions.check(writeIndex == request.getFirstEntryIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
for (DLedgerEntry entry : request.getEntries()) {
// 将日志信息存储的Follower节点上
dLedgerStore.appendAsFollower(entry, request.getTerm(), request.getLeaderId());
}
// 返回成功响应。
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
// 计算已提交指针
long committedIndex = Math.min(dLedgerStore.getLedgerEndIndex(), request.getCommitIndex());
if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(committedIndex)) {
// 更新已提交指针
DLedgerEntryPusher.this.fsmCaller.onCommitted(committedIndex);
}
} catch (Throwable t) {
logger.error("[HandleDoAppend] writeIndex={}", writeIndex, t);
future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
}
}

将从Leader节点的日志追加到从节点,具体调用DLedgerStore的appendAsFollower()方法实现,其实现细节与服务端追加日志的流程基本类似,只是少了日志转发这个流程。然后使用Leader节点的已提交指针更新从节点的已提交指针,即append请求会附带有commit请求的效果。

3.4.7 从节点日志复制异常检测机制

收到Leader节点的append请求后,从节点首先会将这些写入请求 存储在writeRequestMap处理队列中,从节点并不是直接从该队列中获取一个待写入处理请求进行数据追加,而是查找当前节点已存储的最大日志序号leaderEndIndex,然后加1得出下一条待追加的日志序号nextIndex。如果该日志序号在writeRequestMap中不存在日志推送请求,则有可能是因为发生了推送请求丢失,在这种情况下,需要进行异常检测,以便尽快恢复异常,使主节点与从节点最终保持一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryHandler#checkAbnormalFuture
/**
* 检查append请求是否丢失leader向follower推送的日志,并记录推送的索引。
* 但在以下情况下,推送可能会停止:
* 1. 如果追随者异常关闭,其日志结束索引可能会比之前更小。这时,领导者可能会推送快进条目,并一直重试。
* 2. 如果最后一个确认应答丢失,并且没有新消息传入,领导者可能会重试推送最后一条消息,但追随者会忽略它。
*
* @param endIndex
*/
private void checkAbnormalFuture(long endIndex) {
if (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) < 1000) {
// 上次检查距离现在不足1s,则跳过检查
return;
}
lastCheckFastForwardTimeMs = System.currentTimeMillis();
if (writeRequestMap.isEmpty()) {
// 当前没有积压的append的请求,可以证明主节点没有推送新的日志,所以不用检查
return;
}
// 执行检查
checkAppendFuture(endIndex);
}


/**
*
* @param endIndex 从节点当前存储的最大日志序号
*/
private void checkAppendFuture(long endIndex) {
long minFastForwardIndex = Long.MAX_VALUE;
for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) {
// 批量发送的第一条日志的index
long firstEntryIndex = pair.getKey().getFirstEntryIndex();
// 批量发送最后一条日志的index
long lastEntryIndex = pair.getKey().getLastEntryIndex();
// 清除旧的推送请求
if (lastEntryIndex <= endIndex) {
try {
for (DLedgerEntry dLedgerEntry : pair.getKey().getEntries()) {
// 如果接收的日志和当前存储的日志所属选举轮次并不一致,则响应INCONSISTENT_STATE错误码
PreConditions.check(dLedgerEntry.equals(dLedgerStore.get(dLedgerEntry.getIndex())), DLedgerResponseCode.INCONSISTENT_STATE);
}
// 否则,响应成功
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
logger.warn("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", lastEntryIndex, endIndex);
} catch (Throwable t) {
logger.error("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", lastEntryIndex, endIndex, t);
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
}
// 清除旧的的请求
writeRequestMap.remove(pair.getKey().getFirstEntryIndex());
continue;
}
// 如果待追加的日志序号等于endIndex+1,即从节点当前存储的最大日志序号加1,表示从节点下一条期望追加的日志Leader节点已经推送过来了
if (firstEntryIndex == endIndex + 1) {
return;
}
// 清除超时的推送请求
TimeoutFuture<PushEntryResponse> future = (TimeoutFuture<PushEntryResponse>) pair.getValue();
if (!future.isTimeOut()) {
continue;
}
// 记录最小的推送的索引
if (firstEntryIndex < minFastForwardIndex) {
minFastForwardIndex = firstEntryIndex;
}
}
// 主要处理待追加日志的序号大于endIndex+1的情况,可以认为有追加积压
if (minFastForwardIndex == Long.MAX_VALUE) {
return;
}
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(minFastForwardIndex);
if (pair == null) {
return;
}
// 此时,返回错误码,让Leader转变为Compare模式,重新寻找共识点
logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex);
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
}

3.4.8 日志复制整体流程

3.5 日志复制仲裁

Raft协议判断一条日志写入成功的标准是集群中超过半数的节点存储了该日志,Leader节点首先存储数据,然后异步向它所有的从节点推送日志。不需要所有的从节点都返回日志追加成功才认为是成功写入,故Leader节点需要对返回结果进行仲裁,这部分功能主要由 QuorumAckChecker 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// io.openmessaging.storage.dledger.DLedgerEntryPusher.QuorumAckChecker#doWork
@Override
public void doWork() {
try {
if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000) {
logger.info("[{}][{}] term={} ledgerBeforeBegin={} ledgerEnd={} committed={} watermarks={} appliedIndex={}",
memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeforeBeginIndex(), dLedgerStore.getLedgerEndIndex(), memberState.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm), memberState.getAppliedIndex());
lastPrintWatermarkTimeMs = System.currentTimeMillis();
}
// 当前节点所处轮次
long currTerm = memberState.currTerm();
checkTermForPendingMap(currTerm, "QuorumAckChecker");
checkTermForWaterMark(currTerm, "QuorumAckChecker");
// clear pending closure in old term
if (pendingClosure.size() > 1) {
for (Long term : pendingClosure.keySet()) {
if (term == currTerm) {
continue;
}
for (Map.Entry<Long, Closure> futureEntry : pendingClosure.get(term).entrySet()) {
logger.info("[TermChange] Will clear the pending closure index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm);
// 如果是之前的轮次,则调用请求完成回调,调用后,被hold住的请求会被释放
futureEntry.getValue().done(Status.error(DLedgerResponseCode.EXPIRED_TERM));
}
pendingClosure.remove(term);
}
}
// clear peer watermarks in old term
if (peerWaterMarksByTerm.size() > 1) {
for (Long term : peerWaterMarksByTerm.keySet()) {
if (term == currTerm) {
continue;
}
logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm);
// 清除老的选举周期中从节点的水位线
peerWaterMarksByTerm.remove(term);
}
}

// clear the pending closure which index <= applyIndex
if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000) {
checkResponseFuturesElapsed(DLedgerEntryPusher.this.memberState.getAppliedIndex());
lastCheckLeakTimeMs = System.currentTimeMillis();
}
if (DLedgerUtils.elapsed(lastCheckTimeoutTimeMs) > 1000) {
// clear the timeout pending closure should check all since it can timeout for different index
checkResponseFuturesTimeout(DLedgerEntryPusher.this.memberState.getAppliedIndex() + 1);
lastCheckTimeoutTimeMs = System.currentTimeMillis();
}
if (!memberState.isLeader()) {
// 如果不是Leader节点,则返回,不再继续执行生效的逻辑了。
waitForRunning(1);
return;
}

// 从这里开始的逻辑都是Leader节点中才会执行的

// 更新当前节点的水位线
updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());

// 计算所有节点水位线的中位数,那么理论上比这个中位数小的index来说都已经存储在集群中大多数节点上了。
Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);
List<Long> sortedWaterMarks = peerWaterMarks.values()
.stream()
.sorted(Comparator.reverseOrder())
.collect(Collectors.toList());
long quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2);

// advance the commit index
// we can only commit the index whose term is equals to current term (refer to raft paper 5.4.2)
if (DLedgerEntryPusher.this.memberState.leaderUpdateCommittedIndex(currTerm, quorumIndex)) {
// 更新已提交的索引,此时只更新了Leader的 CommittedIndex指针,从节点的CommittedIndex会在后面‘
// DLedgerEntryPusher.EntryDispatcher 发送Append请求和Commit请求中得到更新
DLedgerEntryPusher.this.fsmCaller.onCommitted(quorumIndex);
} else {
// If the commit index is not advanced, we should wait for the next round
waitForRunning(1);
}
} catch (Throwable t) {
DLedgerEntryPusher.LOGGER.error("Error in {}", getName(), t);
DLedgerUtils.sleep(100);
}
}

四. 总结

本文详细阐述了 RocketMQ DLedger 中的日志复制流程。

日志复制的设计理念包括:为每条日志编号以便管理与辨别;引入已提交指针来避免数据不一致问题,只有超过半数节点追加完成才向客户端返回写入成功;通过向从节点源源不断转发日志,并处理从节点落后时的数据同步与删除,保证日志一致性。

日志复制类的设计体系中,DledgerEntryPusher 是核心类,构建了 EntryDispatcherQuorumAckCheckerEntryHandler 三个对象,分别对应不同线程,处理不同任务。

日志复制流程如下:

  1. Leader 节点将日志推送到从节点:
    • 日志转发由 EntryDispatcher 实现,其具有多种推送请求类型,初始状态为 COMPARE
    • doCompare 方法用于 Leader 向从节点发送 COMPARE 请求以比较数据差异,找到共识点后状态变更为 TRUNCATE
    • doTruncate 方法处理发送 TRUNCATE 请求,指示从节点删除多余日志,处理完成后状态变更为 APPEND
    • doAppend 方法在确认主从数据一致后向从节点推送日志,内部有流控和重试机制。
  2. 从节点收到 Leader 节点推送的日志并存储,然后向 Leader 节点汇报日志复制结果:
    • 从节点的日志接收由 EntryHandler 实现,其通过 handlePush 方法接收请求并放入相应队列,doWork 方法进行任务分发处理。
    • handleDoCompare 方法响应 Leader 的 COMPARE 请求,寻找共识点。
    • handleDoTruncate 方法响应 TRUNCATE 请求,删除多余日志。
    • handleDoAppend 方法响应 APPEND 请求,追加日志。
    • 还具有异常检测机制,检查从节点日志复制是否异常。
  3. Leader 节点对日志复制进行仲裁:主要由 QuorumAckChecker 实现,计算所有节点水位线的中位数,根据结果更新已提交索引。