Skip to content

Commit

Permalink
Changing the CopyTo method signature to accept a
Browse files Browse the repository at this point in the history
index.Directory interface implementation for the
destination.

Add a default FileSystemDirectory implementation
for the index.Directory interface.
  • Loading branch information
sreekanth-cb committed Aug 6, 2021
1 parent 51f17a9 commit c13a140
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 89 deletions.
18 changes: 8 additions & 10 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package bleve

import (
"context"
"io"

"github.com/blevesearch/bleve/v2/index/upsidedown"

Expand Down Expand Up @@ -310,14 +309,13 @@ func NewBuilder(path string, mapping mapping.IndexMapping, config map[string]int
}

// IndexCopyable is an index which supports an online copy operation
// of the index. This is an experimental api and could potentially get
// changed or deprecated in future.
// of the index.
type IndexCopyable interface {
// CopyTo creates a fully functional copy of the index using the
// specified destination writer builder callback.
// The index implementation would trigger the given builder callback for
// each index file and it is the callback implementation's responsibilty
// to build and return the respective target writer for the file given
// in the callback.
CopyTo(func(string) io.WriteCloser) error
// CopyTo creates a fully functional copy of the index at the
// specified destination directory implementation.
CopyTo(d index.Directory) error
}

// FileSystemDirectory is the default implementation for the
// index.Directory interface.
type FileSystemDirectory string
62 changes: 41 additions & 21 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,20 +428,26 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
return true, nil
}

