Skip to content

Commit

Permalink
OSV certifier: bulk ingest (guacsec#1309)
Browse files Browse the repository at this point in the history
* OSV certifier: bulk ingest

Signed-off-by: mrizzi <[email protected]>

* OSV certifier: MergedIngest

Signed-off-by: mrizzi <[email protected]>

* OSV certifier: limit transfer

Signed-off-by: mrizzi <[email protected]>

---------

Signed-off-by: mrizzi <[email protected]>
  • Loading branch information
mrizzi authored Sep 28, 2023
1 parent 77475db commit 493c117
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 7 deletions.
16 changes: 9 additions & 7 deletions cmd/guacone/cmd/osv.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,12 @@ var osvCmd = &cobra.Command{
packageQuery := root_package.NewPackageQuery(gqlclient, 0)

totalNum := 0
var totalDocs []*processor.Document
gotErr := false
// Set emit function to go through the entire pipeline
emit := func(d *processor.Document) error {
totalNum += 1
err := ingestor.Ingest(ctx, d, opts.graphqlEndpoint, csubClient)

if err != nil {
gotErr = true
return fmt.Errorf("unable to ingest document: %v", err)
}
totalDocs = append(totalDocs, d)
return nil
}

Expand Down Expand Up @@ -127,9 +123,15 @@ var osvCmd = &cobra.Command{
case <-done:
logger.Infof("All certifiers completed")
}
cf()
wg.Wait()

err = ingestor.MergedIngest(ctx, totalDocs, opts.graphqlEndpoint, csubClient)
if err != nil {
gotErr = true
logger.Errorf("unable to ingest documents: %v", err)
}
cf()

if gotErr {
logger.Errorf("completed ingestion with errors")
} else {
Expand Down
71 changes: 71 additions & 0 deletions pkg/ingestor/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,77 @@ func Ingest(ctx context.Context, d *processor.Document, graphqlEndpoint string,
return nil
}

func MergedIngest(ctx context.Context, docs []*processor.Document, graphqlEndpoint string, csubClient csub_client.Client) error {
logger := logging.FromContext(ctx)
// Get pipeline of components
processorFunc := GetProcessor(ctx)
ingestorFunc := GetIngestor(ctx)
collectSubEmitFunc := GetCollectSubEmit(ctx, csubClient)
assemblerFunc := GetAssembler(ctx, graphqlEndpoint)

start := time.Now()

var predicates = make([]assembler.IngestPredicates, 1)
totalPredicates := 0
var idstrings []*parser_common.IdentifierStrings
for _, d := range docs {
docTree, err := processorFunc(d)
if err != nil {
return fmt.Errorf("unable to process doc: %v, format: %v, document: %v", err, d.Format, d.Type)
}

preds, idstrs, err := ingestorFunc(docTree)
if err != nil {
return fmt.Errorf("unable to ingest doc tree: %v", err)
}
for i := range preds {
predicates[0].CertifyScorecard = append(predicates[0].CertifyScorecard, preds[i].CertifyScorecard...)
predicates[0].IsDependency = append(predicates[0].IsDependency, preds[i].IsDependency...)
predicates[0].IsOccurrence = append(predicates[0].IsOccurrence, preds[i].IsOccurrence...)
predicates[0].HasSlsa = append(predicates[0].HasSlsa, preds[i].HasSlsa...)
predicates[0].CertifyVuln = append(predicates[0].CertifyVuln, preds[i].CertifyVuln...)
predicates[0].VulnEqual = append(predicates[0].VulnEqual, preds[i].VulnEqual...)
predicates[0].HasSourceAt = append(predicates[0].HasSourceAt, preds[i].HasSourceAt...)
predicates[0].CertifyBad = append(predicates[0].CertifyBad, preds[i].CertifyBad...)
predicates[0].CertifyGood = append(predicates[0].CertifyGood, preds[i].CertifyGood...)
predicates[0].HasSBOM = append(predicates[0].HasSBOM, preds[i].HasSBOM...)
predicates[0].HashEqual = append(predicates[0].HashEqual, preds[i].HashEqual...)
predicates[0].PkgEqual = append(predicates[0].PkgEqual, preds[i].PkgEqual...)
predicates[0].Vex = append(predicates[0].Vex, preds[i].Vex...)
predicates[0].PointOfContact = append(predicates[0].PointOfContact, preds[i].PointOfContact...)
predicates[0].VulnMetadata = append(predicates[0].VulnMetadata, preds[i].VulnMetadata...)
predicates[0].HasMetadata = append(predicates[0].HasMetadata, preds[i].HasMetadata...)
predicates[0].CertifyLegal = append(predicates[0].CertifyLegal, preds[i].CertifyLegal...)
totalPredicates += 1
// enough predicates have been collected, worth sending them to GraphQL server
if totalPredicates == 5000 {
err = assemblerFunc(predicates)
if err != nil {
return fmt.Errorf("unable to assemble graphs: %v", err)
}
// reset counter and predicates
totalPredicates = 0
predicates[0] = assembler.IngestPredicates{}
}
}
idstrings = append(idstrings, idstrs...)
}

err := collectSubEmitFunc(idstrings)
if err != nil {
logger.Infof("unable to create entries in collectsub server, but continuing: %v", err)
}

err = assemblerFunc(predicates)
if err != nil {
return fmt.Errorf("unable to assemble graphs: %v", err)
}
t := time.Now()
elapsed := t.Sub(start)
logger.Infof("[%v] completed docs %+v", elapsed, len(docs))
return nil
}

func GetProcessor(ctx context.Context) func(*processor.Document) (processor.DocumentTree, error) {
return func(d *processor.Document) (processor.DocumentTree, error) {
return process.Process(ctx, d)
Expand Down

0 comments on commit 493c117

Please sign in to comment.