Skip to content

Commit

Permalink
Add an adaptive limit for the upload chunk size
Browse files Browse the repository at this point in the history
This commit adds a chunk adaptive limit that acts as a
measure for encoding as many decisions into each chunk as possible.
This change should help fill-up the chunks close to their allowed
limit and thereby help reduce netwrok and memory resources.

Signed-off-by: Ashutosh Narkar <[email protected]>
  • Loading branch information
ashutosh-narkar committed Nov 3, 2021
1 parent cf5b8b4 commit 4a48b65
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 48 deletions.
132 changes: 116 additions & 16 deletions plugins/logs/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,52 @@ import (
"compress/gzip"
"encoding/json"
"fmt"
"math"

"github.com/open-policy-agent/opa/metrics"
)

const (
encHardLimitThreshold = 0.9
softLimitBaseFactor = 2
softLimitExponentScaleFactor = 0.2
encSoftLimitScaleUpCounterName = "enc_soft_limit_scale_up"
encSoftLimitScaleDownCounterName = "enc_soft_limit_scale_down"
encSoftLimitStableCounterName = "enc_soft_limit_stable"
)

// chunkEncoder implements log buffer chunking and compression. Log events are
// written to the encoder and the encoder outputs chunks that are fit to the
// configured limit.
type chunkEncoder struct {
limit int64
bytesWritten int
buf *bytes.Buffer
w *gzip.Writer
limit int64
softLimit int64
softLimitScaleUpExponent float64
softLimitScaleDownExponent float64
bytesWritten int
buf *bytes.Buffer
w *gzip.Writer
metrics metrics.Metrics
}

func newChunkEncoder(limit int64) *chunkEncoder {
enc := &chunkEncoder{
limit: limit,
limit: limit,
softLimit: limit,
softLimitScaleUpExponent: 0,
softLimitScaleDownExponent: 0,
}
enc.reset()
enc.update()

return enc
}

func (enc *chunkEncoder) WithMetrics(m metrics.Metrics) *chunkEncoder {
enc.metrics = m
return enc
}

