Skip to content

Commit

Permalink
GOCBC-95: Implemented Index Management API.
Browse files Browse the repository at this point in the history
Change-Id: I86c78868ba040f4583a301598d81f49d0a715aac
Reviewed-on: http://review.couchbase.org/63148
Reviewed-by: Mark Nunberg <[email protected]>
Tested-by: Brett Lawson <[email protected]>
  • Loading branch information
brett19 committed Apr 20, 2016
1 parent 9ec8767 commit 90c0f08
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 0 deletions.
240 changes: 240 additions & 0 deletions bucketmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"
"time"
)

type View struct {
Expand All @@ -24,6 +26,16 @@ type DesignDocument struct {
SpatialViews map[string]View `json:"spatial,omitempty"`
}

type IndexInfo struct {
Name string `json:"name"`
IsPrimary bool `json:"is_primary"`
Type IndexType `json:"using"`
State string `json:"state"`
Keyspace string `json:"keyspace_id"`
Namespace string `json:"namespace_id"`
IndexKey []string `json:"index_key"`
}

type BucketManager struct {
bucket *Bucket
username string
Expand Down Expand Up @@ -215,3 +227,231 @@ func (bm *BucketManager) RemoveDesignDocument(name string) error {

return nil
}

func (bm *BucketManager) createIndex(indexName string, fields []string, ignoreIfExists, deferred bool) error {
var qs string

if len(fields) == 0 {
qs += "CREATE PRIMARY INDEX"
} else {
qs += "CREATE INDEX"
}
if indexName != "" {
qs += " `" + indexName + "`"
}
qs += " ON `" + bm.bucket.name + "`"
if len(fields) > 0 {
qs += " ("
for i := 0; i < len(fields); i++ {
if i > 0 {
qs += ", "
}
qs += "`" + fields[i] + "`"
}
qs += ")"
}
if deferred {
qs += " WITH {\"defer_build\": true}"
}

rows, err := bm.bucket.ExecuteN1qlQuery(NewN1qlQuery(qs), nil)
if err != nil {
if strings.Contains(err.Error(), "already exist") {
if ignoreIfExists {
return nil
}
return ErrIndexAlreadyExists
}
return err
}

if err := rows.Close(); err != nil {
return err
}
return nil
}

// *VOLATILE*
// Creates an index over the specified fields.
func (bm *BucketManager) CreateIndex(indexName string, fields []string, ignoreIfExists, deferred bool) error {
if indexName == "" {
return ErrIndexInvalidName
}
if len(fields) <= 0 {
return ErrIndexNoFields
}
return bm.createIndex(indexName, fields, ignoreIfExists, deferred)
}

// *VOLATILE*
// Creates a primary index. An empty customName uses the default naming.
func (bm *BucketManager) CreatePrimaryIndex(customName string, ignoreIfExists, deferred bool) error {
return bm.createIndex(customName, nil, ignoreIfExists, deferred)
}

func (bm *BucketManager) dropIndex(indexName string, ignoreIfNotExists bool) error {
var qs string

if indexName == "" {
qs += "DROP PRIMARY INDEX `" + bm.bucket.name + "`"
} else {
qs += "DROP INDEX `" + bm.bucket.name + "`.`" + indexName + "`"
}

rows, err := bm.bucket.ExecuteN1qlQuery(NewN1qlQuery(qs), nil)
if err != nil {
if strings.Contains(err.Error(), "not found") {
if ignoreIfNotExists {
return nil
}
return ErrIndexNotFound
}
return err
}

if err := rows.Close(); err != nil {
return err
}
return nil
}

// *VOLATILE*
// Drops a specific index by name.
func (bm *BucketManager) DropIndex(indexName string, ignoreIfNotExists bool) error {
if indexName == "" {
return ErrIndexInvalidName
}
return bm.dropIndex(indexName, ignoreIfNotExists)
}

// *VOLATILE*
// Drops the primary index. Pass an empty customName for unnamed primary indexes.
func (bm *BucketManager) DropPrimaryIndex(customName string, ignoreIfNotExists bool) error {
return bm.dropIndex(customName, ignoreIfNotExists)
}

// *VOLATILE*
// Returns a list of all currently registered indexes.
func (bm *BucketManager) GetIndexes() ([]IndexInfo, error) {
q := NewN1qlQuery("SELECT `indexes`.* FROM system:indexes")
rows, err := bm.bucket.ExecuteN1qlQuery(q, nil)
if err != nil {
return nil, err
}

var indexes []IndexInfo
var index IndexInfo
for rows.Next(&index) {
indexes = append(indexes, index)
index = IndexInfo{}
}
if err := rows.Close(); err != nil {
return nil, err
}

return indexes, nil
}

// *VOLATILE*
// Builds all indexes which are currently in deferred state.
func (bm *BucketManager) BuildDeferredIndexes() ([]string, error) {
indexList, err := bm.GetIndexes()
if err != nil {
return nil, err
}

var deferredList []string
for i := 0; i < len(indexList); i++ {
var index = indexList[i]
if index.State == "deferred" || index.State == "pending" {
deferredList = append(deferredList, index.Name)
}
}

if len(deferredList) == 0 {
// Don't try to build an empty index list
return nil, nil
}

var qs string
qs += "BUILD INDEX ON `" + bm.bucket.name + "`("
for i := 0; i < len(deferredList); i++ {
if i > 0 {
qs += ", "
}
qs += "`" + deferredList[i] + "`"
}
qs += ")"

rows, err := bm.bucket.ExecuteN1qlQuery(NewN1qlQuery(qs), nil)
if err != nil {
return nil, err
}

if err := rows.Close(); err != nil {
return nil, err
}

return deferredList, nil
}

func checkIndexesActive(indexes []IndexInfo, checkList []string) (bool, error) {
var checkIndexes []IndexInfo
for i := 0; i < len(checkList); i++ {
indexName := checkList[i]

for j := 0; j < len(indexes); j++ {
if indexes[j].Name == indexName {
checkIndexes = append(checkIndexes, indexes[j])
break
}
}
}

if len(checkIndexes) != len(checkList) {
return false, ErrIndexNotFound
}

for i := 0; i < len(checkIndexes); i++ {
if checkIndexes[i].State != "online" {
return false, nil
}
}
return true, nil
}

// *VOLATILE*
// Waits for a set of indexes to come online
func (bm *BucketManager) WatchIndexes(watchList []string, watchPrimary bool, timeout time.Duration) error {
if watchPrimary {
watchList = append(watchList, "#primary")
}

curInterval := 50 * time.Millisecond
timeoutTime := time.Now().Add(timeout)
for {
indexes, err := bm.GetIndexes()
if err != nil {
return err
}

allOnline, err := checkIndexesActive(indexes, watchList)
if allOnline {
break
}

curInterval += 500 * time.Millisecond
if curInterval > 1000 {
curInterval = 1000
}

if time.Now().Add(curInterval).After(timeoutTime) {
return ErrTimeout
}

// Wait till our next poll interval
time.Sleep(curInterval)
}

return nil
}
7 changes: 7 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@ const (
// Common Flag Compressions
cfCmprNone = 0 << 29
)

type IndexType string

const (
IndexTypeN1ql = IndexType("gsi")
IndexTypeView = IndexType("views")
)
5 changes: 5 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ var (
ErrNoResults = errors.New("No results returned.")
ErrNoOpenBuckets = errors.New("You must open a bucket before you can perform cluster level operations.")

ErrIndexInvalidName = errors.New("An invalid index name was specified.")
ErrIndexNoFields = errors.New("You must specify at least one field to index.")
ErrIndexNotFound = errors.New("The index specified does not exist.")
ErrIndexAlreadyExists = errors.New("The index specified already exists.")

ErrDispatchFail = gocbcore.ErrDispatchFail
ErrBadHosts = gocbcore.ErrBadHosts
ErrProtocol = gocbcore.ErrProtocol
Expand Down

0 comments on commit 90c0f08

Please sign in to comment.