Skip to content

Commit

Permalink
removing MAX_CONCURRENT_JOBS (guacsec#1682)
Browse files Browse the repository at this point in the history
* removing MAX_CONCURRENT_JOBS

Signed-off-by: Soham Arora <[email protected]>

* removing unnecessary import

Signed-off-by: Soham Arora <[email protected]>

* using regular ctx

Signed-off-by: Soham Arora <[email protected]>

---------

Signed-off-by: Soham Arora <[email protected]>
  • Loading branch information
arorasoham9 authored Feb 5, 2024
1 parent 75a5ae7 commit db6cfcc
Showing 1 changed file with 2 additions and 25 deletions.
27 changes: 2 additions & 25 deletions cmd/guacone/cmd/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"os"
"strconv"
"strings"
"time"

Expand All @@ -38,11 +37,8 @@ import (
"github.com/guacsec/guac/pkg/logging"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)

const maxConcurrentJobsString string = "MAX_CONCURRENT_JOBS"

type fileOptions struct {
// path to the pem file
keyPath string
Expand All @@ -57,7 +53,7 @@ type fileOptions struct {
}

var filesCmd = &cobra.Command{
Use: "files [flags] file_path (set environment variable MAX_CONCURRENT_JOBS to increase the number of documents to ingest in parallel. Default: 1)",
Use: "files [flags] file_path",
Short: "take a folder of files and create a GUAC graph, this command talks directly to the graphQL endpoint",
Run: func(cmd *cobra.Command, args []string) {
ctx := logging.WithLogger(context.Background())
Expand Down Expand Up @@ -118,29 +114,15 @@ var filesCmd = &cobra.Command{
defer csubClient.Close()
}

files, filesCtx := errgroup.WithContext(ctx)

totalNum := 0
totalSuccess := 0
var filesWithErrors []string

gotErr := false

// Backend can only process a few files at a time. Increasing this might cause timeout errors in the database
maxConcurrentJobs, found := os.LookupEnv(maxConcurrentJobsString)
if found {
jobs, err := strconv.Atoi(maxConcurrentJobs)
if err != nil {
logger.Fatalf("failed to convert concurrent jobs value to integer ")
}
files.SetLimit(jobs)
} else {
files.SetLimit(1)
}

emit := func(d *processor.Document) error {
totalNum += 1
err := ingestor.Ingest(filesCtx, d, opts.graphqlEndpoint, csubClient)
err := ingestor.Ingest(ctx, d, opts.graphqlEndpoint, csubClient)

if err != nil {
gotErr = true
Expand All @@ -164,11 +146,6 @@ var filesCmd = &cobra.Command{
logger.Fatal(err)
}

err = files.Wait()
if err != nil {
logger.Fatal(err)
}

if gotErr {
logger.Fatalf("completed ingestion with error, %v of %v were successful - the following files did not ingest successfully: %v", totalSuccess, totalNum, printErrors(filesWithErrors))
} else {
Expand Down

0 comments on commit db6cfcc

Please sign in to comment.