Skip to content

Commit

Permalink
Merge "[FAB-2052] Finish ledger API GetHistoryForKey()"
Browse files Browse the repository at this point in the history
  • Loading branch information
binhn authored and Gerrit Code Review committed Feb 9, 2017
2 parents 95911f2 + 9d90fdf commit 8927e13
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 105 deletions.
11 changes: 10 additions & 1 deletion common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ At start up a new manager:
*) Updates blockchain info used by the APIs
*/
func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, indexStore *leveldbhelper.DBHandle) *blockfileMgr {
logger.Debugf("newBlockfileMgr() initializing file-based block storage for ledger: %s ", id)
//Determine the root directory for the blockfile storage, if it does not exist create it
rootDir := conf.getLedgerBlockDir(id)
_, err := util.CreateDirIfMissing(rootDir)
Expand Down Expand Up @@ -355,6 +356,7 @@ func (mgr *blockfileMgr) syncIndex() error {

//Should be at the last block, but go ahead and loop looking for next blockBytes
//If there is another block, add it to the index
//TODO Currently this re-indexes the lastBlockIndexed every time. May be better to skip it.
for {
if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
return err
Expand All @@ -366,16 +368,22 @@ func (mgr *blockfileMgr) syncIndex() error {
if err != nil {
return err
}

//The blockStartOffset will get applied to the txOffsets prior to indexing within indexBlock(),
//therefore just shift by the difference between blockBytesOffset and blockStartOffset
numBytesToShift := int(blockPlacementInfo.blockBytesOffset - blockPlacementInfo.blockStartOffset)
for _, offset := range info.txOffsets {
offset.loc.offset += int(blockPlacementInfo.blockBytesOffset)
offset.loc.offset += numBytesToShift
}

//Update the blockIndexInfo with what was actually stored in file system
blockIdxInfo := &blockIdxInfo{}
blockIdxInfo.blockHash = info.blockHeader.Hash()
blockIdxInfo.blockNum = info.blockHeader.Number
blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,
locPointer: locPointer{offset: int(blockPlacementInfo.blockStartOffset)}}
blockIdxInfo.txOffsets = info.txOffsets
logger.Debugf("syncIndex() indexing block [%d]", blockIdxInfo.blockNum)
if err = mgr.index.indexBlock(blockIdxInfo); err != nil {
return err
}
Expand Down Expand Up @@ -482,6 +490,7 @@ func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) {
}

func (mgr *blockfileMgr) fetchTransactionEnvelope(lp *fileLocPointer) (*common.Envelope, error) {
logger.Debugf("Entering fetchTransactionEnvelope() %v\n", lp)
var err error
var txEnvelopeBytes []byte
if txEnvelopeBytes, err = mgr.fetchRawBytes(lp); err != nil {
Expand Down
14 changes: 13 additions & 1 deletion common/ledger/blkstorage/fsblkstorage/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package fsblkstorage

import (
"bytes"
"fmt"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -292,5 +293,16 @@ func (flp *fileLocPointer) String() string {
}

func (blockIdxInfo *blockIdxInfo) String() string {
return fmt.Sprintf("blockNum=%d, blockHash=%#v", blockIdxInfo.blockNum, blockIdxInfo.blockHash)

var buffer bytes.Buffer
for _, txOffset := range blockIdxInfo.txOffsets {
buffer.WriteString("txId=")
buffer.WriteString(txOffset.txID)
buffer.WriteString(" locPointer=")
buffer.WriteString(txOffset.loc.String())
buffer.WriteString("\n")
}
txOffsetsString := buffer.String()

return fmt.Sprintf("blockNum=%d, blockHash=%#v txOffsets=\n%s", blockIdxInfo.blockNum, blockIdxInfo.blockHash, txOffsetsString)
}
9 changes: 6 additions & 3 deletions core/ledger/kvledger/history/historydb/historydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ limitations under the License.

package historydb

import "github.com/hyperledger/fabric/protos/common"
import "github.com/hyperledger/fabric/core/ledger"
import (
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos/common"
)

// HistoryDBProvider provides an instance of a history DB
type HistoryDBProvider interface {
Expand All @@ -29,7 +32,7 @@ type HistoryDBProvider interface {

// HistoryDB - an interface that a history database should implement
type HistoryDB interface {
NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error)
NewHistoryQueryExecutor(blockStore blkstorage.BlockStore) (ledger.HistoryQueryExecutor, error)
Commit(block *common.Block) error
GetBlockNumFromSavepoint() (uint64, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package historyleveldb

import (
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb"
Expand Down Expand Up @@ -82,16 +83,13 @@ func (historyDB *historyDB) Close() {
// Commit implements method in HistoryDB interface
func (historyDB *historyDB) Commit(block *common.Block) error {

logger.Debugf("Entering HistoryLevelDB.Commit()")

//Get the blocknumber off of the header
blockNo := block.Header.Number
//Set the starting tranNo to 0
var tranNo uint64

dbBatch := leveldbhelper.NewUpdateBatch()

logger.Debugf("Updating history for blockNo: %v with [%d] transactions",
logger.Debugf("Updating history database for blockNo [%v] with [%d] transactions",
blockNo, len(block.Data.Data))

//TODO add check for invalid trans in bit array
Expand All @@ -110,7 +108,6 @@ func (historyDB *historyDB) Commit(block *common.Block) error {

if common.HeaderType(payload.Header.ChainHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION {

logger.Debugf("Updating history for tranNo: %d", tranNo)
// extract actions from the envelope message
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
Expand All @@ -133,9 +130,6 @@ func (historyDB *historyDB) Commit(block *common.Block) error {
for _, kvWrite := range nsRWSet.Writes {
writeKey := kvWrite.Key

logger.Debugf("Writing history record for: ns=%s, key=%s, blockNo=%d tranNo=%d",
ns, writeKey, blockNo, tranNo)

//composite key for history records is in the form ns~key~blockNo~tranNo
compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)

Expand All @@ -145,9 +139,8 @@ func (historyDB *historyDB) Commit(block *common.Block) error {
}

} else {
logger.Debugf("Skipping transaction %d since it is not an endorsement transaction\n", tranNo)
logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
}

}

// add savepoint for recovery purpose
Expand All @@ -159,12 +152,13 @@ func (historyDB *historyDB) Commit(block *common.Block) error {
return err
}

logger.Debugf("Updates committed to history database for blockNo [%v]", blockNo)
return nil
}

// NewHistoryQueryExecutor implements method in HistoryDB interface
func (historyDB *historyDB) NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error) {
return &LevelHistoryDBQueryExecutor{historyDB}, nil
func (historyDB *historyDB) NewHistoryQueryExecutor(blockStore blkstorage.BlockStore) (ledger.HistoryQueryExecutor, error) {
return &LevelHistoryDBQueryExecutor{historyDB, blockStore}, nil
}

// GetBlockNumFromSavepoint implements method in HistoryDB interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@ package historyleveldb

import (
"errors"
"fmt"

commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/util"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwset"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/protos/common"
putils "github.com/hyperledger/fabric/protos/utils"
"github.com/syndtr/goleveldb/leveldb/iterator"
)

// LevelHistoryDBQueryExecutor is a query executor against the LevelDB history DB
type LevelHistoryDBQueryExecutor struct {
historyDB *historyDB
historyDB *historyDB
blockStore blkstorage.BlockStore
}

// GetHistoryForKey implements method in interface `ledger.HistoryQueryExecutor`
Expand All @@ -47,17 +51,21 @@ func (q *LevelHistoryDBQueryExecutor) GetHistoryForKey(namespace string, key str

// range scan to find any history records starting with namespace~key
dbItr := q.historyDB.db.GetIterator(compositeStartKey, compositeEndKey)
return newHistoryScanner(compositeStartKey, dbItr), nil
return newHistoryScanner(compositeStartKey, namespace, key, dbItr, q.blockStore), nil
}

//historyScanner implements ResultsIterator for iterating through history results
type historyScanner struct {
compositePartialKey []byte //compositePartialKey includes namespace~key
namespace string
key string
dbItr iterator.Iterator
blockStore blkstorage.BlockStore
}

func newHistoryScanner(compositePartialKey []byte, dbItr iterator.Iterator) *historyScanner {
return &historyScanner{compositePartialKey, dbItr}
func newHistoryScanner(compositePartialKey []byte, namespace string, key string,
dbItr iterator.Iterator, blockStore blkstorage.BlockStore) *historyScanner {
return &historyScanner{compositePartialKey, namespace, key, dbItr, blockStore}
}

func (scanner *historyScanner) Next() (commonledger.QueryResult, error) {
Expand All @@ -70,15 +78,72 @@ func (scanner *historyScanner) Next() (commonledger.QueryResult, error) {
_, blockNumTranNumBytes := historydb.SplitCompositeHistoryKey(historyKey, scanner.compositePartialKey)
blockNum, bytesConsumed := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[0:])
tranNum, _ := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[bytesConsumed:])
logger.Debugf("Found history record for namespace:%s key:%s at blockNumTranNum %v:%v\n",
scanner.namespace, scanner.key, blockNum, tranNum)

blockNumTranNum := fmt.Sprintf("%v:%v", blockNum, tranNum)
logger.Debugf("Got history record for key %s: %s\n", scanner.compositePartialKey, blockNumTranNum)
// Get the transaction from block storage that is associated with this history record
tranEnvelope, err := scanner.blockStore.RetrieveTxByBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}

// For initial test return the blockNumTranNum as TxID.
// TODO query block storage to get and return the TxID and value
return &ledger.KeyModification{TxID: blockNumTranNum}, nil
// Get the txid and key write value associated with this transaction
txID, keyValue, err := getTxIDandKeyWriteValueFromTran(tranEnvelope, scanner.namespace, scanner.key)
if err != nil {
return nil, err
}
logger.Debugf("Found historic key value for namespace:%s key:%s from transaction %s\n",
scanner.namespace, scanner.key, txID)
return &ledger.KeyModification{TxID: txID, Value: keyValue}, nil
}

func (scanner *historyScanner) Close() {
scanner.dbItr.Release()
}

// getTxIDandKeyWriteValueFromTran inspects a transaction for writes to a given key
func getTxIDandKeyWriteValueFromTran(
tranEnvelope *common.Envelope, namespace string, key string) (string, []byte, error) {
logger.Debugf("Entering getTxIDandKeyWriteValueFromTran()\n", namespace, key)

// extract action from the envelope
payload, err := putils.GetPayload(tranEnvelope)
if err != nil {
return "", nil, err
}

tx, err := putils.GetTransaction(payload.Data)
if err != nil {
return "", nil, err
}

_, respPayload, err := putils.GetPayloads(tx.Actions[0])
if err != nil {
return "", nil, err
}

txID := payload.Header.ChainHeader.TxID

txRWSet := &rwset.TxReadWriteSet{}

// Get the Result from the Action and then Unmarshal
// it into a TxReadWriteSet using custom unmarshalling
if err = txRWSet.Unmarshal(respPayload.Results); err != nil {
return txID, nil, err
}

// look for the namespace and key by looping through the transaction's ReadWriteSets
for _, nsRWSet := range txRWSet.NsRWs {
if nsRWSet.NameSpace == namespace {
// got the correct namespace, now find the key write
for _, kvWrite := range nsRWSet.Writes {
if kvWrite.Key == key {
return txID, kvWrite.Value, nil
}
} // end keys loop
return txID, nil, errors.New("Key not found in namespace's writeset")
} // end if
} //end namespaces loop
return txID, nil, errors.New("Namespace not found in transaction's ReadWriteSets")

}
Loading

0 comments on commit 8927e13

Please sign in to comment.