func copyFile(src string, dest io.WriteCloser) (int64, error) {
if dest == nil {
return 0, fmt.Errorf("invalid writer for file: %s", src)
func copyToDirectory(srcPath string, d index.Directory) (int64, error) {
if d == nil {
return 0, nil
}

dest, err := d.GetWriter(filepath.Join("store", filepath.Base(srcPath)))
if err != nil {
return 0, fmt.Errorf("GetWriter err: %v", err)
}
sourceFileStat, err := os.Stat(src)

sourceFileStat, err := os.Stat(srcPath)
if err != nil {
return 0, err
}

if !sourceFileStat.Mode().IsRegular() {
return 0, fmt.Errorf("%s is not a regular file", src)
return 0, fmt.Errorf("%s is not a regular file", srcPath)
}

source, err := os.Open(src)
source, err := os.Open(srcPath)
if err != nil {
return 0, err
}
Expand All @@ -450,8 +456,30 @@ func copyFile(src string, dest io.WriteCloser) (int64, error) {
return io.Copy(dest, source)
}

func persistToDirectory(seg segment.UnpersistedSegment, d index.Directory,
path string) error {
if d == nil {
return seg.Persist(path)
}

sg, ok := seg.(io.WriterTo)
if !ok {
return fmt.Errorf("no io.WriterTo segment implementation found")
}

w, err := d.GetWriter(filepath.Join("store", filepath.Base(path)))
if err != nil {
return err
}

_, err = sg.WriteTo(w)
w.Close()

return err
}

func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
segPlugin SegmentPlugin, getWriter func(string) io.WriteCloser) (
segPlugin SegmentPlugin, d index.Directory) (
[]string, map[uint64]string, error) {
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
if err != nil {
Expand Down Expand Up @@ -505,11 +533,9 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
switch seg := segmentSnapshot.segment.(type) {
case segment.PersistedSegment:
segPath := seg.Path()
if getWriter != nil {
_, err := copyFile(segPath, getWriter(segPath))
if err != nil {
return nil, nil, fmt.Errorf("segment: %s copy err: %v", segPath, err)
}
_, err = copyToDirectory(segPath, d)
if err != nil {
return nil, nil, fmt.Errorf("segment: %s copy err: %v", segPath, err)
}
filename := filepath.Base(segPath)
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
Expand All @@ -519,17 +545,11 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
filenames = append(filenames, filename)
case segment.UnpersistedSegment:
// need to persist this to disk
var err error
filename := zapFileName(segmentSnapshot.id)
path := path + string(os.PathSeparator) + filename
if getWriter != nil {
err = seg.PersistToWriter(getWriter(path))
} else {
err = seg.Persist(path)
}

path := filepath.Join(path, filename)
err := persistToDirectory(seg, d, path)
if err != nil {
return nil, nil, fmt.Errorf("error persisting segment: %v", err)
return nil, nil, fmt.Errorf("segment: %s persist err: %v", path, err)
}
newSegmentPaths[segmentSnapshot.id] = path
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
Expand Down
14 changes: 7 additions & 7 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"container/heap"
"encoding/binary"
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"sort"
"sync"
Expand Down Expand Up @@ -766,15 +766,15 @@ OUTER:
return rv
}

func (i *IndexSnapshot) CopyTo(getWriter func(string) io.WriteCloser) (err error) {
func (i *IndexSnapshot) CopyTo(d index.Directory) error {
// get the root bolt file.
w := getWriter("root.bolt")
if w == nil {
fmt.Errorf("failed to create the root.bolt file")
w, err := d.GetWriter(filepath.Join("store", "root.bolt"))
if err != nil || w == nil {
return fmt.Errorf("failed to create the root.bolt file, err: %v", err)
}
rootFile, ok := w.(*os.File)
if !ok {
fmt.Errorf("failed to create the root.bolt file")
return fmt.Errorf("invalid root.bolt file found")
}

copyBolt, err := bolt.Open(rootFile.Name(), 0600, nil)
Expand All @@ -794,7 +794,7 @@ func (i *IndexSnapshot) CopyTo(getWriter func(string) io.WriteCloser) (err error
return err
}

_, _, err = prepareBoltSnapshot(i, tx, "", i.parent.segPlugin, getWriter)
_, _, err = prepareBoltSnapshot(i, tx, "", i.parent.segPlugin, d)
if err != nil {
_ = tx.Rollback()
return fmt.Errorf("error backing up index snapshot: %v", err)
Expand Down
21 changes: 18 additions & 3 deletions index_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -912,7 +913,7 @@ func (m *searchHitSorter) Less(i, j int) bool {
return c < 0
}

func (i *indexImpl) CopyTo(getWriter func(string) io.WriteCloser) (err error) {
func (i *indexImpl) CopyTo(d index.Directory) (err error) {
i.mutex.RLock()
defer i.mutex.RUnlock()

Expand All @@ -935,11 +936,25 @@ func (i *indexImpl) CopyTo(getWriter func(string) io.WriteCloser) (err error) {
return fmt.Errorf("index implementation does not support copy")
}

err = irc.CopyTo(getWriter)
err = irc.CopyTo(d)
if err != nil {
return fmt.Errorf("error copying index metadata: %v", err)
}

// copy the metadata
return i.meta.SaveToWriter(getWriter)
return i.meta.CopyTo(d)
}

func (f FileSystemDirectory) GetWriter(filePath string) (io.WriteCloser,
error) {
dir, file := filepath.Split(filePath)
if dir != "" {
err := os.MkdirAll(filepath.Join(string(f), dir), os.ModePerm)
if err != nil {
return nil, err
}
}

return os.OpenFile(filepath.Join(string(f), dir, file),
os.O_RDWR|os.O_CREATE, 0600)
}
11 changes: 6 additions & 5 deletions index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package bleve
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

"github.com/blevesearch/bleve/v2/index/upsidedown"
index "github.com/blevesearch/bleve_index_api"
)

const metaFilename = "index_meta.json"
Expand Down Expand Up @@ -94,15 +94,16 @@ func (i *indexMeta) Save(path string) (err error) {
return nil
}

func (i *indexMeta) SaveToWriter(getWriter func(string) io.WriteCloser) (err error) {
func (i *indexMeta) CopyTo(d index.Directory) (err error) {
metaBytes, err := json.Marshal(i)
if err != nil {
return err
}

w := getWriter(metaFilename)
if w == nil {
return fmt.Errorf("invalid writer for file: %s", metaFilename)
w, err := d.GetWriter(metaFilename)
if w == nil || err != nil {
return fmt.Errorf("invalid writer for file: %s, err: %v",
metaFilename, err)
}
defer w.Close()

Expand Down
66 changes: 23 additions & 43 deletions index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"math"
Expand Down Expand Up @@ -2425,83 +2424,64 @@ func TestCopyIndex(t *testing.T) {
backupIndexPath := createTmpIndexPath(t)
defer cleanupTmpIndexPath(t, backupIndexPath)

getWriter := func(filePath string) io.WriteCloser {
var path string
if strings.Contains(filePath, "store") ||
strings.Contains(filePath, "root.bolt") {
path = filepath.Join(backupIndexPath, "store",
filepath.Base(filePath))
_ = os.MkdirAll(filepath.Join(backupIndexPath, "store"), 0700)
} else {
path = filepath.Join(backupIndexPath,
filepath.Base(filePath))
}
f, err := os.OpenFile(path,
os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil
}
return f
}

err = copyableIndex.CopyTo(getWriter)
err = copyableIndex.CopyTo(FileSystemDirectory(backupIndexPath))
if err != nil {
t.Fatalf("error backing up index: %v", err)
t.Fatalf("error copying the index: %v", err)
}

// open the copied index
idxBackup, err := Open(backupIndexPath)
idxCopied, err := Open(backupIndexPath)
if err != nil {
t.Fatalf("unable to open backup index")
t.Fatalf("unable to open copy index")
}
defer func() {
err := idxBackup.Close()
err := idxCopied.Close()
if err != nil {
t.Fatalf("error closing backup index: %v", err)
t.Fatalf("error closing copy index: %v", err)
}
}()

// assertions on copied index
backupCount, err := idxBackup.DocCount()
copyCount, err := idxCopied.DocCount()
if err != nil {
t.Fatal(err)
}
if backupCount != 2 {
t.Errorf("expected doc count 2, got %d", backupCount)
if copyCount != 2 {
t.Errorf("expected doc count 2, got %d", copyCount)
}

backupDoc, err := idxBackup.Document("a")
copyDoc, err := idxCopied.Document("a")
if err != nil {
t.Fatal(err)
}
backupFoundNameField := false
backupDoc.VisitFields(func(field index.Field) {
copyFoundNameField := false
copyDoc.VisitFields(func(field index.Field) {
if field.Name() == "name" && string(field.Value()) == "tester" {
backupFoundNameField = true
copyFoundNameField = true
}
})
if !backupFoundNameField {
t.Errorf("expected backup to find field named 'name' with value 'tester'")
if !copyFoundNameField {
t.Errorf("expected copy index to find field named 'name' with value 'tester'")
}

backupFields, err := idx.Fields()
copyFields, err := idx.Fields()
if err != nil {
t.Fatal(err)
}
backupExpectedFields := map[string]bool{
copyExpectedFields := map[string]bool{
"_all": false,
"name": false,
"desc": false,
}
if len(backupFields) < len(backupExpectedFields) {
t.Fatalf("expected %d fields got %d", len(backupExpectedFields), len(backupFields))
if len(copyFields) < len(copyExpectedFields) {
t.Fatalf("expected %d fields got %d", len(copyExpectedFields), len(copyFields))
}
for _, f := range backupFields {
backupExpectedFields[f] = true
for _, f := range copyFields {
copyExpectedFields[f] = true
}
for ef, efp := range backupExpectedFields {
for ef, efp := range copyExpectedFields {
if !efp {
t.Errorf("backup field %s is missing", ef)
t.Errorf("copy field %s is missing", ef)
}
}
}

0 comments on commit c13a140

Please sign in to comment.