forked from travisjeffery/jocko
-
Notifications
You must be signed in to change notification settings - Fork 0
/
jocko.go
234 lines (199 loc) · 6.15 KB
/
jocko.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package jocko
import (
"encoding/json"
"fmt"
"io"
"net"
"github.com/travisjeffery/jocko/protocol"
)
// CommitLog is the interface that wraps the commit log's methods and
// is used to manage a partition's data.
type CommitLog interface {
DeleteAll() error
NewReader(offset int64, maxBytes int32) (io.Reader, error)
TruncateTo(int64) error
NewestOffset() int64
OldestOffset() int64
Append([]byte) (int64, error)
}
// Proxy is the interface that wraps Proxy methods for forwarding requests
// to an existing Jocko server and returning server response to caller
type Proxy interface {
FetchMessages(clientID string, fetchRequest *protocol.FetchRequest) (*protocol.FetchResponses, error)
CreateTopic(clientID string, createRequest *protocol.CreateTopicRequest) (*protocol.CreateTopicsResponse, error)
// others
}
// Partition is the unit of storage in Jocko.
type Partition struct {
Topic string `json:"topic"`
ID int32 `json:"id"`
Replicas []int32 `json:"replicas"`
ISR []int32 `json:"isr"`
Leader int32 `json:"leader"`
PreferredLeader int32 `json:"preferred_leader"`
LeaderAndISRVersionInZK int32 `json:"-"`
CommitLog CommitLog `json:"-"`
Conn io.ReadWriter `json:"-"`
}
// NewPartition is used to create a new partition.
func NewPartition(topic string, id int32) *Partition {
return &Partition{
ID: id,
Topic: topic,
}
}
// Delete is used to delete the partition's data/commitlog.
func (p *Partition) Delete() error {
return p.CommitLog.DeleteAll()
}
// NewReader is used to create a reader at the given offset and will
// read up to maxBytes.
func (p *Partition) NewReader(offset int64, maxBytes int32) (io.Reader, error) {
return p.CommitLog.NewReader(offset, maxBytes)
}
// String returns the topic/Partition as a string.
func (r *Partition) String() string {
return fmt.Sprintf("%s/%d", r.Topic, r.ID)
}
// IsOpen is used to check whether the partition's commit log has been
// initialized.
func (r *Partition) IsOpen() bool {
return r.CommitLog != nil
}
// IsLeader is used to check if the given broker ID's the partition's
// leader.
func (r *Partition) IsLeader(id int32) bool {
return r.Leader == id
}
// IsFollowing is used to check if the given broker ID's should
// follow/replicate the leader.
func (r *Partition) IsFollowing(id int32) bool {
for _, b := range r.Replicas {
if b == id {
return true
}
}
return false
}
// HighWatermark is used to get the newest offset of the partition.
func (p *Partition) HighWatermark() int64 {
return p.CommitLog.NewestOffset()
}
// LowWatermark is used to oldest offset of the partition.
func (p *Partition) LowWatermark() int64 {
return p.CommitLog.OldestOffset()
}
// TruncateTo is used to truncate the partition's logs before the given offset.
func (p *Partition) TruncateTo(offset int64) error {
return p.CommitLog.TruncateTo(offset)
}
// Write is used to directly write the given bytes to the partition's leader.
func (p *Partition) Write(b []byte) (int, error) {
return p.Conn.Write(b)
}
// Write is used to directly read the given bytes from the partition's leader.
func (p *Partition) Read(b []byte) (int, error) {
return p.Conn.Read(b)
}
// Append is used to append message sets to the partition.
func (p *Partition) Append(ms []byte) (int64, error) {
return p.CommitLog.Append(ms)
}
// LeaderID is used to get the partition's leader broker ID.
func (p *Partition) LeaderID() int32 {
return p.Leader
}
// MemberStatus is the state that a member is in.
type MemberStatus int
// Different possible states of serf member
const (
StatusNone MemberStatus = iota
StatusAlive
StatusLeaving
StatusLeft
StatusFailed
StatusReap
)
// Serf is the interface that wraps Serf methods and is used to manage
// the cluster membership for Jocko nodes.
type Serf interface {
Bootstrap(node *ClusterMember, reconcileCh chan<- *ClusterMember) error
Cluster() []*ClusterMember
Member(memberID int32) *ClusterMember
Join(addrs ...string) (int, error)
Shutdown() error
ID() int32
}
type RaftCmdType int
type RaftCommand struct {
Cmd RaftCmdType `json:"type"`
Data *json.RawMessage `json:"data"`
}
// Raft is the interface that wraps Raft's methods and is used to
// manage consensus for the Jocko cluster.
type Raft interface {
Bootstrap(serf Serf, serfEventCh <-chan *ClusterMember, commandCh chan<- RaftCommand) error
Apply(cmd RaftCommand) error
IsLeader() bool
LeaderID() string
Shutdown() error
Addr() string
}
// Broker is the interface that wraps the Broker's methods.
type Broker interface {
ID() int32
IsController() bool
CreateTopic(topic string, partitions int32) error
StartReplica(*Partition) error
DeleteTopic(topic string) error
Partition(topic string, id int32) (*Partition, error)
ClusterMember(brokerID int32) *ClusterMember
BecomeLeader(topic string, id int32, command *protocol.PartitionState) error
BecomeFollower(topic string, id int32, command *protocol.PartitionState) error
Join(addr ...string) (int, error)
Cluster() []*ClusterMember
TopicPartitions(topic string) ([]*Partition, error)
IsLeaderOfPartition(topic string, id int32, leaderID int32) bool
}
// ClusterMember is used as a wrapper around a broker's info and a
// connection to it.
type ClusterMember struct {
ID int32 `json:"id"`
Port int `json:"port"`
IP string `json:"addr"`
SerfPort int `json:"-"`
RaftPort int `json:"-"`
Status MemberStatus `json:"-"`
conn net.Conn
}
// Addr is used to get the address of the member.
func (b *ClusterMember) Addr() *net.TCPAddr {
return &net.TCPAddr{IP: net.ParseIP(b.IP), Port: b.Port}
}
// Write is used to write the member.
func (b *ClusterMember) Write(p []byte) (int, error) {
if b.conn == nil {
if err := b.connect(); err != nil {
return 0, err
}
}
return b.conn.Write(p)
}
// Read is used to read from the member.
func (b *ClusterMember) Read(p []byte) (int, error) {
if b.conn == nil {
if err := b.connect(); err != nil {
return 0, err
}
}
return b.conn.Read(p)
}
func (b *ClusterMember) connect() error {
addr := &net.TCPAddr{IP: net.ParseIP(b.IP), Port: b.Port}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return err
}
b.conn = conn
return nil
}