diff --git a/cmd/README.md b/cmd/README.md index 70fd1df9d4..6c9dd1a679 100644 --- a/cmd/README.md +++ b/cmd/README.md @@ -98,7 +98,7 @@ not required. therefore don't use camelCase anywhere. - Descriptive on its own - The flag names are also used in the guac.yaml config - file. Therefore a name should be self descriptive. Good: `nats-addr`, Bad: + file. Therefore a name should be self descriptive. Good: `pubsub-addr`, Bad: `type`. If it is something that has the same meaning everywhere, it is ok to be short: ex: interval. diff --git a/cmd/guaccollect/cmd/deps_dev.go b/cmd/guaccollect/cmd/deps_dev.go index fcfd711f77..5c59035190 100644 --- a/cmd/guaccollect/cmd/deps_dev.go +++ b/cmd/guaccollect/cmd/deps_dev.go @@ -23,7 +23,6 @@ import ( "time" "github.com/guacsec/guac/pkg/cli" - "github.com/guacsec/guac/pkg/collectsub/client" csubclient "github.com/guacsec/guac/pkg/collectsub/client" "github.com/guacsec/guac/pkg/collectsub/datasource" "github.com/guacsec/guac/pkg/collectsub/datasource/csubsource" @@ -38,8 +37,8 @@ import ( type depsDevOptions struct { // datasource for the collector dataSource datasource.CollectSource - // address for NATS connection - natsAddr string + // address for pubsub connection + pubsubAddr string // address for blob store blobAddr string // run as poll collector @@ -75,7 +74,7 @@ you have access to read and write to the respective blob store.`, logger := logging.FromContext(ctx) opts, err := validateDepsDevFlags( - viper.GetString("nats-addr"), + viper.GetString("pubsub-addr"), viper.GetString("blob-addr"), viper.GetString("csub-addr"), viper.GetBool("csub-tls"), @@ -111,22 +110,22 @@ you have access to read and write to the respective blob store.`, }() } - initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr) + initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr) }, } -func validateDepsDevFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, retrieveDependencies bool, args []string, +func validateDepsDevFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, retrieveDependencies bool, args []string, enablePrometheus bool, prometheusPort int, ) (depsDevOptions, error) { var opts depsDevOptions - opts.natsAddr = natsAddr + opts.pubsubAddr = pubsubAddr opts.blobAddr = blobAddr opts.poll = poll opts.retrieveDependencies = retrieveDependencies opts.enablePrometheus = enablePrometheus opts.prometheusPort = prometheusPort if useCsub { - csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify) + csubOpts, err := csubclient.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify) if err != nil { return opts, fmt.Errorf("unable to validate csub client flags: %w", err) } diff --git a/cmd/guaccollect/cmd/files.go b/cmd/guaccollect/cmd/files.go index d36aa22b3b..6af784cee0 100644 --- a/cmd/guaccollect/cmd/files.go +++ b/cmd/guaccollect/cmd/files.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "os/signal" + "strings" "sync" "syscall" "time" @@ -37,8 +38,8 @@ import ( type filesOptions struct { // path to folder with documents to collect path string - // address for NATS connection - natsAddr string + // address for pubsub connection + pubsubAddr string // address for blob store blobAddr string // poll location @@ -66,7 +67,7 @@ you have access to read and write to the respective blob store.`, Run: func(cmd *cobra.Command, args []string) { opts, err := validateFilesFlags( - viper.GetString("nats-addr"), + viper.GetString("pubsub-addr"), viper.GetString("blob-addr"), viper.GetBool("service-poll"), args) @@ -86,14 +87,14 @@ you have access to read and write to the respective blob store.`, logger.Errorf("unable to register file collector: %v", err) } - initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr) + initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr) }, } -func validateFilesFlags(natsAddr string, blobAddr string, poll bool, args []string) (filesOptions, error) { +func validateFilesFlags(pubsubAddr string, blobAddr string, poll bool, args []string) (filesOptions, error) { var opts filesOptions - opts.natsAddr = natsAddr + opts.pubsubAddr = pubsubAddr opts.blobAddr = blobAddr opts.poll = poll @@ -106,33 +107,37 @@ func validateFilesFlags(natsAddr string, blobAddr string, poll bool, args []stri return opts, nil } -func getCollectorPublish(ctx context.Context) (func(*processor.Document) error, error) { +func getCollectorPublish(ctx context.Context, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) (func(*processor.Document) error, error) { return func(d *processor.Document) error { - return collector.Publish(ctx, d) + return collector.Publish(ctx, d, blobStore, pubsub) }, nil } -func initializeNATsandCollector(ctx context.Context, natsAddr string, blobAddr string) { +func initializeNATsandCollector(ctx context.Context, pubsubAddr string, blobAddr string) { logger := logging.FromContext(ctx) - // initialize jetstream - // TODO: pass in credentials file for NATS secure login - jetStream := emitter.NewJetStream(natsAddr, "", "") - ctx, err := jetStream.JetStreamInit(ctx) - if err != nil { - logger.Errorf("jetStream initialization failed with error: %v", err) - os.Exit(1) + + if strings.HasPrefix(pubsubAddr, "nats://") { + // initialize jetstream + // TODO: pass in credentials file for NATS secure login + jetStream := emitter.NewJetStream(pubsubAddr, "", "") + if err := jetStream.JetStreamInit(ctx); err != nil { + logger.Errorf("jetStream initialization failed with error: %v", err) + os.Exit(1) + } + defer jetStream.Close() } - defer jetStream.Close() + // initialize blob store blobStore, err := blob.NewBlobStore(ctx, blobAddr) if err != nil { logger.Errorf("unable to connect to blog store: %v", err) } - ctx = blob.WithBlobStore(ctx, blobStore) + // initialize pubsub + pubsub := emitter.NewEmitterPubSub(ctx, pubsubAddr) // Get pipeline of components - collectorPubFunc, err := getCollectorPublish(ctx) + collectorPubFunc, err := getCollectorPublish(ctx, blobStore, pubsub) if err != nil { logger.Errorf("error: %v", err) os.Exit(1) diff --git a/cmd/guaccollect/cmd/github.go b/cmd/guaccollect/cmd/github.go index e452deef2e..b16e34943c 100644 --- a/cmd/guaccollect/cmd/github.go +++ b/cmd/guaccollect/cmd/github.go @@ -22,7 +22,6 @@ import ( "time" "github.com/guacsec/guac/internal/client/githubclient" - "github.com/guacsec/guac/pkg/collectsub/client" csubclient "github.com/guacsec/guac/pkg/collectsub/client" "github.com/guacsec/guac/pkg/collectsub/datasource" "github.com/guacsec/guac/pkg/collectsub/datasource/csubsource" @@ -37,8 +36,8 @@ import ( type githubOptions struct { // datasource for the collector dataSource datasource.CollectSource - // address for NATS connection - natsAddr string + // address for pubsub connection + pubsubAddr string // address for blob store blobAddr string // run as poll collector @@ -68,7 +67,7 @@ you have access to read and write to the respective blob store.`, logger := logging.FromContext(ctx) opts, err := validateGithubFlags( - viper.GetString("nats-addr"), + viper.GetString("pubsub-addr"), viper.GetString("blob-addr"), viper.GetString("csub-addr"), viper.GetBool("csub-tls"), @@ -109,18 +108,18 @@ you have access to read and write to the respective blob store.`, logger.Errorf("unable to register Github collector: %v", err) } - initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr) + initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr) }, } -func validateGithubFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (githubOptions, error) { +func validateGithubFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (githubOptions, error) { var opts githubOptions - opts.natsAddr = natsAddr + opts.pubsubAddr = pubsubAddr opts.blobAddr = blobAddr opts.poll = poll if useCsub { - csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify) + csubOpts, err := csubclient.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify) if err != nil { return opts, fmt.Errorf("unable to validate csub client flags: %w", err) } diff --git a/cmd/guaccollect/cmd/oci.go b/cmd/guaccollect/cmd/oci.go index f277d36d20..8eeffb2532 100644 --- a/cmd/guaccollect/cmd/oci.go +++ b/cmd/guaccollect/cmd/oci.go @@ -21,7 +21,6 @@ import ( "os" "time" - "github.com/guacsec/guac/pkg/collectsub/client" csubclient "github.com/guacsec/guac/pkg/collectsub/client" "github.com/guacsec/guac/pkg/collectsub/datasource" "github.com/guacsec/guac/pkg/collectsub/datasource/csubsource" @@ -37,8 +36,8 @@ import ( type ociOptions struct { // datasource for the collector dataSource datasource.CollectSource - // address for NATS connection - natsAddr string + // address for pubsub connection + pubsubAddr string // address for blob store blobAddr string // run as poll collector @@ -68,7 +67,7 @@ you have access to read and write to the respective blob store.`, logger := logging.FromContext(ctx) opts, err := validateOCIFlags( - viper.GetString("nats-addr"), + viper.GetString("pubsub-addr"), viper.GetString("blob-addr"), viper.GetString("csub-addr"), viper.GetBool("csub-tls"), @@ -92,18 +91,18 @@ you have access to read and write to the respective blob store.`, logger.Errorf("unable to register oci collector: %v", err) } - initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr) + initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr) }, } -func validateOCIFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (ociOptions, error) { +func validateOCIFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (ociOptions, error) { var opts ociOptions - opts.natsAddr = natsAddr + opts.pubsubAddr = pubsubAddr opts.blobAddr = blobAddr opts.poll = poll if useCsub { - csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify) + csubOpts, err := csubclient.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify) if err != nil { return opts, fmt.Errorf("unable to validate csub client flags: %w", err) } diff --git a/cmd/guaccollect/cmd/root.go b/cmd/guaccollect/cmd/root.go index c218c5d210..dbda7cc1e2 100644 --- a/cmd/guaccollect/cmd/root.go +++ b/cmd/guaccollect/cmd/root.go @@ -29,7 +29,7 @@ import ( func init() { cobra.OnInitialize(cli.InitConfig) - set, err := cli.BuildFlags([]string{"nats-addr", "blob-addr", "csub-addr", "use-csub", "service-poll", "enable-prometheus", "prometheus-addr"}) + set, err := cli.BuildFlags([]string{"pubsub-addr", "blob-addr", "csub-addr", "use-csub", "service-poll", "enable-prometheus", "prometheus-addr"}) if err != nil { fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err) os.Exit(1) diff --git a/cmd/guacingest/cmd/ingest.go b/cmd/guacingest/cmd/ingest.go index 5bb9159fe5..76cb2f6980 100644 --- a/cmd/guacingest/cmd/ingest.go +++ b/cmd/guacingest/cmd/ingest.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "os/signal" + "strings" "sync" "syscall" @@ -36,7 +37,7 @@ import ( ) type options struct { - natsAddr string + pubsubAddr string blobAddr string csubClientOptions client.CsubClientOptions graphqlEndpoint string @@ -45,7 +46,7 @@ type options struct { func ingest(cmd *cobra.Command, args []string) { opts, err := validateFlags( - viper.GetString("nats-addr"), + viper.GetString("pubsub-addr"), viper.GetString("blob-addr"), viper.GetString("csub-addr"), viper.GetBool("csub-tls"), @@ -61,22 +62,25 @@ func ingest(cmd *cobra.Command, args []string) { ctx, cf := context.WithCancel(logging.WithLogger(context.Background())) logger := logging.FromContext(ctx) - // initialize jetstream - // TODO: pass in credentials file for NATS secure login - jetStream := emitter.NewJetStream(opts.natsAddr, "", "") - ctx, err = jetStream.JetStreamInit(ctx) - if err != nil { - logger.Errorf("jetStream initialization failed with error: %v", err) - os.Exit(1) + if strings.HasPrefix(opts.pubsubAddr, "nats://") { + // initialize jetstream + // TODO: pass in credentials file for NATS secure login + jetStream := emitter.NewJetStream(opts.pubsubAddr, "", "") + if err := jetStream.JetStreamInit(ctx); err != nil { + logger.Errorf("jetStream initialization failed with error: %v", err) + os.Exit(1) + } + defer jetStream.Close() } - defer jetStream.Close() + // initialize blob store blobStore, err := blob.NewBlobStore(ctx, opts.blobAddr) if err != nil { logger.Errorf("unable to connect to blog store: %v", err) } - ctx = blob.WithBlobStore(ctx, blobStore) + // initialize pubsub + pubsub := emitter.NewEmitterPubSub(ctx, opts.pubsubAddr) // initialize collectsub client csubClient, err := csub_client.NewClient(opts.csubClientOptions) @@ -95,7 +99,7 @@ func ingest(cmd *cobra.Command, args []string) { wg.Add(1) go func() { defer wg.Done() - if err := process.Subscribe(ctx, emit); err != nil { + if err := process.Subscribe(ctx, emit, blobStore, pubsub); err != nil { logger.Errorf("processor ended with error: %v", err) } }() @@ -110,9 +114,9 @@ func ingest(cmd *cobra.Command, args []string) { wg.Wait() } -func validateFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, graphqlEndpoint string, args []string) (options, error) { +func validateFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, graphqlEndpoint string, args []string) (options, error) { var opts options - opts.natsAddr = natsAddr + opts.pubsubAddr = pubsubAddr opts.blobAddr = blobAddr csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify) if err != nil { diff --git a/cmd/guacingest/cmd/root.go b/cmd/guacingest/cmd/root.go index fbcbab4e64..54c70a8dd4 100644 --- a/cmd/guacingest/cmd/root.go +++ b/cmd/guacingest/cmd/root.go @@ -30,7 +30,7 @@ import ( func init() { cobra.OnInitialize(cli.InitConfig) - set, err := cli.BuildFlags([]string{"nats-addr", "blob-addr", "csub-addr", "gql-addr"}) + set, err := cli.BuildFlags([]string{"pubsub-addr", "blob-addr", "csub-addr", "gql-addr"}) if err != nil { fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err) os.Exit(1) diff --git a/container_files/arango/guac.yaml b/container_files/arango/guac.yaml index b9f73809bb..f0d55f8654 100644 --- a/container_files/arango/guac.yaml +++ b/container_files/arango/guac.yaml @@ -1,5 +1,5 @@ -# nats -nats-addr: nats://nats:4222 +# pubsub - default nats +pubsub-addr: nats://nats:4222 # blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/ blob-addr: file:///path/to/dir diff --git a/container_files/ent/guac.yaml b/container_files/ent/guac.yaml index 74c112bdaf..b55f9baf6c 100644 --- a/container_files/ent/guac.yaml +++ b/container_files/ent/guac.yaml @@ -1,5 +1,5 @@ -# nats -nats-addr: nats://nats:4222 +# pubsub - default nats +pubsub-addr: nats://nats:4222 # blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/ blob-addr: file:///path/to/dir diff --git a/container_files/guac/guac.yaml b/container_files/guac/guac.yaml index 24e8058d3f..38c62ab183 100644 --- a/container_files/guac/guac.yaml +++ b/container_files/guac/guac.yaml @@ -1,5 +1,5 @@ -# nats -nats-addr: nats://nats:4222 +# pubsub - default nats +pubsub-addr: nats://nats:4222 # blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/ blob-addr: file:///path/to/dir diff --git a/container_files/nats/js.conf b/container_files/nats/js.conf index fe481c29bd..ab98e643fa 100644 --- a/container_files/nats/js.conf +++ b/container_files/nats/js.conf @@ -1,4 +1,4 @@ // change max payload from default 1MB to 64MB -max_payload: 64MB +max_payload: 1MB // enables jetstream, an empty block will enable and use defaults jetstream {} diff --git a/container_files/neo4j/guac.yaml b/container_files/neo4j/guac.yaml index 0ef073bd07..8dcdfccb9b 100644 --- a/container_files/neo4j/guac.yaml +++ b/container_files/neo4j/guac.yaml @@ -1,5 +1,5 @@ -# nats -nats-addr: nats://nats:4222 +# pubsub - default nats +pubsub-addr: nats://nats:4222 # blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/ blob-addr: file:///path/to/dir diff --git a/container_files/redis/guac.yaml b/container_files/redis/guac.yaml index 45355656d3..bc358d96e2 100644 --- a/container_files/redis/guac.yaml +++ b/container_files/redis/guac.yaml @@ -1,5 +1,5 @@ -# nats -nats-addr: nats://nats:4222 +# pubsub - default nats +pubsub-addr: nats://nats:4222 # blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/ blob-addr: file:///path/to/dir diff --git a/container_files/tikv/guac.yaml b/container_files/tikv/guac.yaml index a5c40d5bc8..800b6741b7 100644 --- a/container_files/tikv/guac.yaml +++ b/container_files/tikv/guac.yaml @@ -1,5 +1,5 @@ -# nats -nats-addr: nats://nats:4222 +# pubsub - default nats +pubsub-addr: nats://nats:4222 # blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/ blob-addr: file:///path/to/dir diff --git a/go.mod b/go.mod index 2c2941c3a1..e2b45ef110 100644 --- a/go.mod +++ b/go.mod @@ -50,14 +50,18 @@ require ( ariga.io/atlas v0.14.1-0.20230918065911-83ad451a4935 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect dario.cat/mergo v1.0.0 // indirect + github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.5.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 // indirect + github.com/Azure/go-amqp v1.0.2 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0 // indirect github.com/BurntSushi/toml v1.3.2 // indirect + github.com/IBM/sarama v1.42.1 // indirect github.com/KyleBanks/depth v1.2.1 // indirect github.com/Masterminds/semver/v3 v3.2.1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect @@ -81,6 +85,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.10 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 // indirect + github.com/aws/aws-sdk-go-v2/service/sns v1.26.5 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.18.7 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.7 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.26.7 // indirect @@ -111,6 +116,9 @@ require ( github.com/docker/docker v24.0.7+incompatible // indirect github.com/docker/docker-credential-helpers v0.7.0 // indirect github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 // indirect + github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/fatih/color v1.15.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect @@ -133,6 +141,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang-jwt/jwt/v5 v5.1.0 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.1.2 // indirect github.com/google/go-containerregistry v0.17.0 // indirect github.com/google/go-github/v53 v53.2.0 // indirect @@ -147,11 +156,17 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-retryablehttp v0.7.4 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/hashicorp/golang-lru/v2 v2.0.3 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/hcl/v2 v2.13.0 // indirect github.com/ianlancetaylor/demangle v0.0.0-20231023195312-e2daf7ba7156 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect @@ -173,7 +188,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.5.3 // indirect - github.com/nats-io/nkeys v0.4.6 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/onsi/ginkgo/v2 v2.13.1 // indirect @@ -183,7 +198,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/owenrumney/go-sarif/v2 v2.3.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect - github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130 // indirect @@ -194,6 +209,8 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/rabbitmq/amqp091-go v1.9.0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/rhysd/actionlint v1.6.26 // indirect github.com/rivo/uniseg v0.4.4 // indirect @@ -288,11 +305,12 @@ require ( github.com/lib/pq v1.10.9 github.com/manifoldco/promptui v0.9.0 github.com/mitchellh/go-homedir v1.1.0 - github.com/nats-io/nats-server/v2 v2.10.5 - github.com/nats-io/nats.go v1.31.0 + github.com/nats-io/nats-server/v2 v2.10.9 + github.com/nats-io/nats.go v1.32.0 github.com/openvex/go-vex v0.2.5 github.com/ossf/scorecard/v4 v4.13.1 github.com/package-url/packageurl-go v0.1.2 + github.com/pitabwire/natspubsub v0.1.1 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.18.0 github.com/redis/go-redis/v9 v9.4.0 @@ -307,6 +325,8 @@ require ( github.com/tikv/client-go/v2 v2.0.8-0.20231115083414-7c96dfd783fb github.com/vektah/gqlparser/v2 v2.5.10 gocloud.dev v0.36.0 + gocloud.dev/pubsub/kafkapubsub v0.36.0 + gocloud.dev/pubsub/rabbitpubsub v0.36.0 golang.org/x/exp v0.0.0-20231006140011-7918f672742d gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 4c2525b158..e3c35f02f7 100644 --- a/go.sum +++ b/go.sum @@ -23,20 +23,33 @@ entgo.io/ent v0.12.5 h1:KREM5E4CSoej4zeGa88Ou/gfturAnpUv0mzAjch1sj4= entgo.io/ent v0.12.5/go.mod h1:Y3JVAjtlIk8xVZYSn3t3mf8xlZIn5SAOXZQxD6kKI+Q= github.com/99designs/gqlgen v0.17.41 h1:C1/zYMhGVP5TWNCNpmZ9Mb6CqT1Vr5SHEWoTOEJ3v3I= github.com/99designs/gqlgen v0.17.41/go.mod h1:GQ6SyMhwFbgHR0a8r2Wn8fYgEwPxxmndLFPhU63+cJE= +github.com/Azure/azure-amqp-common-go/v3 v3.2.3 h1:uDF62mbd9bypXWi19V1bN5NZEO84JqgmI5G73ibAmrk= +github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE4ehlXQZHpMja2OtxC2Tas= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0 h1:fb8kj/Dh4CSwgsOzHeZY4Xh68cFVbzXx+ONXGMY//4w= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0/go.mod h1:uReU2sSxZExRPBAg3qKzmAucSi51+SP1OhohieR821Q= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 h1:BMAjVKJM0U/CYF27gA0ZMmXGkOcvfFtD0oHVZ1TIPRI= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0/go.mod h1:1fXstnBMas5kzG+S3q8UoJcmyU6nUeunJcMDHcRYHhs= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 h1:d81/ng9rET2YqdVkVwkb6EXeRrLJIwyGnJcAlAWKwhs= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.5.0 h1:HKHkea1fdm18LT8VAxTVZgJpPsLgv+0NZhmtus1UqJQ= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.5.0/go.mod h1:4BbKA+mRmmTP8VaLfDPNF5nOdhRm5upG3AXVWfv1dxc= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0 h1:Ma67P/GGprNwsslzEH6+Kb8nybI8jpDTm4Wmzu2ReK8= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0/go.mod h1:c+Lifp3EDEamAkPVzMooRNOK6CZjNSdEnf1A7jsI9u4= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 h1:gggzg0SUMs6SQbEw+3LoSsYf9YMjkupeAnHMX8O9mmY= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0/go.mod h1:+6KLcKIVgxoBDMqMO/Nvy7bZ9a0nbU3I1DtFQK3YvB4= +github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= +github.com/Azure/go-amqp v1.0.2 h1:zHCHId+kKC7fO8IkwyZJnWMvtRXhYC0VJtD0GYkHc6M= +github.com/Azure/go-amqp v1.0.2/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= +github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA= +github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= +github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74= +github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk= github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE= +github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= +github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0 h1:hVeq+yCyUi+MsoO/CU95yqCIcdzra5ovzk8Q2BBpV2M= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -47,6 +60,8 @@ github.com/CycloneDX/cyclonedx-go v0.8.0 h1:FyWVj6x6hoJrui5uRQdYZcSievw3Z32Z88uY github.com/CycloneDX/cyclonedx-go v0.8.0/go.mod h1:K2bA+324+Og0X84fA8HhN2X066K7Bxz4rpMQ4ZhjtSk= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= +github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/Khan/genqlient v0.6.0 h1:Bwb1170ekuNIVIwTJEqvO8y7RxBxXu639VJOkKSrwAk= github.com/Khan/genqlient v0.6.0/go.mod h1:rvChwWVTqXhiapdhLDV4bp9tz/Xvtewwkon4DpWWCRM= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= @@ -121,6 +136,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 h1:KOxnQeWy5sXyS github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10/go.mod h1:jMx5INQFYFYB3lQD9W0D8Ohgq6Wnl7NYOJ2TQndbulI= github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1 h1:5XNlsBsEvBZBMO6p82y+sqpWg8j5aBCe+5C2GBFgqBQ= github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1/go.mod h1:4qXHrG1Ne3VGIMZPCB8OjH/pLFO94sKABIusjh0KWPU= +github.com/aws/aws-sdk-go-v2/service/sns v1.26.5 h1:umyC9zH/A1w8AXrrG7iMxT4Rfgj80FjfvLannWt5vuE= +github.com/aws/aws-sdk-go-v2/service/sns v1.26.5/go.mod h1:IrcbquqMupzndZ20BXxDxjM7XenTRhbwBOetk4+Z5oc= github.com/aws/aws-sdk-go-v2/service/sqs v1.29.7 h1:tRNrFDGRm81e6nTX5Q4CFblea99eAfm0dxXazGpLceU= github.com/aws/aws-sdk-go-v2/service/sqs v1.29.7/go.mod h1:8GWUDux5Z2h6z2efAtr54RdHXtLm8sq7Rg85ZNY/CZM= github.com/aws/aws-sdk-go-v2/service/sso v1.18.7 h1:eajuO3nykDPdYicLlP3AGgOyVN3MOlFmZv7WGTuJPow= @@ -202,6 +219,7 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -222,6 +240,12 @@ github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 h1:UhxFibDNY/bfvqU github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= +github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU= github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= @@ -236,6 +260,9 @@ github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -350,6 +377,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -396,6 +424,8 @@ github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyE github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= @@ -414,6 +444,9 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA= github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru/v2 v2.0.3 h1:kmRrRLlInXvng0SmLxmQpQkpbYAvcXm7NPDrgxJa9mE= github.com/hashicorp/golang-lru/v2 v2.0.3/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -431,6 +464,18 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jedib0t/go-pretty/v6 v6.5.3 h1:GIXn6Er/anHTkVUoufs7ptEvxdD6KIhR7Axa2wYCPF0= github.com/jedib0t/go-pretty/v6 v6.5.3/go.mod h1:5LQIxa52oJ/DlDSLv0HEkWOFMDGoWkJb9ss5KqPpJBg= github.com/jeremywohl/flatten v1.0.1 h1:LrsxmB3hfwJuE+ptGOijix1PIfOoKLJ3Uee/mzbgtrs= @@ -441,6 +486,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jmhodges/clock v1.2.0 h1:eq4kys+NI0PLngzaHEe7AmPT90XMGIEySD1JfV1PDIs= github.com/jmhodges/clock v1.2.0/go.mod h1:qKjhA7x7u/lQpPB1XAqX1b1lCI/w3/fNuYpI/ZjLynI= +github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= +github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -521,12 +568,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= -github.com/nats-io/nats-server/v2 v2.10.5 h1:hhWt6m9ja/mNnm6ixc85jCthDaiUFPaeJI79K/MD980= -github.com/nats-io/nats-server/v2 v2.10.5/go.mod h1:xUMTU4kS//SDkJCSvFwN9SyJ9nUuLhSkzB/Qz0dvjjg= -github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= -github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= -github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= -github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= +github.com/nats-io/nats-server/v2 v2.10.9 h1:VEW43Zz+p+9lARtiPM9ctd6ckun+92ZT2T17HWtwiFI= +github.com/nats-io/nats-server/v2 v2.10.9/go.mod h1:oorGiV9j3BOLLO3ejQe+U7pfAGyPo+ppD7rpgNF6KTQ= +github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0= +github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/neo4j/neo4j-go-driver/v4 v4.4.7 h1:6D0DPI7VOVF6zB8eubY1lav7RI7dZ2mytnr3fj369Ow= @@ -567,8 +614,9 @@ github.com/package-url/packageurl-go v0.1.2 h1:0H2DQt6DHd/NeRlVwW4EZ4oEI6Bn40XlN github.com/package-url/packageurl-go v0.1.2/go.mod h1:uQd4a7Rh3ZsVg5j0lNyAfyxIeGde9yrlhjF78GzeW0c= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= -github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTmyFqUwr+jcCvpVkK7sumiz+ko5H9eq4= @@ -581,6 +629,8 @@ github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130 h1:qbLm5cOdCWxZ0mt github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= +github.com/pitabwire/natspubsub v0.1.1 h1:YYGGlwOomhQ4I18sHR5/soAEVIjh1hLC90vibwqy66M= +github.com/pitabwire/natspubsub v0.1.1/go.mod h1:JPpKrslFc7gBNcfh91SbLb0Ln1sssiyN9tLMbGj/e50= github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4= github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFzPPsI= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= @@ -602,6 +652,10 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk= github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/regclient/regclient v0.5.5 h1:fDh5afBCRbeSU71vcvbN0fI88D1fTfp2m39BPqki7EU= @@ -777,8 +831,8 @@ go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= @@ -791,15 +845,21 @@ go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= gocloud.dev v0.36.0 h1:q5zoXux4xkOZP473e1EZbG8Gq9f0vlg1VNH5Du/ybus= gocloud.dev v0.36.0/go.mod h1:bLxah6JQVKBaIxzsr5BQLYB4IYdWHkMZdzCXlo6F0gg= +gocloud.dev/pubsub/kafkapubsub v0.36.0 h1:LkS3DncPCOPQYs/fg9oQfLOTMDhOCUVAwzIUPwIZZps= +gocloud.dev/pubsub/kafkapubsub v0.36.0/go.mod h1:jcAEkT/H0k0FYyGdJDnrHkcs7itDc0CdHbIFtmReTWg= +gocloud.dev/pubsub/rabbitpubsub v0.36.0 h1:f7KXLhk5HVZ81a2LKfTQY/HBDyXF0teQ7aMuJN+iK7I= +gocloud.dev/pubsub/rabbitpubsub v0.36.0/go.mod h1:xCbgXeLHXrb2Yqs3WOTvmTvk6WeIfMMn3uq34uOk9yM= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k= golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= @@ -826,6 +886,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= @@ -839,6 +900,7 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= @@ -1024,6 +1086,8 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= mvdan.cc/sh/v3 v3.7.0 h1:lSTjdP/1xsddtaKfGg7Myu7DnlHItd3/M2tomOcNNBg= mvdan.cc/sh/v3 v3.7.0/go.mod h1:K2gwkaesF/D7av7Kxl0HbF5kGOd2ArupNTX3X44+8l8= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= +nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= sigs.k8s.io/release-utils v0.7.3 h1:6pS8x6c5RmdUgR9qcg1LO6hjUzuE4Yo9TGZ3DemrZdM= sigs.k8s.io/release-utils v0.7.3/go.mod h1:n0mVez/1PZYZaZUTJmxewxH3RJ/Lf7JUDh7TG1CASOE= diff --git a/guac.yaml b/guac.yaml index 11020eb0f1..6968b95265 100644 --- a/guac.yaml +++ b/guac.yaml @@ -17,8 +17,8 @@ neptune-port: 8182 neptune-region: us-east-1 neptune-realm: neptune -# Nats setup -nats-addr: nats://localhost:4222 +# pubsub setup +pubsub-addr: nats://localhost:4222 # blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/ blob-addr: file:///path/to/dir diff --git a/internal/testing/cmd/pubsub_test/cmd/files.go b/internal/testing/cmd/pubsub_test/cmd/files.go index f9cf4bd4a0..fb1d35e53b 100644 --- a/internal/testing/cmd/pubsub_test/cmd/files.go +++ b/internal/testing/cmd/pubsub_test/cmd/files.go @@ -19,20 +19,19 @@ import ( "context" "fmt" "os" + "os/signal" + "strings" "sync" + "syscall" "time" jsoniter "github.com/json-iterator/go" - "github.com/guacsec/guac/pkg/assembler" + "github.com/guacsec/guac/pkg/blob" "github.com/guacsec/guac/pkg/emitter" "github.com/guacsec/guac/pkg/handler/collector" "github.com/guacsec/guac/pkg/handler/collector/file" "github.com/guacsec/guac/pkg/handler/processor" - "github.com/guacsec/guac/pkg/handler/processor/process" - "github.com/guacsec/guac/pkg/ingestor/parser" - "github.com/guacsec/guac/pkg/ingestor/parser/common" - parser_common "github.com/guacsec/guac/pkg/ingestor/parser/common" "github.com/guacsec/guac/pkg/logging" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -48,7 +47,9 @@ type options struct { // path to folder with documents to collect path string // nats - natsAddr string + pubsubAddr string + // address for blob store + blobAddr string // osv/scorecard certifier poll bool @@ -65,7 +66,8 @@ var filesCmd = &cobra.Command{ viper.GetString("gdbpass"), viper.GetString("gdbaddr"), viper.GetString("realm"), - viper.GetString("natsaddr"), + viper.GetString("pubsubAddr"), + viper.GetString("blob-addr"), args) if err != nil { fmt.Printf("unable to validate flags: %v\n", err) @@ -77,132 +79,24 @@ var filesCmd = &cobra.Command{ logger := logging.FromContext(ctx) // Register collector - fileCollector := file.NewFileCollector(ctx, opts.path, false, time.Second) + fileCollector := file.NewFileCollector(ctx, opts.path, opts.poll, 30*time.Second) err = collector.RegisterDocumentCollector(fileCollector, file.FileCollector) if err != nil { logger.Errorf("unable to register file collector: %v", err) } - // initialize jetstream - // TODO: pass in credentials file for NATS secure login - jetStream := emitter.NewJetStream(opts.natsAddr, "", "") - ctx, err = jetStream.JetStreamInit(ctx) - if err != nil { - logger.Errorf("jetStream initialization failed with error: %v", err) - os.Exit(1) - } - // recreate stream to remove any old lingering documents - // NOT TO BE USED IN PRODUCTION - err = jetStream.RecreateStream(ctx) - if err != nil { - logger.Errorf("unexpected error recreating jetstream: %v", err) - } - defer jetStream.Close() - - // Get pipeline of components - collectorPubFunc, err := getCollectorPublish(ctx) - if err != nil { - logger.Errorf("error: %v", err) - os.Exit(1) - } - - assemblerFunc, err := getAssembler(opts) - if err != nil { - logger.Errorf("error: %v", err) - os.Exit(1) - } - - // for pubsub_test we ignore identifier strings as we don't connect to a collectsub service - ingestorTransportFunc := func(d []assembler.IngestPredicates, i []*common.IdentifierStrings) error { - err := assemblerFunc(d) - if err != nil { - return err - } - return nil - } - - ingestorFunc, err := getIngestor(ctx, ingestorTransportFunc) - if err != nil { - logger.Errorf("error: %v", err) - os.Exit(1) - } - - // Set emit function to go through the entire pipeline - emit := func(d *processor.Document) error { - err = collectorPubFunc(d) - if err != nil { - logger.Errorf("collector ended with error: %v", err) - os.Exit(1) - } - return nil - } - - // Collect - errHandler := func(err error) bool { - if err == nil { - logger.Info("collector ended gracefully") - return true - } - logger.Errorf("collector ended with error: %v", err) - return false - } - - ingest := func(d *processor.Document) error { - docTree, err := process.Process(ctx, d) - if err != nil { - logger.Error("[processor] failed process document: %v", err) - return nil - } - - docTreeBytes, err := json.Marshal(d) - if err != nil { - return fmt.Errorf("failed marshal of document: %w", err) - } - err = emitter.Publish(ctx, emitter.SubjectNameDocProcessed, docTreeBytes) - if err != nil { - logger.Error("[processor] failed transportFunc: %v", err) - return nil - } - - logger.Infof("[processor] docTree Processed: %+v", docTree.Document.SourceInformation) - return nil - } - - // Assuming that publisher and consumer are different processes. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := process.Subscribe(ctx, ingest) - if err != nil { - logger.Errorf("processor ended with error: %v", err) - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - err := ingestorFunc() - if err != nil { - logger.Errorf("parser ended with error: %v", err) - } - }() - - if err := collector.Collect(ctx, emit, errHandler); err != nil { - logger.Fatal(err) - } - - wg.Wait() + initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr) }, } -func validateFlags(user string, pass string, dbAddr string, realm string, natsAddr string, args []string) (options, error) { +func validateFlags(user string, pass string, dbAddr string, realm string, pubsubAddr string, blobAddr string, args []string) (options, error) { var opts options opts.user = user opts.pass = pass + opts.blobAddr = blobAddr opts.dbAddr = dbAddr opts.realm = realm - opts.natsAddr = natsAddr + opts.pubsubAddr = pubsubAddr if len(args) != 1 { return opts, fmt.Errorf("expected positional argument for file_path") @@ -213,25 +107,82 @@ func validateFlags(user string, pass string, dbAddr string, realm string, natsAd return opts, nil } -func getCollectorPublish(ctx context.Context) (func(*processor.Document) error, error) { +func getCollectorPublish(ctx context.Context, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) (func(*processor.Document) error, error) { return func(d *processor.Document) error { - return collector.Publish(ctx, d) + return collector.Publish(ctx, d, blobStore, pubsub) }, nil } -func getIngestor(ctx context.Context, transportFunc func([]assembler.IngestPredicates, []*parser_common.IdentifierStrings) error) (func() error, error) { - return func() error { - err := parser.Subscribe(ctx, transportFunc) +func initializeNATsandCollector(ctx context.Context, pubsubAddr string, blobAddr string) { + logger := logging.FromContext(ctx) + + if strings.HasPrefix(pubsubAddr, "nats://") { + // initialize jetstream + // TODO: pass in credentials file for NATS secure login + jetStream := emitter.NewJetStream(pubsubAddr, "", "") + if err := jetStream.JetStreamInit(ctx); err != nil { + logger.Errorf("jetStream initialization failed with error: %v", err) + os.Exit(1) + } + defer jetStream.Close() + } + + blobStore, err := blob.NewBlobStore(ctx, blobAddr) + if err != nil { + logger.Errorf("unable to connect to blog store: %v", err) + } + + pubsub := emitter.NewEmitterPubSub(ctx, pubsubAddr) + + // Get pipeline of components + collectorPubFunc, err := getCollectorPublish(ctx, blobStore, pubsub) + if err != nil { + logger.Errorf("error: %v", err) + os.Exit(1) + } + + // Set emit function to go through the entire pipeline + emit := func(d *processor.Document) error { + err = collectorPubFunc(d) if err != nil { - return err + logger.Errorf("error publishing document from collector: %v", err) + os.Exit(1) } return nil - }, nil -} + } + + // Collect + errHandler := func(err error) bool { + if err == nil { + logger.Info("collector ended gracefully") + return true + } + logger.Errorf("collector ended with error: %v", err) + // Continue to emit any documents still in the docChan + return true + } -func getAssembler(opts options) (func([]assembler.IngestPredicates) error, error) { - // TODO(bulldozer): return assembler func to talk to graphQL ingestion - return func(_ []assembler.IngestPredicates) error { return nil }, nil + ctx, cf := context.WithCancel(ctx) + var wg sync.WaitGroup + done := make(chan bool, 1) + wg.Add(1) + go func() { + defer wg.Done() + if err := collector.Collect(ctx, emit, errHandler); err != nil { + logger.Errorf("Unhandled error in the collector: %s", err) + } + done <- true + }() + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + select { + case s := <-sigs: + logger.Infof("Signal received: %s, shutting down gracefully\n", s.String()) + case <-done: + logger.Infof("All Collectors completed") + } + cf() + wg.Wait() } func init() { diff --git a/internal/testing/cmd/pubsub_test/cmd/osv.go b/internal/testing/cmd/pubsub_test/cmd/osv.go index 109469063e..6e83660cf0 100644 --- a/internal/testing/cmd/pubsub_test/cmd/osv.go +++ b/internal/testing/cmd/pubsub_test/cmd/osv.go @@ -20,19 +20,20 @@ import ( "fmt" "net/http" "os" + "os/signal" + "strings" "sync" + "syscall" "time" "github.com/Khan/genqlient/graphql" - "github.com/guacsec/guac/pkg/assembler" + "github.com/guacsec/guac/pkg/blob" "github.com/guacsec/guac/pkg/certifier" "github.com/guacsec/guac/pkg/certifier/certify" "github.com/guacsec/guac/pkg/certifier/components/root_package" "github.com/guacsec/guac/pkg/certifier/osv" "github.com/guacsec/guac/pkg/emitter" "github.com/guacsec/guac/pkg/handler/processor" - "github.com/guacsec/guac/pkg/handler/processor/process" - parser_common "github.com/guacsec/guac/pkg/ingestor/parser/common" "github.com/guacsec/guac/pkg/logging" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -50,7 +51,7 @@ var osvCmd = &cobra.Command{ viper.GetString("gdbpass"), viper.GetString("gdbaddr"), viper.GetString("realm"), - viper.GetString("natsaddr"), + viper.GetString("pubsubAddr"), viper.GetBool("poll"), viper.GetInt("interval"), ) @@ -65,144 +66,26 @@ var osvCmd = &cobra.Command{ logger.Fatalf("unable to register certifier: %v", err) } - // TODO: Fix this with the graphQL endpoint - httpClient := http.Client{} - gqlclient := graphql.NewClient("", &httpClient) - - // initialize jetstream - // TODO: pass in credentials file for NATS secure login - jetStream := emitter.NewJetStream(opts.natsAddr, "", "") - ctx, err = jetStream.JetStreamInit(ctx) - if err != nil { - logger.Errorf("jetStream initialization failed with error: %v", err) - os.Exit(1) - } - // recreate stream to remove any old lingering documents - // NOT TO BE USED IN PRODUCTION - err = jetStream.RecreateStream(ctx) - if err != nil { - logger.Errorf("unexpected error recreating jetstream: %v", err) - } - defer jetStream.Close() - - certifierPubFunc, err := getCertifierPublish(ctx) - if err != nil { - logger.Errorf("error: %v", err) - os.Exit(1) - } - - assemblerFunc, err := getAssembler(opts) - if err != nil { - logger.Errorf("error: %v", err) - os.Exit(1) - } - - ingest := func(d *processor.Document) error { - docTree, err := process.Process(ctx, d) - if err != nil { - logger.Error("[processor] failed process document: %v", err) - return nil - } - - docTreeBytes, err := json.Marshal(d) - if err != nil { - return fmt.Errorf("failed marshal of document: %w", err) - } - err = emitter.Publish(ctx, emitter.SubjectNameDocProcessed, docTreeBytes) - if err != nil { - logger.Error("[processor] failed transportFunc: %v", err) - return nil - } - - logger.Infof("[processor] docTree Processed: %+v", docTree.Document.SourceInformation) - return nil - } - - // for pubsub_test we ignore identifier strings as we don't connect to a collectsub service - ingestorTransportFunc := func(d []assembler.IngestPredicates, i []*parser_common.IdentifierStrings) error { - err := assemblerFunc(d) - if err != nil { - return err - } - return nil - } - - ingestorFunc, err := getIngestor(ctx, ingestorTransportFunc) - if err != nil { - logger.Errorf("error: %v", err) - os.Exit(1) - } - - packageQueryFunc, err := getPackageQuery(gqlclient) - if err != nil { - logger.Errorf("error: %v", err) - os.Exit(1) - } - - // Set emit function to go through the entire pipeline - emit := func(d *processor.Document) error { - err = certifierPubFunc(d) - if err != nil { - logger.Errorf("collector ended with error: %v", err) - os.Exit(1) - } - return nil - } - - // Collect - errHandler := func(err error) bool { - if err == nil { - logger.Info("collector ended gracefully") - return true - } - logger.Errorf("collector ended with error: %v", err) - return false - } - - // Assuming that publisher and consumer are different processes. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := process.Subscribe(ctx, ingest) - if err != nil { - logger.Errorf("processor ended with error: %v", err) - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - err := ingestorFunc() - if err != nil { - logger.Errorf("parser ended with error: %v", err) - } - }() - - if err := certify.Certify(ctx, packageQueryFunc(), emit, errHandler, opts.poll, time.Minute*time.Duration(opts.interval)); err != nil { - logger.Fatal(err) - } - - wg.Wait() + initializeNATsandCertifier(ctx, opts) }, } -func validateOsvFlags(user string, pass string, dbAddr string, realm string, natsAddr string, poll bool, interval int) (options, error) { +func validateOsvFlags(user string, pass string, dbAddr string, realm string, pubsubAddr string, poll bool, interval int) (options, error) { var opts options opts.user = user opts.pass = pass opts.dbAddr = dbAddr opts.realm = realm - opts.natsAddr = natsAddr + opts.pubsubAddr = pubsubAddr opts.poll = poll opts.interval = interval return opts, nil } -func getCertifierPublish(ctx context.Context) (func(*processor.Document) error, error) { +func getCertifierPublish(ctx context.Context, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) (func(*processor.Document) error, error) { return func(d *processor.Document) error { - return certify.Publish(ctx, d) + return certify.Publish(ctx, d, blobStore, pubsub) }, nil } @@ -213,6 +96,86 @@ func getPackageQuery(client graphql.Client) (func() certifier.QueryComponents, e }, nil } +func initializeNATsandCertifier(ctx context.Context, opts options) { + logger := logging.FromContext(ctx) + + if strings.Contains(opts.pubsubAddr, "nats://") { + // initialize jetstream + // TODO: pass in credentials file for NATS secure login + jetStream := emitter.NewJetStream(opts.pubsubAddr, "", "") + if err := jetStream.JetStreamInit(ctx); err != nil { + logger.Errorf("jetStream initialization failed with error: %v", err) + os.Exit(1) + } + defer jetStream.Close() + } + + blobStore, err := blob.NewBlobStore(ctx, opts.blobAddr) + if err != nil { + logger.Errorf("unable to connect to blog store: %v", err) + } + + pubsub := emitter.NewEmitterPubSub(ctx, opts.pubsubAddr) + + httpClient := http.Client{} + gqlclient := graphql.NewClient(opts.dbAddr, &httpClient) + + certifierPubFunc, err := getCertifierPublish(ctx, blobStore, pubsub) + if err != nil { + logger.Errorf("error: %v", err) + os.Exit(1) + } + + packageQueryFunc, err := getPackageQuery(gqlclient) + if err != nil { + logger.Errorf("error: %v", err) + os.Exit(1) + } + + // Set emit function to go through the entire pipeline + emit := func(d *processor.Document) error { + err = certifierPubFunc(d) + if err != nil { + logger.Errorf("error publishing document from collector: %v", err) + os.Exit(1) + } + return nil + } + + // Collect + errHandler := func(err error) bool { + if err == nil { + logger.Info("collector ended gracefully") + return true + } + logger.Errorf("collector ended with error: %v", err) + // Continue to emit any documents still in the docChan + return true + } + + ctx, cf := context.WithCancel(ctx) + var wg sync.WaitGroup + done := make(chan bool, 1) + wg.Add(1) + go func() { + defer wg.Done() + if err := certify.Certify(ctx, packageQueryFunc(), emit, errHandler, opts.poll, time.Minute*time.Duration(opts.interval)); err != nil { + logger.Fatal(err) + } + done <- true + }() + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + select { + case s := <-sigs: + logger.Infof("Signal received: %s, shutting down gracefully\n", s.String()) + case <-done: + logger.Infof("All Collectors completed") + } + cf() + wg.Wait() +} + func init() { rootCmd.AddCommand(osvCmd) } diff --git a/internal/testing/cmd/pubsub_test/cmd/root.go b/internal/testing/cmd/pubsub_test/cmd/root.go index 4091a7327d..97137274e6 100644 --- a/internal/testing/cmd/pubsub_test/cmd/root.go +++ b/internal/testing/cmd/pubsub_test/cmd/root.go @@ -36,7 +36,7 @@ var flags = struct { realm string // nats - natsAddr string + pubsubAddr string // run as poll certifier poll bool @@ -50,12 +50,12 @@ func init() { persistentFlags.StringVar(&flags.gdbuser, "gdbuser", "", "neo4j user credential to connect to graph db") persistentFlags.StringVar(&flags.gdbpass, "gdbpass", "", "neo4j password credential to connect to graph db") persistentFlags.StringVar(&flags.realm, "realm", "neo4j", "realm to connect to graph db") - persistentFlags.StringVar(&flags.natsAddr, "natsaddr", "nats://127.0.0.1:4222", "address to connect to NATs Server") + persistentFlags.StringVar(&flags.pubsubAddr, "pubsubAddr", "nats://127.0.0.1:4222", "address to connect to NATs Server") // certifier flags persistentFlags.BoolVarP(&flags.poll, "poll", "p", true, "sets the certifier to polling mode") persistentFlags.IntVarP(&flags.interval, "interval", "i", 5, "if polling set interval in minutes") - flagNames := []string{"gdbaddr", "gdbuser", "gdbpass", "realm", "natsaddr", "poll", "interval"} + flagNames := []string{"gdbaddr", "gdbuser", "gdbpass", "realm", "pubsubAddr", "poll", "interval"} for _, name := range flagNames { if flag := persistentFlags.Lookup(name); flag != nil { if err := viper.BindPFlag(name, flag); err != nil { diff --git a/k8s/k8s.yaml b/k8s/k8s.yaml index 3e27711de3..5e0d778118 100644 --- a/k8s/k8s.yaml +++ b/k8s/k8s.yaml @@ -113,7 +113,7 @@ spec: command: ["/opt/guac/guacingest"] workingDir: /tmp env: - - name: GUAC_NATS_ADDR + - name: GUAC_PUBSUB_ADDR value: nats://nats:4222 --- apiVersion: apps/v1 @@ -136,7 +136,7 @@ spec: command: ["/opt/guac/guaccollect", "image"] workingDir: /tmp env: - - name: GUAC_NATS_ADDR + - name: GUAC_PUBSUB_ADDR value: nats://nats:4222 - name: GUAC_CSUB_ADDR value: guac-collectsub:2782 @@ -161,7 +161,7 @@ spec: command: ["/opt/guac/guaccollect", "deps_dev"] workingDir: /tmp env: - - name: GUAC_NATS_ADDR + - name: GUAC_PUBSUB_ADDR value: nats://nats:4222 - name: GUAC_CSUB_ADDR value: guac-collectsub:2782 diff --git a/pkg/blob/blob.go b/pkg/blob/blob.go index d3832d1873..34728caf05 100644 --- a/pkg/blob/blob.go +++ b/pkg/blob/blob.go @@ -28,7 +28,7 @@ import ( _ "gocloud.dev/blob/s3blob" ) -type blobStore struct { +type BlobStore struct { bucket *blob.Bucket } @@ -37,18 +37,18 @@ type blobStore struct { // such as S3, google cloud bucket, azure blob store can be used. // Authentication is setup via environment variables. Please refer to for // full documentation https://gocloud.dev/howto/blob/ -func NewBlobStore(ctx context.Context, url string) (*blobStore, error) { +func NewBlobStore(ctx context.Context, url string) (*BlobStore, error) { bucket, err := blob.OpenBucket(ctx, url) if err != nil { return nil, fmt.Errorf("failed to open bucket with error: %w", err) } - return &blobStore{ + return &BlobStore{ bucket: bucket, }, nil } // Write uses the key and value to write the data to the initialized blob store (via the authentication provided) -func (b *blobStore) Write(ctx context.Context, key string, value []byte) error { +func (b *BlobStore) Write(ctx context.Context, key string, value []byte) error { w, err := b.bucket.NewWriter(ctx, key, nil) if err != nil { return fmt.Errorf("failed to write to bucket with error: %w", err) @@ -67,7 +67,7 @@ func (b *blobStore) Write(ctx context.Context, key string, value []byte) error { } // Read uses the key read the data from the initialized blob store (via the authentication provided) -func (b *blobStore) Read(ctx context.Context, key string) ([]byte, error) { +func (b *BlobStore) Read(ctx context.Context, key string) ([]byte, error) { r, err := b.bucket.NewReader(ctx, key, nil) if err != nil { return nil, fmt.Errorf("failed to read to bucket with error: %w", err) @@ -80,16 +80,3 @@ func (b *blobStore) Read(ctx context.Context, key string) ([]byte, error) { } return buf.Bytes(), nil } - -// WithBlobStore stores the initialized blobStore in the context such that it can be retrieved later when needed -func WithBlobStore(ctx context.Context, bs *blobStore) context.Context { - return context.WithValue(ctx, blobStore{}, bs) -} - -// FromContext allows for the blobStore to be pulled from the context -func FromContext(ctx context.Context) *blobStore { - if bs, ok := ctx.Value(blobStore{}).(*blobStore); ok { - return bs - } - return nil -} diff --git a/pkg/blob/blob_test.go b/pkg/blob/blob_test.go index cc423d9e31..b10b2d650a 100644 --- a/pkg/blob/blob_test.go +++ b/pkg/blob/blob_test.go @@ -28,7 +28,7 @@ import ( _ "gocloud.dev/blob/s3blob" ) -func initializeInMemBlobStore(ctx context.Context) (*blobStore, error) { +func initializeInMemBlobStore(ctx context.Context) (*BlobStore, error) { blobStore, err := NewBlobStore(ctx, "mem://") if err != nil { return nil, fmt.Errorf("unable to connect to blog store: %w", err) @@ -42,7 +42,6 @@ func Test_blobStore_Write_Read(t *testing.T) { if err != nil { t.Fatalf("failed to initialize blob store with error: %v", err) } - ctx = WithBlobStore(ctx, inmemBlog) type args struct { key string value []byte @@ -74,11 +73,10 @@ func Test_blobStore_Write_Read(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b := FromContext(ctx) - if err := b.Write(ctx, tt.args.key, tt.args.value); err != nil { + if err := inmemBlog.Write(ctx, tt.args.key, tt.args.value); err != nil { t.Errorf("blobStore.Write() error = %v, wantErr %v", err, tt.wantErr) } - got, err := b.Read(ctx, tt.searchKey) + got, err := inmemBlog.Read(ctx, tt.searchKey) if (err != nil) != tt.wantErr { t.Errorf("blobStore.Read() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/certifier/certify/certify.go b/pkg/certifier/certify/certify.go index 2be76a3829..904c0ad07f 100644 --- a/pkg/certifier/certify/certify.go +++ b/pkg/certifier/certify/certify.go @@ -22,8 +22,10 @@ import ( jsoniter "github.com/json-iterator/go" + "github.com/guacsec/guac/pkg/blob" "github.com/guacsec/guac/pkg/certifier" "github.com/guacsec/guac/pkg/emitter" + "github.com/guacsec/guac/pkg/events" "github.com/guacsec/guac/pkg/handler/processor" "github.com/guacsec/guac/pkg/logging" ) @@ -164,15 +166,34 @@ func generateDocuments(ctx context.Context, collectedComponent interface{}, emit } // Publish is used by NATS JetStream to stream the documents and send them to the processor -func Publish(ctx context.Context, d *processor.Document) error { +func Publish(ctx context.Context, d *processor.Document, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error { logger := logging.FromContext(ctx) + docByte, err := json.Marshal(d) if err != nil { return fmt.Errorf("failed marshal of document: %w", err) } - err = emitter.Publish(ctx, emitter.SubjectNameDocCollected, docByte) + + key := events.GetKey(d.Blob) + + if err = blobStore.Write(ctx, key, docByte); err != nil { + return fmt.Errorf("failed write document to blob store: %w", err) + } + + cdEvent, err := events.CreateArtifactPubEvent(ctx, key) if err != nil { - return err + return fmt.Errorf("failed create an event: %w", err) + } + + keyByte, err := json.Marshal(cdEvent) + if err != nil { + return fmt.Errorf("failed marshal of document key: %w", err) + } + + if err := pubsub.Publish(ctx, keyByte); err != nil { + if err != nil { + return fmt.Errorf("failed to publish event with error: %w", err) + } } logger.Debugf("doc published: %+v", d.SourceInformation.Source) return nil diff --git a/pkg/certifier/certify/certify_test.go b/pkg/certifier/certify/certify_test.go index 8449299fe1..bf4ba38ffc 100644 --- a/pkg/certifier/certify/certify_test.go +++ b/pkg/certifier/certify/certify_test.go @@ -26,10 +26,12 @@ import ( "github.com/guacsec/guac/internal/testing/dochelper" nats_test "github.com/guacsec/guac/internal/testing/nats" "github.com/guacsec/guac/internal/testing/testdata" + "github.com/guacsec/guac/pkg/blob" "github.com/guacsec/guac/pkg/certifier" "github.com/guacsec/guac/pkg/certifier/components/root_package" "github.com/guacsec/guac/pkg/certifier/osv" "github.com/guacsec/guac/pkg/emitter" + "github.com/guacsec/guac/pkg/events" "github.com/guacsec/guac/pkg/handler/processor" "github.com/guacsec/guac/pkg/logging" ) @@ -227,8 +229,7 @@ func Test_Publish(t *testing.T) { ctx := context.Background() jetStream := emitter.NewJetStream(url, "", "") - ctx, err = jetStream.JetStreamInit(ctx) - if err != nil { + if err := jetStream.JetStreamInit(ctx); err != nil { t.Fatalf("unexpected error initializing jetstream: %v", err) } err = jetStream.RecreateStream(ctx) @@ -236,7 +237,15 @@ func Test_Publish(t *testing.T) { t.Fatalf("unexpected error recreating jetstream: %v", err) } defer jetStream.Close() - err = Publish(ctx, &testdata.Ite6SLSADoc) + + blobStore, err := blob.NewBlobStore(ctx, "mem://") + if err != nil { + t.Fatalf("unable to connect to blog store: %v", err) + } + + pubsub := emitter.NewEmitterPubSub(ctx, url) + + err = Publish(ctx, &testdata.Ite6SLSADoc, blobStore, pubsub) if err != nil { t.Fatalf("unexpected error on emit: %v", err) } @@ -253,7 +262,7 @@ func Test_Publish(t *testing.T) { return nil } - err = testSubscribe(ctx, transportFunc) + err = testSubscribe(ctx, transportFunc, blobStore, pubsub) if err != nil { if err != nil && !errors.Is(err, context.DeadlineExceeded) { t.Errorf("nats emitter Subscribe test errored = %v", err) @@ -261,23 +270,35 @@ func Test_Publish(t *testing.T) { } } -func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error) error { +func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error { logger := logging.FromContext(ctx) + uuid, err := uuid.NewV4() if err != nil { return fmt.Errorf("failed to get uuid with the following error: %w", err) } uuidString := uuid.String() - psub, err := emitter.NewPubSub(ctx, uuidString, emitter.SubjectNameDocCollected, emitter.DurableProcessor, emitter.BackOffTimer) + sub, err := pubsub.Subscribe(ctx, uuidString) if err != nil { return err } - processFunc := func(d []byte) error { + + blobStoreKey, err := events.DecodeEventSubject(ctx, d) + if err != nil { + logger.Errorf("[processor: %s] failed decode event: %v", uuidString, err) + return nil + } + + documentBytes, err := blobStore.Read(ctx, blobStoreKey) + if err != nil { + return fmt.Errorf("failed read document to blob store: %w", err) + } + doc := processor.Document{} - err := json.Unmarshal(d, &doc) + err = json.Unmarshal(documentBytes, &doc) if err != nil { - fmtErrString := fmt.Sprintf("[processor: %s] failed unmarshal the document bytes", uuidString) + fmtErrString := fmt.Sprintf("[processor: %s] failed unmarshal the document bytes: %v", uuidString, err) logger.Errorf(fmtErrString+": %v", err) return fmt.Errorf(fmtErrString+": %w", err) } @@ -294,13 +315,16 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre logger.Errorf(fmtErrString+": %v", err) return fmt.Errorf(fmtErrString+": %w", err) } + logger.Infof("[processor: %s] docTree Processed: %+v", uuidString, docTree.Document.SourceInformation) return nil } - err = psub.GetDataFromNats(ctx, processFunc) - if err != nil { - return err + if err := sub.GetDataFromSubscriber(ctx, processFunc); err != nil { + return fmt.Errorf("failed to get data from subscriber with error: %w", err) + } + if err := sub.CloseSubscriber(ctx); err != nil { + return fmt.Errorf("failed to close subscriber with error: %w", err) } return nil } diff --git a/pkg/cli/store.go b/pkg/cli/store.go index 34cdc05aab..2ad702894b 100644 --- a/pkg/cli/store.go +++ b/pkg/cli/store.go @@ -35,7 +35,7 @@ func init() { // Set of all flags used across GUAC clis and subcommands. Use consistent // names for config file. - set.String("nats-addr", "nats://127.0.0.1:4222", "address to connect to NATs Server") + set.String("pubsub-addr", "nats://127.0.0.1:4222", "gocloud connection string for pubsub configured via https://gocloud.dev/howto/pubsub/ (default is nats://127.0.0.1:4222)") set.String("csub-addr", "localhost:2782", "address to connect to collect-sub service") set.Bool("csub-tls", false, "enable tls connection to the server") set.Bool("csub-tls-skip-verify", false, "skip verifying server certificate (for self-signed certificates for example)") @@ -58,7 +58,7 @@ func init() { set.String("neo4j-realm", "neo4j", "realm to connect to graph db") // blob store address - set.String("blob-addr", "file:///path/to/dir", "address to the blob store configured via https://gocloud.dev/howto/blob/") + set.String("blob-addr", "file:///path/to/dir", "gocloud connection string for blob store configured via https://gocloud.dev/howto/blob/") set.String("neptune-endpoint", "localhost", "address to neptune db") set.Int("neptune-port", 8182, "port used for neptune db connection") diff --git a/pkg/emitter/emitter.go b/pkg/emitter/emitter.go new file mode 100644 index 0000000000..99ea718030 --- /dev/null +++ b/pkg/emitter/emitter.go @@ -0,0 +1,193 @@ +// +// Copyright 2024 The GUAC Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package emitter + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "gocloud.dev/pubsub" + + _ "github.com/pitabwire/natspubsub" + _ "gocloud.dev/pubsub/awssnssqs" + _ "gocloud.dev/pubsub/azuresb" + _ "gocloud.dev/pubsub/gcppubsub" + _ "gocloud.dev/pubsub/kafkapubsub" + _ "gocloud.dev/pubsub/mempubsub" + _ "gocloud.dev/pubsub/rabbitpubsub" +) + +// EmitterPubSub stores the serviceURL such that the topic and subscription can be reopened +type EmitterPubSub struct { + ServiceURL string +} + +// DataFunc determines how the data return from NATS is transformed based on implementation per module +type DataFunc func([]byte) error + +// subscriber provides dataChan to read the collected data from the stream, errChan for any error that return and +// the pubsub.Subscription to close the subscription once complete +type subscriber struct { + dataChan <-chan []byte + errChan <-chan error + subscription *pubsub.Subscription +} + +// NewEmitterPubSub initializes the blob store based on the url. +// utilizing gocloud (https://gocloud.dev/howto/pubsub/publish/) various pubsub providers +// such as sqs, google pubsub, azure service bus, NATS and Kafka can be used. +// Authentication is setup via environment variables. Please refer to for +// full documentation https://gocloud.dev/howto/pubsub/ +func NewEmitterPubSub(_ context.Context, serviceURL string) *EmitterPubSub { + return &EmitterPubSub{ + ServiceURL: serviceURL, + } +} + +// buildTopicURL constructs the full URL for a topic. +// If using NATS, additional parameters are needed for jetstream +func buildTopicURL(serviceURL string) string { + if strings.HasPrefix(serviceURL, "nats://") { + return fmt.Sprintf("%s?subject=%s", serviceURL, subjectNameDocCollected) + } else { + return serviceURL + } +} + +// buildSubscriptionURL constructs the full URL for subscription. +// If using NATS, additional parameters are needed for jetstream +func buildSubscriptionURL(serviceURL string) string { + if strings.HasPrefix(serviceURL, "nats://") { + return fmt.Sprintf("%s?%s&subject=%s&consumer_durable=%s&stream_name=%s&stream_subjects=%s", serviceURL, "jetstream", subjectNameDocCollected, durableProcessor, streamName, streamSubjects) + } else { + return serviceURL + } +} + +// Publish publishes the data onto the pubsub stream for consumption by upstream services +func (e *EmitterPubSub) Publish(ctx context.Context, data []byte) error { + // pubsub.OpenTopic creates a *pubsub.Topic from a URL. + topicURL := buildTopicURL(e.ServiceURL) + + // Initialize a topic + topic, err := pubsub.OpenTopic(ctx, topicURL) + if err != nil { + return fmt.Errorf("failed to open topic with url: %s, with error: %w", topicURL, err) + } + + // Publish a message + if err := topic.Send(ctx, &pubsub.Message{Body: data}); err != nil { + return fmt.Errorf("failed to open publish with url: %s, with error: %w", topicURL, err) + } + + if err := topic.Shutdown(ctx); err != nil { + return fmt.Errorf("failed to shutdown topic: %s, with error: %w", e.ServiceURL, err) + } + return nil +} + +// Subscribe subscribes to the pubsub stream and receives events as they flow through +func (e *EmitterPubSub) Subscribe(ctx context.Context, id string) (*subscriber, error) { + subscriptionURL := buildSubscriptionURL(e.ServiceURL) + + // Initialize a subscription + subscription, err := pubsub.OpenSubscription(ctx, subscriptionURL) + if err != nil { + return nil, fmt.Errorf("failed to open subscription with url: %s, with error: %w", subscriptionURL, err) + } + + dataChan, errchan, err := createSubscriber(ctx, subscription, id) + if err != nil { + return nil, err + } + return &subscriber{ + dataChan: dataChan, + errChan: errchan, + subscription: subscription, + }, nil +} + +// GetDataFromSubscriber retrieves the data from the channels and transforms it via the DataFunc defined per module +func (s *subscriber) GetDataFromSubscriber(ctx context.Context, dataFunc DataFunc) error { + for { + select { + case d := <-s.dataChan: + if err := dataFunc(d); err != nil { + return err + } + case err := <-s.errChan: + for len(s.dataChan) > 0 { + d := <-s.dataChan + if err := dataFunc(d); err != nil { + return err + } + } + return err + case <-ctx.Done(): + for len(s.dataChan) > 0 { + d := <-s.dataChan + if err := dataFunc(d); err != nil { + return err + } + } + return ctx.Err() + } + } +} + +// CloseSubscriber closes the pubsub.Subscription +func (s *subscriber) CloseSubscriber(ctx context.Context) error { + return s.subscription.Shutdown(ctx) +} + +// createSubscriber receives from the subscription and use the dataChan and errChan to continuously send collected data or errors +func createSubscriber(ctx context.Context, subscription *pubsub.Subscription, id string) (<-chan []byte, <-chan error, error) { + // docChan to collect artifacts + dataChan := make(chan []byte, bufferChannelSize) + // errChan to receive error from collectors + errChan := make(chan error, 1) + go func() { + for { + // if the context is canceled we want to break out of the loop + if ctx.Err() != nil { + errChan <- ctx.Err() + return + } + msg, err := subscription.Receive(ctx) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + // if we get a timeout, we want to try again + select { + case <-ctx.Done(): + errChan <- ctx.Err() + return + case <-time.After(backOffTimer): + } + continue + } else { + errChan <- fmt.Errorf("[%s: %s] unexpected Receive error: %w", durableProcessor, id, err) + return + } + } + msg.Ack() + dataChan <- msg.Body + } + }() + return dataChan, errChan, nil +} diff --git a/pkg/emitter/emitter_test.go b/pkg/emitter/emitter_test.go new file mode 100644 index 0000000000..ed3c532852 --- /dev/null +++ b/pkg/emitter/emitter_test.go @@ -0,0 +1,82 @@ +// +// Copyright 2024 The GUAC Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package emitter + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/guacsec/guac/internal/testing/dochelper" + nats_test "github.com/guacsec/guac/internal/testing/nats" + "github.com/guacsec/guac/pkg/handler/processor" + _ "github.com/pitabwire/natspubsub" + _ "gocloud.dev/pubsub/awssnssqs" + _ "gocloud.dev/pubsub/azuresb" + _ "gocloud.dev/pubsub/gcppubsub" + _ "gocloud.dev/pubsub/kafkapubsub" + _ "gocloud.dev/pubsub/mempubsub" + _ "gocloud.dev/pubsub/rabbitpubsub" +) + +func TestEmitter_PublishOnEmit(t *testing.T) { + expectedDocTree := dochelper.DocNode(&ite6SLSADoc) + + natsTest := nats_test.NewNatsTestServer() + url, err := natsTest.EnableJetStreamForTest() + if err != nil { + t.Fatal(err) + } + defer natsTest.Shutdown() + + ctx := context.Background() + jetStream := NewJetStream(url, "", "") + if err := jetStream.JetStreamInit(ctx); err != nil { + t.Fatalf("unexpected error initializing jetstream: %v", err) + } + err = jetStream.RecreateStream(ctx) + if err != nil { + t.Fatalf("unexpected error recreating jetstream: %v", err) + } + defer jetStream.Close() + + pubsub := NewEmitterPubSub(ctx, url) + + err = testPublish(ctx, &ite6SLSADoc, pubsub) + if err != nil { + t.Fatalf("unexpected error on emit: %v", err) + } + + var cancel context.CancelFunc + + ctx, cancel = context.WithTimeout(ctx, time.Second) + defer cancel() + + transportFunc := func(d processor.DocumentTree) error { + if !dochelper.DocTreeEqual(d, expectedDocTree) { + t.Errorf("doc tree did not match up, got\n%s, \nexpected\n%s", dochelper.StringTree(d), dochelper.StringTree(expectedDocTree)) + } + return nil + } + + err = testSubscribe(ctx, transportFunc, pubsub) + if err != nil { + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("nats emitter Subscribe test errored = %v", err) + } + } +} diff --git a/pkg/emitter/nats_data.go b/pkg/emitter/nats_data.go deleted file mode 100644 index f7ec41713b..0000000000 --- a/pkg/emitter/nats_data.go +++ /dev/null @@ -1,70 +0,0 @@ -// -// Copyright 2022 The GUAC Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package emitter - -import ( - "context" - "time" -) - -// DataFunc determines how the data return from NATS is transformed based on implementation per module -type DataFunc func([]byte) error - -type pubSub struct { - dataChan <-chan []byte - errChan <-chan error -} - -// NewPubSub initializes the subscriber via the valid subject and durable string. Returning a dataChan and errChan to fetch -// data on the stream -func NewPubSub(ctx context.Context, id string, subj string, durable string, backOffTimer time.Duration) (*pubSub, error) { - dataChan, errchan, err := createSubscriber(ctx, id, subj, durable, backOffTimer) - if err != nil { - return nil, err - } - return &pubSub{ - dataChan: dataChan, - errChan: errchan, - }, nil -} - -// GetDataFromNats retrieves the data from the channels and transforms it via the DataFunc defined per module -func (psub *pubSub) GetDataFromNats(ctx context.Context, dataFunc DataFunc) error { - for { - select { - case d := <-psub.dataChan: - if err := dataFunc(d); err != nil { - return err - } - case err := <-psub.errChan: - for len(psub.dataChan) > 0 { - d := <-psub.dataChan - if err := dataFunc(d); err != nil { - return err - } - } - return err - case <-ctx.Done(): - for len(psub.dataChan) > 0 { - d := <-psub.dataChan - if err := dataFunc(d); err != nil { - return err - } - } - return ctx.Err() - } - } -} diff --git a/pkg/emitter/nats_emitter.go b/pkg/emitter/nats_emitter.go index 113f9e5609..39122081ea 100644 --- a/pkg/emitter/nats_emitter.go +++ b/pkg/emitter/nats_emitter.go @@ -17,8 +17,6 @@ package emitter import ( "context" - "crypto/sha256" - "encoding/hex" "errors" "fmt" "time" @@ -29,16 +27,12 @@ import ( // NATS stream const ( - NatsName string = "GUAC" - StreamName string = "DOCUMENTS" - StreamSubjects string = "DOCUMENTS.*" - SubjectNameDocCollected string = "DOCUMENTS.collected" - SubjectNameDocProcessed string = "DOCUMENTS.processed" - SubjectNameDocParsed string = "DOCUMENTS.parsed" - DurableProcessor string = "processor" - DurableIngestor string = "ingestor" - BufferChannelSize int = 1000 - BackOffTimer time.Duration = 1 * time.Second + streamName string = "DOCUMENTS" + streamSubjects string = "DOCUMENTS.*" + subjectNameDocCollected string = "DOCUMENTS.collected" + durableProcessor string = "processor" + bufferChannelSize int = 1000 + backOffTimer time.Duration = 1 * time.Second ) type jetStream struct { @@ -66,21 +60,21 @@ func NewJetStream(url string, creds string, nKeyFile string) *jetStream { } // JetStreamInit initializes NATS and enabled Jet Stream to be used for GUAC -func (j *jetStream) JetStreamInit(ctx context.Context) (context.Context, error) { +func (j *jetStream) JetStreamInit(ctx context.Context) error { var err error // Connect Options. - opts := []nats.Option{nats.Name(NatsName)} + var opts []nats.Option // Use UserCredentials if j.creds != "" { - opts = append(opts, nats.UserCredentials(j.creds)) + opts = []nats.Option{nats.UserCredentials(j.creds)} } // Use Nkey authentication. if j.nKeyFile != "" { opt, err := nats.NkeyOptionFromSeed(j.nKeyFile) if err != nil { - return ctx, fmt.Errorf("failed to load nKeyFile for nats: %w", err) + return fmt.Errorf("failed to load nKeyFile for nats: %w", err) } opts = append(opts, opt) } @@ -88,40 +82,40 @@ func (j *jetStream) JetStreamInit(ctx context.Context) (context.Context, error) // Connect to NATS nc, err := nats.Connect(j.url, opts...) if err != nil { - return ctx, fmt.Errorf("unable to connect to nats server: %w", err) + return fmt.Errorf("unable to connect to nats server with address: %s, with error: %w", j.url, err) } // Create JetStream Context js, err := nc.JetStream() if err != nil { nc.Close() - return ctx, fmt.Errorf("unable to connect to nats jetstream: %w", err) + return fmt.Errorf("unable to connect to nats jetstream with address: %s, with error: %w", j.url, err) } err = createStreamOrExists(ctx, js) if err != nil { nc.Close() - return ctx, fmt.Errorf("failed to create stream: %w", err) + return fmt.Errorf("failed to create stream: %w", err) } j.nc = nc j.js = js - return withJetstream(ctx, js), nil + return nil } func createStreamOrExists(ctx context.Context, js nats.JetStreamContext) error { logger := logging.FromContext(ctx) - _, err := js.StreamInfo(StreamName) + _, err := js.StreamInfo(streamName) if err != nil && !errors.Is(err, nats.ErrStreamNotFound) { return err } // stream not found, create it if errors.Is(err, nats.ErrStreamNotFound) { - logger.Infof("creating stream %q and subjects %q", StreamName, StreamSubjects) + logger.Infof("creating stream %q and subjects %q", streamName, streamSubjects) _, err = js.AddStream(&nats.StreamConfig{ - Name: StreamName, - Subjects: []string{StreamSubjects}, + Name: streamName, + Subjects: []string{streamSubjects}, Retention: nats.WorkQueuePolicy, // window to track duplicates in the stream. // see https://github.com/nats-io/nats.docs/blob/master/using-nats/jetstream/model_deep_dive.md#message-deduplication @@ -144,7 +138,7 @@ func (j *jetStream) Close() { // RecreateStream deletes the current existing stream and recreates it func (j *jetStream) RecreateStream(ctx context.Context) error { if j.js != nil { - err := j.js.DeleteStream(StreamName) + err := j.js.DeleteStream(streamName) if err != nil && !errors.Is(err, nats.ErrStreamNotFound) { return fmt.Errorf("failed to delete stream: %w", err) } @@ -156,85 +150,3 @@ func (j *jetStream) RecreateStream(ctx context.Context) error { } return nil } - -func withJetstream(ctx context.Context, js nats.JetStreamContext) context.Context { - return context.WithValue(ctx, jetStream{}, js) -} - -// FromContext allows for the JetStreamContext to be pulled from the context -func FromContext(ctx context.Context) nats.JetStreamContext { - if js, ok := ctx.Value(jetStream{}).(nats.JetStreamContext); ok { - return js - } - return nil -} - -func createSubscriber(ctx context.Context, id string, subj string, durable string, backOffTimer time.Duration) (<-chan []byte, <-chan error, error) { - // docChan to collect artifacts - dataChan := make(chan []byte, BufferChannelSize) - // errChan to receive error from collectors - errChan := make(chan error, 1) - logger := logging.FromContext(ctx) - js := FromContext(ctx) - sub, err := js.PullSubscribe(subj, durable) - if err != nil { - logger.Errorf("%s subscribe failed: %v", durable, err) - return nil, nil, err - } - go func() { - for { - // if the context is canceled we want to break out of the loop - if ctx.Err() != nil { - errChan <- ctx.Err() - return - } - msgs, err := sub.Fetch(1, nats.Context(ctx)) - if err != nil { - if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) { - // if we get a timeout, we want to try again - select { - case <-ctx.Done(): - errChan <- ctx.Err() - return - case <-time.After(backOffTimer): - } - continue - } else { - errChan <- fmt.Errorf("[%s: %s] unexpected NATS fetch error: %w", durable, id, err) - return - } - } - if len(msgs) > 0 { - err := msgs[0].Ack() - if err != nil { - fmtErrString := fmt.Sprintf("[%s: %v] unable to Ack", durable, id) - logger.Errorf(fmtErrString+": %v", err) - errChan <- fmt.Errorf(fmtErrString+": %w", err) - return - } - dataChan <- msgs[0].Data - } - } - }() - return dataChan, errChan, nil -} - -// Publish publishes the data onto the NATS stream for consumption by upstream services -func Publish(ctx context.Context, subj string, data []byte) error { - js := FromContext(ctx) - if js == nil { - return errors.New("jetstream not found from context") - } - // messageID set using the hash to check for duplicate data on the stream - // see: https://github.com/nats-io/nats.docs/blob/master/using-nats/jetstream/model_deep_dive.md#message-deduplication - _, err := js.Publish(subj, data, nats.MsgId(getHash(data))) - if err != nil { - return fmt.Errorf("failed to publish document on stream: %w", err) - } - return nil -} - -func getHash(data []byte) string { - sha256sum := sha256.Sum256(data) - return hex.EncodeToString(sha256sum[:]) -} diff --git a/pkg/emitter/nats_emitter_test.go b/pkg/emitter/nats_emitter_test.go index 9926936e2d..b528152712 100644 --- a/pkg/emitter/nats_emitter_test.go +++ b/pkg/emitter/nats_emitter_test.go @@ -20,12 +20,10 @@ import ( "errors" "fmt" "testing" - "time" jsoniter "github.com/json-iterator/go" uuid "github.com/gofrs/uuid" - "github.com/guacsec/guac/internal/testing/dochelper" nats_test "github.com/guacsec/guac/internal/testing/nats" "github.com/guacsec/guac/pkg/handler/processor" "github.com/guacsec/guac/pkg/logging" @@ -79,120 +77,6 @@ var ( } ) -func TestNatsEmitter_PublishOnEmit(t *testing.T) { - expectedDocTree := dochelper.DocNode(&ite6SLSADoc) - - natsTest := nats_test.NewNatsTestServer() - url, err := natsTest.EnableJetStreamForTest() - if err != nil { - t.Fatal(err) - } - defer natsTest.Shutdown() - - ctx := context.Background() - jetStream := NewJetStream(url, "", "") - ctx, err = jetStream.JetStreamInit(ctx) - if err != nil { - t.Fatalf("unexpected error initializing jetstream: %v", err) - } - err = jetStream.RecreateStream(ctx) - if err != nil { - t.Fatalf("unexpected error recreating jetstream: %v", err) - } - defer jetStream.Close() - err = testPublish(ctx, &ite6SLSADoc) - if err != nil { - t.Fatalf("unexpected error on emit: %v", err) - } - - var cancel context.CancelFunc - - ctx, cancel = context.WithTimeout(ctx, time.Second) - defer cancel() - - transportFunc := func(d processor.DocumentTree) error { - if !dochelper.DocTreeEqual(d, expectedDocTree) { - t.Errorf("doc tree did not match up, got\n%s, \nexpected\n%s", dochelper.StringTree(d), dochelper.StringTree(expectedDocTree)) - } - return nil - } - - err = testSubscribe(ctx, transportFunc) - if err != nil { - if err != nil && !errors.Is(err, context.DeadlineExceeded) { - t.Errorf("nats emitter Subscribe test errored = %v", err) - } - } -} - -func TestNatsEmitter_PublishOnEmit_DeDuplication(t *testing.T) { - expectedDocTree := dochelper.DocNode(&ite6SLSADoc) - - natsTest := nats_test.NewNatsTestServer() - url, err := natsTest.EnableJetStreamForTest() - if err != nil { - t.Fatal(err) - } - defer natsTest.Shutdown() - - ctx := context.Background() - jetStream := NewJetStream(url, "", "") - ctx, err = jetStream.JetStreamInit(ctx) - if err != nil { - t.Fatalf("unexpected error initializing jetstream: %v", err) - } - err = jetStream.RecreateStream(ctx) - if err != nil { - t.Fatalf("unexpected error recreating jetstream: %v", err) - } - defer jetStream.Close() - - // publish document once - err = testPublish(ctx, &ite6SLSADoc) - if err != nil { - t.Fatalf("unexpected error on emit: %v", err) - } - - // publish same document again to check that data deduplication works - err = testPublish(ctx, &ite6SLSADoc) - if err != nil { - t.Fatalf("unexpected error on emit: %v", err) - } - - // publish third time the same document to check that data deduplication works - err = testPublish(ctx, &ite6SLSADoc) - if err != nil { - t.Fatalf("unexpected error on emit: %v", err) - } - - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 2*time.Second) - defer cancel() - - listDocument := []processor.DocumentTree{} - - transportFunc := func(d processor.DocumentTree) error { - listDocument = append(listDocument, d) - return nil - } - - err = testSubscribe(ctx, transportFunc) - if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - if len(listDocument) != 1 { - t.Error("expected only 1 document fetched") - } - for _, d := range listDocument { - if !dochelper.DocTreeEqual(d, expectedDocTree) { - t.Errorf("doc tree did not match up, got\n%s, \nexpected\n%s", dochelper.StringTree(d), dochelper.StringTree(expectedDocTree)) - } - } - } else { - t.Errorf("nats emitter Subscribe test errored = %v", err) - } - } -} - func TestNatsEmitter_RecreateStream(t *testing.T) { natsTest := nats_test.NewNatsTestServer() url, err := natsTest.EnableJetStreamForTest() @@ -203,8 +87,7 @@ func TestNatsEmitter_RecreateStream(t *testing.T) { ctx := context.Background() jetStream := NewJetStream(url, "", "") - ctx, err = jetStream.JetStreamInit(ctx) - if err != nil { + if err := jetStream.JetStreamInit(ctx); err != nil { t.Fatalf("unexpected error initializing jetstream: %v", err) } defer jetStream.Close() @@ -224,11 +107,11 @@ func TestNatsEmitter_RecreateStream(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if tt.deleteStream { - err := jetStream.js.DeleteStream(StreamName) + err := jetStream.js.DeleteStream(streamName) if err != nil { t.Errorf("failed to delete stream: %v", err) } - _, err = jetStream.js.StreamInfo(StreamName) + _, err = jetStream.js.StreamInfo(streamName) if err == nil || (err != nil) && !errors.Is(err, tt.wantErrMessage) { t.Errorf("RecreateStream() error = %v, wantErr %v", err, tt.wantErrMessage) return @@ -238,7 +121,7 @@ func TestNatsEmitter_RecreateStream(t *testing.T) { if err != nil { t.Fatalf("unexpected error recreating jetstream: %v", err) } - _, err = jetStream.js.StreamInfo(StreamName) + _, err = jetStream.js.StreamInfo(streamName) if err != nil { t.Errorf("RecreateStream() failed to create stream with error = %v", err) return @@ -247,13 +130,14 @@ func TestNatsEmitter_RecreateStream(t *testing.T) { } } -func testPublish(ctx context.Context, d *processor.Document) error { +func testPublish(ctx context.Context, d *processor.Document, pubsub *EmitterPubSub) error { logger := logging.FromContext(ctx) + docByte, err := json.Marshal(d) if err != nil { return fmt.Errorf("failed marshal of document: %w", err) } - err = Publish(ctx, SubjectNameDocCollected, docByte) + err = pubsub.Publish(ctx, docByte) if err != nil { return fmt.Errorf("failed to publish document on stream: %w", err) } @@ -261,18 +145,18 @@ func testPublish(ctx context.Context, d *processor.Document) error { return nil } -func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error) error { +func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, pubsub *EmitterPubSub) error { logger := logging.FromContext(ctx) + uuid, err := uuid.NewV4() if err != nil { return fmt.Errorf("failed to get uuid with the following error: %w", err) } uuidString := uuid.String() - psub, err := NewPubSub(ctx, uuidString, SubjectNameDocCollected, DurableProcessor, BackOffTimer) + sub, err := pubsub.Subscribe(ctx, uuidString) if err != nil { return err } - processFunc := func(d []byte) error { doc := processor.Document{} err := json.Unmarshal(d, &doc) @@ -298,9 +182,11 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre return nil } - err = psub.GetDataFromNats(ctx, processFunc) - if err != nil { - return err + if err := sub.GetDataFromSubscriber(ctx, processFunc); err != nil { + return fmt.Errorf("failed to get data from subscriber with error: %w", err) + } + if err := sub.CloseSubscriber(ctx); err != nil { + return fmt.Errorf("failed to close subscriber with error: %w", err) } return nil } diff --git a/pkg/handler/collector/collector.go b/pkg/handler/collector/collector.go index b67200f7ec..f3a93a07c7 100644 --- a/pkg/handler/collector/collector.go +++ b/pkg/handler/collector/collector.go @@ -119,9 +119,8 @@ func Collect(ctx context.Context, emitter Emitter, handleErr ErrHandler) error { // retrieval by the processor/ingestor. A CDEvent is created to transmit the key (which is the // sha256 of the collected "document"). This also fixes the issues where the "document" was too large // to be sent across the event stream. -func Publish(ctx context.Context, d *processor.Document) error { +func Publish(ctx context.Context, d *processor.Document, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error { logger := logging.FromContext(ctx) - blobStore := blob.FromContext(ctx) docByte, err := json.Marshal(d) if err != nil { @@ -144,7 +143,7 @@ func Publish(ctx context.Context, d *processor.Document) error { return fmt.Errorf("failed marshal of document key: %w", err) } - if err := emitter.Publish(ctx, emitter.SubjectNameDocCollected, keyByte); err != nil { + if err := pubsub.Publish(ctx, keyByte); err != nil { if err != nil { return fmt.Errorf("failed to publish event with error: %w", err) } diff --git a/pkg/handler/collector/collector_test.go b/pkg/handler/collector/collector_test.go index 1d956691fc..7ffc9679ad 100644 --- a/pkg/handler/collector/collector_test.go +++ b/pkg/handler/collector/collector_test.go @@ -102,8 +102,7 @@ func Test_Publish(t *testing.T) { ctx := context.Background() jetStream := emitter.NewJetStream(url, "", "") - ctx, err = jetStream.JetStreamInit(ctx) - if err != nil { + if err := jetStream.JetStreamInit(ctx); err != nil { t.Fatalf("unexpected error initializing jetstream: %v", err) } err = jetStream.RecreateStream(ctx) @@ -117,9 +116,9 @@ func Test_Publish(t *testing.T) { t.Fatalf("unable to connect to blog store: %v", err) } - ctx = blob.WithBlobStore(ctx, blobStore) + pubsub := emitter.NewEmitterPubSub(ctx, url) - err = Publish(ctx, &testdata.Ite6SLSADoc) + err = Publish(ctx, &testdata.Ite6SLSADoc, blobStore, pubsub) if err != nil { t.Fatalf("unexpected error on emit: %v", err) } @@ -136,7 +135,7 @@ func Test_Publish(t *testing.T) { return nil } - err = testSubscribe(ctx, transportFunc) + err = testSubscribe(ctx, transportFunc, blobStore, pubsub) if err != nil { if err != nil && !errors.Is(err, context.DeadlineExceeded) { t.Errorf("nats emitter Subscribe test errored = %v", err) @@ -144,20 +143,18 @@ func Test_Publish(t *testing.T) { } } -func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error) error { +func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error { logger := logging.FromContext(ctx) - blobStore := blob.FromContext(ctx) uuid, err := uuid.NewV4() if err != nil { return fmt.Errorf("failed to get uuid with the following error: %w", err) } uuidString := uuid.String() - psub, err := emitter.NewPubSub(ctx, uuidString, emitter.SubjectNameDocCollected, emitter.DurableProcessor, emitter.BackOffTimer) + sub, err := pubsub.Subscribe(ctx, uuidString) if err != nil { return err } - processFunc := func(d []byte) error { blobStoreKey, err := events.DecodeEventSubject(ctx, d) @@ -195,9 +192,11 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre return nil } - err = psub.GetDataFromNats(ctx, processFunc) - if err != nil { - return err + if err := sub.GetDataFromSubscriber(ctx, processFunc); err != nil { + return fmt.Errorf("failed to get data from subscriber with error: %w", err) + } + if err := sub.CloseSubscriber(ctx); err != nil { + return fmt.Errorf("failed to close subscriber with error: %w", err) } return nil } diff --git a/pkg/handler/processor/process/process.go b/pkg/handler/processor/process/process.go index 9828ff9476..a700599dd5 100644 --- a/pkg/handler/processor/process/process.go +++ b/pkg/handler/processor/process/process.go @@ -74,20 +74,18 @@ func RegisterDocumentProcessor(p processor.DocumentProcessor, d processor.Docume // Subscribe receives the CD event and decodes the event to obtain the blob store key. // The key is used to retrieve the "document" from the blob store to be processed and ingested. -func Subscribe(ctx context.Context, em collector.Emitter) error { +func Subscribe(ctx context.Context, em collector.Emitter, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error { logger := logging.FromContext(ctx) - blobStore := blob.FromContext(ctx) uuid, err := uuid.NewV4() if err != nil { return fmt.Errorf("failed to get uuid with the following error: %w", err) } uuidString := uuid.String() - psub, err := emitter.NewPubSub(ctx, uuidString, emitter.SubjectNameDocCollected, emitter.DurableProcessor, emitter.BackOffTimer) + sub, err := pubsub.Subscribe(ctx, uuidString) if err != nil { return fmt.Errorf("[processor: %s] failed to create new pubsub: %w", uuidString, err) } - // should still continue if there are errors since problem is with individual documents processFunc := func(d []byte) error { @@ -116,10 +114,15 @@ func Subscribe(ctx context.Context, em collector.Emitter) error { return nil } - err = psub.GetDataFromNats(ctx, processFunc) + err = sub.GetDataFromSubscriber(ctx, processFunc) if err != nil { - return fmt.Errorf("[processor: %s] failed to get data from nats: %w", uuidString, err) + return fmt.Errorf("[processor: %s] failed to get data from %s: %w", uuidString, pubsub.ServiceURL, err) + } + + if err := sub.CloseSubscriber(ctx); err != nil { + return fmt.Errorf("[processor: %s] failed to close subscriber: %s, with error: %w", uuidString, pubsub.ServiceURL, err) } + return nil } diff --git a/pkg/handler/processor/process/process_test.go b/pkg/handler/processor/process/process_test.go index 99f03deb75..b5cb6048a5 100644 --- a/pkg/handler/processor/process/process_test.go +++ b/pkg/handler/processor/process/process_test.go @@ -17,12 +17,18 @@ package process import ( "context" + "fmt" "strings" "testing" + "time" "github.com/guacsec/guac/internal/testing/dochelper" + nats_test "github.com/guacsec/guac/internal/testing/nats" "github.com/guacsec/guac/internal/testing/simpledoc" "github.com/guacsec/guac/internal/testing/testdata" + "github.com/guacsec/guac/pkg/blob" + "github.com/guacsec/guac/pkg/emitter" + "github.com/guacsec/guac/pkg/events" "github.com/guacsec/guac/pkg/handler/processor" "github.com/guacsec/guac/pkg/handler/processor/guesser" "github.com/guacsec/guac/pkg/logging" @@ -600,9 +606,6 @@ func Test_validateFormat(t *testing.T) { } } -/* -// TODO: Fix tests to check for logger messages instead of err text -// https://github.com/guacsec/guac/issues/765 func Test_ProcessSubscribe(t *testing.T) { natsTest := nats_test.NewNatsTestServer() url, err := natsTest.EnableJetStreamForTest() @@ -628,7 +631,7 @@ func Test_ProcessSubscribe(t *testing.T) { Format: processor.FormatJSON, SourceInformation: processor.SourceInformation{}, }, - wantErr: true, + wantErr: false, expected: dochelper.DocNode(&processor.Document{ Blob: []byte(`{ "issuer": "google.com", @@ -638,7 +641,6 @@ func Test_ProcessSubscribe(t *testing.T) { Format: processor.FormatJSON, SourceInformation: processor.SourceInformation{}, }), - errMessage: "context deadline exceeded", }, { name: "unpack test", doc: processor.Document{ @@ -657,7 +659,7 @@ func Test_ProcessSubscribe(t *testing.T) { Format: processor.FormatJSON, SourceInformation: processor.SourceInformation{}, }, - wantErr: true, + wantErr: false, expected: dochelper.DocNode( &processor.Document{ //root Blob: []byte(`{ @@ -693,7 +695,6 @@ func Test_ProcessSubscribe(t *testing.T) { Format: processor.FormatJSON, SourceInformation: processor.SourceInformation{}, })), - errMessage: "context deadline exceeded", }, { name: "bad format", doc: processor.Document{ @@ -706,7 +707,7 @@ func Test_ProcessSubscribe(t *testing.T) { SourceInformation: processor.SourceInformation{}, }, wantErr: true, - errMessage: "failed process document: invalid JSON document", + errMessage: "invalid JSON document", }} // Register @@ -726,8 +727,7 @@ func Test_ProcessSubscribe(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() jetStream := emitter.NewJetStream(url, "", "") - ctx, err = jetStream.JetStreamInit(ctx) - if err != nil { + if err := jetStream.JetStreamInit(ctx); err != nil { t.Fatalf("unexpected error initializing jetstream: %v", err) } err = jetStream.RecreateStream(ctx) @@ -736,8 +736,14 @@ func Test_ProcessSubscribe(t *testing.T) { } defer jetStream.Close() - err := testPublish(ctx, &tt.doc) + blobStore, err := blob.NewBlobStore(ctx, "mem://") if err != nil { + t.Fatalf("unable to connect to blog store: %v", err) + } + + pubsub := emitter.NewEmitterPubSub(ctx, url) + + if err := testPublish(ctx, &tt.doc, blobStore, pubsub); err != nil { t.Fatalf("unexpected error on emit: %v", err) } var cancel context.CancelFunc @@ -745,19 +751,26 @@ func Test_ProcessSubscribe(t *testing.T) { ctx, cancel = context.WithTimeout(ctx, 1*time.Second) defer cancel() - transportFunc := func(d processor.DocumentTree) error { - if !dochelper.DocTreeEqual(d, tt.expected) { - t.Errorf("doc tree did not match up, got\n%s, \nexpected\n%s", dochelper.StringTree(d), dochelper.StringTree(tt.expected)) + emit := func(d *processor.Document) error { + processedTree, err := Process(ctx, d) + if (err != nil) != tt.wantErr { + t.Errorf("nats emitter Subscribe test errored = %v, want %v", err, tt.wantErr) + } + if err != nil { + if !strings.Contains(err.Error(), tt.errMessage) { + t.Errorf("nats emitter Subscribe test errored = %v, want %v", err, tt.errMessage) + } + } else { + if !dochelper.DocTreeEqual(processedTree, tt.expected) { + t.Errorf("doc tree did not match up, got\n%s, \nexpected\n%s", dochelper.StringTree(processedTree), dochelper.StringTree(tt.expected)) + } } return nil } - err = Subscribe(ctx, transportFunc) - if (err != nil) != tt.wantErr { - t.Errorf("nats emitter Subscribe test errored = %v, want %v", err, tt.wantErr) - } + err = Subscribe(ctx, emit, blobStore, pubsub) if err != nil { - if !strings.Contains(err.Error(), tt.errMessage) { + if !strings.Contains(err.Error(), "context deadline exceeded") { t.Errorf("nats emitter Subscribe test errored = %v, want %v", err, tt.errMessage) } } @@ -765,15 +778,36 @@ func Test_ProcessSubscribe(t *testing.T) { } } -func testPublish(ctx context.Context, d *processor.Document) error { +func testPublish(ctx context.Context, d *processor.Document, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error { + logger := logging.FromContext(ctx) + docByte, err := json.Marshal(d) if err != nil { return fmt.Errorf("failed marshal of document: %w", err) } - err = emitter.Publish(ctx, emitter.SubjectNameDocCollected, docByte) + + key := events.GetKey(d.Blob) + + if err = blobStore.Write(ctx, key, docByte); err != nil { + return fmt.Errorf("failed write document to blob store: %w", err) + } + + cdEvent, err := events.CreateArtifactPubEvent(ctx, key) + if err != nil { + return fmt.Errorf("failed create an event: %w", err) + } + + keyByte, err := json.Marshal(cdEvent) if err != nil { - return fmt.Errorf("failed to publish document on stream: %w", err) + return fmt.Errorf("failed marshal of document key: %w", err) } + + if err := pubsub.Publish(ctx, keyByte); err != nil { + if err != nil { + return fmt.Errorf("failed to publish event with error: %w", err) + } + } + + logger.Debugf("doc published: %+v", d.SourceInformation.Source) return nil } -*/ diff --git a/pkg/ingestor/parser/parser.go b/pkg/ingestor/parser/parser.go index 0e2b93cb7d..f1160bd4e4 100644 --- a/pkg/ingestor/parser/parser.go +++ b/pkg/ingestor/parser/parser.go @@ -19,11 +19,7 @@ import ( "context" "fmt" - "github.com/gofrs/uuid" - jsoniter "github.com/json-iterator/go" - "github.com/guacsec/guac/pkg/assembler" - "github.com/guacsec/guac/pkg/emitter" "github.com/guacsec/guac/pkg/handler/processor" "github.com/guacsec/guac/pkg/ingestor/parser/common" "github.com/guacsec/guac/pkg/ingestor/parser/csaf" @@ -38,8 +34,6 @@ import ( "github.com/guacsec/guac/pkg/logging" ) -var json = jsoniter.ConfigCompatibleWithStandardLibrary - func init() { _ = RegisterDocumentParser(dsse.NewDSSEParser, processor.DocumentDSSE) _ = RegisterDocumentParser(slsa.NewSLSAParser, processor.DocumentITE6SLSA) @@ -77,53 +71,6 @@ func RegisterDocumentParser(p func() common.DocumentParser, d processor.Document return nil } -// Subscribe is used by NATS JetStream to stream the documents received from the processor -// and parse them via ParseDocumentTree -// The context contains the jetstream. -func Subscribe(ctx context.Context, transportFunc func([]assembler.IngestPredicates, []*common.IdentifierStrings) error) error { - logger := logging.FromContext(ctx) - - uuid, err := uuid.NewV4() - if err != nil { - return fmt.Errorf("failed to get uuid with the following error: %w", err) - } - uuidString := uuid.String() - psub, err := emitter.NewPubSub(ctx, uuidString, emitter.SubjectNameDocProcessed, emitter.DurableIngestor, emitter.BackOffTimer) - if err != nil { - return err - } - - // should still continue if there are errors since problem is with individual documents - parserFunc := func(d []byte) error { - docNode := processor.DocumentNode{} - err = json.Unmarshal(d, &docNode) - if err != nil { - logger.Error("[ingestor: %s] failed unmarshal the document tree bytes: %v", uuidString, err) - return nil - } - assemblerInputs, idStrings, err := ParseDocumentTree(ctx, &docNode) - if err != nil { - logger.Error("[ingestor: %s] failed parse document: %v", uuidString, err) - return nil - } - - err = transportFunc(assemblerInputs, idStrings) - if err != nil { - logger.Error("[ingestor: %s] failed transportFunc: %v", uuidString, err) - return nil - } - - logger.Infof("[ingestor: %s] ingested docTree: %+v", uuidString, processor.DocumentTree(&docNode).Document.SourceInformation) - return nil - } - - err = psub.GetDataFromNats(ctx, parserFunc) - if err != nil { - return err - } - return nil -} - // ParseDocumentTree takes the DocumentTree and create graph inputs (nodes and edges) per document node. func ParseDocumentTree(ctx context.Context, docTree processor.DocumentTree) ([]assembler.IngestPredicates, []*common.IdentifierStrings, error) { assemblerInputs := []assembler.IngestPredicates{} diff --git a/pkg/ingestor/parser/parser_test.go b/pkg/ingestor/parser/parser_test.go index 786636d915..3326690069 100644 --- a/pkg/ingestor/parser/parser_test.go +++ b/pkg/ingestor/parser/parser_test.go @@ -19,16 +19,9 @@ import ( "context" "errors" "reflect" - "strings" "testing" - "time" - "github.com/guacsec/guac/internal/testing/mockverifier" - nats_test "github.com/guacsec/guac/internal/testing/nats" "github.com/guacsec/guac/pkg/assembler" - "github.com/guacsec/guac/pkg/emitter" - "github.com/guacsec/guac/pkg/ingestor/verifier" - "github.com/guacsec/guac/pkg/logging" "github.com/guacsec/guac/internal/testing/mocks" @@ -308,116 +301,3 @@ func TestParseDocumentTree(t *testing.T) { }) } } - -func TestSubscribe(t *testing.T) { - ctx := logging.WithLogger(context.Background()) - err := verifier.RegisterVerifier(mockverifier.NewMockSigstoreVerifier(), "sigstore") - if err != nil { - if !strings.Contains(err.Error(), "the verification provider is being overwritten") { - t.Errorf("unexpected error: %v", err) - } - } - - natsTest := nats_test.NewNatsTestServer() - url, err := natsTest.EnableJetStreamForTest() - if err != nil { - t.Fatal(err) - } - defer natsTest.Shutdown() - - tests := []struct { - name string - registerDocType processor.DocumentType - // The registerDocType is used to register the document parser, it is different from the roots own - // processor.DocumentType so that we can test the error case - tree processor.DocumentTree - want []assembler.AssemblerInput - parseWant []assembler.IngestPredicates - parseWant1 []*common.IdentifierStrings - wantErr bool - }{ - { - name: "default", - registerDocType: "test", - tree: &processor.DocumentNode{ - Document: &processor.Document{ - Type: "test", - }, - }, - parseWant: []assembler.IngestPredicates{{}}, - parseWant1: []*common.IdentifierStrings{{}}, - wantErr: true, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - jetStream := emitter.NewJetStream(url, "", "") - ctx, err = jetStream.JetStreamInit(ctx) - if err != nil { - t.Fatalf("unexpected error initializing jetstream: %v", err) - } - err = jetStream.RecreateStream(ctx) - if err != nil { - t.Fatalf("unexpected error recreating jetstream: %v", err) - } - defer jetStream.Close() - err := testPublish(ctx, test.tree) - if err != nil { - t.Fatalf("unexpected error on emit: %v", err) - } - var cancel context.CancelFunc - - ctx, cancel = context.WithTimeout(ctx, time.Second) - defer cancel() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - mockDocumentParser := mocks.NewMockDocumentParser(ctrl) - - parser := common.DocumentParser(mockDocumentParser) - - f := func() common.DocumentParser { - return parser - } - - mockDocumentParser.EXPECT().Parse(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, doc *processor.Document) error { - return nil - }).AnyTimes() - mockDocumentParser.EXPECT().GetIdentities(ctx).Return([]common.TrustInformation{}).AnyTimes() - mockDocumentParser.EXPECT().GetPredicates(gomock.Any()).Return(&assembler.IngestPredicates{}).AnyTimes() - mockDocumentParser.EXPECT().GetIdentifiers(gomock.Any()).DoAndReturn(func(ctx context.Context) (*common.IdentifierStrings, error) { - return &common.IdentifierStrings{}, nil - }).AnyTimes() - - _ = RegisterDocumentParser(f, test.registerDocType) // Ignoring error because it is mutating a global variable - - transportFunc := func(x []assembler.IngestPredicates, y []*common.IdentifierStrings) error { - if !reflect.DeepEqual(x, test.parseWant) { - t.Errorf("Subscribe() got = %v, want %v", x, test.parseWant) - } - if !reflect.DeepEqual(y, test.parseWant1) { - t.Errorf("Subscribe() got1 = %v, want %v", y, test.parseWant1) - } - return nil - } - - if err := Subscribe(ctx, transportFunc); (err != nil) != test.wantErr { - t.Errorf("Subscribe() error = %v, wantErr %v", err, test.wantErr) - } - - delete(documentParser, test.registerDocType) - }) - } -} - -func testPublish(ctx context.Context, documentTree processor.DocumentTree) error { - docTreeJSON, err := json.Marshal(documentTree) - if err != nil { - return err - } - err = emitter.Publish(ctx, emitter.SubjectNameDocProcessed, docTreeJSON) - if err != nil { - return err - } - return nil -}