Skip to content

Commit

Permalink
Broadcast server and client implementation (0xPolygonHermez#776)
Browse files Browse the repository at this point in the history
* Broadcast server and client implementation

* move tests to broadcast_test pkg
  • Loading branch information
fgimenez authored Jun 20, 2022
1 parent 7e65e63 commit 31512d0
Show file tree
Hide file tree
Showing 6 changed files with 479 additions and 0 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ generate-mocks: ## Generates mocks for the tests, using mockery tool
mockery --name=stateInterface --dir=jsonrpc --output=jsonrpc --outpkg=jsonrpc --inpackage --structname=stateMock --filename=mock_state_test.go
mockery --name=BatchProcessorInterface --dir=jsonrpc --output=jsonrpc --outpkg=jsonrpc --inpackage --structname=batchProcessorMock --filename=mock_batchProcessor_test.go
mockery --name=txManager --dir=sequencerv2 --output=sequencerv2 --outpkg=sequencerv2 --structname=txmanagerMock --filename=txmanager-mock_test.go
mockery --name=stateInterface --dir=sequencerv2/broadcast --output=sequencerv2/broadcast --outpkg=broadcast_test --structname=stateMock --filename=state-mock_test.go

.PHONY: generate-code-from-proto
generate-code-from-proto: ## Generates code from proto files
Expand Down
12 changes: 12 additions & 0 deletions sequencerv2/broadcast/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package broadcast

// ServerConfig represents the configuration of the broadcast server.
type ServerConfig struct {
Host string `mapstructure:"Host"`
Port int `mapstructure:"Port"`
}

// ClientConfig represents the configuration of the broadcast client.
type ClientConfig struct {
URI string `mapstructure:"URI"`
}
27 changes: 27 additions & 0 deletions sequencerv2/broadcast/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package broadcast

import (
"context"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
)

// Consumer interfaces required by the package.

type stateInterface interface {
GetLastBatch(ctx context.Context, tx pgx.Tx) (*Batch, error)
GetBatchByNumber(ctx context.Context, batchNumber uint64, tx pgx.Tx) (*Batch, error)
GetEncodedTransactionsByBatchNumber(ctx context.Context, batchNumber uint64, tx pgx.Tx) (encoded []string, err error)
}

// This should be moved into the state package

// Batch represents a Batch
type Batch struct {
BatchNumber uint64
GlobalExitRoot common.Hash
RawTxsData []byte
Timestamp time.Time
}
125 changes: 125 additions & 0 deletions sequencerv2/broadcast/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package broadcast

import (
"context"
"fmt"
"net"

"github.com/hermeznetwork/hermez-core/log"
"github.com/hermeznetwork/hermez-core/sequencerv2/broadcast/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
)

// Server provides the functionality of the MerkleTree service.
type Server struct {
cfg *ServerConfig

srv *grpc.Server
pb.UnimplementedBroadcastServiceServer
state stateInterface
}

// NewServer is the MT server constructor.
func NewServer(cfg *ServerConfig, state stateInterface) *Server {
return &Server{
cfg: cfg,
state: state,
}
}

// SetState is the state setter.
func (s *Server) SetState(st stateInterface) {
s.state = st
}

// Start sets up the server to process requests.
func (s *Server) Start() {
address := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

s.srv = grpc.NewServer()
pb.RegisterBroadcastServiceServer(s.srv, s)

healthService := newHealthChecker()
grpc_health_v1.RegisterHealthServer(s.srv, healthService)

if err := s.srv.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

// Stop stops the server.
func (s *Server) Stop() {
s.srv.Stop()
}

// Implementation of pb.BroadcastServiceServer interface methods.

func (s *Server) GetBatch(ctx context.Context, in *pb.GetBatchRequest) (*pb.GetBatchResponse, error) {
batch, err := s.state.GetBatchByNumber(ctx, in.BatchNumber, nil)
if err != nil {
return nil, err
}
return s.genericGetBatch(ctx, batch)
}

func (s *Server) GetLastBatch(ctx context.Context, empty *pb.Empty) (*pb.GetBatchResponse, error) {
batch, err := s.state.GetLastBatch(ctx, nil)
if err != nil {
return nil, err
}
return s.genericGetBatch(ctx, batch)
}

func (s *Server) genericGetBatch(ctx context.Context, batch *Batch) (*pb.GetBatchResponse, error) {
txs, err := s.state.GetEncodedTransactionsByBatchNumber(ctx, batch.BatchNumber, nil)
if err != nil {
return nil, err
}
transactions := make([]*pb.Transaction, len(txs))
for i, tx := range txs {
transactions[i] = &pb.Transaction{
Encoded: tx,
}
}

return &pb.GetBatchResponse{
BatchNumber: batch.BatchNumber,
GlobalExitRoot: batch.GlobalExitRoot.String(),
Timestamp: uint64(batch.Timestamp.Unix()),
Transactions: transactions,
}, nil
}

// HealthChecker will provide an implementation of the HealthCheck interface.
type healthChecker struct{}

// NewHealthChecker returns a health checker according to standard package
// grpc.health.v1.
func newHealthChecker() *healthChecker {
return &healthChecker{}
}

// HealthCheck interface implementation.

// Check returns the current status of the server for unary gRPC health requests,
// for now if the server is up and able to respond we will always return SERVING.
func (s *healthChecker) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
log.Info("Serving the Check request for health check")
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil
}

// Watch returns the current status of the server for stream gRPC health requests,
// for now if the server is up and able to respond we will always return SERVING.
func (s *healthChecker) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_health_v1.Health_WatchServer) error {
log.Info("Serving the Watch request for health check")
return server.Send(&grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
})
}
212 changes: 212 additions & 0 deletions sequencerv2/broadcast/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package broadcast_test

