Skip to content

Commit

Permalink
updated collector and ingestor cmd with NATS (guacsec#418)
Browse files Browse the repository at this point in the history
* updated collector and ingestor cmd with NATs

Signed-off-by: pxp928 <[email protected]>

* add nats addr flag and remove unused ones

Signed-off-by: pxp928 <[email protected]>

* remove unused flags in collector cmd

Signed-off-by: pxp928 <[email protected]>

---------

Signed-off-by: pxp928 <[email protected]>
  • Loading branch information
pxp928 authored Feb 8, 2023
1 parent 663b076 commit 4dde5cd
Show file tree
Hide file tree
Showing 11 changed files with 543 additions and 67 deletions.
4 changes: 4 additions & 0 deletions cmd/collector/cmd/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,7 @@ var exampleCmd = &cobra.Command{
}
},
}

func init() {
rootCmd.AddCommand(exampleCmd)
}
134 changes: 134 additions & 0 deletions cmd/collector/cmd/files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//
// Copyright 2023 The GUAC Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"context"
"fmt"
"os"
"time"

"github.com/guacsec/guac/pkg/emitter"
"github.com/guacsec/guac/pkg/handler/collector"
"github.com/guacsec/guac/pkg/handler/collector/file"
"github.com/guacsec/guac/pkg/handler/processor"
"github.com/guacsec/guac/pkg/logging"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

type options struct {
// path to folder with documents to collect
path string
// map of image repo and tags
repoTags map[string][]string
natsAddr string
}

var filesCmd = &cobra.Command{
Use: "files [flags] file_path",
Short: "take a folder of files and create a GUAC graph utilizing Nats pubsub",
Run: func(cmd *cobra.Command, args []string) {

opts, err := validateFlags(
viper.GetString("natsaddr"),
args)
if err != nil {
fmt.Printf("unable to validate flags: %v\n", err)
_ = cmd.Help()
os.Exit(1)
}

ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)

// Register collector
fileCollector := file.NewFileCollector(ctx, opts.path, false, time.Second)
err = collector.RegisterDocumentCollector(fileCollector, file.FileCollector)
if err != nil {
logger.Errorf("unable to register file collector: %v", err)
}
initializeNATsandCollector(ctx, opts.natsAddr)
},
}

func validateFlags(natsAddr string, args []string) (options, error) {
var opts options

opts.natsAddr = natsAddr

if len(args) != 1 {
return opts, fmt.Errorf("expected positional argument for file_path")
}

opts.path = args[0]

return opts, nil
}

func getCollectorPublish(ctx context.Context) (func(*processor.Document) error, error) {
return func(d *processor.Document) error {
return collector.Publish(ctx, d)
}, nil
}

func initializeNATsandCollector(ctx context.Context, natsAddr string) {
logger := logging.FromContext(ctx)
// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(natsAddr, "", "")
ctx, err := jetStream.JetStreamInit(ctx)
if err != nil {
logger.Errorf("jetStream initialization failed with error: %v", err)
os.Exit(1)
}
defer jetStream.Close()

// Get pipeline of components
collectorPubFunc, err := getCollectorPublish(ctx)
if err != nil {
logger.Errorf("error: %v", err)
os.Exit(1)
}

// Set emit function to go through the entire pipeline
emit := func(d *processor.Document) error {
err = collectorPubFunc(d)
if err != nil {
logger.Errorf("collector ended with error: %v", err)
os.Exit(1)
}
return nil
}

// Collect
errHandler := func(err error) bool {
if err == nil {
logger.Info("collector ended gracefully")
return true
}
logger.Errorf("collector ended with error: %v", err)
return false
}

if err := collector.Collect(ctx, emit, errHandler); err != nil {
logger.Fatal(err)
}
}

func init() {
rootCmd.AddCommand(filesCmd)
}
83 changes: 83 additions & 0 deletions cmd/collector/cmd/oci.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//
// Copyright 2022 The GUAC Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/guacsec/guac/pkg/handler/collector"
"github.com/guacsec/guac/pkg/handler/collector/oci"
"github.com/guacsec/guac/pkg/logging"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

var ociCmd = &cobra.Command{
Use: "image [flags] image_path1 image_path2...",
Short: "takes images to download sbom and attestation stored in OCI to add to GUAC graph",
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)

opts, err := validateOCIFlags(
viper.GetString("natsaddr"),
args)
if err != nil {
fmt.Printf("unable to validate flags: %v\n", err)
_ = cmd.Help()
os.Exit(1)
}

