From 1604100f45a7b6f9a4408033721f04bf982e2ec6 Mon Sep 17 00:00:00 2001 From: Evgeniy Firsov Date: Fri, 4 Jan 2019 12:47:26 -0800 Subject: [PATCH] Better fields filtering in encoders --- changelog/mysql.go | 2 +- encoder/avro.go | 65 ++++-------------------- encoder/encoder.go | 109 +++++++++++++++++++++++++++++++++++----- encoder/encoder_test.go | 11 ++++ encoder/json.go | 90 ++++++++++----------------------- encoder/msgpack.go | 18 ++++--- encoder/sql.go | 34 +++++++------ encoder/z.go | 2 +- main_test.go | 10 ++-- streamer/buffer.go | 22 ++++---- streamer/streamer.go | 4 +- 11 files changed, 196 insertions(+), 171 deletions(-) diff --git a/changelog/mysql.go b/changelog/mysql.go index 963921d..d08dfc1 100644 --- a/changelog/mysql.go +++ b/changelog/mysql.go @@ -211,7 +211,7 @@ func (b *mysqlReader) createProducer(tn string, t *state.Row) (pipe.Producer, er func (b *mysqlReader) addNewTable(t *state.Row) bool { b.log.Infof("Adding table to MySQL binlog reader (%v,%v,%v,%v,%v,%v)", t.Service, t.Db, t.Table, t.Output, t.Version, t.OutputFormat) - enc, err := encoder.Create(t.OutputFormat, t.Service, t.Db, t.Table, t.Input, t.Output, t.Version) + enc, err := encoder.Create(t.OutputFormat, t.Service, t.Db, t.Table, t.Input, t.Output, t.Version, true) if log.EL(b.log, err) { return false } diff --git a/encoder/avro.go b/encoder/avro.go index 1eb9a0c..295e1f7 100644 --- a/encoder/avro.go +++ b/encoder/avro.go @@ -40,21 +40,16 @@ func init() { //avroEncoder implements Encoder interface for Avro format type avroEncoder struct { - Service string - Db string - Table string - Input string - Output string - Version int + encoder + codec goavro.Codec setter *goavro.RecordSetter inSchema *types.TableSchema - filter []int outSchema *types.AvroSchema } -func initAvroEncoder(service, db, table, input string, output string, version int) (Encoder, error) { - return &avroEncoder{Service: service, Db: db, Table: table, Input: input, Output: output, Version: version}, nil +func initAvroEncoder(service, db, table, input string, output string, version int, filtering bool) (Encoder, error) { + return &avroEncoder{encoder: encoder{Service: service, Db: db, Table: table, Input: input, Output: output, Version: version, filterEnabled: filtering}}, nil } //Type returns type of the encoder interface (faster then type assertion?) @@ -125,8 +120,8 @@ func convertCommonFormatToAvroRecord(rs goavro.RecordSetter, cfEvent *types.Comm return nil } - for i, j := 0, 0; i < len(*cfEvent.Fields); i++ { - if filteredField(filter, i, &j) { + for i := 0; i < len(*cfEvent.Fields); i++ { + if filter[i] == -1 { continue } field := (*cfEvent.Fields)[i] @@ -181,18 +176,12 @@ func (e *avroEncoder) UpdateCodec() error { return err } - if len(e.inSchema.Columns)-(len(e.outSchema.Fields)-numMetadataFields) < 0 { - err = fmt.Errorf("input schema has less fields than output schema") - log.E(err) - return err - } - e.codec, e.setter, err = SchemaCodecHelper(e.outSchema) if log.E(err) { return err } - e.filter = prepareFilter(e.inSchema, e.outSchema, numMetadataFields) + e.filter = prepareFilter(e.inSchema, e.outSchema, nil, e.filterEnabled) log.Debugf("Schema codec (%v) updated", e.Type()) @@ -298,9 +287,9 @@ func fixAvroFieldType(i interface{}, dtype string, ftype string) (interface{}, e //TODO: Remove ability to encode schema, so as receiver should have schema to decode //the record, so no point in pushing schema into stream func fillAvroFields(r *goavro.Record, row *[]interface{}, s *types.TableSchema, filter []int) error { - for i, j := 0, 0; i < len(s.Columns); i++ { + for i := 0; i < len(s.Columns); i++ { //Skip fields which are not present in output schema - if filteredField(filter, i, &j) { + if filter[i] == -1 { continue } v, err := fixAvroFieldType((*row)[i], s.Columns[i].DataType, s.Columns[i].Type) @@ -359,38 +348,6 @@ func convertRowToAvroFormat(tp int, row *[]interface{}, s *types.TableSchema, se return nil } -func prepareFilter(in *types.TableSchema, out *types.AvroSchema, numMetaFields int) []int { - if out == nil { - return nil - } - - nfiltered := len(in.Columns) - if out.Fields != nil { - nfiltered = nfiltered - (len(out.Fields) - numMetaFields) - } - if nfiltered == 0 { - return nil - } - - f := out.Fields - filter := make([]int, 0) - var j int - for i := 0; i < len(in.Columns); i++ { - //Primary key cannot be filtered - if (i-j) >= len(f) || in.Columns[i].Name != f[i-j].Name { - if in.Columns[i].Key != "PRI" { - log.Debugf("Field %v will be filtered", in.Columns[i].Name) - filter = append(filter, i) - } - j++ - } - } - - log.Debugf("len=%v, filtered fields (%v)", len(filter), filter) - - return filter -} - func (e *avroEncoder) UnwrapEvent(data []byte, cfEvent *types.CommonFormatEvent) (payload []byte, err error) { return nil, fmt.Errorf("avro encoder doesn't support wrapping") } @@ -399,8 +356,8 @@ func (e *avroEncoder) decodeEventFields(c *types.CommonFormatEvent, r *goavro.Re hasNonNil := false c.Fields = new([]types.CommonFormatField) - for i, j := 0, 0; i < len(e.inSchema.Columns); i++ { - if filteredField(e.filter, i, &j) { + for i := 0; i < len(e.inSchema.Columns); i++ { + if e.filter[i] == -1 { continue } n := e.inSchema.Columns[i].Name diff --git a/encoder/encoder.go b/encoder/encoder.go index aeb6aac..f718079 100644 --- a/encoder/encoder.go +++ b/encoder/encoder.go @@ -26,11 +26,24 @@ import ( "strings" "time" + "github.com/uber/storagetapper/log" "github.com/uber/storagetapper/types" ) +type encoder struct { + Service string + Db string + Table string + Input string + Output string + Version int + + filter []int //Contains indexes of fields which are not in output schema + filterEnabled bool +} + //encoderConstructor initializes encoder plugin -type encoderConstructor func(service, db, table, input string, output string, version int) (Encoder, error) +type encoderConstructor func(service, db, table, input string, output string, version int, filtering bool) (Encoder, error) //plugins insert their constructors into this map var encoders map[string]encoderConstructor @@ -71,14 +84,14 @@ func Encoders() []string { } //InitEncoder constructs encoder without updating schema -func InitEncoder(encType, svc, sdb, tbl, input string, output string, version int) (Encoder, error) { +func InitEncoder(encType, svc, sdb, tbl, input string, output string, version int, filtering bool) (Encoder, error) { init := encoders[strings.ToLower(encType)] if init == nil { return nil, fmt.Errorf("unsupported encoder: %s", strings.ToLower(encType)) } - enc, err := init(svc, sdb, tbl, input, output, version) + enc, err := init(svc, sdb, tbl, input, output, version, filtering) if err != nil { return nil, err } @@ -88,8 +101,8 @@ func InitEncoder(encType, svc, sdb, tbl, input string, output string, version in //Create is a factory which create encoder of given type for given service, db, //table, input, output, version -func Create(encType, svc, sdb, tbl, input string, output string, version int) (Encoder, error) { - enc, err := InitEncoder(encType, svc, sdb, tbl, input, output, version) +func Create(encType, svc, sdb, tbl, input string, output string, version int, filtering bool) (Encoder, error) { + enc, err := InitEncoder(encType, svc, sdb, tbl, input, output, version, filtering) if err != nil { return nil, err } @@ -125,14 +138,6 @@ func GetCommonFormatKey(cf *types.CommonFormatEvent) string { return key } -func filteredField(filter []int, i int, j *int) bool { - if *j < len(filter) && filter[*j] == i { - (*j)++ - return true - } - return false -} - //WrapEvent prepend provided payload with CommonFormat like event func WrapEvent(outputFormat string, key string, bd []byte, seqno uint64) ([]byte, error) { akey := make([]interface{}, 1) @@ -159,3 +164,81 @@ func WrapEvent(outputFormat string, key string, bd []byte, seqno uint64) ([]byte return buf.Bytes(), nil } + +type fmapItem struct { + inPos int + outPos int +} + +func prepareFilter(in *types.TableSchema, out *types.AvroSchema, cf *types.CommonFormatEvent, filterEnabled bool) []int { + log.Debugf("prepareFilter in_len=%v out=%v cf=%v", len(in.Columns), out, cf) + filter := make([]int, len(in.Columns)) + fmap := make(map[string]*fmapItem, len(in.Columns)) + + for i := 0; i < len(in.Columns); i++ { + fmap[in.Columns[i].Name] = &fmapItem{i, -1} + } + + if filterEnabled && out != nil && out.Fields != nil { + for i := 0; i < len(out.Fields); i++ { + t := fmap[out.Fields[i].Name] + if t != nil { + t.outPos = i + } + } + } else if filterEnabled && cf != nil && cf.Fields != nil { + for i := 0; i < len(*cf.Fields); i++ { + t := fmap[(*cf.Fields)[i].Name] + if t != nil { + t.outPos = i + } + } + } else { // If schema is not defined or filter disabled produce all the fields as is + for i := 0; i < len(in.Columns); i++ { + t := fmap[in.Columns[i].Name] + t.outPos = i + } + } + + for _, v := range fmap { + filter[v.inPos] = v.outPos + } + + log.Debugf("prepareFilter filter=%+v", filter) + + return filter +} + +/* +func prepareFilter(in *types.TableSchema, out *types.AvroSchema, numMetaFields int) []int { + if out == nil { + return nil + } + + nfiltered := len(in.Columns) + if out.Fields != nil { + nfiltered = nfiltered - (len(out.Fields) - numMetaFields) + } + if nfiltered == 0 { + return nil + } + + f := out.Fields + filter := make([]int, 0) + var j int + for i := 0; i < len(in.Columns); i++ { + //Primary key cannot be filtered + if (i-j) >= len(f) || in.Columns[i].Name != f[i-j].Name { + if in.Columns[i].Key != "PRI" { + log.Debugf("Field %v will be filtered", in.Columns[i].Name) + filter = append(filter, i) + } + j++ + } + } + + log.Debugf("len=%v, filtered fields (%v)", len(filter), filter) + + return filter +} +*/ diff --git a/encoder/encoder_test.go b/encoder/encoder_test.go index 7761ea0..01005de 100644 --- a/encoder/encoder_test.go +++ b/encoder/encoder_test.go @@ -970,6 +970,17 @@ func TestUnwrapBinaryKey(t *testing.T) { test.Assert(t, s == key, "keys should be equal") } +func TestPrepareFilter(t *testing.T) { + var tests = struct { + in []types.ColumnSchema + out []types.AvroFields + result []int + }{ + in: []types.ColumnSchema{{Name: "a"}, {Name: "b"}, {Name: "c"}}, + out: []types.AvroFields{{Name: "a"}, {Name: "b"}, {Name: "c"}}, + } +} + func ExecSQL(db *sql.DB, t test.Failer, query string) { test.CheckFail(util.ExecSQL(db, query), t) } diff --git a/encoder/json.go b/encoder/json.go index 5ab0d96..90f9c87 100644 --- a/encoder/json.go +++ b/encoder/json.go @@ -28,7 +28,6 @@ import ( "strings" "time" - "github.com/uber/storagetapper/log" "github.com/uber/storagetapper/state" "github.com/uber/storagetapper/types" ) @@ -39,14 +38,9 @@ func init() { //jsonEncoder implements Encoder interface into common format type jsonEncoder struct { - Service string - Db string - Table string - Input string - Output string - Version int + encoder + inSchema *types.TableSchema - filter []int //Contains indexes of fields which are not in output schema outSchema *types.CommonFormatEvent } @@ -65,8 +59,8 @@ var GenTime GenTimeFunc = genTime //ZeroTime in UTC var ZeroTime = time.Time{}.In(time.UTC) -func initJSONEncoder(service, db, table, input string, output string, version int) (Encoder, error) { - return &jsonEncoder{Service: service, Db: db, Table: table, Input: input, Output: output, Version: version}, nil +func initJSONEncoder(service, db, table, input string, output string, version int, filtering bool) (Encoder, error) { + return &jsonEncoder{encoder: encoder{Service: service, Db: db, Table: table, Input: input, Output: output, Version: version, filterEnabled: filtering}}, nil } //Type returns this encoder type @@ -90,20 +84,24 @@ func (e *jsonEncoder) Row(tp int, row *[]interface{}, seqno uint64, _ time.Time) return e.CommonFormatEncode(cf) } -func filterCommonFormat(filter []int, cf *types.CommonFormatEvent) *types.CommonFormatEvent { - if len(filter) == 0 || cf.Fields == nil || len(*cf.Fields) == 0 { - return cf +func filterCommonFormat(filter []int, cf *types.CommonFormatEvent) (*types.CommonFormatEvent, error) { + if cf.Fields == nil || len(*cf.Fields) == 0 { + return cf, nil + } + + if len(*cf.Fields) != len(filter) { + return nil, fmt.Errorf("number of fields in input event(%v) should be equal to number of fields in the input schema(%v)", len(*cf.Fields), len(filter)) } c := &types.CommonFormatEvent{SeqNo: cf.SeqNo, Type: cf.Type, Timestamp: cf.Timestamp, Key: cf.Key, Fields: new([]types.CommonFormatField)} - for i, j := 0, 0; i < len(*cf.Fields); i++ { - if filteredField(filter, i, &j) { + for i := 0; i < len(*cf.Fields); i++ { + if filter[i] == -1 { continue } *c.Fields = append(*c.Fields, (*cf.Fields)[i]) } - return c + return c, nil } //CommonFormat encodes common format event into byte array @@ -114,7 +112,11 @@ func (e *jsonEncoder) CommonFormat(cf *types.CommonFormatEvent) ([]byte, error) return nil, err } } - cf = filterCommonFormat(e.filter, cf) + var err error + cf, err = filterCommonFormat(e.filter, cf) + if err != nil { + return nil, err + } return e.CommonFormatEncode(cf) } @@ -136,20 +138,14 @@ func (e *jsonEncoder) UpdateCodec() error { if err != nil { return err } - if c.Type != "schema" { + if c.Type != "schema" && c.Type != "record" { return nil //return fmt.Errorf("Broken schema in state for %v,%v,%v", e.Service, e.Db, e.Table) } e.outSchema = c - - if e.outSchema.Fields != nil && len(e.inSchema.Columns)-len(*e.outSchema.Fields) < 0 { - err = fmt.Errorf("input schema has less fields than output schema") - log.E(err) - return err - } } - e.prepareFilter() + e.filter = prepareFilter(e.inSchema, nil, e.outSchema, e.filterEnabled) return err } @@ -207,15 +203,14 @@ func (e *jsonEncoder) fixFieldTypes(res *types.CommonFormatEvent) (err error) { k := 0 //Restore field types according to schema - if e.inSchema != nil && res.Type != "schema" { - var j int + if e.inSchema != nil && res.Type != "schema" && res.Type != "record" { for i := 0; i < len(e.inSchema.Columns); i++ { - if filteredField(e.filter, i, &j) { + if e.filter[i] == -1 { continue } - if res.Fields != nil && i-j < len(*res.Fields) { - f := &(*res.Fields)[i-j] + if res.Fields != nil { + f := &(*res.Fields)[e.filter[i]] f.Value, err = fixFieldType(f.Value, e.inSchema.Columns[i].DataType) if err != nil { return err @@ -271,8 +266,8 @@ func fillCommonFormatKey(e *types.CommonFormatEvent, row *[]interface{}, s *type func fillCommonFormatFields(c *types.CommonFormatEvent, row *[]interface{}, schema *types.TableSchema, filter []int) { f := make([]types.CommonFormatField, 0, len(schema.Columns)) - for i, j := 0, 0; i < len(schema.Columns); i++ { - if filteredField(filter, i, &j) { + for i := 0; i < len(schema.Columns); i++ { + if filter[i] == -1 { continue } var v interface{} @@ -341,37 +336,6 @@ func (e *jsonEncoder) convertRowToCommonFormat(tp int, row *[]interface{}, schem return &c } -func (e *jsonEncoder) prepareFilter() { - if e.outSchema == nil { - return - } - - nfiltered := len(e.inSchema.Columns) - if e.outSchema.Fields != nil { - nfiltered = nfiltered - len(*e.outSchema.Fields) - } - - if nfiltered == 0 { - return - } - - f := e.outSchema.Fields - e.filter = make([]int, 0) - var j int - for i := 0; i < len(e.inSchema.Columns); i++ { - //Primary key cannot be filtered - if f == nil || len(*f) == 0 || (i-j) >= len(*f) || e.inSchema.Columns[i].Name != (*f)[i-j].Name { - if e.inSchema.Columns[i].Key != "PRI" { - log.Debugf("Field %v will be filtered", e.inSchema.Columns[i].Name) - e.filter = append(e.filter, i) - } - j++ - } - } - - log.Debugf("len=%v, filtered fields (%v)", len(e.filter), e.filter) -} - // UnwrapEvent splits the event header and payload // cfEvent is populated with the 'header' information aka the first decoding. // Data after the header returned in the payload parameter diff --git a/encoder/msgpack.go b/encoder/msgpack.go index a868549..80b1442 100644 --- a/encoder/msgpack.go +++ b/encoder/msgpack.go @@ -38,8 +38,8 @@ type msgPackEncoder struct { jsonEncoder } -func initMsgPackEncoder(service, db, table, input string, output string, version int) (Encoder, error) { - return &msgPackEncoder{jsonEncoder{Service: service, Db: db, Table: table, Input: input, Output: output, Version: version}}, nil +func initMsgPackEncoder(service, db, table, input string, output string, version int, filtering bool) (Encoder, error) { + return &msgPackEncoder{jsonEncoder{encoder: encoder{Service: service, Db: db, Table: table, Input: input, Output: output, Version: version, filterEnabled: filtering}}}, nil } //Row encodes row into CommonFormat @@ -61,7 +61,11 @@ func (e *msgPackEncoder) CommonFormat(cf *types.CommonFormatEvent) ([]byte, erro return nil, err } } - cf = filterCommonFormat(e.filter, cf) + var err error + cf, err = filterCommonFormat(e.filter, cf) + if err != nil { + return nil, err + } return cf.MarshalMsg(nil) } @@ -104,12 +108,12 @@ func (e *msgPackEncoder) fixFieldTypes(cf *types.CommonFormatEvent) (err error) //Restore field types according to schema //MsgPack doesn't preserve int type size, so fix it if e.inSchema != nil && cf.Type != "schema" { - for i, j := 0, 0; i < len(e.inSchema.Columns); i++ { - if filteredField(e.filter, i, &j) { + for i := 0; i < len(e.inSchema.Columns); i++ { + if e.filter[i] == -1 { continue } - if cf.Fields != nil && i-j < len(*cf.Fields) { - f := &(*cf.Fields)[i-j] + if cf.Fields != nil { + f := &(*cf.Fields)[e.filter[i]] f.Value = e.fixFieldType(e.inSchema.Columns[i].DataType, f.Value) } diff --git a/encoder/sql.go b/encoder/sql.go index 938ffdc..26f5250 100644 --- a/encoder/sql.go +++ b/encoder/sql.go @@ -39,6 +39,8 @@ func init() { } type sqlEncoder struct { + encoder + typ string c jsonEncoder insertPrefix string @@ -48,24 +50,24 @@ type sqlEncoder struct { idempotentInsert bool } -func initEncoder(tp, service, db, table, input string, output string, version int, quote string, ii bool) (Encoder, error) { - return &sqlEncoder{typ: tp, c: jsonEncoder{Service: service, Db: db, Table: table, Input: input, Output: output, Version: version}, identQuote: quote, idempotentInsert: ii}, nil +func initEncoder(tp, service, db, table, input string, output string, version int, quote string, ii bool, filtering bool) (Encoder, error) { + return &sqlEncoder{typ: tp, c: jsonEncoder{encoder: encoder{Service: service, Db: db, Table: table, Input: input, Output: output, Version: version, filterEnabled: filtering}}, identQuote: quote, idempotentInsert: ii}, nil } -func initMySQLEncoder(service, db, table, input string, output string, version int) (Encoder, error) { - return initEncoder("mysql", service, db, table, input, output, version, "`", false) +func initMySQLEncoder(service, db, table, input string, output string, version int, filtering bool) (Encoder, error) { + return initEncoder("mysql", service, db, table, input, output, version, "`", false, filtering) } -func initAnsiSQLEncoder(service, db, table, input string, output string, version int) (Encoder, error) { - return initEncoder("ansisql", service, db, table, input, output, version, "\"", false) +func initAnsiSQLEncoder(service, db, table, input string, output string, version int, filtering bool) (Encoder, error) { + return initEncoder("ansisql", service, db, table, input, output, version, "\"", false, filtering) } -func initMySQLIdempotentEncoder(service, db, table, input string, output string, version int) (Encoder, error) { - return initEncoder("mysql_idempotent", service, db, table, input, output, version, "`", true) +func initMySQLIdempotentEncoder(service, db, table, input string, output string, version int, filtering bool) (Encoder, error) { + return initEncoder("mysql_idempotent", service, db, table, input, output, version, "`", true, filtering) } -func initAnsiSQLIdempotentEncoder(service, db, table, input string, output string, version int) (Encoder, error) { - return initEncoder("ansisql_idempotent", service, db, table, input, output, version, "\"", true) +func initAnsiSQLIdempotentEncoder(service, db, table, input string, output string, version int, filtering bool) (Encoder, error) { + return initEncoder("ansisql_idempotent", service, db, table, input, output, version, "\"", true, filtering) } //Schema returns table schema @@ -179,11 +181,11 @@ func (e *sqlEncoder) appendFieldNames(schema *types.TableSchema, filter []int, p if !pk { fieldNames += e.quotedIdent("seqno") } - for i, j := 0, 0; i < len(schema.Columns); i++ { + for i := 0; i < len(schema.Columns); i++ { if pk && schema.Columns[i].Key != "PRI" { continue } - if filteredField(filter, i, &j) { + if e.filter[i] == -1 { continue } if len(fieldNames) != 0 { @@ -196,8 +198,8 @@ func (e *sqlEncoder) appendFieldNames(schema *types.TableSchema, filter []int, p func (e *sqlEncoder) appendSchemaFields(schema *types.TableSchema, filter []int) string { fieldNames := e.quotedIdent("seqno") + " BIGINT NOT NULL, " - for i, j := 0, 0; i < len(schema.Columns); i++ { - if filteredField(filter, i, &j) { + for i := 0; i < len(schema.Columns); i++ { + if e.filter[i] == -1 { continue } fieldNames += e.quotedIdent(schema.Columns[i].Name) + " " + schema.Columns[i].Type @@ -215,8 +217,8 @@ func (e *sqlEncoder) appendSchema(schema *types.TableSchema, filter []int) strin func (e *sqlEncoder) appendFields(b *bytes.Buffer, row []interface{}, cf *types.CommonFormatEvent, seqno uint64) { bufWrite(b, strconv.FormatUint(seqno, 10)) - for i, j := 0, 0; i < len(e.c.inSchema.Columns); i++ { - if filteredField(e.c.filter, i, &j) { + for i := 0; i < len(e.c.inSchema.Columns); i++ { + if e.filter[i] == -1 { continue } _ = b.WriteByte(',') diff --git a/encoder/z.go b/encoder/z.go index 3d473bc..33caa3e 100644 --- a/encoder/z.go +++ b/encoder/z.go @@ -36,7 +36,7 @@ import ( func init() { var err error - Internal, err = InitEncoder(config.Get().InternalEncoding, "", "", "", "", "", 0) + Internal, err = InitEncoder(config.Get().InternalEncoding, "", "", "", "", "", 0, true) if err != nil { panic(fmt.Sprintf("Set InternalEncoding to json. Error: %s", err.Error())) } diff --git a/main_test.go b/main_test.go index b5487cf..e40e761 100644 --- a/main_test.go +++ b/main_test.go @@ -193,6 +193,8 @@ func consumeEvents(c pipe.Consumer, format string, result []string, outEncoder e log.Errorf("Received : %+v %v", conv, len(b)) log.Errorf("Reference: %+v %v", v, len(v)) t.FailNow() + } else { + log.Debugf("Successfully matched : %+v %v", conv, len(b)) } log.Debugf("Successfully matched: %+v %v", conv, len(b)) } @@ -214,6 +216,8 @@ func waitAllEventsStreamed(format string, c pipe.Consumer, sseqno int, seqno int _ = c.FetchNext() } + log.Debugf("Events streamed upto seqno=%v", sseqno) + return sseqno } @@ -364,7 +368,7 @@ func testStep(inPipeType string, bufferFormat string, outPipeType string, outPip if outPipeFormat == "json" || outPipeFormat == "msgpack" { cfg.InternalEncoding = outPipeFormat var err error - encoder.Internal, err = encoder.InitEncoder(cfg.InternalEncoding, "", "", "", "", "", 0) + encoder.Internal, err = encoder.InitEncoder(cfg.InternalEncoding, "", "", "", "", "", 0, true) require.NoError(t, err) } @@ -424,7 +428,7 @@ func testStep(inPipeType string, bufferFormat string, outPipeType string, outPip addTable(outPipeFormat, "1", outPipeType, t) // Encoder/decoder for the table added above - outEncoder, err := encoder.Create(outPipeFormat, "e2e_test_svc1", "e2e_test_db1", "e2e_test_table1", "mysql", outPipeType, 0) + outEncoder, err := encoder.Create(outPipeFormat, "e2e_test_svc1", "e2e_test_db1", "e2e_test_table1", "mysql", outPipeType, 0, true) require.NoError(t, err) // Wait snapshot to finish before sending more data otherwise everything even following events will be read @@ -464,7 +468,7 @@ func testStep(inPipeType string, bufferFormat string, outPipeType string, outPip addTable(outPipeFormat, "2", outPipeType, t) // Encoder/decoder for the table added above - outEncoder2, err := encoder.Create(outPipeFormat, "e2e_test_svc1", "e2e_test_db1", "e2e_test_table2", "mysql", outPipeType, 0) + outEncoder2, err := encoder.Create(outPipeFormat, "e2e_test_svc1", "e2e_test_db1", "e2e_test_table2", "mysql", outPipeType, 0, true) require.NoError(t, err) // Wait snapshot to finish before sending more data otherwise everything even following events will be read diff --git a/streamer/buffer.go b/streamer/buffer.go index 2d018f6..08366e6 100644 --- a/streamer/buffer.go +++ b/streamer/buffer.go @@ -59,7 +59,17 @@ func (s *Streamer) encodeCommonFormat(outProducer pipe.Producer, data []byte) (k // log.Debugf("commont format received %v %v", cfEvent, cfEvent.Fields) - if cfEvent.Type == "insert" || cfEvent.Type == "delete" || cfEvent.Type == "schema" { + if cfEvent.Type == s.row.OutputFormat { + outMsg = payload + key = cfEvent.Key[0].(string) + if key == "" { + err = s.outEncoder.UpdateCodec() + if log.EL(s.log, err) { + return + } + } + // log.Debugf("Data in final format already. Forwarding. Key=%v, SeqNo=%v", key, cfEvent.SeqNo) + } else if cfEvent.Type == "insert" || cfEvent.Type == "delete" || cfEvent.Type == "schema" { outMsg, err = s.outEncoder.CommonFormat(cfEvent) if log.EL(s.log, err) { return @@ -76,16 +86,6 @@ func (s *Streamer) encodeCommonFormat(outProducer pipe.Producer, data []byte) (k outMsg = nil return } - } else if cfEvent.Type == s.row.OutputFormat { - outMsg = payload - key = cfEvent.Key[0].(string) - if key == "" { - err = s.outEncoder.UpdateCodec() - if log.EL(s.log, err) { - return - } - } - // log.Debugf("Data in final format already. Forwarding. Key=%v, SeqNo=%v", key, cfEvent.SeqNo) } else if cfEvent.Type == s.envEncoder.Type() { var ev *types.CommonFormatEvent ev, err = s.envEncoder.DecodeEvent(payload) diff --git a/streamer/streamer.go b/streamer/streamer.go index 79aae77..da7d34f 100644 --- a/streamer/streamer.go +++ b/streamer/streamer.go @@ -273,7 +273,7 @@ func (s *Streamer) start(cfg *config.AppConfig) bool { return false } - s.outEncoder, err = encoder.Create(s.row.OutputFormat, s.row.Service, s.row.Db, s.row.Table, s.row.Input, s.row.Output, s.row.Version) + s.outEncoder, err = encoder.Create(s.row.OutputFormat, s.row.Service, s.row.Db, s.row.Table, s.row.Input, s.row.Output, s.row.Version, !cfg.ChangelogBuffer) if log.EL(s.log, err) { if consumer != nil { log.E(consumer.CloseOnFailure()) @@ -293,7 +293,7 @@ func (s *Streamer) start(cfg *config.AppConfig) bool { if cfg.ChangelogBuffer { //Transit format encoder, aka envelope encoder //It must be per table to be able to decode schematized events - s.envEncoder, err = encoder.Create(encoder.Internal.Type(), s.row.Service, s.row.Db, s.row.Table, s.row.Input, s.row.Output, s.row.Version) + s.envEncoder, err = encoder.Create(encoder.Internal.Type(), s.row.Service, s.row.Db, s.row.Table, s.row.Input, s.row.Output, s.row.Version, true) if log.EL(s.log, err) { log.E(consumer.CloseOnFailure()) return false