import (
"context"
"errors"
"fmt"
"os"
"path"
"runtime"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
broadcast "github.com/hermeznetwork/hermez-core/sequencerv2/broadcast"
"github.com/hermeznetwork/hermez-core/sequencerv2/broadcast/pb"
"github.com/hermeznetwork/hermez-core/test/operations"
"github.com/hermeznetwork/hermez-core/test/testutils"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
host = "0.0.0.0"
port = 61091
)

var (
address = fmt.Sprintf("%s:%d", host, port)
broadcastSrv *broadcast.Server
conn *grpc.ClientConn
cancel context.CancelFunc
err error
ctx = context.Background()
)

func init() {
// Change dir to project root
// This is important because we have relative paths to files containing test vectors
_, filename, _, _ := runtime.Caller(0)
dir := path.Join(path.Dir(filename), "../../")
err := os.Chdir(dir)
if err != nil {
panic(err)
}
}

func TestMain(m *testing.M) {
initialize()
defer teardown()

os.Exit(m.Run())
}

func initialize() {
broadcastSrv = initBroadcastServer()
go broadcastSrv.Start()

conn, cancel, err = initConn()
if err != nil {
panic(err)
}

err = operations.WaitGRPCHealthy(address)
if err != nil {
panic(err)
}
}

func teardown() {
cancel()
broadcastSrv.Stop()
}

func initConn() (*grpc.ClientConn, context.CancelFunc, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
conn, err := grpc.DialContext(ctx, address, opts...)
return conn, cancel, err
}

func initBroadcastServer() *broadcast.Server {
s := grpc.NewServer()
st := new(stateMock)
cfg := &broadcast.ServerConfig{
Host: host,
Port: port,
}

broadcastSrv = broadcast.NewServer(cfg, st)
pb.RegisterBroadcastServiceServer(s, broadcastSrv)

return broadcastSrv
}

