Skip to content

Commit

Permalink
abstract pubsub service via gocloud (guacsec#1664)
Browse files Browse the repository at this point in the history
* add new file to emitter package

Signed-off-by: pxp928 <[email protected]>

* add gocloud nats plugin that supports jetstream

Signed-off-by: pxp928 <[email protected]>

* add publish and subscribe via gocloud

Signed-off-by: pxp928 <[email protected]>

* add support for jetstream and other pubsub services

Signed-off-by: pxp928 <[email protected]>

* add missing initalization for nats and defer sub close

Signed-off-by: pxp928 <[email protected]>

* nats jetstream functional with gocloud

Signed-off-by: pxp928 <[email protected]>

* remove context initalization for jetstream

Signed-off-by: pxp928 <[email protected]>

* udpate unit tests for emitter

Signed-off-by: pxp928 <[email protected]>

* remove unneeded stream information for publish and subscribe

Signed-off-by: pxp928 <[email protected]>

* change nats-addr to pubsub-addr and add check before nats initializes

Signed-off-by: pxp928 <[email protected]>

* remove storing in context

Signed-off-by: pxp928 <[email protected]>

* update helper comments for emitter

Signed-off-by: pxp928 <[email protected]>

* updated error message

Signed-off-by: pxp928 <[email protected]>

* fix err checking to shutdown topic and subscriber

Signed-off-by: pxp928 <[email protected]>

* remove commented out unused code

Signed-off-by: pxp928 <[email protected]>

* improve nats error messaage with server address

Signed-off-by: pxp928 <[email protected]>

* fix tilt and address comments

Signed-off-by: pxp928 <[email protected]>

* remove commented out unused code

Signed-off-by: pxp928 <[email protected]>

---------

Signed-off-by: pxp928 <[email protected]>
  • Loading branch information
pxp928 authored Jan 29, 2024
1 parent ede754a commit eee82ba
Show file tree
Hide file tree
Showing 38 changed files with 814 additions and 915 deletions.
2 changes: 1 addition & 1 deletion cmd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ not required.
therefore don't use camelCase anywhere.

- Descriptive on its own - The flag names are also used in the guac.yaml config
file. Therefore a name should be self descriptive. Good: `nats-addr`, Bad:
file. Therefore a name should be self descriptive. Good: `pubsub-addr`, Bad:
`type`. If it is something that has the same meaning everywhere, it is ok to
be short: ex: interval.

Expand Down
15 changes: 7 additions & 8 deletions cmd/guaccollect/cmd/deps_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/guacsec/guac/pkg/cli"
"github.com/guacsec/guac/pkg/collectsub/client"
csubclient "github.com/guacsec/guac/pkg/collectsub/client"
"github.com/guacsec/guac/pkg/collectsub/datasource"
"github.com/guacsec/guac/pkg/collectsub/datasource/csubsource"
Expand All @@ -38,8 +37,8 @@ import (
type depsDevOptions struct {
// datasource for the collector
dataSource datasource.CollectSource
// address for NATS connection
natsAddr string
// address for pubsub connection
pubsubAddr string
// address for blob store
blobAddr string
// run as poll collector
Expand Down Expand Up @@ -75,7 +74,7 @@ you have access to read and write to the respective blob store.`,
logger := logging.FromContext(ctx)

opts, err := validateDepsDevFlags(
viper.GetString("nats-addr"),
viper.GetString("pubsub-addr"),
viper.GetString("blob-addr"),
viper.GetString("csub-addr"),
viper.GetBool("csub-tls"),
Expand Down Expand Up @@ -111,22 +110,22 @@ you have access to read and write to the respective blob store.`,
}()
}

initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr)
initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr)
},
}

func validateDepsDevFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, retrieveDependencies bool, args []string,
func validateDepsDevFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, retrieveDependencies bool, args []string,
enablePrometheus bool, prometheusPort int,
) (depsDevOptions, error) {
var opts depsDevOptions
opts.natsAddr = natsAddr
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
opts.poll = poll
opts.retrieveDependencies = retrieveDependencies
opts.enablePrometheus = enablePrometheus
opts.prometheusPort = prometheusPort
if useCsub {
csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
csubOpts, err := csubclient.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
if err != nil {
return opts, fmt.Errorf("unable to validate csub client flags: %w", err)
}
Expand Down
43 changes: 24 additions & 19 deletions cmd/guaccollect/cmd/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
Expand All @@ -37,8 +38,8 @@ import (
type filesOptions struct {
// path to folder with documents to collect
path string
// address for NATS connection
natsAddr string
// address for pubsub connection
pubsubAddr string
// address for blob store
blobAddr string
// poll location
Expand Down Expand Up @@ -66,7 +67,7 @@ you have access to read and write to the respective blob store.`,
Run: func(cmd *cobra.Command, args []string) {

opts, err := validateFilesFlags(
viper.GetString("nats-addr"),
viper.GetString("pubsub-addr"),
viper.GetString("blob-addr"),
viper.GetBool("service-poll"),
args)
Expand All @@ -86,14 +87,14 @@ you have access to read and write to the respective blob store.`,
logger.Errorf("unable to register file collector: %v", err)
}

initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr)
initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr)
},
}