// Register collector
ociCollector := oci.NewOCICollector(ctx, opts.repoTags, false, 10*time.Minute)
err = collector.RegisterDocumentCollector(ociCollector, oci.OCICollector)
if err != nil {
logger.Errorf("unable to register oci collector: %v", err)
}

initializeNATsandCollector(ctx, opts.natsAddr)
},
}

func validateOCIFlags(natsAddr string, args []string) (options, error) {
var opts options

opts.natsAddr = natsAddr
opts.repoTags = map[string][]string{}

if len(args) < 1 {
return opts, fmt.Errorf("expected positional argument for image_path")
}
for _, arg := range args {
stringSplit := strings.Split(arg, ":")
if len(stringSplit) == 2 {
opts.repoTags[stringSplit[0]] = append(opts.repoTags[stringSplit[0]], stringSplit[1])
} else {
return opts, fmt.Errorf("image_path parsing error. require format repo:tag")
}
}

return opts, nil
}

func init() {
rootCmd.AddCommand(ociCmd)
}
64 changes: 63 additions & 1 deletion cmd/collector/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,76 @@
package cmd

import (
"context"
"fmt"
"os"
"strings"

"github.com/guacsec/guac/pkg/logging"

homedir "github.com/mitchellh/go-homedir"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

var flags = struct {
// collect-sub flags
collectSubAddr string
collectSubListenPort int

// nats
natsAddr string
}{}

var cfgFile string

func init() {
rootCmd.AddCommand(exampleCmd)
cobra.OnInitialize(initConfig)
persistentFlags := rootCmd.PersistentFlags()
persistentFlags.StringVar(&flags.natsAddr, "natsaddr", "nats://127.0.0.1:4222", "address to connect to NATs Server")
persistentFlags.StringVar(&flags.collectSubAddr, "csub-addr", "localhost:2782", "address to connect to collect-sub service")
persistentFlags.IntVar(&flags.collectSubListenPort, "csub-listen-port", 2782, "port to listen to on collect-sub service")

flagNames := []string{"natsaddr", "csub-addr", "csub-listen-port"}
for _, name := range flagNames {
if flag := persistentFlags.Lookup(name); flag != nil {
if err := viper.BindPFlag(name, flag); err != nil {
fmt.Fprintf(os.Stderr, "failed to bind flag: %v", err)
os.Exit(1)
}
}
}
}

func initConfig() {
ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)

if cfgFile != "" {
viper.SetConfigFile(cfgFile)
} else {
home, err := homedir.Dir()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get user home directory: %v\n", err)
os.Exit(1)
}

viper.AddConfigPath(home)
viper.AddConfigPath(".")
viper.SetConfigName("guac")
viper.SetConfigType("yaml")
}

viper.AutomaticEnv()
viper.SetEnvPrefix("guac")
// The following line is needed to replace - with _ in env variables
// e.g. GUAC_DB_ADDR will be read as GUAC_gdbaddr
// The POSIX standard does not allow - in env variables
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))

if err := viper.ReadInConfig(); err == nil {
logger.Infof("Using config file: %s", viper.ConfigFileUsed())
}
}

var rootCmd = &cobra.Command{
Expand Down
31 changes: 5 additions & 26 deletions cmd/ingest/cmd/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cmd

import (
"context"
"fmt"
"os"

"github.com/guacsec/guac/pkg/assembler"
Expand All @@ -28,20 +29,6 @@ import (
"github.com/spf13/viper"
)

var flags = struct {
dbAddr string
gdbuser string
gdbpass string
realm string
}{}

type options struct {
dbAddr string
user string
pass string
realm string
}

var docs []processor.DocumentTree

var exampleCmd = &cobra.Command{
Expand All @@ -56,9 +43,11 @@ var exampleCmd = &cobra.Command{
viper.GetString("gdbpass"),
viper.GetString("gdbaddr"),
viper.GetString("realm"),
)
viper.GetString("natsaddr"),
args)
if err != nil {
logger.Errorf("unable to validate flags: %v", err)
fmt.Printf("unable to validate flags: %v\n", err)
_ = cmd.Help()
os.Exit(1)
}

Expand Down Expand Up @@ -94,16 +83,6 @@ var exampleCmd = &cobra.Command{
},
}

func validateFlags(user string, pass string, dbAddr string, realm string) (options, error) {
var opts options

opts.user = user
opts.pass = pass
opts.dbAddr = dbAddr

return opts, nil
}

func init() {
rootCmd.AddCommand(exampleCmd)
}
Loading

0 comments on commit 4dde5cd

Please sign in to comment.