Skip to content

Commit

Permalink
[FAB-17463] Allow peer to override implicit collection dissemination …
Browse files Browse the repository at this point in the history
…properties

Added the following config properties to peer's core.yaml for implicit collection
dissemination policy. When a peer endorses a proposal that writes to its own
implicit collection, these values override the default values.
- peer.gossip.pvtData.ImplicitCollectionDisseminationPolicy.requiredPeerCount
- peer.gossip.pvtData.ImplicitCollectionDisseminationPolicy.maxPeerCount

The properties are applicable to all channels the peer has joined. The implication
is that requiredPeerCount has to be smaller than the number of peers in a channel
that has the lowest numbers of peers from the organization.

Signed-off-by: Wenjian Qiao <[email protected]>
  • Loading branch information
wenjianqiao authored and denyeart committed Feb 28, 2020
1 parent f086a3c commit 3d89ccf
Show file tree
Hide file tree
Showing 15 changed files with 175 additions and 42 deletions.
16 changes: 10 additions & 6 deletions core/chaincode/lifecycle/deployedcc_infoprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
validationState "github.com/hyperledger/fabric/core/handlers/validation/api/state"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/gossip/privdata"
"github.com/hyperledger/fabric/protoutil"

"github.com/pkg/errors"
Expand All @@ -44,6 +45,7 @@ var (

type ValidatorCommitter struct {
CoreConfig *peer.Config
PrivdataConfig *privdata.PrivdataConfig
Resources *Resources
LegacyDeployedCCInfoProvider LegacyDeployedCCInfoProvider
}
Expand Down Expand Up @@ -207,11 +209,13 @@ func (vc *ValidatorCommitter) ChaincodeImplicitCollections(channelName string) (

// GenerateImplicitCollectionForOrg generates implicit collection for the org
func (vc *ValidatorCommitter) GenerateImplicitCollectionForOrg(mspid string) *pb.StaticCollectionConfig {
// set collection MaximumPeerCount to 0 when its mspid does not match the local mspid
// set collection MaximumPeerCount to 1 when its mspid matches the local mspid (i.e., peer's own org)
maxPeerCount := int32(0)
// set Required/MaxPeerCount to 0 if it is other org's implicit collection (mspid does not match peer's local mspid)
// set Required/MaxPeerCount to the config values if it is the peer org's implicit collection (mspid matches peer's local mspid)
requiredPeerCount := 0
maxPeerCount := 0
if mspid == vc.CoreConfig.LocalMSPID {
maxPeerCount = 1
requiredPeerCount = vc.PrivdataConfig.ImplicitCollDisseminationPolicy.RequiredPeerCount
maxPeerCount = vc.PrivdataConfig.ImplicitCollDisseminationPolicy.MaxPeerCount
}
return &pb.StaticCollectionConfig{
Name: ImplicitCollectionNameForOrg(mspid),
Expand All @@ -220,8 +224,8 @@ func (vc *ValidatorCommitter) GenerateImplicitCollectionForOrg(mspid string) *pb
SignaturePolicy: policydsl.SignedByMspMember(mspid),
},
},
RequiredPeerCount: 0,
MaximumPeerCount: maxPeerCount,
RequiredPeerCount: int32(requiredPeerCount),
MaximumPeerCount: int32(maxPeerCount),
}
}

Expand Down
17 changes: 13 additions & 4 deletions core/chaincode/lifecycle/deployedcc_infoprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hyperledger/fabric/core/chaincode/lifecycle/mock"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/gossip/privdata"
"github.com/hyperledger/fabric/protoutil"