func validateFilesFlags(natsAddr string, blobAddr string, poll bool, args []string) (filesOptions, error) {
func validateFilesFlags(pubsubAddr string, blobAddr string, poll bool, args []string) (filesOptions, error) {
var opts filesOptions

opts.natsAddr = natsAddr
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
opts.poll = poll

Expand All @@ -106,33 +107,37 @@ func validateFilesFlags(natsAddr string, blobAddr string, poll bool, args []stri
return opts, nil
}

func getCollectorPublish(ctx context.Context) (func(*processor.Document) error, error) {
func getCollectorPublish(ctx context.Context, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) (func(*processor.Document) error, error) {
return func(d *processor.Document) error {
return collector.Publish(ctx, d)
return collector.Publish(ctx, d, blobStore, pubsub)
}, nil
}

func initializeNATsandCollector(ctx context.Context, natsAddr string, blobAddr string) {
func initializeNATsandCollector(ctx context.Context, pubsubAddr string, blobAddr string) {
logger := logging.FromContext(ctx)
// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(natsAddr, "", "")
ctx, err := jetStream.JetStreamInit(ctx)
if err != nil {
logger.Errorf("jetStream initialization failed with error: %v", err)
os.Exit(1)

if strings.HasPrefix(pubsubAddr, "nats://") {
// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(pubsubAddr, "", "")
if err := jetStream.JetStreamInit(ctx); err != nil {
logger.Errorf("jetStream initialization failed with error: %v", err)
os.Exit(1)
}
defer jetStream.Close()
}
defer jetStream.Close()

// initialize blob store
blobStore, err := blob.NewBlobStore(ctx, blobAddr)
if err != nil {
logger.Errorf("unable to connect to blog store: %v", err)
}

ctx = blob.WithBlobStore(ctx, blobStore)
// initialize pubsub
pubsub := emitter.NewEmitterPubSub(ctx, pubsubAddr)

// Get pipeline of components
collectorPubFunc, err := getCollectorPublish(ctx)
collectorPubFunc, err := getCollectorPublish(ctx, blobStore, pubsub)
if err != nil {
logger.Errorf("error: %v", err)
os.Exit(1)
Expand Down
15 changes: 7 additions & 8 deletions cmd/guaccollect/cmd/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/guacsec/guac/internal/client/githubclient"
"github.com/guacsec/guac/pkg/collectsub/client"
csubclient "github.com/guacsec/guac/pkg/collectsub/client"
"github.com/guacsec/guac/pkg/collectsub/datasource"
"github.com/guacsec/guac/pkg/collectsub/datasource/csubsource"
Expand All @@ -37,8 +36,8 @@ import (
type githubOptions struct {
// datasource for the collector
dataSource datasource.CollectSource
// address for NATS connection
natsAddr string
// address for pubsub connection
pubsubAddr string
// address for blob store
blobAddr string
// run as poll collector
Expand Down Expand Up @@ -68,7 +67,7 @@ you have access to read and write to the respective blob store.`,
logger := logging.FromContext(ctx)

opts, err := validateGithubFlags(
viper.GetString("nats-addr"),
viper.GetString("pubsub-addr"),
viper.GetString("blob-addr"),
viper.GetString("csub-addr"),
viper.GetBool("csub-tls"),
Expand Down Expand Up @@ -109,18 +108,18 @@ you have access to read and write to the respective blob store.`,
logger.Errorf("unable to register Github collector: %v", err)
}

initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr)
initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr)
},
}

func validateGithubFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (githubOptions, error) {
func validateGithubFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (githubOptions, error) {
var opts githubOptions
opts.natsAddr = natsAddr
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
opts.poll = poll

if useCsub {
csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
csubOpts, err := csubclient.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
if err != nil {
return opts, fmt.Errorf("unable to validate csub client flags: %w", err)
}
Expand Down
15 changes: 7 additions & 8 deletions cmd/guaccollect/cmd/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"os"
"time"

"github.com/guacsec/guac/pkg/collectsub/client"
csubclient "github.com/guacsec/guac/pkg/collectsub/client"
"github.com/guacsec/guac/pkg/collectsub/datasource"
"github.com/guacsec/guac/pkg/collectsub/datasource/csubsource"
Expand All @@ -37,8 +36,8 @@ import (
type ociOptions struct {
// datasource for the collector
dataSource datasource.CollectSource
// address for NATS connection
natsAddr string
// address for pubsub connection
pubsubAddr string
// address for blob store
blobAddr string
// run as poll collector
Expand Down Expand Up @@ -68,7 +67,7 @@ you have access to read and write to the respective blob store.`,
logger := logging.FromContext(ctx)

opts, err := validateOCIFlags(
viper.GetString("nats-addr"),
viper.GetString("pubsub-addr"),
viper.GetString("blob-addr"),
viper.GetString("csub-addr"),
viper.GetBool("csub-tls"),
Expand All @@ -92,18 +91,18 @@ you have access to read and write to the respective blob store.`,
logger.Errorf("unable to register oci collector: %v", err)
}

initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr)
initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr)
},
}

func validateOCIFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (ociOptions, error) {
func validateOCIFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (ociOptions, error) {
var opts ociOptions
opts.natsAddr = natsAddr
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
opts.poll = poll

if useCsub {
csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
csubOpts, err := csubclient.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
if err != nil {
return opts, fmt.Errorf("unable to validate csub client flags: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/guaccollect/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func init() {
cobra.OnInitialize(cli.InitConfig)

set, err := cli.BuildFlags([]string{"nats-addr", "blob-addr", "csub-addr", "use-csub", "service-poll", "enable-prometheus", "prometheus-addr"})
set, err := cli.BuildFlags([]string{"pubsub-addr", "blob-addr", "csub-addr", "use-csub", "service-poll", "enable-prometheus", "prometheus-addr"})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err)
os.Exit(1)
Expand Down
32 changes: 18 additions & 14 deletions cmd/guacingest/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"

Expand All @@ -36,7 +37,7 @@ import (
)

type options struct {
natsAddr string
pubsubAddr string
blobAddr string
csubClientOptions client.CsubClientOptions
graphqlEndpoint string
Expand All @@ -45,7 +46,7 @@ type options struct {
func ingest(cmd *cobra.Command, args []string) {

opts, err := validateFlags(
viper.GetString("nats-addr"),
viper.GetString("pubsub-addr"),
viper.GetString("blob-addr"),
viper.GetString("csub-addr"),
viper.GetBool("csub-tls"),
Expand All @@ -61,22 +62,25 @@ func ingest(cmd *cobra.Command, args []string) {
ctx, cf := context.WithCancel(logging.WithLogger(context.Background()))
logger := logging.FromContext(ctx)

// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(opts.natsAddr, "", "")
ctx, err = jetStream.JetStreamInit(ctx)
if err != nil {
logger.Errorf("jetStream initialization failed with error: %v", err)
os.Exit(1)
if strings.HasPrefix(opts.pubsubAddr, "nats://") {
// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(opts.pubsubAddr, "", "")
if err := jetStream.JetStreamInit(ctx); err != nil {
logger.Errorf("jetStream initialization failed with error: %v", err)
os.Exit(1)
}
defer jetStream.Close()
}
defer jetStream.Close()

// initialize blob store
blobStore, err := blob.NewBlobStore(ctx, opts.blobAddr)
if err != nil {
logger.Errorf("unable to connect to blog store: %v", err)
}

ctx = blob.WithBlobStore(ctx, blobStore)
// initialize pubsub
pubsub := emitter.NewEmitterPubSub(ctx, opts.pubsubAddr)

// initialize collectsub client
csubClient, err := csub_client.NewClient(opts.csubClientOptions)
Expand All @@ -95,7 +99,7 @@ func ingest(cmd *cobra.Command, args []string) {
wg.Add(1)
go func() {
defer wg.Done()
if err := process.Subscribe(ctx, emit); err != nil {
if err := process.Subscribe(ctx, emit, blobStore, pubsub); err != nil {
logger.Errorf("processor ended with error: %v", err)
}
}()
Expand All @@ -110,9 +114,9 @@ func ingest(cmd *cobra.Command, args []string) {
wg.Wait()
}

func validateFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, graphqlEndpoint string, args []string) (options, error) {
func validateFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, graphqlEndpoint string, args []string) (options, error) {
var opts options
opts.natsAddr = natsAddr
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/guacingest/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func init() {
cobra.OnInitialize(cli.InitConfig)

set, err := cli.BuildFlags([]string{"nats-addr", "blob-addr", "csub-addr", "gql-addr"})
set, err := cli.BuildFlags([]string{"pubsub-addr", "blob-addr", "csub-addr", "gql-addr"})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions container_files/arango/guac.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# nats
nats-addr: nats://nats:4222
# pubsub - default nats
pubsub-addr: nats://nats:4222

# blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/
blob-addr: file:///path/to/dir
Expand Down
4 changes: 2 additions & 2 deletions container_files/ent/guac.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# nats
nats-addr: nats://nats:4222
# pubsub - default nats
pubsub-addr: nats://nats:4222

# blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/
blob-addr: file:///path/to/dir
Expand Down
4 changes: 2 additions & 2 deletions container_files/guac/guac.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# nats
nats-addr: nats://nats:4222
# pubsub - default nats
pubsub-addr: nats://nats:4222

# blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/
blob-addr: file:///path/to/dir
Expand Down
2 changes: 1 addition & 1 deletion container_files/nats/js.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// change max payload from default 1MB to 64MB
max_payload: 64MB
max_payload: 1MB
// enables jetstream, an empty block will enable and use defaults
jetstream {}
Loading

0 comments on commit eee82ba

Please sign in to comment.