Skip to content

Commit

Permalink
add blog about tikv raft message, raft client
Browse files Browse the repository at this point in the history
  • Loading branch information
Xie Yu committed Jul 21, 2021
1 parent 7778e21 commit 03b3878
Show file tree
Hide file tree
Showing 63 changed files with 8,577 additions and 1,629 deletions.
19 changes: 11 additions & 8 deletions src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,19 @@
- [draft](./tikv/draft/index.md)
- [PeerFsm](./tikv/PeerFsm.md)
- [MvccReader](./tikv/mvcc_reader.md)
- [raft-rs](./tikv/raft.md)
- [BatchSystem](./tikv/batch-system.md)
- [BatchSystem](./tikv/batch-system.md)
- [Poller](./tikv/poller.md)
- [Region](./tikv/region.md)
- [Peer Storage](./tikv/peer_storage.md)
- [Raft Client](./tikv/raft_client.md)
- [Conf Change](./tikv/conf-change.md)
- [Split Region](./tikv/split-region.md)
- [Merge Region](./tikv/merge-region.md)
- [RaftLogEngine](./tikv/raft_log_engine.md)
- [raft-rs](./tikv/raft.md)
- [RaftKV](./tikv/raft-kv.md)
- [BatchSystem](./tikv/batch-system2.md)
- [RaftMessage](./tikv/raft_message.md)
- [Raft Client](./tikv/raft_client.md)
- [PeerStorage](./tikv/peer_storage.md)
- [Region](./tikv/region.md)
- [Conf Change](./tikv/conf-change.md)
- [Split Region](./tikv/split-region.md)
- [Merge Region](./tikv/merge-region.md)
- [async snapshot](./tikv/async_snapshot.md)
- [async write](./tikv/async_write.md)
- [Storage](./tikv/storage.md)
Expand Down
80 changes: 80 additions & 0 deletions src/tikv/batch-system2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# BatchSystem

<!-- toc -->

## BatchSystem init

![](./dot/batch_system_create.svg)

## Router normals 初始化

RaftPollerBuilder::init 扫描kv engine的`CF_RAFT` faimly, 加载所有的Region.
对于每个Region,调用PeerFsm::create 创建一个PeerFsm以及用来和它通信的
`loose_bounded`,tx部分则会放入BasicMailbox,然后放到RaftRouter的normals map中。

代码调用流程如下图:


![](./dot/BatchSystem_Router_normals.svg)

## 消息发送处理流程

TODO: 怎么根据key找到对应的regionID ?这个流程需要明确下.

给某个`region_id`的PeerFsm发送PeerMsg流程如下:

1. 通过RaftRouter找到`region_id`对应的mailbox,并通过mailbox发送到PeerFsm 的msg channel
2. 如果Mailbox中的FsmState是Idle, 则需要用`RaftRouter::normalScheduler` 将PeerFsm发送到NormalChannel

消息处理流程如下:
1. poller线程池poll时,从调用`fetch_fsm` 从Normal Channel读取一批PeerFsm
2. poller调用RaftPoller.begin 开始处理这批PeerFsm的消息。
3. poller从这批PeerFsm 每个rx中unblock方式读取PeerFsm要处理的PeerMsg,由`RaftPoller::handle_normals`处理消息。
将修改写入write batch.
4. poller在一批消息处理完毕后,调用RaftPoller.end, 将write batch等写入磁盘中

![](./dot/batch_system2_flow.svg)


## `Router::try_send` 发送消息给Fsm

poller线程工作主要流程是从channel中去fetch 一批fsm,然后再从每个fsm的rx中取消息,处理消息。
为了保证发消息给fsm后,fsm能被poller fetch到,需要将fsm
发送到poller的channel中(使用FsmScheduler来发送)。

为了避免重复的将fsm发送到channel中,TiKV中封装了一个BasicMailbox,
在发给fsm消息的tx上,加了一个FsmState, 用来标记Fsm.

Notified表示已经发送到poller的channel,Idle则表示还没有,在BasicMailbox在发消息时,
如果FsmState为Idle, 则还需要使用FsmScheduler将fsm发送到poller的channel。

```rust
pub struct BasicMailbox<Owner: Fsm> {
sender: mpsc::LooseBoundedSender<Owner::Message>,
state: Arc<FsmState<Owner>>,
}
```


![](./dot/router-try-send.svg)

## Poller


![](./dot/poller_fetch_fsm.svg)

PollHandler的实现有RaftPoller和ApplyPoller, RaftPoller负责处理RaftCmd和RaftMessage, raft log的保存,
以及驱动raft的状态机。 raft 日志被committed后,交给ApplyPoller来处理。

ApplyPoller将key,value的修改写入KvEngine, 会发送ApplyRes给RaftPoller,告知Apply 结果.

![](./dot/raft_apply_poller.svg)

### RaftPoller

![](./dot/raft_poller.svg)


### ApplyPoller

![](./dot/apply_poller.svg)
72 changes: 72 additions & 0 deletions src/tikv/dot/BatchSystem_Router_normals.dot
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include "styles.h"
digraph BatchSystem_router_normals{
node[shape=box;style="rounded";color="#1c2123";fontcolor="#2f3638"];
edge[color=gray40];
newrank=true;
rankdir=LR;

RaftBatchSystem_spawn -> {
RaftPollerBuilder_init;
RaftBatchSystem_start_system;
}
Vec_SenderFsmPair;
RaftBatchSystem_start_system -> {
RaftRouter_register_all;
BasicMailbox_new;
};
BasicMailbox_new -> RaftRouter_register_all[style_edge_data];
RaftRouter_register_all -> normals;
RaftPollerBuilder_init -> Vec_SenderFsmPair[style_edge_data];
RaftPollerBuilder_init -> {
kv_engine_scan_cf;
PeerFsm_create;
}
PeerFsm_create -> {
loose_bounded;
}
loose_bounded[style_func;label="{{
loose_bounded|
创建通信的tx,rx
}}"]
PeerFsm_create[style_func;label="{{
PeerFsm::create|
创建PeerFsm, rx保存在PeerFsm\l
tx返回给调用者\l
}}"]
PeerFsm_create -> tx;
kv_engine_scan_cf[style_func;label="{{
kv_engine.scan_cf(CF_RAFT,...)|
扫描CF_RAFT,\l
加载所有的region\l
对于每个Region创建PeerFsm\l
}}"]
tx -> Vec_SenderFsmPair -> BasicMailbox_new[style_edge_data];
kv_engine_scan_cf -> PeerFsm_create[style_edge_data];
RaftBatchSystem_spawn[style_func;label="{{
RaftBatchSystem\lspawn
}}"]
RaftPollerBuilder_init[style_func;label="{{
RaftPollerBuilder\linit\l
}}"]
Vec_SenderFsmPair[style_func;label="{{
Vec\<SenderFsmPair\>|
tx, PeerFsm
}}"]
RaftBatchSystem_start_system[style_func;label="{{
RaftBatchSystem\lstart_system
}}"]
RaftRouter_register_all[style_func;label="{{
RaftRouter\l|
register_all\l
注册mailbox到\l
normals map\l
}}"]

normals[style_blue1;label="{{
RaftRouter::normals|
{region1|{BasicMailbox|{FsmState\l标记Fsm是否已发送到\l NormalChannel\l|<tx1> tx}}}|
{region2|{BasicMailbox|{FsmState|<tx2> tx}}}|
{region3|{BasicMailbox|{FsmState|<tx3> tx}}}|
...
}}"]
}
Loading

0 comments on commit 03b3878

Please sign in to comment.