Skip to content

Commit

Permalink
[FAB-7473] Validate incoming Eventhub registrations
Browse files Browse the repository at this point in the history
This CR adds validation checks for the TLSCertHash (only if mutual TLS
is enabled) of incoming Eventhub registration requests.

Change-Id: I7969e99cb09606e2f1eba32e7534200a3da60c7c
Signed-off-by: Will Lahti <[email protected]>
  • Loading branch information
wlahti committed Jan 3, 2018
1 parent f6bb64b commit 81730bd
Show file tree
Hide file tree
Showing 11 changed files with 364 additions and 191 deletions.
42 changes: 24 additions & 18 deletions core/comm/testdata/grpc/test.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions events/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package consumer

import (
"crypto/x509"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -51,6 +52,7 @@ type EventsClient struct {
type RegistrationConfig struct {
InterestedEvents []*ehpb.Interest
Timestamp *timestamp.Timestamp
TlsCert *x509.Certificate
}

//NewEventsClient Returns a new grpc.ClientConn to the configured local PEER.
Expand Down Expand Up @@ -114,6 +116,9 @@ func (ec *EventsClient) RegisterAsync(config *RegistrationConfig) error {
}
emsg := &ehpb.Event{Event: &ehpb.Event_Register{Register: &ehpb.Register{Events: config.InterestedEvents}}, Creator: creator, Timestamp: config.Timestamp}

if config.TlsCert != nil {
emsg.TlsCertHash = util.ComputeSHA256(config.TlsCert.Raw)
}
if err = ec.send(emsg); err != nil {
consumerLogger.Errorf("error on Register send %s\n", err)
}
Expand Down
23 changes: 21 additions & 2 deletions events/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ SPDX-License-Identifier: Apache-2.0
package consumer

import (
"crypto/x509"
"fmt"
"net"
"os"
"sync"
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/comm"
coreutil "github.com/hyperledger/fabric/core/testutil"
"github.com/hyperledger/fabric/events/producer"
"github.com/hyperledger/fabric/msp/mgmt/testtools"
Expand All @@ -41,6 +44,9 @@ type BadAdapter struct {

var peerAddress = "0.0.0.0:7303"
var ies = []*ehpb.Interest{{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event1"}}}}
var testCert = &x509.Certificate{
Raw: []byte("test"),
}

var adapter *MockAdapter
var obcEHClient *EventsClient
Expand Down Expand Up @@ -160,7 +166,7 @@ func TestUnregisterAsync(t *testing.T) {
t.Fail()
}

regConfig := &RegistrationConfig{InterestedEvents: ies, Timestamp: util.CreateUtcTimestamp()}
regConfig := &RegistrationConfig{InterestedEvents: ies, Timestamp: util.CreateUtcTimestamp(), TlsCert: testCert}
obcEHClient.RegisterAsync(regConfig)
err = obcEHClient.UnregisterAsync(ies)
assert.NoError(t, err)
Expand Down Expand Up @@ -256,8 +262,21 @@ func TestMain(m *testing.M) {
return
}

ehConfig := &producer.EventsServerConfig{BufferSize: uint(viper.GetInt("peer.events.buffersize")), Timeout: viper.GetDuration("peer.events.timeout"), TimeWindow: viper.GetDuration("peer.events.timewindow")}
extract := func(msg proto.Message) []byte {
evt, isEvent := msg.(*ehpb.Event)
if !isEvent || evt == nil {
return nil
}
return evt.TlsCertHash
}

ehConfig := &producer.EventsServerConfig{
BufferSize: uint(viper.GetInt("peer.events.buffersize")),
Timeout: viper.GetDuration("peer.events.timeout"),
TimeWindow: viper.GetDuration("peer.events.timewindow"),
BindingInspector: comm.NewBindingInspector(false, extract)}
ehServer := producer.NewEventsServer(ehConfig)

ehpb.RegisterEventsServer(grpcServer, ehServer)

go grpcServer.Serve(lis)
Expand Down
19 changes: 5 additions & 14 deletions events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,7 @@ type eventProcessor struct {
//we could generalize this with mutiple channels each with its own size
eventChannel chan *pb.Event

//timeout duration for producer to send an event.
//if < 0, if buffer full, unblocks immediately and not send
//if 0, if buffer full, will block and guarantee the event will be sent out
//if > 0, if buffer full, blocks till timeout
timeout time.Duration

//time difference from peer time where registration events can be considered
//valid
timeWindow time.Duration
*EventsServerConfig
}

//global eventProcessor singleton created by initializeEvents. Openchain producers
Expand Down Expand Up @@ -252,8 +244,7 @@ func initializeEvents(config *EventsServerConfig) {
panic("should not be called twice")
}

gEventProcessor = &eventProcessor{eventConsumers: make(map[pb.EventType]handlerList), eventChannel: make(chan *pb.Event, config.BufferSize), timeout: config.Timeout, timeWindow: config.TimeWindow}

gEventProcessor = &eventProcessor{eventConsumers: make(map[pb.EventType]handlerList), eventChannel: make(chan *pb.Event, config.BufferSize), EventsServerConfig: config}
addInternalEventTypes()

//start the event processor
Expand Down Expand Up @@ -327,21 +318,21 @@ func Send(e *pb.Event) error {
return nil
}

if gEventProcessor.timeout < 0 {
if gEventProcessor.Timeout < 0 {
logger.Debugf("Event processor timeout < 0")
select {
case gEventProcessor.eventChannel <- e:
default:
return fmt.Errorf("could not send the blocking event")
}
} else if gEventProcessor.timeout == 0 {
} else if gEventProcessor.Timeout == 0 {
logger.Debugf("Event processor timeout = 0")
gEventProcessor.eventChannel <- e
} else {
logger.Debugf("Event processor timeout > 0")
select {
case gEventProcessor.eventChannel <- e:
case <-time.After(gEventProcessor.timeout):
case <-time.After(gEventProcessor.Timeout):
return fmt.Errorf("could not send the blocking event")
}
}
Expand Down
20 changes: 11 additions & 9 deletions events/producer/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,18 @@ func TestEvents(t *testing.T) {
Send(&peer.Event{})
gEventProcessorBck := gEventProcessor
gEventProcessor = nil
e, err := createEvent()
e, err := createRegisterEvent(nil, nil)
assert.NoError(t, err)
Send(e)
gEventProcessor = gEventProcessorBck
Send(e)
}
prevTimeout := gEventProcessor.timeout
prevTimeout := gEventProcessor.Timeout
for _, timeout := range []time.Duration{0, -1, 1} {
gEventProcessor.timeout = timeout
gEventProcessor.Timeout = timeout
test(timeout)
}
gEventProcessor.timeout = prevTimeout
gEventProcessor.Timeout = prevTimeout
}

func TestDeRegister(t *testing.T) {
Expand All @@ -134,11 +134,13 @@ func TestRegisterHandler(t *testing.T) {
assert.Error(t, registerHandler(&peer.Interest{EventType: peer.EventType_CHAINCODE}, nil))

// attempt to register valid handler
recvChan := make(chan *streamEvent)
stream := &mockstream{c: recvChan}
handler, err := newEventHandler(stream)
assert.Nil(t, err, "error should have been nil")
m := newMockEventhub()
defer close(m.recvChan)
handler := newEventHandler(m)
assert.NoError(t, registerHandler(&peer.Interest{EventType: peer.EventType_BLOCK}, handler))

// clean up by deregistering handler
assert.NoError(t, deRegisterHandler(&peer.Interest{EventType: peer.EventType_BLOCK}, handler))
}

func TestProcessEvents(t *testing.T) {
Expand All @@ -149,7 +151,7 @@ func TestProcessEvents(t *testing.T) {
{EventType: peer.EventType_CHAINCODE, RegInfo: &peer.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &peer.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event2"}}},
}
cl.register(interests)
e, err := createEvent()
e, err := createRegisterEvent(nil, nil)
assert.NoError(t, err)
go Send(e)
time.Sleep(time.Second * 2)
Expand Down
25 changes: 15 additions & 10 deletions events/producer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ type handler struct {
interestedEvents map[string]*pb.Interest
}

func newEventHandler(stream pb.Events_ChatServer) (*handler, error) {
func newEventHandler(stream pb.Events_ChatServer) *handler {
d := &handler{
ChatStream: stream,
}
d.interestedEvents = make(map[string]*pb.Interest)
return d, nil
return d
}

// Stop stops this handler
Expand Down Expand Up @@ -103,7 +103,7 @@ func (d *handler) deregisterAll() {

// HandleMessage handles the Openchain messages for the Peer.
func (d *handler) HandleMessage(msg *pb.SignedEvent) error {
evt, err := validateEventMessage(msg)
evt, err := d.validateEventMessage(msg)
if err != nil {
return fmt.Errorf("event message validation failed: [%s]", err)
}
Expand Down Expand Up @@ -151,8 +151,8 @@ func (d *handler) SendMessage(msg *pb.Event) error {
// However, this is not being done for v1.0 due to complexity concerns and the need to complex a stable,
// minimally viable release. Eventually events will be made channel-specific, at which point this method
// should be revisited
func validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, error) {
logger.Debugf("ValidateEventMessage starts for signed event %p", signedEvt)
func (d *handler) validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, error) {
logger.Debugf("validating for signed event %p", signedEvt)

// messages from the client for registering and unregistering must be signed
// and accompanied by the signing certificate in the "Creator" field
Expand All @@ -163,15 +163,20 @@ func validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, error) {
}

if evt.GetTimestamp() != nil {
evtTime := time.Unix(evt.GetTimestamp().Seconds, int64(evt.GetTimestamp().Nanos)).UTC().UnixNano()
peerTime := time.Now().UnixNano()
evtTime := time.Unix(evt.GetTimestamp().Seconds, int64(evt.GetTimestamp().Nanos)).UTC()
peerTime := time.Now()

if math.Abs(float64(peerTime-evtTime)) > float64(gEventProcessor.timeWindow.Nanoseconds()) {
logger.Warningf("event timestamp %s is more than the %s `peer.events.timewindow` difference above/below peer time %s. either the peer and client clocks are out of sync or a replay attack has been attempted", evtTime, gEventProcessor.timeWindow, peerTime)
return nil, fmt.Errorf("event timestamp out of acceptable range. must be within %s above/below peer time", gEventProcessor.timeWindow)
if math.Abs(float64(peerTime.UnixNano()-evtTime.UnixNano())) > float64(gEventProcessor.TimeWindow.Nanoseconds()) {
logger.Warningf("Message timestamp %s more than %s apart from current server time %s", evtTime, gEventProcessor.TimeWindow, peerTime)
return nil, fmt.Errorf("message timestamp out of acceptable range. must be within %s of current server time", gEventProcessor.TimeWindow)
}
}

err = gEventProcessor.BindingInspector(d.ChatStream.Context(), evt)
if err != nil {
return nil, err
}

localMSP := mgmt.GetLocalMSP()
principalGetter := mgmt.NewLocalMSPPrincipalGetter()

Expand Down
13 changes: 6 additions & 7 deletions events/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/comm"
pb "github.com/hyperledger/fabric/protos/peer"
)

Expand All @@ -33,9 +34,10 @@ type EventsServer struct {

// EventsServerConfig contains the setup config for the events server
type EventsServerConfig struct {
BufferSize uint
Timeout time.Duration
TimeWindow time.Duration
BufferSize uint
Timeout time.Duration
TimeWindow time.Duration
BindingInspector comm.BindingInspector
}

//singleton - if we want to create multiple servers, we need to subsume events.gEventConsumers into EventsServer
Expand All @@ -54,10 +56,7 @@ func NewEventsServer(config *EventsServerConfig) *EventsServer {

// Chat implementation of the Chat bidi streaming RPC function
func (p *EventsServer) Chat(stream pb.Events_ChatServer) error {
handler, err := newEventHandler(stream)
if err != nil {
return fmt.Errorf("error creating handler during handleChat initiation: %s", err)
}
handler := newEventHandler(stream)
defer handler.Stop()
for {
in, err := stream.Recv()
Expand Down
Loading

0 comments on commit 81730bd

Please sign in to comment.