"github.com/golang/protobuf/proto"
Expand All @@ -32,6 +33,7 @@ var _ = Describe("ValidatorCommitter", func() {
var (
vc *lifecycle.ValidatorCommitter
resources *lifecycle.Resources
privdataConfig *privdata.PrivdataConfig
fakeLegacyProvider *mock.LegacyDeployedCCInfoProvider
fakeQueryExecutor *mock.SimpleQueryExecutor
fakeChannelConfigSource *mock.ChannelConfigSource
Expand Down Expand Up @@ -67,8 +69,15 @@ var _ = Describe("ValidatorCommitter", func() {
Serializer: &lifecycle.Serializer{},
}

privdataConfig = &privdata.PrivdataConfig{
ImplicitCollDisseminationPolicy: privdata.ImplicitCollectionDisseminationPolicy{
RequiredPeerCount: 1,
MaxPeerCount: 2,
},
}
vc = &lifecycle.ValidatorCommitter{
CoreConfig: &peer.Config{LocalMSPID: "first-mspid"},
PrivdataConfig: privdataConfig,
Resources: resources,
LegacyDeployedCCInfoProvider: fakeLegacyProvider,
}
Expand Down Expand Up @@ -312,13 +321,13 @@ var _ = Describe("ValidatorCommitter", func() {
secondOrg = collection
}
}
// Required/MaxPeerCount should match privdataConfig when the implicit collection is for peer's own org
Expect(firstOrg).NotTo(BeNil())
Expect(firstOrg.RequiredPeerCount).To(Equal(int32(0)))
// implicit collection MaximumPeerCount is 1 when its mspid matches peer's local mspid
Expect(firstOrg.MaximumPeerCount).To(Equal(int32(1)))
Expect(firstOrg.RequiredPeerCount).To(Equal(int32(privdataConfig.ImplicitCollDisseminationPolicy.RequiredPeerCount)))
Expect(firstOrg.MaximumPeerCount).To(Equal(int32(privdataConfig.ImplicitCollDisseminationPolicy.MaxPeerCount)))
// Required/MaxPeerCount should be 0 when the implicit collection is for other org
Expect(secondOrg).NotTo(BeNil())
Expect(secondOrg.RequiredPeerCount).To(Equal(int32(0)))
// implicit collection MaximumPeerCount is 0 when its mspid doesn't match peer's local mspid
Expect(secondOrg.MaximumPeerCount).To(Equal(int32(0)))
})

Expand Down
4 changes: 2 additions & 2 deletions core/handlers/validation/builtin/v12/validation_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ func validateNewCollectionConfigs(newCollectionConfigs []*pb.CollectionConfig) e

}
if requiredPeerCount < 0 {
return fmt.Errorf("collection-name: %s -- requiredPeerCount (%d) cannot be less than zero (%d)",
collectionName, maximumPeerCount, requiredPeerCount)
return fmt.Errorf("collection-name: %s -- requiredPeerCount (%d) cannot be less than zero",
collectionName, requiredPeerCount)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ func TestValidateRWSetAndCollectionForDeploy(t *testing.T) {
blockToLive = 10000
coll3 = createCollectionConfig(collName3, policyEnvelope, requiredPeerCount, maximumPeerCount, blockToLive)
err = testValidateCollection(t, v, []*peer.CollectionConfig{coll1, coll2, coll3}, cdRWSet, lsccFunc, ac, chid)
assert.EqualError(t, err, "collection-name: mycollection3 -- requiredPeerCount (1) cannot be less than zero (-2)",
assert.EqualError(t, err, "collection-name: mycollection3 -- requiredPeerCount (-2) cannot be less than zero",
collName3, maximumPeerCount, requiredPeerCount)

// Test 11: requiredPeerCount > maximumPeerCount -> error
Expand Down
2 changes: 2 additions & 0 deletions core/peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/hyperledger/fabric/core/transientstore"
"github.com/hyperledger/fabric/gossip/gossip"
gossipmetrics "github.com/hyperledger/fabric/gossip/metrics"
"github.com/hyperledger/fabric/gossip/privdata"
"github.com/hyperledger/fabric/gossip/service"
gossipservice "github.com/hyperledger/fabric/gossip/service"
peergossip "github.com/hyperledger/fabric/internal/peer/gossip"
Expand Down Expand Up @@ -84,6 +85,7 @@ func NewTestPeer(t *testing.T) (*Peer, func()) {
nil,
gossipConfig,
&service.ServiceConfig{},
&privdata.PrivdataConfig{},
&deliverservice.DeliverServiceConfig{
ReConnectBackoffThreshold: deliverservice.DefaultReConnectBackoffThreshold,
ReconnectTotalTimeThreshold: deliverservice.DefaultReConnectTotalTimeThreshold,
Expand Down
2 changes: 2 additions & 0 deletions core/scc/cscc/configure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/hyperledger/fabric/core/transientstore"
"github.com/hyperledger/fabric/gossip/gossip"
gossipmetrics "github.com/hyperledger/fabric/gossip/metrics"
"github.com/hyperledger/fabric/gossip/privdata"
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/internal/configtxgen/encoder"
"github.com/hyperledger/fabric/internal/configtxgen/genesisconfig"
Expand Down Expand Up @@ -309,6 +310,7 @@ func TestConfigerInvokeJoinChainCorrectParams(t *testing.T) {
nil,
gossipConfig,
&service.ServiceConfig{},
&privdata.PrivdataConfig{},
&deliverservice.DeliverServiceConfig{
ReConnectBackoffThreshold: deliverservice.DefaultReConnectBackoffThreshold,
ReconnectTotalTimeThreshold: deliverservice.DefaultReConnectTotalTimeThreshold,
Expand Down
39 changes: 37 additions & 2 deletions gossip/privdata/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ SPDX-License-Identifier: Apache-2.0
package privdata

import (
"fmt"
"time"

"github.com/spf13/viper"
)

const (
reconcileSleepIntervalDefault = time.Minute
reconcileBatchSizeDefault = 10
reconcileSleepIntervalDefault = time.Minute
reconcileBatchSizeDefault = 10
implicitCollectionMaxPeerCountDefault = 1
)

// PrivdataConfig is the struct that defines the Gossip Privdata configurations.
Expand All @@ -26,6 +28,19 @@ type PrivdataConfig struct {
ReconcileBatchSize int
// ReconciliationEnabled is a flag that indicates whether private data reconciliation is enabled or not.
ReconciliationEnabled bool
// ImplicitCollectionDisseminationPolicy specifies the dessemination policy for the peer's own implicit collection.
ImplicitCollDisseminationPolicy ImplicitCollectionDisseminationPolicy
}

// ImplicitCollectionDisseminationPolicy specifies the dessemination policy for the peer's own implicit collection.
// It is not applicable to private data for other organizations' implicit collections.
type ImplicitCollectionDisseminationPolicy struct {
// RequiredPeerCount defines the minimum number of eligible peers to which each endorsing peer must successfully
// disseminate private data for its own implicit collection. Default is 0.
RequiredPeerCount int
// MaxPeerCount defines the maximum number of eligible peers to which each endorsing peer will attempt to
// disseminate private data for its own implicit collection. Default is 1.
MaxPeerCount int
}

// GlobalConfig obtains a set of configuration from viper, build and returns the config struct.
Expand All @@ -50,4 +65,24 @@ func (c *PrivdataConfig) loadPrivDataConfig() {

c.ReconciliationEnabled = viper.GetBool("peer.gossip.pvtData.reconciliationEnabled")

requiredPeerCount := viper.GetInt("peer.gossip.pvtData.implicitCollectionDisseminationPolicy.requiredPeerCount")

maxPeerCount := implicitCollectionMaxPeerCountDefault
if viper.Get("peer.gossip.pvtData.implicitCollectionDisseminationPolicy.maxPeerCount") != nil {
// allow override maxPeerCount to 0 that will effectively disable dissemination on the peer
maxPeerCount = viper.GetInt("peer.gossip.pvtData.implicitCollectionDisseminationPolicy.maxPeerCount")
}

if requiredPeerCount < 0 {
panic(fmt.Sprintf("peer.gossip.pvtData.implicitCollectionDisseminationPolicy.requiredPeerCount (%d) cannot be less than zero",
requiredPeerCount))
}
if maxPeerCount < requiredPeerCount {
panic(fmt.Sprintf("peer.gossip.pvtData.implicitCollectionDisseminationPolicy.maxPeerCount (%d) cannot be less than requiredPeerCount (%d)",
maxPeerCount, requiredPeerCount))

}

c.ImplicitCollDisseminationPolicy.RequiredPeerCount = requiredPeerCount
c.ImplicitCollDisseminationPolicy.MaxPeerCount = maxPeerCount
}
34 changes: 34 additions & 0 deletions gossip/privdata/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@ func TestGlobalConfig(t *testing.T) {
viper.Set("peer.gossip.pvtData.reconcileSleepInterval", "10s")
viper.Set("peer.gossip.pvtData.reconcileBatchSize", 10)
viper.Set("peer.gossip.pvtData.reconciliationEnabled", true)
viper.Set("peer.gossip.pvtData.implicitCollectionDisseminationPolicy.requiredPeerCount", 2)
viper.Set("peer.gossip.pvtData.implicitCollectionDisseminationPolicy.maxPeerCount", 3)

coreConfig := privdata.GlobalConfig()

expectedConfig := &privdata.PrivdataConfig{
ReconcileSleepInterval: 10 * time.Second,
ReconcileBatchSize: 10,
ReconciliationEnabled: true,
ImplicitCollDisseminationPolicy: privdata.ImplicitCollectionDisseminationPolicy{
RequiredPeerCount: 2,
MaxPeerCount: 3,
},
}

assert.Equal(t, coreConfig, expectedConfig)
Expand All @@ -42,7 +48,35 @@ func TestGlobalConfigDefaults(t *testing.T) {
ReconcileSleepInterval: time.Minute,
ReconcileBatchSize: 10,
ReconciliationEnabled: false,
ImplicitCollDisseminationPolicy: privdata.ImplicitCollectionDisseminationPolicy{
RequiredPeerCount: 0,
MaxPeerCount: 1,
},
}

assert.Equal(t, coreConfig, expectedConfig)
}

func TestGlobalConfigPanic(t *testing.T) {
viper.Reset()
// Capture the configuration from viper
viper.Set("peer.gossip.pvtData.reconcileSleepInterval", "10s")
viper.Set("peer.gossip.pvtData.reconcileBatchSize", 10)
viper.Set("peer.gossip.pvtData.reconciliationEnabled", true)
viper.Set("peer.gossip.pvtData.implicitCollectionDisseminationPolicy.requiredPeerCount", 2)
viper.Set("peer.gossip.pvtData.implicitCollectionDisseminationPolicy.maxPeerCount", 1)
assert.PanicsWithValue(
t,
"peer.gossip.pvtData.implicitCollectionDisseminationPolicy.maxPeerCount (1) cannot be less than requiredPeerCount (2)",
func() { privdata.GlobalConfig() },
"A panic should occur because maxPeerCount is less than requiredPeerCount",
)

viper.Set("peer.gossip.pvtData.implicitCollectionDisseminationPolicy.requiredPeerCount", -1)
assert.PanicsWithValue(
t,
"peer.gossip.pvtData.implicitCollectionDisseminationPolicy.requiredPeerCount (-1) cannot be less than zero",
func() { privdata.GlobalConfig() },
"A panic should occur because requiredPeerCount is less than zero",
)
}
16 changes: 9 additions & 7 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type GossipService struct {
secAdv api.SecurityAdvisor
metrics *gossipmetrics.GossipMetrics
serviceConfig *ServiceConfig
privdataConfig *gossipprivdata.PrivdataConfig
}

// This is an implementation of api.JoinChannelMessage.
Expand Down Expand Up @@ -212,6 +213,7 @@ func New(
deliverGRPCClient *corecomm.GRPCClient,
gossipConfig *gossip.Config,
serviceConfig *ServiceConfig,
privdataConfig *gossipprivdata.PrivdataConfig,
deliverServiceConfig *deliverservice.DeliverServiceConfig,
) (*GossipService, error) {
serializedIdentity, err := peerIdentity.Serialize()
Expand Down Expand Up @@ -244,10 +246,11 @@ func New(
deliverGRPCClient: deliverGRPCClient,
deliverServiceConfig: deliverServiceConfig,
},
peerIdentity: serializedIdentity,
secAdv: secAdv,
metrics: gossipMetrics,
serviceConfig: serviceConfig,
peerIdentity: serializedIdentity,
secAdv: secAdv,
metrics: gossipMetrics,
serviceConfig: serviceConfig,
privdataConfig: privdataConfig,
}, nil
}

Expand Down Expand Up @@ -320,12 +323,11 @@ func (g *GossipService) InitializeChannel(channelID string, ordererSource *order
}, store, selfSignedData, g.metrics.PrivdataMetrics, coordinatorConfig,
support.IdDeserializeFactory)

privdataConfig := gossipprivdata.GlobalConfig()
var reconciler gossipprivdata.PvtDataReconciler

if privdataConfig.ReconciliationEnabled {
if g.privdataConfig.ReconciliationEnabled {
reconciler = gossipprivdata.NewReconciler(channelID, g.metrics.PrivdataMetrics,
support.Committer, fetcher, privdataConfig)
support.Committer, fetcher, g.privdataConfig)
} else {
reconciler = &gossipprivdata.NoOpReconciler{}
}
Expand Down
12 changes: 8 additions & 4 deletions gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func TestInitGossipService(t *testing.T) {
grpcClient,
gossipConfig,
&ServiceConfig{},
&privdata.PrivdataConfig{},
&deliverservice.DeliverServiceConfig{
ReConnectBackoffThreshold: deliverservice.DefaultReConnectBackoffThreshold,
ReconnectTotalTimeThreshold: deliverservice.DefaultReConnectTotalTimeThreshold,
Expand Down Expand Up @@ -809,10 +810,11 @@ func newGossipInstance(serviceConfig *ServiceConfig, port int, id int, gRPCServe
deliveryFactory: &deliveryFactoryImpl{
credentialSupport: comm.NewCredentialSupport(),
},
peerIdentity: api.PeerIdentityType(conf.InternalEndpoint),
secAdv: secAdv,
metrics: metrics,
serviceConfig: serviceConfig,
peerIdentity: api.PeerIdentityType(conf.InternalEndpoint),
secAdv: secAdv,
metrics: metrics,
serviceConfig: serviceConfig,
privdataConfig: privdata.GlobalConfig(),
}

return &gossipGRPC{GossipService: gossipService, grpc: gRPCServer}
Expand Down Expand Up @@ -933,6 +935,7 @@ func TestInvalidInitialization(t *testing.T) {
grpcClient,
gossipConfig,
&ServiceConfig{},
&privdata.PrivdataConfig{},
&deliverservice.DeliverServiceConfig{
PeerTLSEnabled: false,
ReConnectBackoffThreshold: deliverservice.DefaultReConnectBackoffThreshold,
Expand Down Expand Up @@ -978,6 +981,7 @@ func TestChannelConfig(t *testing.T) {
grpcClient,
gossipConfig,
&ServiceConfig{},
&privdata.PrivdataConfig{},
&deliverservice.DeliverServiceConfig{
ReConnectBackoffThreshold: deliverservice.DefaultReConnectBackoffThreshold,
ReconnectTotalTimeThreshold: deliverservice.DefaultReConnectTotalTimeThreshold,
Expand Down
3 changes: 3 additions & 0 deletions integration/nwo/core_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ peer:
reconcileSleepInterval: 10s
reconciliationEnabled: true
skipPullingInvalidTransactionsDuringCommit: false
implicitCollectionDisseminationPolicy:
requiredPeerCount: 0
maxPeerCount: 1
state:
enabled: true
checkInterval: 10s
Expand Down
23 changes: 15 additions & 8 deletions integration/nwo/fabricconfig/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,21 @@ type GossipElection struct {
}

type GossipPvtData struct {
PullRetryThreshold time.Duration `yaml:"pullRetryThreshold,omitempty"`
TransientstoreMaxBlockRetention int `yaml:"transientstoreMaxBlockRetention,omitempty"`
PushAckTimeout time.Duration `yaml:"pushAckTimeout,omitempty"`
BtlPullMargin int `yaml:"btlPullMargin,omitempty"`
ReconcileBatchSize int `yaml:"reconcileBatchSize,omitempty"`
ReconcileSleepInterval time.Duration `yaml:"reconcileSleepInterval,omitempty"`
ReconciliationEnabled bool `yaml:"reconciliationEnabled"`
SkipPullingInvalidTransactionsDuringCommit bool `yaml:"skipPullingInvalidTransactionsDuringCommit"`
PullRetryThreshold time.Duration `yaml:"pullRetryThreshold,omitempty"`
TransientstoreMaxBlockRetention int `yaml:"transientstoreMaxBlockRetention,omitempty"`
PushAckTimeout time.Duration `yaml:"pushAckTimeout,omitempty"`
BtlPullMargin int `yaml:"btlPullMargin,omitempty"`
ReconcileBatchSize int `yaml:"reconcileBatchSize,omitempty"`
ReconcileSleepInterval time.Duration `yaml:"reconcileSleepInterval,omitempty"`
ReconciliationEnabled bool `yaml:"reconciliationEnabled"`
SkipPullingInvalidTransactionsDuringCommit bool `yaml:"skipPullingInvalidTransactionsDuringCommit"`
ImplicitCollDisseminationPolicy ImplicitCollDisseminationPolicy `yaml:"implicitCollectionDisseminationPolicy"`
}

type ImplicitCollDisseminationPolicy struct {
RequiredPeerCount int `yaml:"requiredPeerCount,omitempty"`
// do not tag omitempty in order to override MaxPeerCount default with 0
MaxPeerCount int `yaml:"maxPeerCount"`
}

type GossipState struct {
Expand Down
Loading

0 comments on commit 3d89ccf

Please sign in to comment.