Skip to content

Commit

Permalink
*: set store labels using pd api. (tikv#689)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Jul 31, 2017
1 parent 7d42cd0 commit 2710f22
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 3 deletions.
27 changes: 26 additions & 1 deletion pdctl/command/store_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package command
import (
"fmt"
"net/http"
"path"
"strconv"

"github.com/spf13/cobra"
Expand All @@ -29,11 +30,12 @@ var (
// NewStoreCommand return a store subcommand of rootCmd
func NewStoreCommand() *cobra.Command {
s := &cobra.Command{
Use: "store [delete] <store_id>",
Use: "store [delete|label] <store_id>",
Short: "show the store status",
Run: showStoreCommandFunc,
}
s.AddCommand(NewDeleteStoreCommand())
s.AddCommand(NewLabelStoreCommand())
return s
}

Expand All @@ -47,6 +49,16 @@ func NewDeleteStoreCommand() *cobra.Command {
return d
}

// NewLabelStoreCommand returns a label subcommand of storeCmd.
func NewLabelStoreCommand() *cobra.Command {
l := &cobra.Command{
Use: "label <store_id> <key> <value>",
Short: "set a store's label value",
Run: labelStoreCommandFunc,
}
return l
}

func showStoreCommandFunc(cmd *cobra.Command, args []string) {
var prefix string
prefix = storesPrefix
Expand Down Expand Up @@ -82,3 +94,16 @@ func deleteStoreCommandFunc(cmd *cobra.Command, args []string) {
}
fmt.Println("Success!")
}

func labelStoreCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 3 {
fmt.Println("Usage: store label <store_id> <key> <value>")
return
}
if _, err := strconv.Atoi(args[0]); err != nil {
fmt.Println("store_id should be a number")
return
}
prefix := fmt.Sprintf(path.Join(storePrefix, "label"), args[0])
postJSON(cmd, prefix, map[string]interface{}{args[1]: args[2]})
}
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
storeHandler := newStoreHandler(svr, rd)
router.HandleFunc("/api/v1/store/{id}", storeHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/store/{id}", storeHandler.Delete).Methods("DELETE")
router.HandleFunc("/api/v1/store/{id}/label", storeHandler.SetLabels).Methods("POST")
router.Handle("/api/v1/stores", newStoresHandler(svr, rd)).Methods("GET")

labelsHandler := newLabelsHandler(svr, rd)
Expand Down
36 changes: 36 additions & 0 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,42 @@ func (h *storeHandler) Delete(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storeHandler) SetLabels(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}

vars := mux.Vars(r)
storeIDStr := vars["id"]
storeID, err := strconv.ParseUint(storeIDStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

var input map[string]string
if err := readJSON(r.Body, &input); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
var labels []*metapb.StoreLabel
for k, v := range input {
labels = append(labels, &metapb.StoreLabel{
Key: k,
Value: v,
})
}

if err := cluster.UpdateStoreLabels(storeID, labels); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

type storesHandler struct {
svr *server.Server
rd *render.Render
Expand Down
42 changes: 42 additions & 0 deletions server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package api

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -99,11 +100,13 @@ func (s *testStoreSuite) TestStoresList(c *C) {
checkStoresInfo(c, info.Stores, s.stores[:3])

url = fmt.Sprintf("%s/stores?state=0", s.urlPrefix)
info = new(storesInfo)
err = readJSONWithURL(url, info)
c.Assert(err, IsNil)
checkStoresInfo(c, info.Stores, s.stores[:2])

url = fmt.Sprintf("%s/stores?state=1", s.urlPrefix)
info = new(storesInfo)
err = readJSONWithURL(url, info)
c.Assert(err, IsNil)
checkStoresInfo(c, info.Stores, s.stores[2:3])
Expand All @@ -118,6 +121,45 @@ func (s *testStoreSuite) TestStoreGet(c *C) {
checkStoresInfo(c, []*storeInfo{info}, s.stores[:1])
}

func (s *testStoreSuite) TestStoreLabel(c *C) {
url := fmt.Sprintf("%s/store/1", s.urlPrefix)
var info storeInfo
err := readJSONWithURL(url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.Labels, HasLen, 0)

// Test set.
labels := map[string]string{"zone": "cn", "host": "local"}
b, err := json.Marshal(labels)
c.Assert(err, IsNil)
err = postJSON(&http.Client{}, url+"/label", b)
c.Assert(err, IsNil)

err = readJSONWithURL(url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.Labels, HasLen, len(labels))
for _, l := range info.Store.Labels {
c.Assert(labels[l.Key], Equals, l.Value)
}

// Test merge.
labels = map[string]string{"zack": "zack1", "host": "host1"}
b, err = json.Marshal(labels)
c.Assert(err, IsNil)
err = postJSON(&http.Client{}, url+"/label", b)
c.Assert(err, IsNil)

expectLabel := map[string]string{"zone": "cn", "zack": "zack1", "host": "host1"}
err = readJSONWithURL(url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.Labels, HasLen, len(expectLabel))
for _, l := range info.Store.Labels {
c.Assert(expectLabel[l.Key], Equals, l.Value)
}

s.stores[0].Labels = info.Store.Labels
}

func (s *testStoreSuite) TestStoreDelete(c *C) {
table := []struct {
id int
Expand Down
17 changes: 15 additions & 2 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,19 @@ func (c *RaftCluster) GetStore(storeID uint64) (*metapb.Store, *StoreStatus, err
return store.Store, store.status, nil
}

// UpdateStoreLabels updates a store's location labels.
func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLabel) error {
store := c.cachedCluster.getStore(storeID)
if store == nil {
return errors.Errorf("invalid store ID %d, not found", storeID)
}
storeMeta := store.Store
storeMeta.Labels = labels
// putStore will perform label merge.
err := c.putStore(storeMeta)
return errors.Trace(err)
}

func (c *RaftCluster) putStore(store *metapb.Store) error {
c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -430,13 +443,13 @@ func (c *RaftCluster) putStore(store *metapb.Store) error {
} else {
// Update an existed store.
s.Address = store.Address
s.Labels = store.Labels
s.mergeLabels(store.Labels)
}

// Check location labels.
for _, k := range c.s.cfg.Replication.LocationLabels {
if v := s.getLabelValue(k); len(v) == 0 {
return errors.Errorf("missing location label %q in store %v", k, s)
log.Warnf("missing location label %q in store %v", k, s)
}
}

Expand Down
13 changes: 13 additions & 0 deletions server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,19 @@ func (s *storeInfo) compareLocation(other *storeInfo, labels []string) int {
return -1
}

func (s *storeInfo) mergeLabels(labels []*metapb.StoreLabel) {
L:
for _, newLabel := range labels {
for _, label := range s.Labels {
if label.Key == newLabel.Key {
label.Value = newLabel.Value
continue L
}
}
s.Labels = append(s.Labels, newLabel)
}
}

// StoreStatus contains information about a store's status.
type StoreStatus struct {
*pdpb.StoreStats
Expand Down

0 comments on commit 2710f22

Please sign in to comment.