diff --git a/cmd/guacone/cmd/osv.go b/cmd/guacone/cmd/osv.go index b1f91028cf..da0bb93ca3 100644 --- a/cmd/guacone/cmd/osv.go +++ b/cmd/guacone/cmd/osv.go @@ -83,16 +83,12 @@ var osvCmd = &cobra.Command{ packageQuery := root_package.NewPackageQuery(gqlclient, 0) totalNum := 0 + var totalDocs []*processor.Document gotErr := false // Set emit function to go through the entire pipeline emit := func(d *processor.Document) error { totalNum += 1 - err := ingestor.Ingest(ctx, d, opts.graphqlEndpoint, csubClient) - - if err != nil { - gotErr = true - return fmt.Errorf("unable to ingest document: %v", err) - } + totalDocs = append(totalDocs, d) return nil } @@ -127,9 +123,15 @@ var osvCmd = &cobra.Command{ case <-done: logger.Infof("All certifiers completed") } - cf() wg.Wait() + err = ingestor.MergedIngest(ctx, totalDocs, opts.graphqlEndpoint, csubClient) + if err != nil { + gotErr = true + logger.Errorf("unable to ingest documents: %v", err) + } + cf() + if gotErr { logger.Errorf("completed ingestion with errors") } else { diff --git a/pkg/ingestor/ingestor.go b/pkg/ingestor/ingestor.go index c63999f02f..df78089d7c 100644 --- a/pkg/ingestor/ingestor.go +++ b/pkg/ingestor/ingestor.go @@ -69,6 +69,77 @@ func Ingest(ctx context.Context, d *processor.Document, graphqlEndpoint string, return nil } +func MergedIngest(ctx context.Context, docs []*processor.Document, graphqlEndpoint string, csubClient csub_client.Client) error { + logger := logging.FromContext(ctx) + // Get pipeline of components + processorFunc := GetProcessor(ctx) + ingestorFunc := GetIngestor(ctx) + collectSubEmitFunc := GetCollectSubEmit(ctx, csubClient) + assemblerFunc := GetAssembler(ctx, graphqlEndpoint) + + start := time.Now() + + var predicates = make([]assembler.IngestPredicates, 1) + totalPredicates := 0 + var idstrings []*parser_common.IdentifierStrings + for _, d := range docs { + docTree, err := processorFunc(d) + if err != nil { + return fmt.Errorf("unable to process doc: %v, format: %v, document: %v", err, d.Format, d.Type) + } + + preds, idstrs, err := ingestorFunc(docTree) + if err != nil { + return fmt.Errorf("unable to ingest doc tree: %v", err) + } + for i := range preds { + predicates[0].CertifyScorecard = append(predicates[0].CertifyScorecard, preds[i].CertifyScorecard...) + predicates[0].IsDependency = append(predicates[0].IsDependency, preds[i].IsDependency...) + predicates[0].IsOccurrence = append(predicates[0].IsOccurrence, preds[i].IsOccurrence...) + predicates[0].HasSlsa = append(predicates[0].HasSlsa, preds[i].HasSlsa...) + predicates[0].CertifyVuln = append(predicates[0].CertifyVuln, preds[i].CertifyVuln...) + predicates[0].VulnEqual = append(predicates[0].VulnEqual, preds[i].VulnEqual...) + predicates[0].HasSourceAt = append(predicates[0].HasSourceAt, preds[i].HasSourceAt...) + predicates[0].CertifyBad = append(predicates[0].CertifyBad, preds[i].CertifyBad...) + predicates[0].CertifyGood = append(predicates[0].CertifyGood, preds[i].CertifyGood...) + predicates[0].HasSBOM = append(predicates[0].HasSBOM, preds[i].HasSBOM...) + predicates[0].HashEqual = append(predicates[0].HashEqual, preds[i].HashEqual...) + predicates[0].PkgEqual = append(predicates[0].PkgEqual, preds[i].PkgEqual...) + predicates[0].Vex = append(predicates[0].Vex, preds[i].Vex...) + predicates[0].PointOfContact = append(predicates[0].PointOfContact, preds[i].PointOfContact...) + predicates[0].VulnMetadata = append(predicates[0].VulnMetadata, preds[i].VulnMetadata...) + predicates[0].HasMetadata = append(predicates[0].HasMetadata, preds[i].HasMetadata...) + predicates[0].CertifyLegal = append(predicates[0].CertifyLegal, preds[i].CertifyLegal...) + totalPredicates += 1 + // enough predicates have been collected, worth sending them to GraphQL server + if totalPredicates == 5000 { + err = assemblerFunc(predicates) + if err != nil { + return fmt.Errorf("unable to assemble graphs: %v", err) + } + // reset counter and predicates + totalPredicates = 0 + predicates[0] = assembler.IngestPredicates{} + } + } + idstrings = append(idstrings, idstrs...) + } + + err := collectSubEmitFunc(idstrings) + if err != nil { + logger.Infof("unable to create entries in collectsub server, but continuing: %v", err) + } + + err = assemblerFunc(predicates) + if err != nil { + return fmt.Errorf("unable to assemble graphs: %v", err) + } + t := time.Now() + elapsed := t.Sub(start) + logger.Infof("[%v] completed docs %+v", elapsed, len(docs)) + return nil +} + func GetProcessor(ctx context.Context) func(*processor.Document) (processor.DocumentTree, error) { return func(d *processor.Document) (processor.DocumentTree, error) { return process.Process(ctx, d)