func TestBroadcastServerGetBatch(t *testing.T) {
tcs := []struct {
description string
inputBatchNumber uint64
expectedBatch *broadcast.Batch
expectedEncodedTxs []string
expectedErr bool
expectedErrMsg string
}{
{
description: "happy path",
inputBatchNumber: 14,
expectedBatch: &broadcast.Batch{
BatchNumber: 14,
GlobalExitRoot: common.Hash{},
Timestamp: time.Now(),
},
expectedEncodedTxs: []string{"tx1", "tx2", "tx3"},
},
{
description: "query errors are returned",
inputBatchNumber: 14,
expectedErr: true,
expectedErrMsg: "query error",
},
}

for _, tc := range tcs {
tc := tc
t.Run(tc.description, func(t *testing.T) {
st := new(stateMock)
var err error
if tc.expectedErr {
err = errors.New(tc.expectedErrMsg)
}
st.On("GetBatchByNumber", mock.AnythingOfType("*context.valueCtx"), tc.inputBatchNumber, nil).Return(tc.expectedBatch, err)
st.On("GetEncodedTransactionsByBatchNumber", mock.AnythingOfType("*context.valueCtx"), tc.inputBatchNumber, nil).Return(tc.expectedEncodedTxs, err)

broadcastSrv.SetState(st)

client := pb.NewBroadcastServiceClient(conn)
actualBatch, err := client.GetBatch(ctx, &pb.GetBatchRequest{
BatchNumber: tc.inputBatchNumber,
})
require.NoError(t, testutils.CheckError(err, tc.expectedErr, fmt.Sprintf("rpc error: code = Unknown desc = %s", tc.expectedErrMsg)))

if err == nil {
require.Equal(t, tc.expectedBatch.BatchNumber, actualBatch.BatchNumber)
require.Equal(t, tc.expectedBatch.GlobalExitRoot.String(), actualBatch.GlobalExitRoot)
require.Equal(t, uint64(tc.expectedBatch.Timestamp.Unix()), actualBatch.Timestamp)
for i, encoded := range tc.expectedEncodedTxs {
require.Equal(t, encoded, actualBatch.Transactions[i].Encoded)
}
require.True(t, st.AssertExpectations(t))
}
})
}
}

func TestBroadcastServerGetLastBatch(t *testing.T) {
tcs := []struct {
description string
expectedBatch *broadcast.Batch
expectedEncodedTxs []string
expectedErr bool
expectedErrMsg string
}{
{
description: "happy path",
expectedBatch: &broadcast.Batch{
BatchNumber: 14,
GlobalExitRoot: common.Hash{},
Timestamp: time.Now(),
},
expectedEncodedTxs: []string{"tx1", "tx2", "tx3"},
},
{
description: "query errors are returned",
expectedErr: true,
expectedErrMsg: "query error",
},
}

for _, tc := range tcs {
tc := tc
t.Run(tc.description, func(t *testing.T) {
st := new(stateMock)
var err error
if tc.expectedErr {
err = errors.New(tc.expectedErrMsg)
}
st.On("GetLastBatch", mock.AnythingOfType("*context.valueCtx"), nil).Return(tc.expectedBatch, err)
if tc.expectedBatch != nil {
st.On("GetEncodedTransactionsByBatchNumber", mock.AnythingOfType("*context.valueCtx"), tc.expectedBatch.BatchNumber, nil).Return(tc.expectedEncodedTxs, err)
}

broadcastSrv.SetState(st)

client := pb.NewBroadcastServiceClient(conn)
actualBatch, err := client.GetLastBatch(ctx, &pb.Empty{})
require.NoError(t, testutils.CheckError(err, tc.expectedErr, fmt.Sprintf("rpc error: code = Unknown desc = %s", tc.expectedErrMsg)))

if err == nil {
require.Equal(t, tc.expectedBatch.BatchNumber, actualBatch.BatchNumber)
require.Equal(t, tc.expectedBatch.GlobalExitRoot.String(), actualBatch.GlobalExitRoot)
require.Equal(t, uint64(tc.expectedBatch.Timestamp.Unix()), actualBatch.Timestamp)
for i, encoded := range tc.expectedEncodedTxs {
require.Equal(t, encoded, actualBatch.Transactions[i].Encoded)
}
require.True(t, st.AssertExpectations(t))
}
})
}
}
Loading

0 comments on commit 31512d0

Please sign in to comment.