From e0447126759da01676a54b962a8c98389db20a5a Mon Sep 17 00:00:00 2001 From: Travis Turner Date: Fri, 26 Jul 2019 11:34:29 -0500 Subject: [PATCH] add close() method to api --- api.go | 11 +++++++++++ server/server.go | 1 + 2 files changed, 12 insertions(+) diff --git a/api.go b/api.go index 7b03d39e4..913868414 100644 --- a/api.go +++ b/api.go @@ -25,6 +25,7 @@ import ( "io/ioutil" "strconv" "strings" + "sync" "time" "github.com/pilosa/pilosa/pql" @@ -42,6 +43,7 @@ type API struct { cluster *cluster server *Server + importWorkersWG sync.WaitGroup importWorkerPoolSize int importWork chan importJob @@ -83,8 +85,10 @@ func NewAPI(opts ...apiOption) (*API, error) { api.importWork = make(chan importJob, api.importWorkerPoolSize) for i := 0; i < api.importWorkerPoolSize; i++ { + api.importWorkersWG.Add(1) go func() { importWorker(api.importWork) + defer api.importWorkersWG.Done() }() } @@ -119,6 +123,13 @@ func (api *API) validate(f apiMethod) error { return newAPIMethodNotAllowedError(errors.Errorf("api method %s not allowed in state %s", f, state)) } +// Close closes the api and waits for it to shutdown. +func (api *API) Close() error { + close(api.importWork) + api.importWorkersWG.Wait() + return nil +} + // Query parses a PQL query out of the request and executes it. func (api *API) Query(ctx context.Context, req *QueryRequest) (QueryResponse, error) { span, ctx := tracing.StartSpanFromContext(ctx, "API.Query") diff --git a/server/server.go b/server/server.go index 635d4fde7..b4f05bbba 100644 --- a/server/server.go +++ b/server/server.go @@ -378,6 +378,7 @@ func (m *Command) Close() error { eg := errgroup.Group{} eg.Go(m.Handler.Close) eg.Go(m.Server.Close) + eg.Go(m.API.Close) if m.gossipMemberSet != nil { eg.Go(m.gossipMemberSet.Close) }