Skip to content

Commit

Permalink
Add support for bundle downloading
Browse files Browse the repository at this point in the history
Previously, policies and data had to be pushed into OPA via the REST API
or loaded via command line arguments at startup.

With these changes, OPA can now be configured to pull down bundles of
policy and data from remote HTTP servers. When a bundle is downloaded
successfully, the policies and data are loaded out of the bundle file
and inserted into storage.
  • Loading branch information
tsandall committed Mar 16, 2018
1 parent 966eaac commit f131cfc
Show file tree
Hide file tree
Showing 15 changed files with 1,717 additions and 8 deletions.
205 changes: 205 additions & 0 deletions bundle/bundle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2018 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

// Package bundle implements bundle loading.
package bundle

import (
"archive/tar"
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"reflect"
"strings"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/util"
"github.com/pkg/errors"
)

// Common file extensions and file names.
const (
RegoExt = ".rego"
JSONExt = ".json"
DataFileExt = "/data.json"
)

// Bundle represents a loaded bundle. The bundle can contain data and policies.
type Bundle struct {
Data map[string]interface{}
Modules []ModuleFile
}

// ModuleFile represents a single module contained a bundle.
type ModuleFile struct {
Path string
Raw []byte
Parsed *ast.Module
}

// Write serializes the Bundle and writes it to w.
func Write(w io.Writer, bundle Bundle) error {
gw := gzip.NewWriter(w)
tw := tar.NewWriter(gw)

var buf bytes.Buffer

if err := json.NewEncoder(&buf).Encode(bundle.Data); err != nil {
return err
}

if err := writeFile(tw, "data.json", buf.Bytes()); err != nil {
return err
}

for _, module := range bundle.Modules {
if err := writeFile(tw, module.Path, module.Raw); err != nil {
return err
}
}

if err := tw.Close(); err != nil {
return err
}

return gw.Close()
}

// Read returns a new Bundle loaded from the reader.
func Read(r io.Reader) (Bundle, error) {

var bundle Bundle

bundle.Data = map[string]interface{}{}

gr, err := gzip.NewReader(r)
if err != nil {
return bundle, errors.Wrap(err, "bundle read failed")
}

tr := tar.NewReader(gr)

for {
header, err := tr.Next()
if err == io.EOF {
break
} else if err != nil {
return bundle, errors.Wrap(err, "bundle read failed")
}

if header.Typeflag != tar.TypeReg {
continue
}

var buf bytes.Buffer
io.Copy(&buf, tr)
path := header.Name

if strings.HasSuffix(path, RegoExt) {
module, err := ast.ParseModule(path, buf.String())
if err != nil {
return bundle, errors.Wrap(err, "bundle load failed")
}
file := ModuleFile{
Path: path,
Raw: buf.Bytes(),
Parsed: module,
}
bundle.Modules = append(bundle.Modules, file)

} else if strings.HasSuffix(path, DataFileExt) {
var value interface{}
if err := util.NewJSONDecoder(&buf).Decode(&value); err != nil {
return bundle, errors.Wrapf(err, "bundle load failed on %v", path)
}
dirpath := strings.Trim(strings.TrimSuffix(path, DataFileExt), "/")
var key []string
if dirpath != "" {
key = strings.Split(dirpath, "/")
}
if err := bundle.insert(key, value); err != nil {
return bundle, errors.Wrapf(err, "bundle load failed on %v", path)
}
}
}

return bundle, nil
}

// Equal returns true if this bundle's contents equal the other bundle's
// contents.
func (b Bundle) Equal(other Bundle) bool {
if !reflect.DeepEqual(b.Data, other.Data) {
return false
}
if len(b.Modules) != len(other.Modules) {
return false
}
for i := range b.Modules {
if b.Modules[i].Path != other.Modules[i].Path {
return false
}
if !b.Modules[i].Parsed.Equal(other.Modules[i].Parsed) {
return false
}
if !bytes.Equal(b.Modules[i].Raw, other.Modules[i].Raw) {
return false
}
}
return true
}

func (b *Bundle) insert(key []string, value interface{}) error {
if len(key) == 0 {
obj, ok := value.(map[string]interface{})
if !ok {
return fmt.Errorf("root value must be object")
}
b.Data = obj
return nil
}

obj, err := b.mkdir(key[:len(key)-1])
if err != nil {
return err
}

obj[key[len(key)-1]] = value
return nil
}

func (b *Bundle) mkdir(key []string) (map[string]interface{}, error) {
obj := b.Data
for i := 0; i < len(key); i++ {
node, ok := obj[key[i]]
if !ok {
node = map[string]interface{}{}
obj[key[i]] = node
}
obj, ok = node.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("non-leaf value must be object")
}
}
return obj, nil
}

