forked from go-mysql-org/go-mysql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
174 lines (142 loc) · 3.39 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package failover
import (
"fmt"
"github.com/go-mysql-org/go-mysql/client"
. "github.com/go-mysql-org/go-mysql/mysql"
)
type User struct {
Name string
Password string
}
type Server struct {
Addr string
User User
ReplUser User
conn *client.Conn
}
func NewServer(addr string, user User, replUser User) *Server {
s := new(Server)
s.Addr = addr
s.User = user
s.ReplUser = replUser
return s
}
func (s *Server) Close() {
if s.conn != nil {
s.conn.Close()
}
}
func (s *Server) Execute(cmd string, args ...interface{}) (r *Result, err error) {
retryNum := 3
for i := 0; i < retryNum; i++ {
if s.conn == nil {
s.conn, err = client.Connect(s.Addr, s.User.Name, s.User.Password, "")
if err != nil {
return nil, err
}
}
r, err = s.conn.Execute(cmd, args...)
if err != nil && ErrorEqual(err, ErrBadConn) {
return
} else if ErrorEqual(err, ErrBadConn) {
s.conn = nil
continue
} else {
return
}
}
return
}
func (s *Server) StartSlave() error {
_, err := s.Execute("START SLAVE")
return err
}
func (s *Server) StopSlave() error {
_, err := s.Execute("STOP SLAVE")
return err
}
func (s *Server) StopSlaveIOThread() error {
_, err := s.Execute("STOP SLAVE IO_THREAD")
return err
}
func (s *Server) SlaveStatus() (*Resultset, error) {
r, err := s.Execute("SHOW SLAVE STATUS")
if err != nil {
return nil, err
} else {
return r.Resultset, nil
}
}
func (s *Server) MasterStatus() (*Resultset, error) {
r, err := s.Execute("SHOW MASTER STATUS")
if err != nil {
return nil, err
} else {
return r.Resultset, nil
}
}
func (s *Server) ResetSlave() error {
_, err := s.Execute("RESET SLAVE")
return err
}
func (s *Server) ResetSlaveALL() error {
_, err := s.Execute("RESET SLAVE ALL")
return err
}
func (s *Server) ResetMaster() error {
_, err := s.Execute("RESET MASTER")
return err
}
func (s *Server) MysqlGTIDMode() (string, error) {
r, err := s.Execute("SELECT @@gtid_mode")
if err != nil {
return GTIDModeOff, err
}
on, _ := r.GetString(0, 0)
if on != GTIDModeOn {
return GTIDModeOff, nil
} else {
return GTIDModeOn, nil
}
}
func (s *Server) SetReadonly(b bool) error {
var err error
if b {
_, err = s.Execute("SET GLOBAL read_only = ON")
} else {
_, err = s.Execute("SET GLOBAL read_only = OFF")
}
return err
}
func (s *Server) LockTables() error {
_, err := s.Execute("FLUSH TABLES WITH READ LOCK")
return err
}
func (s *Server) UnlockTables() error {
_, err := s.Execute("UNLOCK TABLES")
return err
}
// FetchSlaveReadPos gets current binlog filename and position read from master
func (s *Server) FetchSlaveReadPos() (Position, error) {
r, err := s.SlaveStatus()
if err != nil {
return Position{}, err
}
fname, _ := r.GetStringByName(0, "Master_Log_File")
pos, _ := r.GetIntByName(0, "Read_Master_Log_Pos")
return Position{Name: fname, Pos: uint32(pos)}, nil
}
// FetchSlaveExecutePos gets current executed binlog filename and position from master
func (s *Server) FetchSlaveExecutePos() (Position, error) {
r, err := s.SlaveStatus()
if err != nil {
return Position{}, err
}
fname, _ := r.GetStringByName(0, "Relay_Master_Log_File")
pos, _ := r.GetIntByName(0, "Exec_Master_Log_Pos")
return Position{Name: fname, Pos: uint32(pos)}, nil
}
func (s *Server) MasterPosWait(pos Position, timeout int) error {
_, err := s.Execute(fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %d)", pos.Name, pos.Pos, timeout))
return err
}