Skip to content

Commit

Permalink
add function[send query to specified node]
Browse files Browse the repository at this point in the history
  • Loading branch information
flike committed Aug 7, 2015
1 parent 1f4a56c commit 5a3213f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
3 changes: 3 additions & 0 deletions mysql/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,7 @@ var (
"select": KS_TK_SELECT,
"use": KS_TK_USE,
}
// '/'
COMMENT_PREFIX uint8 = 47
COMMENT_STRING = "/*"
)
37 changes: 25 additions & 12 deletions proxy/server/conn_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func (c *ClientConn) handleQuery(sql string) (err error) {
}()

sql = strings.TrimRight(sql, ";") //删除sql语句最后的分号

hasHandled, err := c.handleUnsupport(sql)
hasHandled, err := c.handleUnShard(sql)
if err != nil {
golog.Error("server", "parse", err.Error(), 0, "hasHandled", hasHandled)
return err
Expand Down Expand Up @@ -326,9 +325,11 @@ func (c *ClientConn) newEmptyResultset(stmt *sqlparser.Select) *Resultset {
}

//返回true表示已经处理,false表示未处理
func (c *ClientConn) handleUnsupport(sql string) (bool, error) {
func (c *ClientConn) handleUnShard(sql string) (bool, error) {
var rs []*Result
var TK_FROM string = "from"
var execNode *backend.Node
var fromSlave bool = false

sql = strings.ToLower(sql)
tokens := strings.Fields(sql)
Expand All @@ -348,26 +349,39 @@ func (c *ClientConn) handleUnsupport(sql string) (bool, error) {
}
}
}
//get node
if 2 <= tokensLen {
if tokens[0][0] == COMMENT_PREFIX {
nodeName := strings.Trim(tokens[0], COMMENT_STRING)
if c.schema.nodes[nodeName] != nil {
execNode = c.schema.nodes[nodeName]
}
//select
if WHITE_TOKEN_MAP[tokens[1]] == 2 {
fromSlave = true
}
}
}

defaultRule := c.schema.rule.DefaultRule
if len(defaultRule.Nodes) == 0 {

return false, ErrNoDefaultNode
if execNode == nil {
defaultRule := c.schema.rule.DefaultRule
if len(defaultRule.Nodes) == 0 {
return false, ErrNoDefaultNode
}
execNode = c.proxy.GetNode(defaultRule.Nodes[0])
}
defaultNode := c.proxy.GetNode(defaultRule.Nodes[0])

//execute in Master DB
conn, err := c.getBackendConn(defaultNode, false)
conn, err := c.getBackendConn(execNode, fromSlave)
if err != nil {
return false, err
}

rs, err = c.executeInNode(conn, sql, nil)
if err != nil {
return false, err
}

c.closeConn(conn, false)

if len(rs) == 0 {
msg := fmt.Sprintf("result is empty")
golog.Error("ClientConn", "handleUnsupport", msg, c.connectionId)
Expand All @@ -379,7 +393,6 @@ func (c *ClientConn) handleUnsupport(sql string) (bool, error) {
} else {
err = c.writeOK(rs[0])
}

if err != nil {
return false, err
}
Expand Down

0 comments on commit 5a3213f

Please sign in to comment.