func writeFile(tw *tar.Writer, path string, bs []byte) error {

hdr := &tar.Header{
Name: "/" + strings.TrimLeft(path, "/"),
Mode: 0600,
Typeflag: tar.TypeReg,
Size: int64(len(bs)),
}

if err := tw.WriteHeader(hdr); err != nil {
return err
}

_, err := tw.Write(bs)
return err
}
145 changes: 145 additions & 0 deletions bundle/bundle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2018 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package bundle

import (
"archive/tar"
"bytes"
"compress/gzip"
"encoding/json"
"testing"

"github.com/open-policy-agent/opa/ast"
)

func TestRead(t *testing.T) {

files := [][2]string{
{"/a/b/c/data.json", "[1,2,3]"},
{"/a/b/d/data.json", "true"},
{"/example/example.rego", `package example`},
}

buf := writeTarGz(files)
bundle, err := Read(buf)
if err != nil {
t.Fatal(err)
}

module := `package example`

exp := Bundle{
Data: map[string]interface{}{
"a": map[string]interface{}{
"b": map[string]interface{}{
"c": []interface{}{json.Number("1"), json.Number("2"), json.Number("3")},
"d": true,
},
},
},
Modules: []ModuleFile{
{
Path: "/example/example.rego",
Parsed: ast.MustParseModule(module),
Raw: []byte(module),
},
},
}

if !exp.Equal(bundle) {
t.Fatal("Exp:", exp, "\n\nGot:", bundle)
}
}

func TestReadErrorBadGzip(t *testing.T) {
buf := bytes.NewBufferString("bad gzip bytes")
_, err := Read(buf)
if err == nil {
t.Fatal("expected error")
}
}

func TestReadErrorBadTar(t *testing.T) {
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
gw.Write([]byte("bad tar bytes"))
gw.Close()
_, err := Read(&buf)
if err == nil {
t.Fatal("expected error")
}
}

func TestReadErrorBadContents(t *testing.T) {
tests := []struct {
files [][2]string
}{
{[][2]string{{"/test.rego", "lkafjasdkljf"}}},
{[][2]string{{"/data.json", "lskjafkljsdf"}}},
{[][2]string{{"/data.json", "[1,2,3]"}}},
{[][2]string{
{"/a/b/data.json", "[1,2,3]"},
{"a/b/c/data.json", "true"},
}},
}
for _, test := range tests {
buf := writeTarGz(test.files)
_, err := Read(buf)
if err == nil {
t.Fatal("expected error")
}
}

}

func TestRoundtrip(t *testing.T) {

bundle := Bundle{
Data: map[string]interface{}{
"foo": map[string]interface{}{
"bar": []interface{}{json.Number("1"), json.Number("2"), json.Number("3")},
"baz": true,
"qux": "hello",
},
},
Modules: []ModuleFile{
{
Path: "/foo/corge/corge.rego",
Parsed: ast.MustParseModule(`package foo.corge`),
Raw: []byte(`package foo.corge`),
},
},
}

var buf bytes.Buffer

if err := Write(&buf, bundle); err != nil {
t.Fatal("Unexpected error:", err)
}

bundle2, err := Read(&buf)
if err != nil {
t.Fatal("Unexpected error:", err)
}

if !bundle2.Equal(bundle) {
t.Fatal("Exp:", bundle, "\n\nGot:", bundle2)
}

}

func writeTarGz(files [][2]string) *bytes.Buffer {
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
defer gw.Close()
tw := tar.NewWriter(gw)
defer tw.Close()
for _, file := range files {
if err := writeFile(tw, file[0], []byte(file[1])); err != nil {
panic(err)
}
}
return &buf
}
1 change: 1 addition & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ the data document with the following syntax:
},
}

runCommand.Flags().StringVarP(&params.ConfigFile, "config-file", "c", "", "set path of configuration file")
runCommand.Flags().BoolVarP(&serverMode, "server", "s", false, "start the runtime in server mode")
runCommand.Flags().StringVarP(&params.HistoryPath, "history", "H", historyPath(), "set path of history file")
runCommand.Flags().StringVarP(&params.Addr, "addr", "a", defaultAddr, "set listening address of the server")
Expand Down
4 changes: 4 additions & 0 deletions docs/book/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@
* [Running with Docker](deployments.md#running-with-docker)
* [Kubernetes](deployments.md#kubernetes)
* [Kicking the Tires](deployments.md#kicking-the-tires)
* [Bundles](bundles.md)
* [Configuration](bundles.md#bundle-configuration)
* [Service API](bundles.md#bundle-service-api)
* [File Format](bundles.md#bundle-file-format)
* [Monitoring & Diagnostics](monitoring-diagnostics.md)
* [Prometheus](monitoring-diagnostics.md#prometheus)
* [Diagnostics](monitoring-diagnostics.md#prometheus)
Expand Down
Loading

0 comments on commit f131cfc

Please sign in to comment.