Skip to content

Commit

Permalink
Updated to version 1.9.7
Browse files Browse the repository at this point in the history
  • Loading branch information
olegator77 committed Sep 28, 2018
1 parent 076baef commit 2a1f776
Show file tree
Hide file tree
Showing 89 changed files with 1,748 additions and 899 deletions.
87 changes: 59 additions & 28 deletions bindings/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package builtin
// #include <stdlib.h>
import "C"
import (
"encoding/json"
"errors"
"fmt"
"net/url"
Expand All @@ -31,6 +32,7 @@ var bufPool sync.Pool

type Builtin struct {
cgoLimiter chan struct{}
rx C.uintptr_t
}

type RawCBuffer struct {
Expand Down Expand Up @@ -60,7 +62,6 @@ func newRawCBuffer() *RawCBuffer {
}

func init() {
C.init_reindexer()
bindings.RegisterBinding("builtin", &Builtin{})
}

Expand Down Expand Up @@ -115,23 +116,30 @@ func bool2cint(v bool) C.int {
return 0
}

func (binding *Builtin) SetReindexerInstance(p unsafe.Pointer) {
C.change_reindexer_instance(p)
}

func (binding *Builtin) Init(u *url.URL, options ...interface{}) error {
if binding.rx != 0 {
return bindings.NewError("already initialized", bindings.ErrConflict)
}

cgoLimit := defCgoLimit
var rx uintptr = 0
for _, option := range options {
switch v := option.(type) {
case bindings.OptionCgoLimit:
cgoLimit = v.CgoLimit
case bindings.OptionBuiltinWithServer:
case bindings.OptionReindexerInstance:
rx = v.Instance
default:
fmt.Printf("Unknown builtin option: %v\n", option)
}
}

if rx == 0 {
binding.rx = C.init_reindexer()
} else {
binding.rx = C.uintptr_t(rx)
}

binding.cgoLimiter = make(chan struct{}, cgoLimit)
if len(u.Path) != 0 && u.Path != "/" {
err := binding.EnableStorage(u.Path)
Expand All @@ -140,17 +148,21 @@ func (binding *Builtin) Init(u *url.URL, options ...interface{}) error {
}
}

return err2go(C.reindexer_init_system_namespaces())
return err2go(C.reindexer_init_system_namespaces(binding.rx))
}

func (binding *Builtin) Clone() bindings.RawBinding {
return &Builtin{}
}

func (binding *Builtin) Ping() error {
return nil
return err2go(C.reindexer_ping(binding.rx))
}

func (binding *Builtin) ModifyItem(nsHash int, data []byte, mode int) (bindings.RawBuffer, error) {
binding.cgoLimiter <- struct{}{}
defer func() { <-binding.cgoLimiter }()
return ret2go(C.reindexer_modify_item(buf2c(data), C.int(mode)))
return ret2go(C.reindexer_modify_item(binding.rx, buf2c(data), C.int(mode)))
}

func (binding *Builtin) OpenNamespace(namespace string, enableStorage, dropOnFormatError bool, cacheMode uint8) error {
Expand All @@ -159,71 +171,84 @@ func (binding *Builtin) OpenNamespace(namespace string, enableStorage, dropOnFor
opts := C.StorageOpts{
options: C.uint8_t(storageOptions),
}
return err2go(C.reindexer_open_namespace(str2c(namespace), opts, C.uint8_t(cacheMode)))
return err2go(C.reindexer_open_namespace(binding.rx, str2c(namespace), opts, C.uint8_t(cacheMode)))
}
func (binding *Builtin) CloseNamespace(namespace string) error {
return err2go(C.reindexer_close_namespace(str2c(namespace)))
return err2go(C.reindexer_close_namespace(binding.rx, str2c(namespace)))
}

func (binding *Builtin) DropNamespace(namespace string) error {
return err2go(C.reindexer_drop_namespace(str2c(namespace)))
return err2go(C.reindexer_drop_namespace(binding.rx, str2c(namespace)))
}

func (binding *Builtin) EnableStorage(path string) error {
l := len(path)
if l > 0 && path[l-1] != '/' {
path += "/"
}
return err2go(C.reindexer_enable_storage(str2c(path)))
return err2go(C.reindexer_enable_storage(binding.rx, str2c(path)))
}

func (binding *Builtin) AddIndex(namespace string, indexDef bindings.IndexDef) error {
bIndexDef, err := json.Marshal(indexDef)
if err != nil {
return err
}

sIndexDef := string(bIndexDef)
err = err2go(C.reindexer_add_index(binding.rx, str2c(namespace), str2c(sIndexDef)))

return err
}

func (binding *Builtin) AddIndex(namespace, index, jsonPath, indexType, fieldType string, indexOpts bindings.IndexOptions, collateMode int, sortOrderStr string) error {
opts := C.IndexOptsC{
options: C.uint8_t(indexOpts),
collate: C.uint8_t(collateMode),
sortOrderLetters: C.CString(sortOrderStr),
func (binding *Builtin) UpdateIndex(namespace string, indexDef bindings.IndexDef) error {
bIndexDef, err := json.Marshal(indexDef)
if err != nil {
return err
}
err := err2go(C.reindexer_add_index(str2c(namespace), str2c(index), str2c(jsonPath), str2c(indexType), str2c(fieldType), opts))
C.free(unsafe.Pointer(opts.sortOrderLetters))

sIndexDef := string(bIndexDef)
err = err2go(C.reindexer_update_index(binding.rx, str2c(namespace), str2c(sIndexDef)))

return err
}

func (binding *Builtin) DropIndex(namespace, index string) error {
return err2go(C.reindexer_drop_index(str2c(namespace), str2c(index)))
return err2go(C.reindexer_drop_index(binding.rx, str2c(namespace), str2c(index)))
}

func (binding *Builtin) ConfigureIndex(namespace, index, config string) error {
return err2go(C.reindexer_configure_index(str2c(namespace), str2c(index), str2c(config)))
return err2go(C.reindexer_configure_index(binding.rx, str2c(namespace), str2c(index), str2c(config)))
}

func (binding *Builtin) PutMeta(namespace, key, data string) error {
return err2go(C.reindexer_put_meta(str2c(namespace), str2c(key), str2c(data)))
return err2go(C.reindexer_put_meta(binding.rx, str2c(namespace), str2c(key), str2c(data)))
}

func (binding *Builtin) GetMeta(namespace, key string) (bindings.RawBuffer, error) {
return ret2go(C.reindexer_get_meta(str2c(namespace), str2c(key)))
return ret2go(C.reindexer_get_meta(binding.rx, str2c(namespace), str2c(key)))
}

func (binding *Builtin) Select(query string, withItems bool, ptVersions []int32, fetchCount int) (bindings.RawBuffer, error) {
binding.cgoLimiter <- struct{}{}
defer func() { <-binding.cgoLimiter }()
return ret2go(C.reindexer_select(str2c(query), bool2cint(withItems), (*C.int32_t)(unsafe.Pointer(&ptVersions[0])), C.int(len(ptVersions))))
return ret2go(C.reindexer_select(binding.rx, str2c(query), bool2cint(withItems), (*C.int32_t)(unsafe.Pointer(&ptVersions[0])), C.int(len(ptVersions))))
}

func (binding *Builtin) SelectQuery(data []byte, withItems bool, ptVersions []int32, fetchCount int) (bindings.RawBuffer, error) {
binding.cgoLimiter <- struct{}{}
defer func() { <-binding.cgoLimiter }()
return ret2go(C.reindexer_select_query(buf2c(data), bool2cint(withItems), (*C.int32_t)(unsafe.Pointer(&ptVersions[0])), C.int(len(ptVersions))))
return ret2go(C.reindexer_select_query(binding.rx, buf2c(data), bool2cint(withItems), (*C.int32_t)(unsafe.Pointer(&ptVersions[0])), C.int(len(ptVersions))))
}

func (binding *Builtin) DeleteQuery(nsHash int, data []byte) (bindings.RawBuffer, error) {
binding.cgoLimiter <- struct{}{}
defer func() { <-binding.cgoLimiter }()
return ret2go(C.reindexer_delete_query(buf2c(data)))
return ret2go(C.reindexer_delete_query(binding.rx, buf2c(data)))
}

func (binding *Builtin) Commit(namespace string) error {
return err2go(C.reindexer_commit(str2c(namespace)))
return err2go(C.reindexer_commit(binding.rx, str2c(namespace)))
}

// CGoLogger logger function for C
Expand All @@ -244,6 +269,12 @@ func (binding *Builtin) DisableLogger() {
logger = nil
}

func (binding *Builtin) Finalize() error {
C.destroy_reindexer(binding.rx)
binding.rx = 0
return nil
}

func newBufFreeBatcher() (bf *bufFreeBatcher) {
bf = &bufFreeBatcher{
bufs: make([]*RawCBuffer, 0, 100),
Expand Down
72 changes: 60 additions & 12 deletions bindings/builtinserver/builtinserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/url"
"reflect"
"sync"
"time"
"unsafe"

Expand All @@ -16,6 +17,7 @@ import (
)

var defaultStartupTimeout time.Duration = time.Minute * 3
var defaultShutdownTimeout time.Duration = defaultStartupTimeout

func init() {
C.init_reindexer_server()
Expand All @@ -40,14 +42,38 @@ func checkStorageReady() bool {
}

type BuiltinServer struct {
builtin bindings.RawBinding
ready chan bool
builtin bindings.RawBinding
wg sync.WaitGroup
shutdownTimeout time.Duration
}

func (server *BuiltinServer) stopServer(timeout time.Duration) error {
if err := err2go(C.stop_reindexer_server()); err != nil {
return err
}

c := make(chan struct{})
go func() {
defer close(c)
server.wg.Wait()
}()

select {
case <-c:
return nil
case <-time.After(timeout):
return bindings.NewError("Shutdown server timeout is expired", bindings.ErrLogic)
}
}

func (server *BuiltinServer) Init(u *url.URL, options ...interface{}) error {
server.builtin = &builtin.Builtin{}
if server.builtin != nil {
return bindings.NewError("already initialized", bindings.ErrConflict)
}

server.builtin = &builtin.Builtin{}
startupTimeout := defaultStartupTimeout
server.shutdownTimeout = defaultShutdownTimeout
serverCfg := config.DefaultServerConfig()

for _, option := range options {
Expand All @@ -60,6 +86,9 @@ func (server *BuiltinServer) Init(u *url.URL, options ...interface{}) error {
if v.ServerConfig != nil {
serverCfg = v.ServerConfig
}
if v.ShutdownTimeout != 0 {
server.shutdownTimeout = v.ShutdownTimeout
}
default:
fmt.Printf("Unknown builtinserver option: %v\n", option)
}
Expand All @@ -70,7 +99,9 @@ func (server *BuiltinServer) Init(u *url.URL, options ...interface{}) error {
return err
}

server.wg.Add(1)
go func() {
defer server.wg.Done()
err := err2go(C.start_reindexer_server(str2c(yamlStr)))
if err != nil {
panic(err)
Expand All @@ -86,18 +117,21 @@ func (server *BuiltinServer) Init(u *url.URL, options ...interface{}) error {
}

pass, _ := u.User.Password()
server.builtin.(*builtin.Builtin).SetReindexerInstance(
unsafe.Pointer(C.get_reindexer_instance(str2c(u.Host), str2c(u.User.Username()), str2c(pass))),
)

var rx C.uintptr_t = 0
if err := err2go(C.get_reindexer_instance(str2c(u.Host), str2c(u.User.Username()), str2c(pass), &rx)); err != nil {
return err
}

url := *u
url.Path = ""

if err := server.builtin.Init(&url, options...); err != nil {
return err
}
options = append(options, bindings.OptionReindexerInstance{Instance: uintptr(rx)})
return server.builtin.Init(&url, options...)
}

return nil
func (server *BuiltinServer) Clone() bindings.RawBinding {
return &BuiltinServer{}
}

func (server *BuiltinServer) OpenNamespace(namespace string, enableStorage, dropOnFileFormatError bool, cacheMode uint8) error {
Expand All @@ -116,8 +150,12 @@ func (server *BuiltinServer) EnableStorage(namespace string) error {
return server.builtin.EnableStorage(namespace)
}

func (server *BuiltinServer) AddIndex(namespace, index, jsonPath, indexType, fieldType string, opts bindings.IndexOptions, collateMode int, sortOrderStr string) error {
return server.builtin.AddIndex(namespace, index, jsonPath, indexType, fieldType, opts, collateMode, sortOrderStr)
func (server *BuiltinServer) AddIndex(namespace string, indexDef bindings.IndexDef) error {
return server.builtin.AddIndex(namespace, indexDef)
}

func (server *BuiltinServer) UpdateIndex(namespace string, indexDef bindings.IndexDef) error {
return server.builtin.UpdateIndex(namespace, indexDef)
}

func (server *BuiltinServer) DropIndex(namespace, index string) error {
Expand Down Expand Up @@ -164,6 +202,16 @@ func (server *BuiltinServer) DisableLogger() {
server.builtin.DisableLogger()
}

func (server *BuiltinServer) Finalize() error {
if err := server.stopServer(server.shutdownTimeout); err != nil {
return err
}
C.destroy_reindexer_server()
server.builtin = nil
server.shutdownTimeout = 0
return nil
}

func (server *BuiltinServer) Ping() error {
return server.builtin.Ping()
}
1 change: 1 addition & 0 deletions bindings/cproto/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
cmdEnumNamespaces = 22
cmdConfigureIndex = 23
cmdDropIndex = 24
cmdUpdateIndex = 25
cmdCommit = 32
cmdModifyItem = 33
cmdDeleteQuery = 34
Expand Down
Loading

0 comments on commit 2a1f776

Please sign in to comment.