Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Better fields filtering in encoders #27

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion changelog/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (b *mysqlReader) createProducer(tn string, t *state.Row) (pipe.Producer, er

func (b *mysqlReader) addNewTable(t *state.Row) bool {
b.log.Infof("Adding table to MySQL binlog reader (%v,%v,%v,%v,%v,%v)", t.Service, t.Db, t.Table, t.Output, t.Version, t.OutputFormat)
enc, err := encoder.Create(t.OutputFormat, t.Service, t.Db, t.Table, t.Input, t.Output, t.Version)
enc, err := encoder.Create(t.OutputFormat, t.Service, t.Db, t.Table, t.Input, t.Output, t.Version, true)
if log.EL(b.log, err) {
return false
}
Expand Down
65 changes: 11 additions & 54 deletions encoder/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,16 @@ func init() {

//avroEncoder implements Encoder interface for Avro format
type avroEncoder struct {
Service string
Db string
Table string
Input string
Output string
Version int
encoder

codec goavro.Codec
setter *goavro.RecordSetter
inSchema *types.TableSchema
filter []int
outSchema *types.AvroSchema
}

func initAvroEncoder(service, db, table, input string, output string, version int) (Encoder, error) {
return &avroEncoder{Service: service, Db: db, Table: table, Input: input, Output: output, Version: version}, nil
func initAvroEncoder(service, db, table, input string, output string, version int, filtering bool) (Encoder, error) {
return &avroEncoder{encoder: encoder{Service: service, Db: db, Table: table, Input: input, Output: output, Version: version, filterEnabled: filtering}}, nil
}

//Type returns type of the encoder interface (faster then type assertion?)
Expand Down Expand Up @@ -125,8 +120,8 @@ func convertCommonFormatToAvroRecord(rs goavro.RecordSetter, cfEvent *types.Comm
return nil
}

for i, j := 0, 0; i < len(*cfEvent.Fields); i++ {
if filteredField(filter, i, &j) {
for i := 0; i < len(*cfEvent.Fields); i++ {
if filter[i] == -1 {
continue
}
field := (*cfEvent.Fields)[i]
Expand Down Expand Up @@ -181,18 +176,12 @@ func (e *avroEncoder) UpdateCodec() error {
return err
}

if len(e.inSchema.Columns)-(len(e.outSchema.Fields)-numMetadataFields) < 0 {
err = fmt.Errorf("input schema has less fields than output schema")
log.E(err)
return err
}

e.codec, e.setter, err = SchemaCodecHelper(e.outSchema)
if log.E(err) {
return err
}

e.filter = prepareFilter(e.inSchema, e.outSchema, numMetadataFields)
e.filter = prepareFilter(e.inSchema, e.outSchema, nil, e.filterEnabled)

log.Debugf("Schema codec (%v) updated", e.Type())

Expand Down Expand Up @@ -298,9 +287,9 @@ func fixAvroFieldType(i interface{}, dtype string, ftype string) (interface{}, e
//TODO: Remove ability to encode schema, so as receiver should have schema to decode
//the record, so no point in pushing schema into stream
func fillAvroFields(r *goavro.Record, row *[]interface{}, s *types.TableSchema, filter []int) error {
for i, j := 0, 0; i < len(s.Columns); i++ {
for i := 0; i < len(s.Columns); i++ {
//Skip fields which are not present in output schema
if filteredField(filter, i, &j) {
if filter[i] == -1 {
continue
}
v, err := fixAvroFieldType((*row)[i], s.Columns[i].DataType, s.Columns[i].Type)
Expand Down Expand Up @@ -359,38 +348,6 @@ func convertRowToAvroFormat(tp int, row *[]interface{}, s *types.TableSchema, se
return nil
}

func prepareFilter(in *types.TableSchema, out *types.AvroSchema, numMetaFields int) []int {
if out == nil {
return nil
}

nfiltered := len(in.Columns)
if out.Fields != nil {
nfiltered = nfiltered - (len(out.Fields) - numMetaFields)
}
if nfiltered == 0 {
return nil
}

f := out.Fields
filter := make([]int, 0)
var j int
for i := 0; i < len(in.Columns); i++ {
//Primary key cannot be filtered
if (i-j) >= len(f) || in.Columns[i].Name != f[i-j].Name {
if in.Columns[i].Key != "PRI" {
log.Debugf("Field %v will be filtered", in.Columns[i].Name)
filter = append(filter, i)
}
j++
}
}

log.Debugf("len=%v, filtered fields (%v)", len(filter), filter)

return filter
}

func (e *avroEncoder) UnwrapEvent(data []byte, cfEvent *types.CommonFormatEvent) (payload []byte, err error) {
return nil, fmt.Errorf("avro encoder doesn't support wrapping")
}
Expand All @@ -399,8 +356,8 @@ func (e *avroEncoder) decodeEventFields(c *types.CommonFormatEvent, r *goavro.Re
hasNonNil := false
c.Fields = new([]types.CommonFormatField)

for i, j := 0, 0; i < len(e.inSchema.Columns); i++ {
if filteredField(e.filter, i, &j) {
for i := 0; i < len(e.inSchema.Columns); i++ {
if e.filter[i] == -1 {
continue
}
n := e.inSchema.Columns[i].Name
Expand Down
109 changes: 96 additions & 13 deletions encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,24 @@ import (
"strings"
"time"

"github.com/uber/storagetapper/log"
"github.com/uber/storagetapper/types"
)

type encoder struct {
Service string
Db string
Table string
Input string
Output string
Version int

filter []int //Contains indexes of fields which are not in output schema
filterEnabled bool
}

//encoderConstructor initializes encoder plugin
type encoderConstructor func(service, db, table, input string, output string, version int) (Encoder, error)
type encoderConstructor func(service, db, table, input string, output string, version int, filtering bool) (Encoder, error)

//plugins insert their constructors into this map
var encoders map[string]encoderConstructor
Expand Down Expand Up @@ -71,14 +84,14 @@ func Encoders() []string {
}

//InitEncoder constructs encoder without updating schema
func InitEncoder(encType, svc, sdb, tbl, input string, output string, version int) (Encoder, error) {
func InitEncoder(encType, svc, sdb, tbl, input string, output string, version int, filtering bool) (Encoder, error) {
init := encoders[strings.ToLower(encType)]

if init == nil {
return nil, fmt.Errorf("unsupported encoder: %s", strings.ToLower(encType))
}

enc, err := init(svc, sdb, tbl, input, output, version)
enc, err := init(svc, sdb, tbl, input, output, version, filtering)
if err != nil {
return nil, err
}
Expand All @@ -88,8 +101,8 @@ func InitEncoder(encType, svc, sdb, tbl, input string, output string, version in

//Create is a factory which create encoder of given type for given service, db,
//table, input, output, version
func Create(encType, svc, sdb, tbl, input string, output string, version int) (Encoder, error) {
enc, err := InitEncoder(encType, svc, sdb, tbl, input, output, version)
func Create(encType, svc, sdb, tbl, input string, output string, version int, filtering bool) (Encoder, error) {
enc, err := InitEncoder(encType, svc, sdb, tbl, input, output, version, filtering)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -125,14 +138,6 @@ func GetCommonFormatKey(cf *types.CommonFormatEvent) string {
return key
}

func filteredField(filter []int, i int, j *int) bool {
if *j < len(filter) && filter[*j] == i {
(*j)++
return true
}
return false
}

//WrapEvent prepend provided payload with CommonFormat like event
func WrapEvent(outputFormat string, key string, bd []byte, seqno uint64) ([]byte, error) {
akey := make([]interface{}, 1)
Expand All @@ -159,3 +164,81 @@ func WrapEvent(outputFormat string, key string, bd []byte, seqno uint64) ([]byte

return buf.Bytes(), nil
}

type fmapItem struct {
inPos int
outPos int
}

func prepareFilter(in *types.TableSchema, out *types.AvroSchema, cf *types.CommonFormatEvent, filterEnabled bool) []int {
log.Debugf("prepareFilter in_len=%v out=%v cf=%v", len(in.Columns), out, cf)
filter := make([]int, len(in.Columns))
fmap := make(map[string]*fmapItem, len(in.Columns))

for i := 0; i < len(in.Columns); i++ {
fmap[in.Columns[i].Name] = &fmapItem{i, -1}
}

if filterEnabled && out != nil && out.Fields != nil {
for i := 0; i < len(out.Fields); i++ {
t := fmap[out.Fields[i].Name]
if t != nil {
t.outPos = i
}
}
} else if filterEnabled && cf != nil && cf.Fields != nil {
for i := 0; i < len(*cf.Fields); i++ {
t := fmap[(*cf.Fields)[i].Name]
if t != nil {
t.outPos = i
}
}
} else { // If schema is not defined or filter disabled produce all the fields as is
for i := 0; i < len(in.Columns); i++ {
t := fmap[in.Columns[i].Name]
t.outPos = i
}
}

for _, v := range fmap {
filter[v.inPos] = v.outPos
}

log.Debugf("prepareFilter filter=%+v", filter)

return filter
}

/*
func prepareFilter(in *types.TableSchema, out *types.AvroSchema, numMetaFields int) []int {
if out == nil {
return nil
}

nfiltered := len(in.Columns)
if out.Fields != nil {
nfiltered = nfiltered - (len(out.Fields) - numMetaFields)
}
if nfiltered == 0 {
return nil
}

f := out.Fields
filter := make([]int, 0)
var j int
for i := 0; i < len(in.Columns); i++ {
//Primary key cannot be filtered
if (i-j) >= len(f) || in.Columns[i].Name != f[i-j].Name {
if in.Columns[i].Key != "PRI" {
log.Debugf("Field %v will be filtered", in.Columns[i].Name)
filter = append(filter, i)
}
j++
}
}

log.Debugf("len=%v, filtered fields (%v)", len(filter), filter)

return filter
}
*/
11 changes: 11 additions & 0 deletions encoder/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,17 @@ func TestUnwrapBinaryKey(t *testing.T) {
test.Assert(t, s == key, "keys should be equal")
}

func TestPrepareFilter(t *testing.T) {
var tests = struct {
in []types.ColumnSchema
out []types.AvroFields
result []int
}{
in: []types.ColumnSchema{{Name: "a"}, {Name: "b"}, {Name: "c"}},
out: []types.AvroFields{{Name: "a"}, {Name: "b"}, {Name: "c"}},
}
}

func ExecSQL(db *sql.DB, t test.Failer, query string) {
test.CheckFail(util.ExecSQL(db, query), t)
}
Expand Down
Loading