diff --git a/cmd/collectors/keyperf/keyperf.go b/cmd/collectors/keyperf/keyperf.go index 9c7018410..ebecfcdc2 100644 --- a/cmd/collectors/keyperf/keyperf.go +++ b/cmd/collectors/keyperf/keyperf.go @@ -8,6 +8,7 @@ import ( "github.com/netapp/harvest/v2/pkg/conf" "github.com/netapp/harvest/v2/pkg/errs" "github.com/netapp/harvest/v2/pkg/matrix" + "github.com/netapp/harvest/v2/pkg/set" "github.com/netapp/harvest/v2/pkg/slogx" "github.com/tidwall/gjson" "log/slog" @@ -302,6 +303,11 @@ func (kp *KeyPerf) pollData( ) prevMat = kp.Matrix[kp.Object] + // Track old instances before processing batches + oldInstances := set.New() + for key := range prevMat.GetInstances() { + oldInstances.Add(key) + } // clone matrix without numeric data curMat = prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true}) @@ -314,10 +320,10 @@ func (kp *KeyPerf) pollData( if len(perfRecords) == 0 { return nil, errs.New(errs.ErrNoInstance, "no "+kp.Object+" instances on cluster") } - count, numPartials = kp.HandleResults(curMat, perfRecords, kp.Prop, false) + count, numPartials = kp.HandleResults(curMat, perfRecords, kp.Prop, false, oldInstances) // process endpoints - eCount, endpointAPID := kp.ProcessEndPoints(curMat, endpointFunc) + eCount, endpointAPID := kp.ProcessEndPoints(curMat, endpointFunc, oldInstances) count += eCount parseD = time.Since(startTime) diff --git a/cmd/collectors/rest/rest.go b/cmd/collectors/rest/rest.go index b723bb1bc..9051ae792 100644 --- a/cmd/collectors/rest/rest.go +++ b/cmd/collectors/rest/rest.go @@ -380,65 +380,79 @@ func (r *Rest) updateHref() { } func (r *Rest) PollData() (map[string]*matrix.Matrix, error) { - var ( - startTime time.Time - err error - records []gjson.Result + apiD, parseD time.Duration + metricCount uint64 ) - r.Matrix[r.Object].Reset() r.Client.Metadata.Reset() - startTime = time.Now() - - if records, err = r.GetRestData(r.Prop.Href); err != nil { - return nil, err - } - - if len(records) == 0 { - return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster") + // Track old instances before processing batches + oldInstances := set.New() + for key := range r.Matrix[r.Object].GetInstances() { + oldInstances.Add(key) } - return r.pollData(startTime, records, func(e *EndPoint) ([]gjson.Result, time.Duration, error) { - return r.ProcessEndPoint(e) - }) -} + processBatch := func(records []gjson.Result) error { + if len(records) == 0 { + return errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster") + } -func (r *Rest) pollData( - startTime time.Time, - records []gjson.Result, - endpointFunc func(e *EndPoint) ([]gjson.Result, time.Duration, error), -) (map[string]*matrix.Matrix, error) { + // Process the current batch of records + count, batchParseD := r.pollData(records, oldInstances) + metricCount += count + parseD += batchParseD + apiD -= batchParseD + return nil + } - var ( - count uint64 - apiD, parseD time.Duration - ) + startTime := time.Now() + if err := rest.FetchAllStream(r.Client, r.Prop.Href, processBatch); err != nil { + return nil, err + } + apiD += time.Since(startTime) - apiD = time.Since(startTime) - startTime = time.Now() - mat := r.Matrix[r.Object] + // Process endpoints after all batches have been processed + eCount, endpointAPID := r.ProcessEndPoints(r.Matrix[r.Object], r.ProcessEndPoint, oldInstances) + metricCount += eCount + apiD += endpointAPID - count, _ = r.HandleResults(mat, records, r.Prop, false) + r.postPollData(apiD, parseD, metricCount, oldInstances) + return r.Matrix, nil +} - // process endpoints - eCount, endpointAPID := r.ProcessEndPoints(mat, endpointFunc) - count += eCount - parseD = time.Since(startTime) +func (r *Rest) postPollData(apiD time.Duration, parseD time.Duration, metricCount uint64, oldInstances *set.Set) { + // Remove old instances that are not found in new instances + for key := range oldInstances.Iter() { + r.Matrix[r.Object].RemoveInstance(key) + } numRecords := len(r.Matrix[r.Object].GetInstances()) - _ = r.Metadata.LazySetValueInt64("api_time", "data", (apiD + endpointAPID).Microseconds()) + _ = r.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds()) _ = r.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds()) - _ = r.Metadata.LazySetValueUint64("metrics", "data", count) + _ = r.Metadata.LazySetValueUint64("metrics", "data", metricCount) _ = r.Metadata.LazySetValueUint64("instances", "data", uint64(numRecords)) _ = r.Metadata.LazySetValueUint64("bytesRx", "data", r.Client.Metadata.BytesRx) _ = r.Metadata.LazySetValueUint64("numCalls", "data", r.Client.Metadata.NumCalls) - r.AddCollectCount(count) + r.AddCollectCount(metricCount) +} + +func (r *Rest) pollData(records []gjson.Result, oldInstances *set.Set) (uint64, time.Duration) { - return r.Matrix, nil + var ( + count uint64 + parseD time.Duration + ) + + startTime := time.Now() + mat := r.Matrix[r.Object] + + count, _ = r.HandleResults(mat, records, r.Prop, false, oldInstances) + parseD = time.Since(startTime) + + return count, parseD } func (r *Rest) ProcessEndPoint(e *EndPoint) ([]gjson.Result, time.Duration, error) { @@ -450,7 +464,7 @@ func (r *Rest) ProcessEndPoint(e *EndPoint) ([]gjson.Result, time.Duration, erro return data, time.Since(now), nil } -func (r *Rest) ProcessEndPoints(mat *matrix.Matrix, endpointFunc func(e *EndPoint) ([]gjson.Result, time.Duration, error)) (uint64, time.Duration) { +func (r *Rest) ProcessEndPoints(mat *matrix.Matrix, endpointFunc func(e *EndPoint) ([]gjson.Result, time.Duration, error), oldInstances *set.Set) (uint64, time.Duration) { var ( err error count uint64 @@ -475,7 +489,7 @@ func (r *Rest) ProcessEndPoints(mat *matrix.Matrix, endpointFunc func(e *EndPoin r.Logger.Debug("no instances on cluster", slog.String("APIPath", endpoint.prop.Query)) continue } - count, _ = r.HandleResults(mat, records, endpoint.prop, true) + count, _ = r.HandleResults(mat, records, endpoint.prop, true, oldInstances) } return count, totalAPID @@ -531,21 +545,15 @@ func (r *Rest) LoadPlugin(kind string, abc *plugin.AbstractPlugin) plugin.Plugin // HandleResults function is used for handling the rest response for parent as well as endpoints calls, // isEndPoint would be true only for the endpoint call, and it can't create/delete instance. -func (r *Rest) HandleResults(mat *matrix.Matrix, result []gjson.Result, prop *prop, isEndPoint bool) (uint64, uint64) { +func (r *Rest) HandleResults(mat *matrix.Matrix, result []gjson.Result, prop *prop, isEndPoint bool, oldInstances *set.Set) (uint64, uint64) { var ( err error count uint64 numPartials uint64 ) - oldInstances := set.New() currentInstances := set.New() - // copy keys of current instances. This is used to remove deleted instances from matrix later - for key := range mat.GetInstances() { - oldInstances.Add(key) - } - for _, instanceData := range result { var ( instanceKey string @@ -591,11 +599,10 @@ func (r *Rest) HandleResults(mat *matrix.Matrix, result []gjson.Result, prop *pr } else { currentInstances.Add(instanceKey) } - oldInstances.Remove(instanceKey) - // clear all instance labels as there are some fields which may be missing between polls // Don't remove instance labels when endpoints are being processed because endpoints uses parent instance only. if !isEndPoint { + oldInstances.Remove(instanceKey) instance.ClearLabels() } for label, display := range prop.InstanceLabels { @@ -671,14 +678,6 @@ func (r *Rest) HandleResults(mat *matrix.Matrix, result []gjson.Result, prop *pr } } - // Used for parent as we don't want to remove instances for endpoints - if !isEndPoint { - // remove deleted instances - for key := range oldInstances.Iter() { - mat.RemoveInstance(key) - } - } - return count, numPartials } diff --git a/cmd/collectors/rest/rest_test.go b/cmd/collectors/rest/rest_test.go index 01391086f..77b35901b 100644 --- a/cmd/collectors/rest/rest_test.go +++ b/cmd/collectors/rest/rest_test.go @@ -7,6 +7,7 @@ import ( "github.com/netapp/harvest/v2/cmd/poller/options" "github.com/netapp/harvest/v2/pkg/conf" "github.com/netapp/harvest/v2/pkg/matrix" + "github.com/netapp/harvest/v2/pkg/set" "github.com/netapp/harvest/v2/pkg/util" "github.com/tidwall/gjson" "os" @@ -73,31 +74,18 @@ func TestMain(m *testing.M) { benchRest = newRest("Volume", "volume.yaml", "testdata/conf") fullPollData = collectors.JSONToGson("testdata/volume-1.json.gz", true) - now := time.Now().Truncate(time.Second) - _, _ = benchRest.pollData(now, fullPollData, volumeEndpoints) + _, _ = benchRest.pollData(fullPollData, set.New()) os.Exit(m.Run()) } func BenchmarkRestPerf_PollData(b *testing.B) { - var err error ms = make([]*matrix.Matrix, 0) now := time.Now().Truncate(time.Second) for range b.N { now = now.Add(time.Minute * 15) - mi, _ := benchRest.pollData(now, fullPollData, volumeEndpoints) - - for _, mm := range mi { - ms = append(ms, mm) - } - mi, err = benchRest.pollData(now, fullPollData, volumeEndpoints) - if err != nil { - b.Errorf("error: %v", err) - } - for _, mm := range mi { - ms = append(ms, mm) - } + _, _ = benchRest.pollData(fullPollData, set.New()) } } @@ -121,14 +109,14 @@ func Test_pollDataVolume(t *testing.T) { t.Run(tt.name, func(t *testing.T) { r := newRest("Volume", "volume.yaml", "testdata/conf") - now := time.Now().Truncate(time.Second) pollData := collectors.JSONToGson(tt.pollDataPath1, true) - mm, err := r.pollData(now, pollData, volumeEndpoints) - if err != nil { - t.Fatal(err) - } - m := mm["Volume"] + mcount, parseD := r.pollData(pollData, set.New()) + mecount, apiD := r.ProcessEndPoints(r.Matrix[r.Object], volumeEndpoints, set.New()) + + metricCount := mcount + mecount + r.postPollData(apiD, parseD, metricCount, set.New()) + m := r.Matrix["Volume"] if len(m.GetInstances()) != tt.numInstances { t.Errorf("pollData() numInstances got=%v, want=%v", len(m.GetInstances()), tt.numInstances) diff --git a/cmd/tools/rest/rest.go b/cmd/tools/rest/rest.go index 75488c18c..5a89163a3 100644 --- a/cmd/tools/rest/rest.go +++ b/cmd/tools/rest/rest.go @@ -470,6 +470,56 @@ func FetchAnalytics(client *Client, href string) ([]gjson.Result, gjson.Result, return result, *analytics, nil } +func FetchAllStream(client *Client, href string, processBatch func([]gjson.Result) error, headers ...map[string]string) error { + var prevLink string + nextLink := href + + for { + var records []gjson.Result + response, err := client.GetRest(nextLink, headers...) + if err != nil { + return fmt.Errorf("error making request %w", err) + } + + output := gjson.ParseBytes(response) + data := output.Get("records") + numRecords := output.Get("num_records") + next := output.Get("_links.next.href") + + if data.Exists() { + if numRecords.Int() > 0 { + // Process the current batch of records + if err := processBatch(data.Array()); err != nil { + return err + } + } + + prevLink = nextLink + // If there is a next link, follow it + nextLink = next.String() + if nextLink == "" || nextLink == prevLink { + // no nextLink or nextLink is the same as the previous link, no progress is being made, exit + break + } + } else { + contentJSON := `{"records":[]}` + response, err := sjson.SetRawBytes([]byte(contentJSON), "records.-1", response) + if err != nil { + return fmt.Errorf("error setting record %w", err) + } + value := gjson.GetBytes(response, "records") + records = append(records, value.Array()...) + // Process the current batch of records + if err := processBatch(records); err != nil { + return err + } + break + } + } + + return nil +} + func fetchAll(client *Client, href string, records *[]gjson.Result, headers ...map[string]string) error { var prevLink string