func (enc *chunkEncoder) Write(event EventV1) (result []byte, err error) {
func (enc *chunkEncoder) Write(event EventV1) (result [][]byte, err error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(event); err != nil {
return nil, err
Expand All @@ -44,12 +68,15 @@ func (enc *chunkEncoder) Write(event EventV1) (result []byte, err error) {
return nil, fmt.Errorf("upload chunk size too small")
}

if int64(len(bs)+enc.bytesWritten+1) > enc.limit {
if int64(len(bs)+enc.bytesWritten+1) > enc.softLimit {
if err := enc.writeClose(); err != nil {
return nil, err
}

result = enc.reset()
result, err = enc.reset()
if err != nil {
return nil, err
}
}

if enc.bytesWritten == 0 {
Expand Down Expand Up @@ -82,27 +109,100 @@ func (enc *chunkEncoder) writeClose() error {
return enc.w.Close()
}

func (enc *chunkEncoder) Flush() ([]byte, error) {
func (enc *chunkEncoder) Flush() ([][]byte, error) {
if enc.bytesWritten == 0 {
return nil, nil
}
if err := enc.writeClose(); err != nil {
return nil, err
}
return enc.reset(), nil
return enc.reset()
}

func (enc *chunkEncoder) reset() ([][]byte, error) {

// Adjust the encoder's soft limit based on the current amount of
// data written to the underlying buffer. The soft limit decides when to flush a chunk.
// The soft limit is modified based on the below algorithm:
// 1) Scale Up: If the current chunk size is within 90% of the user-configured limit, exponentially increase
// the soft limit. The exponential function is 2^x where x has a minimum value of 1
// 2) Scale Down: If the current chunk size exceeds the hard limit, decrease the soft limit and re-encode the
// decisions in the last chunk.
// 3) Equilibrium: If the chunk size is between 90% and 100% of the user-configured limit, maintain soft limit value.

if enc.buf.Len() < int(float64(enc.limit)*encHardLimitThreshold) {
if enc.metrics != nil {
enc.metrics.Counter(encSoftLimitScaleUpCounterName).Incr()
}

mul := int64(math.Pow(float64(softLimitBaseFactor), float64(enc.softLimitScaleUpExponent+1)))
enc.softLimit *= mul
enc.softLimitScaleUpExponent += softLimitExponentScaleFactor
return enc.update(), nil
}

if int(enc.limit) > enc.buf.Len() && enc.buf.Len() >= int(float64(enc.limit)*encHardLimitThreshold) {
if enc.metrics != nil {
enc.metrics.Counter(encSoftLimitStableCounterName).Incr()
}

enc.softLimitScaleDownExponent = enc.softLimitScaleUpExponent
return enc.update(), nil
}

if enc.softLimit > enc.limit {
if enc.metrics != nil {
enc.metrics.Counter(encSoftLimitScaleDownCounterName).Incr()
}

if enc.softLimitScaleDownExponent < enc.softLimitScaleUpExponent {
enc.softLimitScaleDownExponent = enc.softLimitScaleUpExponent
}

den := int64(math.Pow(float64(softLimitBaseFactor), float64(enc.softLimitScaleDownExponent-enc.softLimitScaleUpExponent+1)))
enc.softLimit /= den

if enc.softLimitScaleUpExponent > 0 {
enc.softLimitScaleUpExponent -= softLimitExponentScaleFactor
}
}

events, decErr := newChunkDecoder(enc.buf.Bytes()).decode()
if decErr != nil {
return nil, decErr
}

enc.initialize()

var result [][]byte
for _, event := range events {
chunk, err := enc.Write(event)
if err != nil {
return nil, err
}

if chunk != nil {
result = append(result, chunk...)
}
}
return result, nil
}

func (enc *chunkEncoder) reset() []byte {
func (enc *chunkEncoder) update() [][]byte {
buf := enc.buf
enc.buf = new(bytes.Buffer)
enc.bytesWritten = 0
enc.w = gzip.NewWriter(enc.buf)
enc.initialize()
if buf != nil {
return buf.Bytes()
return [][]byte{buf.Bytes()}
}
return nil
}

func (enc *chunkEncoder) initialize() {
enc.buf = new(bytes.Buffer)
enc.bytesWritten = 0
enc.w = gzip.NewWriter(enc.buf)
}

// chunkDecoder decodes the encoded chunks and outputs the log events
type chunkDecoder struct {
raw []byte
Expand Down
100 changes: 100 additions & 0 deletions plugins/logs/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
package logs

import (
"fmt"
"testing"
"time"

"github.com/open-policy-agent/opa/metrics"
)

func TestChunkEncoder(t *testing.T) {
Expand Down Expand Up @@ -46,6 +49,103 @@ func TestChunkEncoder(t *testing.T) {
bs, err = enc.Flush()
if bs != nil || err != nil {
t.Fatalf("Unexpected error chunk produced: err: %v", err)

}
}

func TestChunkEncoderAdaptive(t *testing.T) {

enc := newChunkEncoder(1000).WithMetrics(metrics.New())
var result interface{} = false
var expInput interface{} = map[string]interface{}{"method": "GET"}
ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z")
if err != nil {
panic(err)
}

var chunks [][]byte
numEvents := 400
for i := 0; i < numEvents; i++ {

bundles := map[string]BundleInfoV1{}
bundles["authz"] = BundleInfoV1{Revision: fmt.Sprint(i)}

event := EventV1{
Labels: map[string]string{
"id": "test-instance-id",
"app": "example-app",
},
Bundles: bundles,
DecisionID: fmt.Sprint(i),
Path: "foo/bar",
Input: &expInput,
Result: &result,
RequestedBy: "test",
Timestamp: ts,
}

chunk, err := enc.Write(event)
if err != nil {
t.Fatal(err)
}
if chunk != nil {
chunks = append(chunks, chunk...)
}
}

// decode the chunks and check the number of events is equal to the encoded events

numEventsActual := decodeChunks(t, chunks)

// flush the encoder
for {
bs, err := enc.Flush()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if len(bs) == 0 {
break
}

numEventsActual += decodeChunks(t, bs)
}

if numEvents != numEventsActual {
t.Fatalf("Expected %v events but got %v", numEvents, numEventsActual)
}

actualScaleUpEvents := enc.metrics.Counter(encSoftLimitScaleUpCounterName).Value().(uint64)
actualScaleDownEvents := enc.metrics.Counter(encSoftLimitScaleDownCounterName).Value().(uint64)
actualEquiEvents := enc.metrics.Counter(encSoftLimitStableCounterName).Value().(uint64)

expectedScaleUpEvents := uint64(8)
expectedScaleDownEvents := uint64(3)
expectedEquiEvents := uint64(0)

if actualScaleUpEvents != expectedScaleUpEvents {
t.Fatalf("Expected scale up events %v but got %v", expectedScaleUpEvents, actualScaleUpEvents)
}

if actualScaleDownEvents != expectedScaleDownEvents {
t.Fatalf("Expected scale down events %v but got %v", expectedScaleDownEvents, actualScaleDownEvents)
}

if actualEquiEvents != expectedEquiEvents {
t.Fatalf("Expected equilibrium events %v but got %v", expectedEquiEvents, actualEquiEvents)
}
}

func decodeChunks(t *testing.T, bs [][]byte) int {
t.Helper()

numEvents := 0
for _, chunk := range bs {
events, err := newChunkDecoder(chunk).decode()
if err != nil {
t.Fatal(err)
}
numEvents += len(events)
}
return numEvents
}
13 changes: 8 additions & 5 deletions plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ func New(parsedConfig *Config, manager *plugins.Manager) *Plugin {
// WithMetrics sets the global metrics provider to be used by the plugin.
func (p *Plugin) WithMetrics(m metrics.Metrics) *Plugin {
p.metrics = m
p.enc.WithMetrics(m)
return p
}

Expand Down Expand Up @@ -712,7 +713,7 @@ func (p *Plugin) oneShot(ctx context.Context) (ok bool, err error) {
oldChunkEnc := p.enc
oldBuffer := p.buffer
p.buffer = newLogBuffer(*p.config.Reporting.BufferSizeLimitBytes)
p.enc = newChunkEncoder(*p.config.Reporting.UploadSizeLimitBytes)
p.enc = newChunkEncoder(*p.config.Reporting.UploadSizeLimitBytes).WithMetrics(p.metrics)
p.mtx.Unlock()

// Along with uploading the compressed events in the buffer
Expand All @@ -721,8 +722,10 @@ func (p *Plugin) oneShot(ctx context.Context) (ok bool, err error) {
chunk, err := oldChunkEnc.Flush()
if err != nil {
return false, err
} else if chunk != nil {
p.bufferChunk(oldBuffer, chunk)
}

for _, ch := range chunk {
p.bufferChunk(oldBuffer, ch)
}

if oldBuffer.Len() == 0 {
Expand Down Expand Up @@ -792,8 +795,8 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {
return
}

if result != nil {
p.bufferChunk(p.buffer, result)
for _, chunk := range result {
p.bufferChunk(p.buffer, chunk)
}
}

Expand Down
Loading

0 comments on commit 4a48b65

Please sign in to comment.