Skip to content

Commit

Permalink
proto: enable pooling for vreplication
Browse files Browse the repository at this point in the history
Signed-off-by: Vicent Marti <[email protected]>
  • Loading branch information
vmg committed Jun 7, 2021
1 parent f051584 commit 5b5ccd8
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 14 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ $(PROTO_GO_OUTS): minimaltools install_protoc-gen-go proto/*.proto
--go_out=. --plugin protoc-gen-go="${GOBIN}/protoc-gen-go" \
--go-grpc_out=. --plugin protoc-gen-go-grpc="${GOBIN}/protoc-gen-go-grpc" \
--go-vtproto_out=. --plugin protoc-gen-go-vtproto="${GOBIN}/protoc-gen-go-vtproto" \
--go-vtproto_opt=features=marshal+unmarshal+size \
--go-vtproto_opt=features=marshal+unmarshal+size+pool \
--go-vtproto_opt=pool=vitess.io/vitess/go/vt/proto/query.Row \
--go-vtproto_opt=pool=vitess.io/vitess/go/vt/proto/binlogdata.VStreamRowsResponse \
-I${PWD}/dist/vt-protoc-3.6.1/include:proto proto/$${name}.proto; \
done
cp -Rf vitess.io/vitess/go/vt/proto/* go/vt/proto
Expand Down
59 changes: 55 additions & 4 deletions go/vt/proto/binlogdata/binlogdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ limitations under the License.
package grpcqueryservice

import (
"google.golang.org/grpc"

"context"

"google.golang.org/grpc"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/callinfo"
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,18 +675,18 @@ func (conn *gRPCQueryClient) VStreamRows(ctx context.Context, target *querypb.Ta
return err
}
for {
r, err := stream.Recv()
r := binlogdatapb.VStreamRowsResponseFromVTPool()
err := stream.RecvMsg(r)
if err != nil {
return tabletconn.ErrorFromGRPC(err)
}
select {
case <-ctx.Done():
if ctx.Err() != nil {
return ctx.Err()
default:
}
if err := send(r); err != nil {
return err
}
r.ReturnToVTPool()
}
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,13 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
}
fieldEvent := &binlogdatapb.FieldEvent{
TableName: initialPlan.SendRule.Match,
Fields: rows.Fields,
}
fieldEvent.Fields = append(fieldEvent.Fields, rows.Fields...)
vc.tablePlan, err = plan.buildExecutionPlan(fieldEvent)
if err != nil {
return err
}
pkfields = rows.Pkfields
pkfields = append(pkfields, rows.Pkfields...)
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("update _vt.copy_state set lastpk=%a where vrepl_id=%s and table_name=%s", ":lastpk", strconv.Itoa(int(vc.vr.id)), encodeString(tableName))
updateCopyState = buf.ParsedQuery()
Expand Down

0 comments on commit 5b5ccd8

Please sign in to comment.