forked from cubefs/cubefs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpartition.go
182 lines (150 loc) · 5.05 KB
/
partition.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// Copyright 2018 The Chubao Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package raftstore
import (
"os"
"github.com/tiglabs/raft"
"github.com/tiglabs/raft/proto"
)
// PartitionStatus is a type alias of raft.Status
type PartitionStatus = raft.Status
// PartitionFsm wraps necessary methods include both FSM implementation
// and data storage operation for raft store partition.
// It extends from raft StateMachine and Store.
type PartitionFsm = raft.StateMachine
// Partition wraps necessary methods for raft store partition operation.
// Partition is a shard for multi-raft in RaftSore. RaftStore is based on multi-raft which
// manages multiple raft replication groups at same time through a single
// raft server instance and system resource.
type Partition interface {
// Submit submits command data to raft log.
Submit(cmd []byte) (resp interface{}, err error)
// ChaneMember submits member change event and information to raft log.
ChangeMember(changeType proto.ConfChangeType, peer proto.Peer, context []byte) (resp interface{}, err error)
// Stop removes the raft partition from raft server and shuts down this partition.
Stop() error
// Delete stops and deletes the partition.
Delete() error
// Status returns the current raft status.
Status() (status *PartitionStatus)
// LeaderTerm returns the current term of leader in the raft group. TODO what is term?
LeaderTerm() (leaderID, term uint64)
// IsRaftLeader returns true if this node is the leader of the raft group it belongs to.
IsRaftLeader() bool
// AppliedIndex returns the current index of the applied raft log in the raft store partition.
AppliedIndex() uint64
// CommittedIndex returns the current index of the applied raft log in the raft store partition.
CommittedIndex() uint64
// Truncate raft log
Truncate(index uint64)
TryToLeader(nodeID uint64) error
IsOfflinePeer() bool
}
// Default implementation of the Partition interface.
type partition struct {
id uint64
raft *raft.RaftServer
walPath string
config *PartitionConfig
}
// ChaneMember submits member change event and information to raft log.
func (p *partition) ChangeMember(changeType proto.ConfChangeType, peer proto.Peer, context []byte) (
resp interface{}, err error) {
if !p.IsRaftLeader() {
err = raft.ErrNotLeader
return
}
future := p.raft.ChangeMember(p.id, changeType, peer, context)
resp, err = future.Response()
return
}
// Stop removes the raft partition from raft server and shuts down this partition.
func (p *partition) Stop() (err error) {
err = p.raft.RemoveRaft(p.id)
return
}
func (p *partition) TryToLeader(nodeID uint64) (err error) {
future := p.raft.TryToLeader(nodeID)
_, err = future.Response()
return
}
// Delete stops and deletes the partition.
func (p *partition) Delete() (err error) {
if err = p.Stop(); err != nil {
return
}
err = os.RemoveAll(p.walPath)
return
}
// Status returns the current raft status.
func (p *partition) Status() (status *PartitionStatus) {
status = p.raft.Status(p.id)
return
}
// LeaderTerm returns the current term of leader in the raft group.
func (p *partition) LeaderTerm() (leaderID, term uint64) {
leaderID, term = p.raft.LeaderTerm(p.id)
return
}
func (p *partition) IsOfflinePeer() bool {
status := p.Status()
active := 0
sumPeers := 0
for _, peer := range status.Replicas {
if peer.Active == true {
active++
}
sumPeers++
}
return active >= (int(sumPeers)/2 + 1)
}
// IsRaftLeader returns true if this node is the leader of the raft group it belongs to.
func (p *partition) IsRaftLeader() (isLeader bool) {
isLeader = p.raft != nil && p.raft.IsLeader(p.id)
return
}
// AppliedIndex returns the current index of the applied raft log in the raft store partition.
func (p *partition) AppliedIndex() (applied uint64) {
applied = p.raft.AppliedIndex(p.id)
return
}
// CommittedIndex returns the current index of the applied raft log in the raft store partition.
func (p *partition) CommittedIndex() (applied uint64) {
applied = p.raft.CommittedIndex(p.id)
return
}
// Submit submits command data to raft log.
func (p *partition) Submit(cmd []byte) (resp interface{}, err error) {
if !p.IsRaftLeader() {
err = raft.ErrNotLeader
return
}
future := p.raft.Submit(p.id, cmd)
resp, err = future.Response()
return
}
// Truncate truncates the raft log
func (p *partition) Truncate(index uint64) {
if p.raft != nil {
p.raft.Truncate(p.id, index)
}
}
func newPartition(cfg *PartitionConfig, raft *raft.RaftServer, walPath string) Partition {
return &partition{
id: cfg.ID,
raft: raft,
walPath: walPath,
config: cfg,
}
}