Skip to content

Commit

Permalink
Introduce ServiceManager and Refactor DAX Integration tests (FeatureB…
Browse files Browse the repository at this point in the history
…aseDB#2320)

* Introduce ServiceManager and Refactor DAX Integration tests

The ServiceManager provides an interface with which to manage
featurebase (dax) services (mds, queryer, computer). It replaces the
confusing interface implementations in /dax/server/server.go (which
optionally used pointers to in-process objects to satisfy an interface)
with (for now) http implementations. The thought is that even if we're
running all services in-process, we should communicate between services
over http in order to mirror what we would do in a production
environment where the services are running on different nodes.

This batch of commits does quit a lot, most of which is captured here:

- Added `path` support to `dax.Address`. Address is now a string of the form [scheme]://[host]:[port]/[path].
- Added `Holder.directiveApplied` to determine (in tests) if the computer has completed applying the latest directive. This is somewhat temporary until we improve the mds-to-computer logic.
- Removed the "service prefix" code which was prepending client URL paths with the prefix. Instead, the serviceType (mds, queryer, computer[n] is now part of `dax.Address`).
- Removed, from the dax config, the top level `StorageMethod` and `StorageDSN` and now just have `MDS.Config.DataDir`.
- Added `Computer.Config.N` to specify the number of computers to run in-process.
- Moved the `pilosa.MDS` interface to `computer.Registrar`. This is an example of getting the interfaces defined in the right packages.
- Added `SnapshotTable()` method to the mds client (to align with its API).
- Changed `Balancer.AddJob()` to `Balancer.AddJobs()` to support, for example, adding 256 partitions in a single call. Refactored some of the naive Balancer to account for this.
- Added a `Seed` to the top-level config. It's not really useful because of package `crypto/rand`.
- Added an in-memory implementation of the DisCo interface and disabled etcd in a computer service.
- Create sepearte data-dirs for each in-process computer.
- Disabled grpc in dax.
- Modified the sql3 test definition format to support multiple insert steps and separate query results (to align with those steps).

* Changes necessary to get multiple computer instance running in-process

For now the config looks like this:

```
[computer]
run = true
n = 4
```

but we can probably just change that to be something like:

```
[computer]
run = 4
```

*Issues found running multiple "computers" in-process*
- grpc was trying to bind on the same port
  - changed GRPCListener from `*net.TCPListener` to `net.Listener`
  - created a nopListener and set to that for now (i.e. disabled grpc)
- etcd was starting more than once
  - changed dax to use in-memory implementations of the disco interfaces (i.e. stop using etcd)
- IDAllocator (which uses boltdb) was trying to open the `idalloc.db` file more than once
  - realized we have to set separate data-dirs for each holder. that fixed it.

* Port dax integration tests to ManagedCommand

* Modify Balancer-related methods like AddJob to AddJobs

There were (and still are) a lot of places where we were adding on job
at a time, even when we had a long list of jobs to add. This resulted in
every job add (for example adding 1 of 256 shards) taking ~40ms, or over
10s to create a keyed table. One reason was because each job add was
making multiple boltdb transactions.

* Port over more dax integration test stuff

* Add DirectiveApplied to signify that snapshot/writes have loaded.

We use this in tests to avoid using sleeps.
This should be considered temporary; we're going to need a more robust
solution for determining when a computer node is ready to serve complete
data.

* Finish porting dax integration tests

* Improve godocs

* Remove docker-based DAX integration tests.

* go mod tidy

* Move test/managed.go to avoid package conflicts

* Modify IDK integration tests to work with ServiceManager changes

This is really just computer -> computer0
And the MDS DataDir config change.

* cleanup found during review

* echo $CI_COMMIT_REF_SLUG in CI

* remove docker image arg, use build instead

(cherry picked from commit 2843f21)
  • Loading branch information
travisturner authored and fhaynes committed Dec 12, 2022
1 parent d6d5ddb commit a44b622
Show file tree
Hide file tree
Showing 68 changed files with 2,454 additions and 3,369 deletions.
12 changes: 12 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3091,10 +3091,19 @@ func (api *API) RBFDebugInfo() map[string]*rbf.DebugInfo {
return infos
}

// Directive applies the provided Directive to the local computer.
func (api *API) Directive(ctx context.Context, d *dax.Directive) error {
return api.ApplyDirective(ctx, d)
}

// DirectiveApplied returns true if the computer's current Directive has been
// applied and is ready to be queried. This it temporary (primarily for tests)
// and needs to be refactored as we improve the logic around mds-to-computer
// communication.
func (api *API) DirectiveApplied(ctx context.Context) (bool, error) {
return api.holder.DirectiveApplied(), nil
}

// SnapshotShardData triggers the node to perform a shard snapshot based on the
// provided SnapshotShardDataRequest.
func (api *API) SnapshotShardData(ctx context.Context, req *dax.SnapshotShardDataRequest) error {
Expand Down Expand Up @@ -3135,6 +3144,7 @@ func (api *API) SnapshotShardData(ctx context.Context, req *dax.SnapshotShardDat

// Update the cached directive on the holder.
api.holder.SetDirective(&req.Directive)
api.holder.SetDirectiveApplied(true)

// Finally, delete the log file for the previous version.
return api.writeLogWriter.DeleteShard(ctx, qtid, partitionNum, req.ShardNum, req.FromVersion)
Expand Down Expand Up @@ -3183,6 +3193,7 @@ func (api *API) SnapshotTableKeys(ctx context.Context, req *dax.SnapshotTableKey

// Update the cached directive on the holder.
api.holder.SetDirective(&req.Directive)
api.holder.SetDirectiveApplied(true)

// Finally, delete the log file for the previous version.
return api.writeLogWriter.DeleteTableKeys(ctx, qtid, req.PartitionNum, req.FromVersion)
Expand Down Expand Up @@ -3224,6 +3235,7 @@ func (api *API) SnapshotFieldKeys(ctx context.Context, req *dax.SnapshotFieldKey

// Update the cached directive on the holder.
api.holder.SetDirective(&req.Directive)
api.holder.SetDirectiveApplied(true)

// Finally, delete the log file for the previous version.
return api.writeLogWriter.DeleteFieldKeys(ctx, qtid, req.Field, req.FromVersion)
Expand Down
1 change: 1 addition & 0 deletions api_directive.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (api *API) ApplyDirective(ctx context.Context, d *dax.Directive) error {
// TODO(tlt): despite what this comment says, this logic is not sound; we
// shouldn't be setting the directive until enactiveDirective() succeeds.
api.holder.SetDirective(d)
defer api.holder.SetDirectiveApplied(true)

return api.enactDirective(ctx, &previousDirective, d)
}
Expand Down
63 changes: 35 additions & 28 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (c *Client) Query(query PQLQuery, options ...interface{}) (*QueryResponse,
if err != nil {
return nil, errors.Wrap(err, "making request data")
}
path := fmt.Sprintf("%s/index/%s/query", c.prefix(), query.Index().name)
path := fmt.Sprintf("/index/%s/query", query.Index().name)
_, respData, err := c.HTTPRequest("POST", path, reqData, c.augmentHeaders(defaultProtobufHeaders()))
if err != nil {
return nil, err
Expand All @@ -334,7 +334,7 @@ func (c *Client) CreateIndex(index *Index) error {
defer span.Finish()

data := []byte(index.options.String())
path := fmt.Sprintf("%s/index/%s", c.prefix(), index.name)
path := fmt.Sprintf("/index/%s", index.name)
status, body, err := c.HTTPRequest("POST", path, data, c.augmentHeaders(nil))
if err != nil {
return errors.Wrapf(err, "creating index: %s", index.name)
Expand All @@ -358,7 +358,7 @@ func (c *Client) CreateField(field *Field) error {
defer span.Finish()

data := []byte(field.options.String())
path := fmt.Sprintf("%s/index/%s/field/%s", c.prefix(), field.index.name, field.name)
path := fmt.Sprintf("/index/%s/field/%s", field.index.name, field.name)
status, body, err := c.HTTPRequest("POST", path, data, c.augmentHeaders(nil))
if err != nil {
return errors.Wrapf(err, "creating field: %s in index: %s", field.name, field.index.name)
Expand Down Expand Up @@ -426,7 +426,7 @@ func (c *Client) DeleteIndexByName(index string) error {
span := c.tracer.StartSpan("Client.DeleteIndex")
defer span.Finish()

path := fmt.Sprintf("%s/index/%s", c.prefix(), index)
path := fmt.Sprintf("/index/%s", index)
_, _, err := c.HTTPRequest("DELETE", path, nil, c.augmentHeaders(nil))
return err
}
Expand All @@ -436,7 +436,7 @@ func (c *Client) DeleteField(field *Field) error {
span := c.tracer.StartSpan("Client.DeleteField")
defer span.Finish()

path := fmt.Sprintf("%s/index/%s/field/%s", c.prefix(), field.index.name, field.name)
path := fmt.Sprintf("/index/%s/field/%s", field.index.name, field.name)
_, _, err := c.HTTPRequest("DELETE", path, nil, c.augmentHeaders(nil))
return err
}
Expand Down Expand Up @@ -547,7 +547,7 @@ func (c *Client) EncodeImport(field *Field, shard uint64, vals, ids []uint64, cl
if err != nil {
return "", nil, errors.Wrap(err, "marshaling Import to protobuf")
}
path = fmt.Sprintf("%s/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", c.prefix(), field.index.Name(), field.Name(), strconv.FormatBool(clear))
path = fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", field.index.Name(), field.Name(), strconv.FormatBool(clear))
return path, data, nil
}

Expand Down Expand Up @@ -594,7 +594,7 @@ func (c *Client) EncodeImportValues(field *Field, shard uint64, vals []int64, id
if err != nil {
return "", nil, errors.Wrap(err, "marshaling ImportValue to protobuf")
}
path = fmt.Sprintf("%s/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", c.prefix(), field.index.Name(), field.Name(), strconv.FormatBool(clear))
path = fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", field.index.Name(), field.Name(), strconv.FormatBool(clear))
return path, data, nil
}

Expand Down Expand Up @@ -625,7 +625,7 @@ func (c *Client) fetchFragmentNodes(indexName string, shard uint64) ([]fragmentN
if c.manualFragmentNode != nil {
return []fragmentNode{*c.manualFragmentNode}, nil
}
path := fmt.Sprintf("%s/internal/fragment/nodes?shard=%d&index=%s", c.prefix(), shard, indexName)
path := fmt.Sprintf("/internal/fragment/nodes?shard=%d&index=%s", shard, indexName)
_, body, err := c.HTTPRequest("GET", path, []byte{}, c.augmentHeaders(nil))
if err != nil {
return nil, err
Expand Down Expand Up @@ -688,7 +688,7 @@ func (c *Client) ImportRoaringShard(index string, shard uint64, request *pilosa.
for _, uri := range uris {
uri := uri
eg.Go(func() error {
return c.importData(uri, fmt.Sprintf("%s/index/%s/shard/%d/import-roaring", c.prefix(), index, shard), data)
return c.importData(uri, fmt.Sprintf("/index/%s/shard/%d/import-roaring", index, shard), data)
})
}
err = eg.Wait()
Expand Down Expand Up @@ -722,7 +722,7 @@ func (c *Client) importRoaringBitmap(uri *pnet.URI, field *Field, shard uint64,
}
params := url.Values{}
params.Add("clear", strconv.FormatBool(options.clear))
path := makeRoaringImportPath(field, shard, params, c.prefix())
path := makeRoaringImportPath(field, shard, params)
req := &pb.ImportRoaringRequest{
Clear: options.clear,
Views: protoViews,
Expand Down Expand Up @@ -776,7 +776,7 @@ func (c *Client) Info() (Info, error) {
span := c.tracer.StartSpan("Client.Info")
defer span.Finish()

path := fmt.Sprintf("%s/info", c.prefix())
path := "/info"
_, data, err := c.HTTPRequest("GET", path, nil, c.augmentHeaders(nil))
if err != nil {
return Info{}, errors.Wrap(err, "requesting /info")
Expand All @@ -794,7 +794,7 @@ func (c *Client) Status() (Status, error) {
span := c.tracer.StartSpan("Client.Status")
defer span.Finish()

path := fmt.Sprintf("%s/status", c.prefix())
path := "/status"
_, data, err := c.HTTPRequest("GET", path, nil, nil)
if err != nil {
return Status{}, errors.Wrap(err, "requesting /status")
Expand All @@ -808,7 +808,7 @@ func (c *Client) Status() (Status, error) {
}

func (c *Client) readSchema() ([]SchemaIndex, error) {
path := fmt.Sprintf("%s/schema", c.prefix())
path := "/schema"
_, data, err := c.HTTPRequest("GET", path, nil, c.augmentHeaders(nil))
if err != nil {
return nil, errors.Wrap(err, "requesting /schema")
Expand All @@ -822,7 +822,7 @@ func (c *Client) readSchema() ([]SchemaIndex, error) {
}

func (c *Client) shardsMax() (map[string]uint64, error) {
path := fmt.Sprintf("%s/internal/shards/max", c.prefix())
path := "/internal/shards/max"
_, data, err := c.HTTPRequest("GET", path, nil, nil)
if err != nil {
return nil, errors.Wrap(err, "requesting /internal/shards/max")
Expand Down Expand Up @@ -866,7 +866,7 @@ func (c *Client) httpRequest(method string, path string, data []byte, headers ma
// doRequest implements expotential backoff
status, body, err = c.doRequest(host, method, path, c.augmentHeaders(headers), data)
// conditions when primary should not be tried
pathCheck := fmt.Sprintf("%s/status", c.prefix())
pathCheck := "/status"
if err == nil || usePrimary || path == pathCheck {
break
}
Expand Down Expand Up @@ -902,8 +902,9 @@ func (c *Client) host(usePrimary bool) (*pnet.URI, error) {
c.primaryLock.Unlock()
return nil, errors.Wrap(err, "fetching primary node")
}
if host, err = pnet.NewURIFromAddress(fmt.Sprintf("%s://%s:%d", node.Scheme, node.Host, node.Port)); err != nil {
return nil, errors.Wrap(err, "parsing primary node URL")
addr := fmt.Sprintf("%s://%s:%d", node.Scheme, node.Host, node.Port)
if host, err = pnet.NewURIFromAddress(addr); err != nil {
return nil, errors.Wrapf(err, "parsing primary node URL: %s", addr)
}
} else {
host = c.primaryURI
Expand All @@ -930,6 +931,12 @@ func (c *Client) doRequest(host *pnet.URI, method, path string, headers map[stri
sleepTime time.Duration
rand = rand.New(rand.NewSource(time.Now().UnixNano()))
)

// We have to add the service prefix to the path here (where applicable)
// because the pnet.URI type doesn't support the path portion of an address.
// Where needed, we already have a service prefix set on the Client.
path = c.prefix() + path

for retry := 0; ; {
if req, err = buildRequest(host, method, path, headers, data); err != nil {
return 0, nil, errors.Wrap(err, "building request")
Expand Down Expand Up @@ -1050,7 +1057,7 @@ func (c *Client) augmentHeaders(headers map[string]string) map[string]string {
// FindFieldKeys looks up the IDs associated with specified keys in a field.
// If a key does not exist, the result will not include it.
func (c *Client) FindFieldKeys(field *Field, keys ...string) (map[string]uint64, error) {
path := fmt.Sprintf("%s/internal/translate/field/%s/%s/keys/find", c.prefix(), field.index.name, field.name)
path := fmt.Sprintf("/internal/translate/field/%s/%s/keys/find", field.index.name, field.name)

reqData, err := json.Marshal(keys)
if err != nil {
Expand Down Expand Up @@ -1082,7 +1089,7 @@ func (c *Client) FindFieldKeys(field *Field, keys ...string) (map[string]uint64,
// CreateFieldKeys looks up the IDs associated with specified keys in a field.
// If a key does not exist, it will be created.
func (c *Client) CreateFieldKeys(field *Field, keys ...string) (map[string]uint64, error) {
path := fmt.Sprintf("%s/internal/translate/field/%s/%s/keys/create", c.prefix(), field.index.name, field.name)
path := fmt.Sprintf("/internal/translate/field/%s/%s/keys/create", field.index.name, field.name)

reqData, err := json.Marshal(keys)
if err != nil {
Expand Down Expand Up @@ -1114,7 +1121,7 @@ func (c *Client) CreateFieldKeys(field *Field, keys ...string) (map[string]uint6
// FindIndexKeys looks up the IDs associated with specified column keys in an index.
// If a key does not exist, the result will not include it.
func (c *Client) FindIndexKeys(idx *Index, keys ...string) (map[string]uint64, error) {
path := fmt.Sprintf("%s/internal/translate/index/%s/keys/find", c.prefix(), idx.name)
path := fmt.Sprintf("/internal/translate/index/%s/keys/find", idx.name)

reqData, err := json.Marshal(keys)
if err != nil {
Expand Down Expand Up @@ -1146,7 +1153,7 @@ func (c *Client) FindIndexKeys(idx *Index, keys ...string) (map[string]uint64, e
// CreateIndexKeys looks up the IDs associated with specified column keys in an index.
// If a key does not exist, it will be created.
func (c *Client) CreateIndexKeys(idx *Index, keys ...string) (map[string]uint64, error) {
path := fmt.Sprintf("%s/internal/translate/index/%s/keys/create", c.prefix(), idx.name)
path := fmt.Sprintf("/internal/translate/index/%s/keys/create", idx.name)

reqData, err := json.Marshal(keys)
if err != nil {
Expand Down Expand Up @@ -1200,7 +1207,7 @@ func (c *Client) startTransaction(id string, timeout time.Duration, exclusive bo
return nil, errors.Wrap(err, "marshalling transaction")
}

path := fmt.Sprintf("%s/transaction", c.prefix())
path := "/transaction"
status, data, err := c.httpRequest("POST", path, bod, c.augmentHeaders(defaultJSONHeaders()), true)
if status == http.StatusConflict && time.Now().Before(deadline) {
// if we're getting StatusConflict after all the usual timeouts/retries, keep retrying until the deadline
Expand Down Expand Up @@ -1228,7 +1235,7 @@ func (c *Client) startTransaction(id string, timeout time.Duration, exclusive bo
}

func (c *Client) FinishTransaction(id string) (*pilosa.Transaction, error) {
path := fmt.Sprintf("%s/transaction/%s/finish", c.prefix(), id)
path := fmt.Sprintf("/transaction/%s/finish", id)
_, data, err := c.httpRequest("POST", path, nil, c.augmentHeaders(defaultJSONHeaders()), true)
if err != nil && len(data) == 0 {
return nil, err
Expand All @@ -1251,7 +1258,7 @@ func (c *Client) FinishTransaction(id string) (*pilosa.Transaction, error) {
}

func (c *Client) Transactions() (map[string]*pilosa.Transaction, error) {
path := fmt.Sprintf("%s/transactions", c.prefix())
path := "/transactions"
_, respData, err := c.httpRequest("GET", path, nil, c.augmentHeaders(defaultJSONHeaders()), true)
if err != nil {
return nil, errors.Wrap(err, "getting transactions")
Expand All @@ -1266,7 +1273,7 @@ func (c *Client) Transactions() (map[string]*pilosa.Transaction, error) {
}

func (c *Client) GetTransaction(id string) (*pilosa.Transaction, error) {
path := fmt.Sprintf("%s/transaction/%s", c.prefix(), id)
path := fmt.Sprintf("/transaction/%s", id)
_, data, err := c.httpRequest("GET", path, nil, c.augmentHeaders(defaultJSONHeaders()), true)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1344,9 +1351,9 @@ func makeRequestData(query string, options *QueryOptions) ([]byte, error) {
return r, nil
}

func makeRoaringImportPath(field *Field, shard uint64, params url.Values, pathPrefix string) string {
return fmt.Sprintf("%s/index/%s/field/%s/import-roaring/%d?%s",
pathPrefix, field.index.name, field.name, shard, params.Encode())
func makeRoaringImportPath(field *Field, shard uint64, params url.Values) string {
return fmt.Sprintf("/index/%s/field/%s/import-roaring/%d?%s",
field.index.name, field.name, shard, params.Encode())
}

type viewImports map[string]*roaring.Bitmap
Expand Down
5 changes: 2 additions & 3 deletions ctl/dax.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ func BuildDAXFlags(cmd *cobra.Command, srv *server.Command) {
flags.BoolVar(&srv.Config.Verbose, "verbose", srv.Config.Verbose, "Enable verbose logging")
flags.StringVar(&srv.Config.LogPath, "log-path", srv.Config.LogPath, "Log path")

flags.StringVar(&srv.Config.StorageMethod, "storage-method", srv.Config.StorageMethod, "Method to use for persistent storage.")
flags.StringVar(&srv.Config.StorageDSN, "storage-dsn", srv.Config.StorageDSN, "Datasource Name when using an applicable storage method.")

// MDS
flags.BoolVar(&srv.Config.MDS.Run, "mds.run", srv.Config.MDS.Run, "Run the MDS service in process.")
flags.DurationVar(&srv.Config.MDS.Config.RegistrationBatchTimeout, "mds.config.registration-batch-timeout", srv.Config.MDS.Config.RegistrationBatchTimeout, "Timeout for node registration batches.")
flags.StringVar(&srv.Config.MDS.Config.DataDir, "mds.config.data-dir", srv.Config.MDS.Config.DataDir, "MDS directory to use in process.")

// WriteLogger
flags.BoolVar(&srv.Config.WriteLogger.Run, "writelogger.run", srv.Config.WriteLogger.Run, "Run the WriteLogger service in process.")
Expand All @@ -35,5 +33,6 @@ func BuildDAXFlags(cmd *cobra.Command, srv *server.Command) {

// Computer
flags.BoolVar(&srv.Config.Computer.Run, "computer.run", srv.Config.Computer.Run, "Run the Computer service in process.")
flags.IntVar(&srv.Config.Computer.N, "computer.n", srv.Config.Computer.N, "The number of Computer services to run in process.")
flags.AddFlagSet(serverFlagSet(&srv.Config.Computer.Config, "computer.config"))
}
Loading

0 comments on commit a44b622

Please sign in to comment.