Skip to content

Commit

Permalink
add zstandard encoder/decoder pipeline modifier and simplify setPipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
dcentelles committed Jul 12, 2023
1 parent 6c8203d commit 2b41237
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 94 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.17

require (
github.com/jaracil/ei v0.0.0-20170808175009-4f519a480ebd
github.com/klauspost/compress v1.16.7
github.com/nayarsystems/buffer v0.1.1
github.com/stretchr/testify v1.8.2
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jaracil/ei v0.0.0-20170808175009-4f519a480ebd h1:4MeyWzia2q0jIePfPv281gT5+xX7s2Fdf7fgebwfXbQ=
github.com/jaracil/ei v0.0.0-20170808175009-4f519a480ebd/go.mod h1:vY3hztyrLc9fGs8e9rvAhkYsSPhdryJUDlwnVDpYIfI=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/nayarsystems/buffer v0.1.1 h1:ioRQ9aza2bEvPnSJKNjPcjXHhvjtDJEPOqHZQ58vtI0=
github.com/nayarsystems/buffer v0.1.1/go.mod h1:O/MPQ7Ls2Feb/78IQ1qEdPYeUXX76I0YMfghSAUcprI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
10 changes: 10 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (s *StateQueue) Encode() (dataOut []byte, err error) {
if err != nil {
return
}
case MOD_ZSTD:
inputBuf, err = ZstdEnc(inputBuf)
if err != nil {
return
}
case MOD_BITTRANS:
stateBitSize := s.StateSchema.GetByteSize() * 8
inputBuf, err = shuffling.TransposeBits(inputBuf, stateBitSize)
Expand All @@ -129,6 +134,11 @@ func (s *StateQueue) Decode(data []byte) (err error) {
if err != nil {
return
}
case MOD_ZSTD:
inputBuf, err = ZstdDec(inputBuf)
if err != nil {
return
}
case MOD_BITTRANS:
numStates := inputBuf.GetByteSize() / s.StateSchema.GetByteSize()
inputBuf, err = shuffling.TransposeBits(inputBuf, numStates)
Expand Down
124 changes: 49 additions & 75 deletions queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bstates

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -91,103 +92,76 @@ func Test_Compression(t *testing.T) {
}

func Test_PipelineComparative(t *testing.T) {
// Create states
fillStates := func(schema *StateSchema) (states []*State) {
numStates := 16383
for i := 0; i < numStates; i++ {
state, err := schema.CreateState()
require.NoError(t, err)
err = state.Set("F_COUNTER", i)
require.NoError(t, err)
err = state.Set("F_ZEROS", 0)
require.NoError(t, err)
states = append(states, state)
}
return states
}

schema := testPipelineComparativeCreateSchema(t, "")
zschema := testPipelineComparativeCreateSchema(t, "z")
zstdschema := testPipelineComparativeCreateSchema(t, "zstd")
tzschema := testPipelineComparativeCreateSchema(t, "t:z")

numStates := 2048 // 2^11 - 1
states := []*State{}
zstates := []*State{}
tzstates := []*State{}

// Create states

for i := 0; i < numStates; i++ {
state, err := schema.CreateState()
tzstdschema := testPipelineComparativeCreateSchema(t, "t:zstd")

states := fillStates(schema)
zstates := fillStates(zschema)
zstdstates := fillStates(zstdschema)
tzstates := fillStates(tzschema)
tzstdstates := fillStates(tzstdschema)

checkPipeline := func(states []*State, schema *StateSchema) (edata []byte) {
// Encode
queue := CreateStateQueue(schema)
err := queue.PushAll(states)
require.NoError(t, err)
err = state.Set("F_COUNTER", i)
edata, err = queue.Encode()
require.NoError(t, err)
err = state.Set("F_ZEROS", 0)
require.NoError(t, err)
states = append(states, state)

zstate, err := zschema.CreateState()
// Decode
err = queue.Decode(edata)
require.NoError(t, err)
err = zstate.Set("F_COUNTER", i)
dstates, err := queue.GetStates()
require.NoError(t, err)
err = zstate.Set("F_ZEROS", 0)
require.NoError(t, err)
zstates = append(zstates, zstate)

tzstate, err := tzschema.CreateState()
require.NoError(t, err)
err = tzstate.Set("F_COUNTER", i)
require.NoError(t, err)
err = tzstate.Set("F_ZEROS", 0)
require.NoError(t, err)
tzstates = append(tzstates, tzstate)
// Check states == decoded
testEqualStates(t, states, dstates)
return
}

// -----------------

// Encode
queue := CreateStateQueue(schema)
err := queue.PushAll(states)
require.NoError(t, err)
data, err := queue.Encode()
require.NoError(t, err)

// Decode
err = queue.Decode(data)
require.NoError(t, err)
dstates, err := queue.GetStates()
require.NoError(t, err)

// Check states == decoded
testEqualStates(t, states, dstates)

// -----------------
// No compression
data := checkPipeline(states, schema)

// Encode with z
zqueue := CreateStateQueue(zschema)
err = zqueue.PushAll(zstates)
require.NoError(t, err)
zdata, err := zqueue.Encode()
require.NoError(t, err)
zdata := checkPipeline(zstates, zschema)

// Decode with z
err = zqueue.Decode(zdata)
require.NoError(t, err)
zstates, err = zqueue.GetStates()
require.NoError(t, err)

// Check states == decoded states with z
testEqualStates(t, states, zstates)

// -----------------
// Encode with zstd
zstddata := checkPipeline(zstdstates, zstdschema)

// Encode with t:z
tzqueue := CreateStateQueue(tzschema)
err = tzqueue.PushAll(tzstates)
require.NoError(t, err)
tzdata, err := tzqueue.Encode()
require.NoError(t, err)
tzdata := checkPipeline(tzstates, tzschema)

// Decode with t:z
err = tzqueue.Decode(tzdata)
require.NoError(t, err)
tzstates, err = tzqueue.GetStates()
require.NoError(t, err)
// Encode with t:zstd
tzstddata := checkPipeline(tzstdstates, tzstdschema)

// Check states == decoded states with t:z
testEqualStates(t, states, tzstates)
// -----------------

// ----------------
fmt.Printf("PipelineComparative: %d states, %d bytes, %d bytes (z), %d bytes (zstd), %d bytes (t:z), %d bytes (t:zstd)\n",
len(states), len(data), len(zdata), len(zstddata), len(tzdata), len(tzstddata))

// Check sizes of encoded data with no compression, "z" and "t:z"
require.Less(t, len(zdata), len(data))
require.Less(t, len(zstddata), len(data))
require.Less(t, len(tzdata), len(zdata))
require.Less(t, len(tzstddata), len(zstddata))

}

Expand Down Expand Up @@ -324,7 +298,7 @@ func testPipelineComparativeCreateSchema(t *testing.T, encoderPipeline string) *
{
Name: "F_COUNTER",
Type: T_UINT,
Size: 11,
Size: 14,
},
{
Name: "F_ZEROS",
Expand Down
35 changes: 16 additions & 19 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

const (
MOD_GZIP = "z"
MOD_ZSTD = "zstd"
MOD_BITTRANS = "t"
)

Expand Down Expand Up @@ -278,27 +279,23 @@ func (s *StateSchema) updateByteSize() {
}

func (e *StateSchema) setPipelines(pipelineRaw string) error {
pipelineRegex, err := regexp.Compile(`^([^:]+)(:[^:]+)*$`)
if err != nil {
return err
}
indexes := pipelineRegex.FindStringSubmatchIndex(pipelineRaw)
modifiers := []string{}
for indexes != nil {
modifier := string(pipelineRaw[indexes[2]:indexes[3]])
modifiers = append(modifiers, modifier)
fromIdx := indexes[3]
if indexes[3] < len(pipelineRaw) {
fromIdx++
if pipelineRaw != "" {
pipelineRegex, err := regexp.Compile(`^([^:]+)(:[^:]+)*$`)
if err != nil {
return err
}
pipelineRaw = pipelineRaw[fromIdx:]
indexes = pipelineRegex.FindStringSubmatchIndex(pipelineRaw)
}
for _, mod := range modifiers {
switch mod {
case MOD_GZIP, MOD_BITTRANS:
default:
return fmt.Errorf("\"%s\" is not a modifier", mod)
ok := pipelineRegex.MatchString(pipelineRaw)
if !ok {
return fmt.Errorf("wrong pipeline format")
}
modifiers = strings.Split(pipelineRaw, ":")
for _, mod := range modifiers {
switch mod {
case MOD_GZIP, MOD_ZSTD, MOD_BITTRANS:
default:
return fmt.Errorf("\"%s\" is not a modifier", mod)
}
}
}
e.encoderPipeline = modifiers
Expand Down
48 changes: 48 additions & 0 deletions zstd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package bstates

import (
"bytes"
"github.com/klauspost/compress/zstd"
"io"

"github.com/nayarsystems/buffer/buffer"
)

func ZstdEnc(b *buffer.Buffer) (*buffer.Buffer, error) {
buf := new(bytes.Buffer)
wr, err := zstd.NewWriter(buf, zstd.WithEncoderConcurrency(1))
if err != nil {
return nil, err
}
_, err = wr.Write(b.GetRawBuffer()[:b.GetByteSize()])
wr.Close()
if err != nil {
return nil, err
}
var out []byte
out, err = io.ReadAll(buf)
if err != nil {
return nil, err
}
outb := &buffer.Buffer{}
outb.InitFromRawBuffer(out)
return outb, nil
}

func ZstdDec(b *buffer.Buffer) (*buffer.Buffer, error) {
r := bytes.NewReader(b.GetRawBuffer()[:b.GetByteSize()])
var gzr *zstd.Decoder
gzr, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(1))
if err != nil {
return nil, err
}
var out []byte
out, err = io.ReadAll(gzr)
gzr.Close()
if err != nil {
return nil, err
}
outb := &buffer.Buffer{}
outb.InitFromRawBuffer(out)
return outb, nil
}
36 changes: 36 additions & 0 deletions zstd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package bstates

import (
"testing"

"github.com/nayarsystems/buffer/buffer"
"github.com/nayarsystems/buffer/shuffling"
"github.com/stretchr/testify/require"
)

func Test_ZstdBufferTransposeCompression(t *testing.T) {
b := &buffer.Buffer{}
b.InitFromRawBuffer(make([]byte, 10000))

for i := 0; i < b.GetByteSize(); i++ {
b.GetRawBuffer()[i] = uint8(i)
}

bt, err := shuffling.TransposeBits(b, 8)
require.Nil(t, err)
require.Equal(t, b.GetByteSize(), bt.GetByteSize())

eb, err := ZstdEnc(b)
require.Nil(t, err)
db, err := ZstdDec(eb)
require.Nil(t, err)
require.Equal(t, b, db)

ebt, err := ZstdEnc(bt)
require.Nil(t, err)
dbt, err := ZstdDec(ebt)
require.Nil(t, err)
require.Equal(t, bt, dbt)

require.Less(t, ebt.GetByteSize(), eb.GetByteSize())
}

0 comments on commit 2b41237

Please sign in to comment.