diff --git a/go.mod b/go.mod index 13ffb6fb..4474f40a 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,6 @@ require ( ) -replace github.com/hyperledger/fabric => github.com/trustbloc/fabric-mod v0.0.0-20190508134351-4beae6ee306b +replace github.com/hyperledger/fabric => github.com/trustbloc/fabric-mod v0.0.0-20190510235640-ff89b7580e81 -replace github.com/hyperledger/fabric/extensions => github.com/trustbloc/fabric-mod/extensions v0.0.0-20190508134351-4beae6ee306b +replace github.com/hyperledger/fabric/extensions => github.com/trustbloc/fabric-mod/extensions v0.0.0-20190510235640-ff89b7580e81 diff --git a/go.sum b/go.sum index 240164b6..4d987b13 100644 --- a/go.sum +++ b/go.sum @@ -182,8 +182,12 @@ github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc/go.mod h1:eyZnKCc955u github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/trustbloc/fabric-mod v0.0.0-20190508134351-4beae6ee306b h1:xZCwe/MyQYmCGZglxiayosPeDkM49OrLj9QwWyz/Wfw= github.com/trustbloc/fabric-mod v0.0.0-20190508134351-4beae6ee306b/go.mod h1:y/yp/cF5M9+xddNmn6f24tz/PrDjgGCzOk5/0WCKLkU= +github.com/trustbloc/fabric-mod v0.0.0-20190510235640-ff89b7580e81 h1:CWmIqCQjIOmc12d8/byYfAYJjwwlUXIVJlRANX+syUE= +github.com/trustbloc/fabric-mod v0.0.0-20190510235640-ff89b7580e81/go.mod h1:y/yp/cF5M9+xddNmn6f24tz/PrDjgGCzOk5/0WCKLkU= github.com/trustbloc/fabric-mod/extensions v0.0.0-20190508134351-4beae6ee306b h1:40O0t+2qIGqVKE6SMkx5FmKCcS4rsDF7QS3QGAHYIVw= github.com/trustbloc/fabric-mod/extensions v0.0.0-20190508134351-4beae6ee306b/go.mod h1:SnMkFTM0DFEsFcZqZp4P0Gqm+ryD8iwx9cEGW8qrju0= +github.com/trustbloc/fabric-mod/extensions v0.0.0-20190510235640-ff89b7580e81 h1:eD2FCLOj1PWiqRM9NadS+LeHMu5UbiqE9D/cPN8bOUA= +github.com/trustbloc/fabric-mod/extensions v0.0.0-20190510235640-ff89b7580e81/go.mod h1:SnMkFTM0DFEsFcZqZp4P0Gqm+ryD8iwx9cEGW8qrju0= github.com/ugorji/go v1.1.1/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= github.com/urfave/cli v1.18.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/vishvananda/netlink v1.0.0 h1:bqNY2lgheFIu1meHUFSH3d7vG93AFyqg3oGbJCOJgSM= diff --git a/mod/peer/blkstorage/store.go b/mod/peer/blkstorage/store.go index d4c0dcc7..e0b602ac 100644 --- a/mod/peer/blkstorage/store.go +++ b/mod/peer/blkstorage/store.go @@ -9,14 +9,19 @@ package blkstorage import ( "github.com/hyperledger/fabric/common/ledger/blkstorage" "github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage" + "github.com/trustbloc/fabric-peer-ext/pkg/blkstorage/cdbblkstorage" ) -//NewProvider is redirect hook for fabric/fsblkstorage NewProvider() +//NewProvider returns couchdb blockstorage provider func NewProvider(conf *fsblkstorage.Conf, indexConfig *blkstorage.IndexConfig) blkstorage.BlockStoreProvider { - return fsblkstorage.NewProvider(conf, indexConfig) + pvdr, err := cdbblkstorage.NewProvider(indexConfig) + if err != nil { + panic(err) + } + return pvdr } -//NewConf is redirect hook for fabric/fsblkstorage NewConf() +//NewConf is returns file system based blockstorage conf func NewConf(blockStorageDir string, maxBlockfileSize int) *fsblkstorage.Conf { return fsblkstorage.NewConf(blockStorageDir, maxBlockfileSize) } diff --git a/mod/peer/blkstorage/store_test.go b/mod/peer/blkstorage/store_test.go index 022787cb..eb29b217 100644 --- a/mod/peer/blkstorage/store_test.go +++ b/mod/peer/blkstorage/store_test.go @@ -7,26 +7,16 @@ SPDX-License-Identifier: Apache-2.0 package blkstorage import ( - "io/ioutil" - "os" "testing" "github.com/hyperledger/fabric/common/ledger/blkstorage" "github.com/hyperledger/fabric/core/ledger/ledgerconfig" - "github.com/spf13/viper" + "github.com/hyperledger/fabric/extensions/testutil" "github.com/stretchr/testify/require" ) func TestNewProvider(t *testing.T) { - cleanup := setupPath(t) - defer cleanup() + _, _, destroy := testutil.SetupExtTestEnv() + defer destroy() require.NotEmpty(t, NewProvider(NewConf(ledgerconfig.GetBlockStorePath(), ledgerconfig.GetMaxBlockfileSize()), &blkstorage.IndexConfig{})) } - -func setupPath(t *testing.T) (cleanup func()) { - tempDir, err := ioutil.TempDir("", "transientstore") - require.NoError(t, err) - - viper.Set("peer.fileSystemPath", tempDir) - return func() { os.RemoveAll(tempDir) } -} diff --git a/mod/peer/go.mod b/mod/peer/go.mod index 965fcbbd..20fc05e9 100644 --- a/mod/peer/go.mod +++ b/mod/peer/go.mod @@ -1,6 +1,6 @@ module github.com/trustbloc/fabric-peer-ext/mod/peer -replace github.com/hyperledger/fabric => github.com/trustbloc/fabric-mod v0.0.0-20190508134351-4beae6ee306b +replace github.com/hyperledger/fabric => github.com/trustbloc/fabric-mod v0.0.0-20190510235640-ff89b7580e81 replace github.com/hyperledger/fabric/extensions => ./ diff --git a/mod/peer/go.sum b/mod/peer/go.sum index d0d6e92e..c9e4be66 100644 --- a/mod/peer/go.sum +++ b/mod/peer/go.sum @@ -62,9 +62,7 @@ github.com/go-kit/kit v0.7.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -199,6 +197,7 @@ github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc/go.mod h1:eyZnKCc955u github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/trustbloc/fabric-mod v0.0.0-20190508134351-4beae6ee306b h1:xZCwe/MyQYmCGZglxiayosPeDkM49OrLj9QwWyz/Wfw= github.com/trustbloc/fabric-mod v0.0.0-20190508134351-4beae6ee306b/go.mod h1:y/yp/cF5M9+xddNmn6f24tz/PrDjgGCzOk5/0WCKLkU= +github.com/trustbloc/fabric-mod v0.0.0-20190510235640-ff89b7580e81/go.mod h1:y/yp/cF5M9+xddNmn6f24tz/PrDjgGCzOk5/0WCKLkU= github.com/ugorji/go v1.1.1/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= github.com/urfave/cli v1.18.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/vishvananda/netlink v1.0.0 h1:bqNY2lgheFIu1meHUFSH3d7vG93AFyqg3oGbJCOJgSM= diff --git a/pkg/blkstorage/cdbblkstorage/block_serialization.go b/pkg/blkstorage/cdbblkstorage/block_serialization.go new file mode 100644 index 00000000..75c327d4 --- /dev/null +++ b/pkg/blkstorage/cdbblkstorage/block_serialization.go @@ -0,0 +1,67 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cdbblkstorage + +import ( + "github.com/pkg/errors" + + "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protoutil" +) + +func extractTxIDFromEnvelope(txEnvelope *common.Envelope) (string, error) { + payload, err := protoutil.GetPayload(txEnvelope) + if err != nil { + return "", nil + } + + payloadHeader := payload.Header + channelHeader, err := protoutil.UnmarshalChannelHeader(payloadHeader.ChannelHeader) + if err != nil { + return "", err + } + + return channelHeader.TxId, nil +} + +func extractTxnEnvelopeFromBlock(block *common.Block, txID string) (*common.Envelope, error) { + blockData := block.GetData() + for _, txEnvelopeBytes := range blockData.GetData() { + envelope, err := protoutil.GetEnvelopeFromBlock(txEnvelopeBytes) + if err != nil { + return nil, err + } + + id, err := extractTxIDFromEnvelope(envelope) + if err != nil { + return nil, err + } + if id != txID { + continue + } + + txEnvelope, err := protoutil.GetEnvelopeFromBlock(txEnvelopeBytes) + if err != nil { + return nil, err + } + + return txEnvelope, nil + } + + return nil, errors.Errorf("transaction not found [%s]", txID) +} + +func extractEnvelopeFromBlock(block *common.Block, tranNum uint64) (*common.Envelope, error) { + blockData := block.GetData() + envelopes := blockData.GetData() + envelopesLen := uint64(len(envelopes)) + if envelopesLen-1 < tranNum { + blockNum := block.GetHeader().GetNumber() + return nil, errors.Errorf("transaction number is invalid [%d, %d, %d]", blockNum, envelopesLen, tranNum) + } + return protoutil.GetEnvelopeFromBlock(envelopes[tranNum]) +} diff --git a/pkg/blkstorage/cdbblkstorage/blocks_itr.go b/pkg/blkstorage/cdbblkstorage/blocks_itr.go new file mode 100644 index 00000000..db281826 --- /dev/null +++ b/pkg/blkstorage/cdbblkstorage/blocks_itr.go @@ -0,0 +1,73 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cdbblkstorage + +import ( + "sync" + + "github.com/hyperledger/fabric/common/ledger" +) + +// blocksItr - an iterator for iterating over a sequence of blocks +type blocksItr struct { + cdbBlockStore *cdbBlockStore + maxBlockNumAvailable uint64 + blockNumToRetrieve uint64 + closeMarker bool + closeMarkerLock *sync.Mutex +} + +func newBlockItr(cdbBlockStore *cdbBlockStore, startBlockNum uint64) *blocksItr { + return &blocksItr{cdbBlockStore, cdbBlockStore.cpInfo.lastBlockNumber, startBlockNum, false, &sync.Mutex{}} +} + +func (itr *blocksItr) waitForBlock(blockNum uint64) uint64 { + itr.cdbBlockStore.cpInfoCond.L.Lock() + defer itr.cdbBlockStore.cpInfoCond.L.Unlock() + for itr.cdbBlockStore.cpInfo.lastBlockNumber < blockNum && !itr.shouldClose() { + logger.Debugf("Going to wait for newer blocks. maxAvailaBlockNumber=[%d], waitForBlockNum=[%d]", + itr.cdbBlockStore.cpInfo.lastBlockNumber, blockNum) + itr.cdbBlockStore.cpInfoCond.Wait() + logger.Debugf("Came out of wait. maxAvailaBlockNumber=[%d]", itr.cdbBlockStore.cpInfo.lastBlockNumber) + } + return itr.cdbBlockStore.cpInfo.lastBlockNumber +} + +func (itr *blocksItr) shouldClose() bool { + itr.closeMarkerLock.Lock() + defer itr.closeMarkerLock.Unlock() + return itr.closeMarker +} + +// Next moves the cursor to next block and returns true iff the iterator is not exhausted +func (itr *blocksItr) Next() (ledger.QueryResult, error) { + if itr.maxBlockNumAvailable < itr.blockNumToRetrieve { + itr.maxBlockNumAvailable = itr.waitForBlock(itr.blockNumToRetrieve) + } + itr.closeMarkerLock.Lock() + defer itr.closeMarkerLock.Unlock() + if itr.closeMarker { + return nil, nil + } + + nextBlock, err := itr.cdbBlockStore.RetrieveBlockByNumber(itr.blockNumToRetrieve) + if err != nil { + return nil, err + } + itr.blockNumToRetrieve++ + return nextBlock, nil +} + +// Close releases any resources held by the iterator +func (itr *blocksItr) Close() { + itr.cdbBlockStore.cpInfoCond.L.Lock() + defer itr.cdbBlockStore.cpInfoCond.L.Unlock() + itr.closeMarkerLock.Lock() + defer itr.closeMarkerLock.Unlock() + itr.closeMarker = true + itr.cdbBlockStore.cpInfoCond.Broadcast() +} diff --git a/pkg/blkstorage/cdbblkstorage/blocks_itr_test.go b/pkg/blkstorage/cdbblkstorage/blocks_itr_test.go new file mode 100644 index 00000000..f6f48c85 --- /dev/null +++ b/pkg/blkstorage/cdbblkstorage/blocks_itr_test.go @@ -0,0 +1,222 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cdbblkstorage + +import ( + "sync" + "testing" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/hyperledger/fabric/common/ledger" + "github.com/hyperledger/fabric/common/ledger/testutil" + "github.com/hyperledger/fabric/protos/common" + + "github.com/stretchr/testify/assert" +) + +func TestBlocksItr(t *testing.T) { + + env := newTestEnv(t) + defer env.Cleanup() + + const numberOfBlks = 5 + const startBlk = 3 + + provider := env.provider + store, _ := provider.OpenBlockStore("ledger-1") + defer store.Shutdown() + cdbstore := store.(*cdbBlockStore) + + blocks := testutil.ConstructTestBlocks(t, numberOfBlks) + testAppendBlocks(cdbstore, blocks) + + //get iterator + itr := newBlockItr(cdbstore, 0) + for i := 0; i < numberOfBlks; i++ { + blk, err := itr.Next() + assert.NoError(t, err) + assert.NotNil(t, blk) + assert.Equal(t, uint64(i), blk.(*common.Block).Header.Number) + } + itr.Close() + + itr = newBlockItr(cdbstore, startBlk) + for i := startBlk; i < numberOfBlks; i++ { + blk, err := itr.Next() + assert.NoError(t, err) + assert.NotNil(t, blk) + assert.Equal(t, uint64(i), blk.(*common.Block).Header.Number) + } + itr.Close() + + blk, err := itr.Next() + assert.NoError(t, err) + assert.Nil(t, blk) +} + +func TestBlocksItrBlockingNext(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + + const numberOfBlks = 10 + + provider := env.provider + store, _ := provider.OpenBlockStore("testLedger") + defer store.Shutdown() + cdbstore := store.(*cdbBlockStore) + + blocks := testutil.ConstructTestBlocks(t, numberOfBlks) + testAppendBlocks(cdbstore, blocks[:5]) + + itr := newBlockItr(cdbstore, 1) + defer itr.Close() + + readyChan := make(chan struct{}) + doneChan := make(chan bool) + + go testIterateAndVerify(t, itr, blocks[1:], 4, readyChan, doneChan) + <-readyChan + testAppendBlocks(cdbstore, blocks[5:7]) + time.Sleep(time.Millisecond * 10) + testAppendBlocks(cdbstore, blocks[7:]) + <-doneChan +} + +func TestRaceToDeadlock(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + + const numberOfBlks = 5 + + provider := env.provider + store, _ := provider.OpenBlockStore("testLedger") + defer store.Shutdown() + + blocks := testutil.ConstructTestBlocks(t, numberOfBlks) + testAppendBlocks(store.(*cdbBlockStore), blocks) + + for i := 0; i < 1000; i++ { + itr, err := store.RetrieveBlocks(0) + if err != nil { + panic(err) + } + go func() { + itr.Next() + }() + itr.Close() + } + + for i := 0; i < 1000; i++ { + itr, err := store.RetrieveBlocks(0) + if err != nil { + panic(err) + } + go func() { + itr.Close() + }() + itr.Next() + } +} + +func TestBlockItrCloseWithoutRetrieve(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + + const numberOfBlks = 5 + + provider := env.provider + store, _ := provider.OpenBlockStore("testLedger") + defer store.Shutdown() + + blocks := testutil.ConstructTestBlocks(t, numberOfBlks) + testAppendBlocks(store.(*cdbBlockStore), blocks) + + itr, err := store.RetrieveBlocks(2) + assert.NoError(t, err) + itr.Close() + + assert.True(t, itr.(*blocksItr).closeMarker) +} + +func TestCloseMultipleItrsWaitForFutureBlock(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + + const numberOfBlks = 10 + + provider := env.provider + store, _ := provider.OpenBlockStore("testLedger-2") + defer store.Shutdown() + + blocks := testutil.ConstructTestBlocks(t, numberOfBlks) + + testAppendBlocks(store.(*cdbBlockStore), blocks[:5]) + + wg := &sync.WaitGroup{} + wg.Add(2) + itr1, err := store.RetrieveBlocks(7) + assert.NoError(t, err) + // itr1 does not retrieve any block because it closes before new blocks are added + go iterateInBackground(t, itr1, 9, wg, []uint64{}) + + itr2, err := store.RetrieveBlocks(8) + assert.NoError(t, err) + // itr2 retrieves two blocks 8 and 9. Because it started waiting for 8 and quits at 9 + go iterateInBackground(t, itr2, 9, wg, []uint64{8, 9}) + + // sleep for the background iterators to get started + time.Sleep(2 * time.Second) + itr1.Close() + testAppendBlocks(store.(*cdbBlockStore), blocks[5:]) + wg.Wait() +} + +func iterateInBackground(t *testing.T, itr ledger.ResultsIterator, quitAfterBlkNum uint64, wg *sync.WaitGroup, expectedBlockNums []uint64) { + defer wg.Done() + retrievedBlkNums := []uint64{} + defer func() { assert.Equal(t, expectedBlockNums, retrievedBlkNums) }() + + for { + blk, err := itr.Next() + assert.NoError(t, err) + if blk == nil { + return + } + blkNum := blk.(*common.Block).Header.Number + retrievedBlkNums = append(retrievedBlkNums, blkNum) + t.Logf("blk.Num=%d", blk.(*common.Block).Header.Number) + if blkNum == quitAfterBlkNum { + return + } + } +} + +func testIterateAndVerify(t *testing.T, itr *blocksItr, blocks []*common.Block, readyAt int, readyChan chan<- struct{}, doneChan chan bool) { + blocksIterated := 0 + for { + t.Logf("blocksIterated: %v", blocksIterated) + block, err := itr.Next() + assert.NoError(t, err) + assert.True(t, proto.Equal(blocks[blocksIterated], block.(*common.Block))) + blocksIterated++ + if blocksIterated == readyAt { + close(readyChan) + } + if blocksIterated == len(blocks) { + break + } + } + doneChan <- true +} + +func testAppendBlocks(store *cdbBlockStore, blocks []*common.Block) { + for _, b := range blocks { + store.AddBlock(b) + } +} diff --git a/pkg/blkstorage/cdbblkstorage/cdb_blkstorage.go b/pkg/blkstorage/cdbblkstorage/cdb_blkstorage.go new file mode 100644 index 00000000..d9b5a481 --- /dev/null +++ b/pkg/blkstorage/cdbblkstorage/cdb_blkstorage.go @@ -0,0 +1,361 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cdbblkstorage + +import ( + "bytes" + "encoding/hex" + "fmt" + "math" + "strconv" + "sync" + + "github.com/hyperledger/fabric/protoutil" + + "github.com/hyperledger/fabric/common/ledger" + "github.com/hyperledger/fabric/common/ledger/blkstorage" + "github.com/hyperledger/fabric/core/ledger/util/couchdb" + "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protos/peer" + "github.com/pkg/errors" +) + +// cdbBlockStore ... +type cdbBlockStore struct { + blockStore *couchdb.CouchDatabase + txnStore *couchdb.CouchDatabase + ledgerID string + cpInfo *checkpointInfo + cpInfoCond *sync.Cond + cp *checkpoint + attachTxn bool +} + +// newCDBBlockStore constructs block store based on CouchDB +func newCDBBlockStore(blockStore *couchdb.CouchDatabase, txnStore *couchdb.CouchDatabase, ledgerID string) *cdbBlockStore { + cp := newCheckpoint(blockStore) + + cdbBlockStore := &cdbBlockStore{ + blockStore: blockStore, + txnStore: txnStore, + ledgerID: ledgerID, + cp: cp, + attachTxn: false, + } + + // cp = checkpointInfo, retrieve from the database the last block number that was written to that db. + cpInfo := cdbBlockStore.cp.getCheckpointInfo() + err := cdbBlockStore.cp.saveCurrentInfo(cpInfo) + if err != nil { + panic(fmt.Sprintf("Could not save cpInfo info to db: %s", err)) + } + + // Update the manager with the checkpoint info and the file writer + cdbBlockStore.cpInfo = cpInfo + + // Create a checkpoint condition (event) variable, for the goroutine waiting for + // or announcing the occurrence of an event. + cdbBlockStore.cpInfoCond = sync.NewCond(&sync.Mutex{}) + + return cdbBlockStore +} + +// AddBlock adds a new block +func (s *cdbBlockStore) AddBlock(block *common.Block) error { + + err := s.validateBlock(block) + if err != nil { + return err + } + + err = s.storeBlock(block) + if err != nil { + return err + } + + err = s.storeTransactions(block) + if err != nil { + return err + } + + return s.checkpointBlock(block) +} + +//validateBlock validates block before adding to store +func (s *cdbBlockStore) validateBlock(block *common.Block) error { + + if s.cpInfo.isChainEmpty { + //chain is empty, no need to validate, first block it is. + return nil + } + if block.Header.Number != s.cpInfo.lastBlockNumber+1 { + return errors.Errorf( + "block number should have been %d but was %d", + s.cpInfo.lastBlockNumber+1, block.Header.Number, + ) + } + + // Add the previous hash check - Though, not essential but may not be a bad idea to + // verify the field `block.Header.PreviousHash` present in the block. + // This check is a simple bytes comparison and hence does not cause any observable performance penalty + // and may help in detecting a rare scenario if there is any bug in the ordering service. + if !bytes.Equal(block.Header.PreviousHash, s.cpInfo.currentHash) { + return errors.Errorf( + "unexpected Previous block hash. Expected PreviousHash = [%x], PreviousHash referred in the latest block= [%x]", + s.cpInfo.currentHash, block.Header.PreviousHash, + ) + } + + return nil +} + +func (s *cdbBlockStore) storeBlock(block *common.Block) error { + doc, err := blockToCouchDoc(block) + if err != nil { + return errors.WithMessage(err, "converting block to couchDB document failed") + } + + id := blockNumberToKey(block.GetHeader().GetNumber()) + + rev, err := s.blockStore.SaveDoc(id, "", doc) + if err != nil { + return errors.WithMessage(err, "adding block to couchDB failed") + } + logger.Debugf("block added to couchDB [%d, %s]", block.GetHeader().GetNumber(), rev) + return nil +} + +func (s *cdbBlockStore) storeTransactions(block *common.Block) error { + docs, err := blockToTxnCouchDocs(block, s.attachTxn) + if err != nil { + return errors.WithMessage(err, "converting block to couchDB txn documents failed") + } + + if len(docs) == 0 { + return nil + } + + _, err = s.txnStore.BatchUpdateDocuments(docs) + if err != nil { + return errors.WithMessage(err, "adding block to couchDB failed") + } + logger.Debugf("block transactions added to couchDB [%d]", block.GetHeader().GetNumber()) + return nil +} + +func (s *cdbBlockStore) checkpointBlock(block *common.Block) error { + //Update the checkpoint info with the results of adding the new block + newCPInfo := &checkpointInfo{ + isChainEmpty: false, + lastBlockNumber: block.Header.Number, + currentHash: protoutil.BlockHeaderHash(block.Header), + } + //save the checkpoint information in the database + err := s.cp.saveCurrentInfo(newCPInfo) + if err != nil { + return errors.WithMessage(err, "adding cpInfo to couchDB failed") + } + //update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager + s.updateCheckpoint(newCPInfo) + return nil +} + +// GetBlockchainInfo returns the current info about blockchain +func (s *cdbBlockStore) GetBlockchainInfo() (*common.BlockchainInfo, error) { + cpInfo := s.cp.getCheckpointInfo() + s.cpInfo = cpInfo + bcInfo := &common.BlockchainInfo{ + Height: 0, + } + if !cpInfo.isChainEmpty { + //If start up is a restart of an existing storage, update BlockchainInfo for external API's + lastBlock, err := s.RetrieveBlockByNumber(cpInfo.lastBlockNumber) + if err != nil { + return nil, fmt.Errorf("RetrieveBlockByNumber return error: %s", err) + } + + lastBlockHeader := lastBlock.GetHeader() + lastBlockHash := protoutil.BlockHeaderHash(lastBlockHeader) + previousBlockHash := lastBlockHeader.GetPreviousHash() + bcInfo = &common.BlockchainInfo{ + Height: lastBlockHeader.GetNumber() + 1, + CurrentBlockHash: lastBlockHash, + PreviousBlockHash: previousBlockHash, + } + } + return bcInfo, nil +} + +// RetrieveBlocks returns an iterator that can be used for iterating over a range of blocks +func (s *cdbBlockStore) RetrieveBlocks(startNum uint64) (ledger.ResultsIterator, error) { + return newBlockItr(s, startNum), nil +} + +// RetrieveBlockByHash returns the block for given block-hash +func (s *cdbBlockStore) RetrieveBlockByHash(blockHash []byte) (*common.Block, error) { + blockHashHex := hex.EncodeToString(blockHash) + const queryFmt = ` + { + "selector": { + "` + blockHeaderField + `.` + blockHashField + `": { + "$eq": "%s" + } + }, + "use_index": ["_design/` + blockHashIndexDoc + `", "` + blockHashIndexName + `"] + }` + + block, err := retrieveBlockQuery(s.blockStore, fmt.Sprintf(queryFmt, blockHashHex)) + if err != nil { + // note: allow ErrNotFoundInIndex to pass through + return nil, err + } + return block, nil +} + +// RetrieveBlockByNumber returns the block at a given blockchain height +func (s *cdbBlockStore) RetrieveBlockByNumber(blockNum uint64) (*common.Block, error) { + // interpret math.MaxUint64 as a request for last block + if blockNum == math.MaxUint64 { + bcinfo, err := s.GetBlockchainInfo() + if err != nil { + return nil, errors.WithMessage(err, "retrieval of blockchain info failed") + } + blockNum = bcinfo.Height - 1 + } + + id := blockNumberToKey(blockNum) + + doc, _, err := s.blockStore.ReadDoc(id) + if err != nil { + return nil, errors.WithMessage(err, fmt.Sprintf("retrieval of block from couchDB failed [%d]", blockNum)) + } + if doc == nil { + return nil, blkstorage.ErrNotFoundInIndex + } + + block, err := couchDocToBlock(doc) + if err != nil { + return nil, errors.WithMessage(err, fmt.Sprintf("unmarshal of block from couchDB failed [%d]", blockNum)) + } + + return block, nil +} + +// RetrieveTxByID returns a transaction for given transaction id +func (s *cdbBlockStore) RetrieveTxByID(txID string) (*common.Envelope, error) { + doc, _, err := s.txnStore.ReadDoc(txID) + if err != nil { + // note: allow ErrNotFoundInIndex to pass through + return nil, err + } + if doc == nil { + return nil, blkstorage.ErrNotFoundInIndex + } + + // If this transaction includes the envelope as a valid attachment then can return immediately. + if len(doc.Attachments) > 0 { + attachedEnv, e := couchAttachmentsToTxnEnvelope(doc.Attachments) + if e == nil { + return attachedEnv, nil + } + logger.Debugf("transaction has attachment but failed to be extracted into envelope [%s]", err) + } + + // Otherwise, we need to extract the transaction from the block document. + block, err := s.RetrieveBlockByTxID(txID) + if err != nil { + // note: allow ErrNotFoundInIndex to pass through + return nil, err + } + + return extractTxnEnvelopeFromBlock(block, txID) +} + +// RetrieveTxByBlockNumTranNum returns a transaction for given block ID and transaction ID +func (s *cdbBlockStore) RetrieveTxByBlockNumTranNum(blockNum uint64, tranNum uint64) (*common.Envelope, error) { + block, err := s.RetrieveBlockByNumber(blockNum) + if err != nil { + // note: allow ErrNotFoundInIndex to pass through + return nil, err + } + return extractEnvelopeFromBlock(block, tranNum) +} + +// RetrieveBlockByTxID returns a block for a given transaction ID +func (s *cdbBlockStore) RetrieveBlockByTxID(txID string) (*common.Block, error) { + blockHash, err := s.retrieveBlockHashByTxID(txID) + if err != nil { + return nil, err + } + + return s.RetrieveBlockByHash(blockHash) +} + +func (s *cdbBlockStore) retrieveBlockHashByTxID(txID string) ([]byte, error) { + jsonResult, err := retrieveJSONQuery(s.txnStore, txID) + if err != nil { + // note: allow ErrNotFoundInIndex to pass through + logger.Errorf("retrieving transaction document from DB failed : %s", err) + return nil, blkstorage.ErrNotFoundInIndex + } + + blockHashStoredUT, ok := jsonResult[txnBlockHashField] + if !ok { + return nil, errors.Errorf("block hash was not found for transaction ID [%s]", txID) + } + + blockHashStored, ok := blockHashStoredUT.(string) + if !ok { + return nil, errors.Errorf("block hash has invalid type for transaction ID [%s]", txID) + } + + blockHash, err := hex.DecodeString(blockHashStored) + if err != nil { + return nil, errors.Wrapf(err, "block hash was invalid for transaction ID [%s]", txID) + } + + return blockHash, nil +} + +// RetrieveTxValidationCodeByTxID returns a TX validation code for a given transaction ID +func (s *cdbBlockStore) RetrieveTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) { + jsonResult, err := retrieveJSONQuery(s.txnStore, txID) + if err != nil { + // note: allow ErrNotFoundInIndex to pass through + return peer.TxValidationCode(-1), err + } + + txnValidationCodeStoredUT, ok := jsonResult[txnValidationCode] + if !ok { + return peer.TxValidationCode_INVALID_OTHER_REASON, errors.Errorf("validation code was not found for transaction ID [%s]", txID) + } + + txnValidationCodeStored, ok := txnValidationCodeStoredUT.(string) + if !ok { + return peer.TxValidationCode_INVALID_OTHER_REASON, errors.Errorf("validation code has invalid type for transaction ID [%s]", txID) + } + + const sizeOfTxValidationCode = 32 + txnValidationCode, err := strconv.ParseInt(txnValidationCodeStored, txnValidationCodeBase, sizeOfTxValidationCode) + if err != nil { + return peer.TxValidationCode_INVALID_OTHER_REASON, errors.Wrapf(err, "validation code was invalid for transaction ID [%s]", txID) + } + + return peer.TxValidationCode(txnValidationCode), nil +} + +// Shutdown closes the storage instance +func (s *cdbBlockStore) Shutdown() { +} + +func (s *cdbBlockStore) updateCheckpoint(cpInfo *checkpointInfo) { + s.cpInfoCond.L.Lock() + defer s.cpInfoCond.L.Unlock() + s.cpInfo = cpInfo + logger.Debugf("Broadcasting about update checkpointInfo: %s", cpInfo) + s.cpInfoCond.Broadcast() +} diff --git a/pkg/blkstorage/cdbblkstorage/cdb_blkstorage_provider.go b/pkg/blkstorage/cdbblkstorage/cdb_blkstorage_provider.go new file mode 100644 index 00000000..31973e96 --- /dev/null +++ b/pkg/blkstorage/cdbblkstorage/cdb_blkstorage_provider.go @@ -0,0 +1,100 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cdbblkstorage + +import ( + "strings" + + "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/common/ledger/blkstorage" + "github.com/hyperledger/fabric/common/metrics/disabled" + "github.com/hyperledger/fabric/core/ledger/util/couchdb" + "github.com/pkg/errors" +) + +var logger = flogging.MustGetLogger("cdbblkstorage") + +const ( + blockStoreName = "blocks" + txnStoreName = "transactions" +) + +// CDBBlockstoreProvider provides block storage in CouchDB +type CDBBlockstoreProvider struct { + couchInstance *couchdb.CouchInstance + indexConfig *blkstorage.IndexConfig +} + +// NewProvider creates a new CouchDB BlockStoreProvider +func NewProvider(indexConfig *blkstorage.IndexConfig) (blkstorage.BlockStoreProvider, error) { + logger.Debugf("constructing CouchDB block storage provider") + couchDBDef := couchdb.GetCouchDBDefinition() + couchInstance, err := couchdb.CreateCouchInstance(couchDBDef.URL, couchDBDef.Username, couchDBDef.Password, + couchDBDef.MaxRetries, couchDBDef.MaxRetriesOnStartup, couchDBDef.RequestTimeout, couchDBDef.CreateGlobalChangesDB, &disabled.Provider{}) + if err != nil { + return nil, errors.WithMessage(err, "obtaining CouchDB instance failed") + } + return &CDBBlockstoreProvider{couchInstance, indexConfig}, nil +} + +// CreateBlockStore creates a block store instance for the given ledger ID +func (p *CDBBlockstoreProvider) CreateBlockStore(ledgerid string) (blkstorage.BlockStore, error) { + return p.OpenBlockStore(ledgerid) +} + +// OpenBlockStore opens the block store for the given ledger ID +func (p *CDBBlockstoreProvider) OpenBlockStore(ledgerid string) (blkstorage.BlockStore, error) { + id := strings.ToLower(ledgerid) + blockStoreDBName := couchdb.ConstructBlockchainDBName(id, blockStoreName) + blockStoreDB, err := couchdb.CreateCouchDatabase(p.couchInstance, blockStoreDBName) + if err != nil { + return nil, err + } + + txnStoreDBName := couchdb.ConstructBlockchainDBName(id, txnStoreName) + txnStoreDB, err := couchdb.CreateCouchDatabase(p.couchInstance, txnStoreDBName) + if err != nil { + return nil, err + } + + err = p.createBlockStoreIndices(blockStoreDB) + if err != nil { + return nil, err + } + + return newCDBBlockStore(blockStoreDB, txnStoreDB, ledgerid), nil +} + +func (p *CDBBlockstoreProvider) createBlockStoreIndices(db *couchdb.CouchDatabase) error { + _, err := db.CreateIndex(blockHashIndexDef) + if err != nil { + return errors.WithMessage(err, "creation of block hash index failed") + } + + return nil +} + +// Exists returns whether or not the given ledger ID exists +func (p *CDBBlockstoreProvider) Exists(ledgerid string) (bool, error) { + id := strings.ToLower(ledgerid) + blockStoreDBName := couchdb.ConstructBlockchainDBName(id, blockStoreName) + blockStoreDB, err := couchdb.NewCouchDatabase(p.couchInstance, blockStoreDBName) + if err != nil { + return false, err + } + + return blockStoreDB.Exists() +} + +// List returns the available ledger IDs, not supported in couchdb block storage +func (p *CDBBlockstoreProvider) List() ([]string, error) { + panic("not supported") +} + +// Close cleans up the Provider +func (p *CDBBlockstoreProvider) Close() { +} diff --git a/pkg/blkstorage/cdbblkstorage/cdb_blkstorage_provider_test.go b/pkg/blkstorage/cdbblkstorage/cdb_blkstorage_provider_test.go new file mode 100644 index 00000000..f386a149 --- /dev/null +++ b/pkg/blkstorage/cdbblkstorage/cdb_blkstorage_provider_test.go @@ -0,0 +1,162 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cdbblkstorage + +import ( + "fmt" + "os" + "testing" + + "github.com/golang/protobuf/proto" + + "github.com/hyperledger/fabric/common/ledger/blkstorage" + "github.com/hyperledger/fabric/common/ledger/testutil" + "github.com/hyperledger/fabric/core/ledger/util" + "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protos/peer" + "github.com/hyperledger/fabric/protoutil" + "github.com/stretchr/testify/assert" + xtestutil "github.com/trustbloc/fabric-peer-ext/pkg/testutil" +) + +func TestMain(m *testing.M) { + + //setup extension test environment + _, _, destroy := xtestutil.SetupExtTestEnv() + + code := m.Run() + + destroy() + + os.Exit(code) +} + +func TestMultipleBlockStores(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + + provider := env.provider + store1, _ := provider.OpenBlockStore("ledger1") + defer store1.Shutdown() + + store2, _ := provider.CreateBlockStore("ledger2") + defer store2.Shutdown() + + blocks1 := testutil.ConstructTestBlocks(t, 5) + for _, b := range blocks1 { + store1.AddBlock(b) + } + + blocks2 := testutil.ConstructTestBlocks(t, 10) + for _, b := range blocks2 { + store2.AddBlock(b) + } + + checkBlocks(t, blocks1, store1) + checkBlocks(t, blocks2, store2) + + checkWithWrongInputs(t, store1, 5) + checkWithWrongInputs(t, store2, 10) +} + +func checkBlocks(t *testing.T, expectedBlocks []*common.Block, store blkstorage.BlockStore) { + bcInfo, _ := store.GetBlockchainInfo() + assert.Equal(t, uint64(len(expectedBlocks)), bcInfo.Height) + assert.Equal(t, protoutil.BlockHeaderHash(expectedBlocks[len(expectedBlocks)-1].GetHeader()), bcInfo.CurrentBlockHash) + + itr, _ := store.RetrieveBlocks(0) + for i := 0; i < len(expectedBlocks); i++ { + block, _ := itr.Next() + assert.True(t, proto.Equal(expectedBlocks[i], block.(*common.Block))) + } + + for blockNum := 0; blockNum < len(expectedBlocks); blockNum++ { + block := expectedBlocks[blockNum] + flags := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) + retrievedBlock, _ := store.RetrieveBlockByNumber(uint64(blockNum)) + assert.True(t, proto.Equal(block, retrievedBlock)) + + retrievedBlock, _ = store.RetrieveBlockByHash(protoutil.BlockHeaderHash(block.Header)) + assert.True(t, proto.Equal(block, retrievedBlock)) + + for txNum := 0; txNum < len(block.Data.Data); txNum++ { + txEnvBytes := block.Data.Data[txNum] + txEnv, _ := protoutil.GetEnvelopeFromBlock(txEnvBytes) + txid, err := extractTxID(txEnvBytes) + assert.NoError(t, err) + + retrievedBlock, _ := store.RetrieveBlockByTxID(txid) + assert.True(t, proto.Equal(block, retrievedBlock)) + + retrievedTxEnv, _ := store.RetrieveTxByID(txid) + assert.Equal(t, txEnv, retrievedTxEnv) + + retrievedTxEnv, _ = store.RetrieveTxByBlockNumTranNum(uint64(blockNum), uint64(txNum)) + assert.Equal(t, txEnv, retrievedTxEnv) + + retrievedTxValCode, err := store.RetrieveTxValidationCodeByTxID(txid) + assert.NoError(t, err) + assert.Equal(t, flags.Flag(txNum), retrievedTxValCode) + } + } +} + +func checkWithWrongInputs(t *testing.T, store blkstorage.BlockStore, numBlocks int) { + block, err := store.RetrieveBlockByHash([]byte("non-existent-hash")) + assert.Nil(t, block) + assert.Equal(t, blkstorage.ErrNotFoundInIndex, err) + + block, err = store.RetrieveBlockByTxID("non-existent-txid") + assert.Nil(t, block) + assert.Equal(t, blkstorage.ErrNotFoundInIndex, err) + + tx, err := store.RetrieveTxByID("non-existent-txid") + assert.Nil(t, tx) + assert.Equal(t, blkstorage.ErrNotFoundInIndex, err) + + tx, err = store.RetrieveTxByBlockNumTranNum(uint64(numBlocks+1), uint64(0)) + assert.Nil(t, tx) + assert.Equal(t, blkstorage.ErrNotFoundInIndex, err) + + txCode, err := store.RetrieveTxValidationCodeByTxID("non-existent-txid") + assert.Equal(t, peer.TxValidationCode(-1), txCode) + assert.Equal(t, blkstorage.ErrNotFoundInIndex, err) +} + +func TestBlockStoreProvider(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + const numStores = 10 + + provider := env.provider + var stores []blkstorage.BlockStore + var allLedgerIDs []string + for i := 0; i < numStores; i++ { + allLedgerIDs = append(allLedgerIDs, constructLedgerid(i)) + } + + for _, id := range allLedgerIDs { + store, _ := provider.OpenBlockStore(id) + defer store.Shutdown() + stores = append(stores, store) + } + + for _, id := range allLedgerIDs { + exists, err := provider.Exists(id) + assert.NoError(t, err) + assert.Equal(t, true, exists) + } + + exists, err := provider.Exists(constructLedgerid(numStores + 1)) + assert.NoError(t, err) + assert.Equal(t, false, exists) + +} + +func constructLedgerid(id int) string { + return fmt.Sprintf("ledger_%d", id) +} diff --git a/pkg/blkstorage/cdbblkstorage/cdb_blockstorage_test.go b/pkg/blkstorage/cdbblkstorage/cdb_blockstorage_test.go new file mode 100644 index 00000000..5f641013 --- /dev/null +++ b/pkg/blkstorage/cdbblkstorage/cdb_blockstorage_test.go @@ -0,0 +1,25 @@ +package cdbblkstorage + +import ( + "testing" + + "github.com/hyperledger/fabric/common/ledger/testutil" + "github.com/stretchr/testify/assert" +) + +func TestWrongBlockNumber(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + + provider := env.provider + store, _ := provider.OpenBlockStore("testLedger-1") + defer store.Shutdown() + + blocks := testutil.ConstructTestBlocks(t, 5) + for i := 0; i < 3; i++ { + err := store.AddBlock(blocks[i]) + assert.NoError(t, err) + } + err := store.AddBlock(blocks[4]) + assert.Error(t, err, "Error should have been thrown when adding block number 4 while block number 3 is expected") +} diff --git a/pkg/blkstorage/cdbblkstorage/cdb_checkpoint.go b/pkg/blkstorage/cdbblkstorage/cdb_checkpoint.go new file mode 100644 index 00000000..813ffd02 --- /dev/null +++ b/pkg/blkstorage/cdbblkstorage/cdb_checkpoint.go @@ -0,0 +1,121 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cdbblkstorage + +import ( + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/core/ledger/util/couchdb" + "github.com/pkg/errors" +) + +const blkMgrInfoKey = "blkMgrInfo" + +type checkpoint struct { + db *couchdb.CouchDatabase +} + +// checkpointInfo +type checkpointInfo struct { + isChainEmpty bool + lastBlockNumber uint64 + currentHash []byte +} + +func newCheckpoint(db *couchdb.CouchDatabase) *checkpoint { + return &checkpoint{db: db} +} + +func (cp *checkpoint) getCheckpointInfo() *checkpointInfo { + cpInfo, err := cp.loadCurrentInfo() + if err != nil { + panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err)) + } + if cpInfo == nil { + cpInfo = &checkpointInfo{ + isChainEmpty: true, + lastBlockNumber: 0} + } + return cpInfo +} + +//Get the current checkpoint information that is stored in the database +func (cp *checkpoint) loadCurrentInfo() (*checkpointInfo, error) { + doc, _, err := cp.db.ReadDoc(blkMgrInfoKey) + if err != nil { + return nil, errors.WithMessage(err, fmt.Sprintf("retrieval of checkpointInfo from couchDB failed [%s]", blkMgrInfoKey)) + } + if doc == nil { + return nil, nil + } + checkpointInfo, err := couchDocToCheckpointInfo(doc) + if err != nil { + return nil, errors.WithMessage(err, fmt.Sprintf("unmarshal of checkpointInfo from couchDB failed [%s]", blkMgrInfoKey)) + } + logger.Debugf("loaded checkpointInfo:%s", checkpointInfo) + return checkpointInfo, nil +} + +func (cp *checkpoint) saveCurrentInfo(i *checkpointInfo) error { + doc, err := checkpointInfoToCouchDoc(i) + if err != nil { + return errors.WithMessage(err, "converting checkpointInfo to couchDB document failed") + } + _, err = cp.db.SaveDoc(blkMgrInfoKey, "", doc) + if err != nil { + return errors.WithMessage(err, "adding checkpointInfo to couchDB failed") + } + return nil +} + +func (i *checkpointInfo) marshal() ([]byte, error) { + buffer := proto.NewBuffer([]byte{}) + var err error + if err = buffer.EncodeVarint(i.lastBlockNumber); err != nil { + return nil, err + } + + if err = buffer.EncodeRawBytes(i.currentHash); err != nil { + return nil, err + } + + var chainEmptyMarker uint64 + if i.isChainEmpty { + chainEmptyMarker = 1 + } + if err = buffer.EncodeVarint(chainEmptyMarker); err != nil { + return nil, err + } + + return buffer.Bytes(), nil +} + +func (i *checkpointInfo) unmarshal(b []byte) error { + buffer := proto.NewBuffer(b) + var chainEmptyMarker uint64 + var err error + + if i.lastBlockNumber, err = buffer.DecodeVarint(); err != nil { + return err + } + + if i.currentHash, err = buffer.DecodeRawBytes(false); err != nil { + return err + } + + if len(i.currentHash) == 0 { + i.currentHash = nil + } + + if chainEmptyMarker, err = buffer.DecodeVarint(); err != nil { + return err + } + i.isChainEmpty = chainEmptyMarker == 1 + + return nil +} diff --git a/pkg/blkstorage/cdbblkstorage/couchdoc_conv.go b/pkg/blkstorage/cdbblkstorage/couchdoc_conv.go new file mode 100644 index 00000000..c70ae3a1 --- /dev/null +++ b/pkg/blkstorage/cdbblkstorage/couchdoc_conv.go @@ -0,0 +1,377 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cdbblkstorage + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "strconv" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/common/ledger/blkstorage" + ledgerUtil "github.com/hyperledger/fabric/core/ledger/util" + "github.com/hyperledger/fabric/core/ledger/util/couchdb" + "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protos/peer" + "github.com/hyperledger/fabric/protoutil" + "github.com/pkg/errors" +) + +// block document +const ( + idField = "_id" + blockHashField = "hash" + blockTxnsField = "transactions" + blockTxnIDField = "id" + blockHashIndexName = "by_hash" + blockHashIndexDoc = "indexHash" + blockAttachmentName = "block" + blockKeyPrefix = "" + blockHeaderField = "header" +) + +// txn document +const ( + txnBlockNumberField = "block_number" + txnBlockHashField = "block_hash" + txnAttachmentName = "transaction" + txnValidationCode = "validation_code" + txnValidationCodeBase = 16 +) + +// checkpoint document +const ( + cpiAttachmentName = "checkpointinfo" + cpiAttachmentContentType = "application/octet-stream" +) + +const blockHashIndexDef = ` + { + "index": { + "fields": ["` + blockHeaderField + `.` + blockHashField + `"] + }, + "name": "` + blockHashIndexName + `", + "ddoc": "` + blockHashIndexDoc + `", + "type": "json" + }` + +type jsonValue map[string]interface{} + +func (v jsonValue) toBytes() ([]byte, error) { + return json.Marshal(v) +} + +func blockToCouchDoc(block *common.Block) (*couchdb.CouchDoc, error) { + jsonMap := make(jsonValue) + + blockHeader := block.GetHeader() + + key := blockNumberToKey(blockHeader.GetNumber()) + blockHashHex := hex.EncodeToString(protoutil.BlockHeaderHash(blockHeader)) + blockTxns, err := blockToTransactionsField(block) + if err != nil { + return nil, err + } + + jsonMap[idField] = key + header := make(jsonValue) + header[blockHashField] = blockHashHex + jsonMap[blockHeaderField] = header + jsonMap[blockTxnsField] = blockTxns + + jsonBytes, err := jsonMap.toBytes() + if err != nil { + return nil, err + } + couchDoc := &couchdb.CouchDoc{JSONValue: jsonBytes} + + attachment, err := blockToAttachment(block) + if err != nil { + return nil, err + } + + attachments := append([]*couchdb.AttachmentInfo{}, attachment) + couchDoc.Attachments = attachments + return couchDoc, nil +} + +func blockToTxnCouchDocs(block *common.Block, attachTxn bool) ([]*couchdb.CouchDoc, error) { + blockHeader := block.GetHeader() + blockNumber := blockNumberToKey(blockHeader.GetNumber()) + blockHash := hex.EncodeToString(protoutil.BlockHeaderHash(blockHeader)) + + blockData := block.GetData() + + blockMetadata := block.GetMetadata() + txValidationFlags := ledgerUtil.TxValidationFlags(blockMetadata.GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) + + txnDocs := make([]*couchdb.CouchDoc, 0) + + for i, txEnvelopeBytes := range blockData.GetData() { + envelope, err := protoutil.GetEnvelopeFromBlock(txEnvelopeBytes) + if err != nil { + return nil, err + } + + txnDoc, err := blockTxnToCouchDoc(blockNumber, blockHash, envelope, txValidationFlags.Flag(i), attachTxn) + if err == errorNoTxID { + continue + } else if err != nil { + return nil, err + } + + txnDocs = append(txnDocs, txnDoc) + } + + return txnDocs, nil +} + +var errorNoTxID = errors.New("missing transaction ID") + +func blockTxnToCouchDoc(blockNumber string, blockHash string, txEnvelope *common.Envelope, validationCode peer.TxValidationCode, attachTxn bool) (*couchdb.CouchDoc, error) { + txID, err := extractTxIDFromEnvelope(txEnvelope) + if err != nil { + return nil, errors.WithMessage(err, "transaction ID could not be extracted") + } + + // TODO: is the empty transaction queryable? If so, need to change this to a default transaction ID. + if txID == "" { + return nil, errorNoTxID + } + + jsonMap := make(jsonValue) + jsonMap[idField] = txID + jsonMap[txnBlockHashField] = blockHash + jsonMap[txnBlockNumberField] = blockNumber + jsonMap[txnValidationCode] = strconv.FormatInt(int64(validationCode), txnValidationCodeBase) + + jsonBytes, err := jsonMap.toBytes() + if err != nil { + return nil, err + } + couchDoc := &couchdb.CouchDoc{JSONValue: jsonBytes} + + if attachTxn { + attachment, err := txnEnvelopeToAttachment(txEnvelope) + if err != nil { + return nil, err + } + + attachments := append([]*couchdb.AttachmentInfo{}, attachment) + couchDoc.Attachments = attachments + } + return couchDoc, nil +} + +func checkpointInfoToCouchDoc(i *checkpointInfo) (*couchdb.CouchDoc, error) { + jsonMap := make(jsonValue) + + jsonMap[idField] = blkMgrInfoKey + + jsonBytes, err := jsonMap.toBytes() + if err != nil { + return nil, err + } + couchDoc := &couchdb.CouchDoc{JSONValue: jsonBytes} + + attachment, err := checkpointInfoToAttachment(i) + if err != nil { + return nil, err + } + + attachments := append([]*couchdb.AttachmentInfo{}, attachment) + couchDoc.Attachments = attachments + return couchDoc, nil +} + +func checkpointInfoToAttachment(i *checkpointInfo) (*couchdb.AttachmentInfo, error) { + checkpointInfoBytes, err := i.marshal() + if err != nil { + return nil, errors.Wrapf(err, "marshaling checkpointInfo failed") + } + + attachment := &couchdb.AttachmentInfo{} + attachment.AttachmentBytes = checkpointInfoBytes + attachment.ContentType = cpiAttachmentContentType + attachment.Name = cpiAttachmentName + + return attachment, nil +} + +func blockToTransactionsField(block *common.Block) ([]jsonValue, error) { + blockData := block.GetData() + + var txns []jsonValue + + for _, txEnvelopeBytes := range blockData.GetData() { + envelope, err := protoutil.GetEnvelopeFromBlock(txEnvelopeBytes) + if err != nil { + return nil, err + } + + txID, err := extractTxIDFromEnvelope(envelope) + if err != nil { + return nil, errors.WithMessage(err, "transaction ID could not be extracted") + } + + txField := make(jsonValue) + txField[blockTxnIDField] = txID + + txns = append(txns, txField) + } + + return txns, nil +} + +func txnEnvelopeToAttachment(txEnvelope *common.Envelope) (*couchdb.AttachmentInfo, error) { + txEnvelopeBytes, err := proto.Marshal(txEnvelope) + if err != nil { + return nil, errors.Wrapf(err, "marshaling block failed") + } + + attachment := &couchdb.AttachmentInfo{} + attachment.AttachmentBytes = txEnvelopeBytes + attachment.ContentType = cpiAttachmentContentType + attachment.Name = txnAttachmentName + + return attachment, nil +} + +func blockToAttachment(block *common.Block) (*couchdb.AttachmentInfo, error) { + blockBytes, err := proto.Marshal(block) + if err != nil { + return nil, errors.Wrapf(err, "marshaling block failed") + } + + attachment := &couchdb.AttachmentInfo{} + attachment.AttachmentBytes = blockBytes + attachment.ContentType = cpiAttachmentContentType + attachment.Name = blockAttachmentName + + return attachment, nil +} + +func couchDocToBlock(doc *couchdb.CouchDoc) (*common.Block, error) { + return couchAttachmentsToBlock(doc.Attachments) +} + +func couchAttachmentsToBlock(attachments []*couchdb.AttachmentInfo) (*common.Block, error) { + var blockBytes []byte + block := common.Block{} + + // get binary data from attachment + for _, a := range attachments { + if a.Name == blockAttachmentName { + blockBytes = a.AttachmentBytes + } + } + + if len(blockBytes) == 0 { + return nil, errors.New("block is not within couchDB document") + } + + err := proto.Unmarshal(blockBytes, &block) + if err != nil { + return nil, errors.Wrapf(err, "block from couchDB document could not be unmarshaled") + } + + return &block, nil +} + +func couchAttachmentsToTxnEnvelope(attachments []*couchdb.AttachmentInfo) (*common.Envelope, error) { + var envelope common.Envelope + var txnBytes []byte + + // get binary data from attachment + for _, a := range attachments { + if a.Name == txnAttachmentName { + txnBytes = a.AttachmentBytes + } + } + + if len(txnBytes) == 0 { + return nil, errors.New("transaction envelope is not within couchDB document") + } + + err := proto.Unmarshal(txnBytes, &envelope) + if err != nil { + return nil, errors.Wrapf(err, "transaction from couchDB document could not be unmarshaled") + } + + return &envelope, nil +} + +func couchDocToCheckpointInfo(doc *couchdb.CouchDoc) (*checkpointInfo, error) { + return couchAttachmentsToCheckpointInfo(doc.Attachments) +} + +func couchAttachmentsToCheckpointInfo(attachments []*couchdb.AttachmentInfo) (*checkpointInfo, error) { + var checkpointInfoBytes []byte + cpInfo := checkpointInfo{} + // get binary data from attachment + for _, a := range attachments { + if a.Name == cpiAttachmentName { + checkpointInfoBytes = a.AttachmentBytes + } + } + if len(checkpointInfoBytes) == 0 { + return nil, errors.New("checkpointInfo is not within couchDB document") + } + err := cpInfo.unmarshal(checkpointInfoBytes) + if err != nil { + return nil, errors.Wrapf(err, "checkpointInfo from couchDB document could not be unmarshaled") + } + return &cpInfo, nil +} + +func blockNumberToKey(blockNum uint64) string { + return blockKeyPrefix + strconv.FormatUint(blockNum, 10) +} + +func retrieveBlockQuery(db *couchdb.CouchDatabase, query string) (*common.Block, error) { + results, _, err := db.QueryDocuments(query) + if err != nil { + return nil, err + } + + if len(results) == 0 { + return nil, blkstorage.ErrNotFoundInIndex + } + + if len(results[0].Attachments) == 0 { + return nil, errors.New("block bytes not found") + } + + return couchAttachmentsToBlock(results[0].Attachments) +} + +func retrieveJSONQuery(db *couchdb.CouchDatabase, id string) (jsonValue, error) { + doc, _, err := db.ReadDoc(id) + if err != nil { + return nil, err + } + if doc == nil { + return nil, blkstorage.ErrNotFoundInIndex + } + + return couchDocToJSON(doc) +} + +func couchDocToJSON(doc *couchdb.CouchDoc) (jsonValue, error) { + // create a generic map unmarshal the json + jsonResult := make(map[string]interface{}) + decoder := json.NewDecoder(bytes.NewBuffer(doc.JSONValue)) + decoder.UseNumber() + + err := decoder.Decode(&jsonResult) + if err != nil { + return nil, errors.Wrapf(err, "result from DB is not JSON encoded") + } + + return jsonResult, nil +} diff --git a/pkg/blkstorage/cdbblkstorage/pkg_test.go b/pkg/blkstorage/cdbblkstorage/pkg_test.go new file mode 100644 index 00000000..f88814d3 --- /dev/null +++ b/pkg/blkstorage/cdbblkstorage/pkg_test.go @@ -0,0 +1,64 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package cdbblkstorage + +import ( + "testing" + + "github.com/hyperledger/fabric/protoutil" + + "github.com/hyperledger/fabric/common/ledger/blkstorage" + "github.com/stretchr/testify/assert" +) + +type testEnv struct { + t testing.TB + provider *CDBBlockstoreProvider +} + +func newTestEnv(t testing.TB) *testEnv { + attrsToIndex := []blkstorage.IndexableAttr{ + blkstorage.IndexableAttrBlockHash, + blkstorage.IndexableAttrBlockNum, + blkstorage.IndexableAttrTxID, + blkstorage.IndexableAttrBlockNumTranNum, + blkstorage.IndexableAttrBlockTxID, + blkstorage.IndexableAttrTxValidationCode, + } + env, err := newTestEnvSelectiveIndexing(t, attrsToIndex) + assert.NoError(t, err) + return env +} + +func newTestEnvSelectiveIndexing(t testing.TB, attrsToIndex []blkstorage.IndexableAttr) (*testEnv, error) { + indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex} + privider, err := NewProvider(indexConfig) + if err != nil { + return nil, err + } + return &testEnv{t, privider.(*CDBBlockstoreProvider)}, nil +} + +func (env *testEnv) Cleanup() { + env.provider.Close() +} + +func extractTxID(txEnvelopBytes []byte) (string, error) { + txEnvelope, err := protoutil.GetEnvelopeFromBlock(txEnvelopBytes) + if err != nil { + return "", err + } + txPayload, err := protoutil.GetPayload(txEnvelope) + if err != nil { + return "", nil + } + chdr, err := protoutil.UnmarshalChannelHeader(txPayload.Header.ChannelHeader) + if err != nil { + return "", err + } + return chdr.TxId, nil +} diff --git a/pkg/testutil/sampleconfig/configtx.yaml b/pkg/testutil/sampleconfig/configtx.yaml new file mode 100644 index 00000000..bff89e85 --- /dev/null +++ b/pkg/testutil/sampleconfig/configtx.yaml @@ -0,0 +1,609 @@ +# Copyright IBM Corp. All Rights Reserved. +# +# SPDX-License-Identifier: Apache-2.0 +# + +--- +################################################################################ +# +# ORGANIZATIONS +# +# This section defines the organizational identities that can be referenced +# in the configuration profiles. +# +################################################################################ +Organizations: + + # SampleOrg defines an MSP using the sampleconfig. It should never be used + # in production but may be used as a template for other definitions. + - &SampleOrg + # Name is the key by which this org will be referenced in channel + # configuration transactions. + # Name can include alphanumeric characters as well as dots and dashes. + Name: SampleOrg + + # SkipAsForeign can be set to true for org definitions which are to be + # inherited from the orderer system channel during channel creation. This + # is especially useful when an admin of a single org without access to the + # MSP directories of the other orgs wishes to create a channel. Note + # this property must always be set to false for orgs included in block + # creation. + SkipAsForeign: false + + # ID is the key by which this org's MSP definition will be referenced. + # ID can include alphanumeric characters as well as dots and dashes. + ID: SampleOrg + + # MSPDir is the filesystem path which contains the MSP configuration. + MSPDir: msp + + # Policies defines the set of policies at this level of the config tree + # For organization policies, their canonical path is usually + # /Channel/// + Policies: &SampleOrgPolicies + Readers: + Type: Signature + Rule: "OR('SampleOrg.member')" + # If your MSP is configured with the new NodeOUs, you might + # want to use a more specific rule like the following: + # Rule: "OR('SampleOrg.admin', 'SampleOrg.peer', 'SampleOrg.client')" + Writers: + Type: Signature + Rule: "OR('SampleOrg.member')" + # If your MSP is configured with the new NodeOUs, you might + # want to use a more specific rule like the following: + # Rule: "OR('SampleOrg.admin', 'SampleOrg.client')" + Admins: + Type: Signature + Rule: "OR('SampleOrg.admin')" + Endorsement: + Type: Signature + Rule: "OR('SampleOrg.member')" + + # AnchorPeers defines the location of peers which can be used for + # cross-org gossip communication. Note, this value is only encoded in + # the genesis block in the Application section context. + AnchorPeers: + - Host: 127.0.0.1 + Port: 7051 + +################################################################################ +# +# CAPABILITIES +# +# This section defines the capabilities of fabric network. This is a new +# concept as of v1.1.0 and should not be utilized in mixed networks with +# v1.0.x peers and orderers. Capabilities define features which must be +# present in a fabric binary for that binary to safely participate in the +# fabric network. For instance, if a new MSP type is added, newer binaries +# might recognize and validate the signatures from this type, while older +# binaries without this support would be unable to validate those +# transactions. This could lead to different versions of the fabric binaries +# having different world states. Instead, defining a capability for a channel +# informs those binaries without this capability that they must cease +# processing transactions until they have been upgraded. For v1.0.x if any +# capabilities are defined (including a map with all capabilities turned off) +# then the v1.0.x peer will deliberately crash. +# +################################################################################ +Capabilities: + # Channel capabilities apply to both the orderers and the peers and must be + # supported by both. + # Set the value of the capability to true to require it. + Channel: &ChannelCapabilities + # V1.3 for Channel is a catchall flag for behavior which has been + # determined to be desired for all orderers and peers running at the v1.3.x + # level, but which would be incompatible with orderers and peers from + # prior releases. + # Prior to enabling V1.3 channel capabilities, ensure that all + # orderers and peers on a channel are at v1.3.0 or later. + V1_3: true + + # Orderer capabilities apply only to the orderers, and may be safely + # used with prior release peers. + # Set the value of the capability to true to require it. + Orderer: &OrdererCapabilities + # V1.1 for Orderer is a catchall flag for behavior which has been + # determined to be desired for all orderers running at the v1.1.x + # level, but which would be incompatible with orderers from prior releases. + # Prior to enabling V1.1 orderer capabilities, ensure that all + # orderers on a channel are at v1.1.0 or later. + V1_1: true + + # Application capabilities apply only to the peer network, and may be safely + # used with prior release orderers. + # Set the value of the capability to true to require it. + Application: &ApplicationCapabilities + # V1.3 for Application enables the new non-backwards compatible + # features and fixes of fabric v1.3. + V1_3: true + # V1.2 for Application enables the new non-backwards compatible + # features and fixes of fabric v1.2 (note, this need not be set if + # later version capabilities are set) + V1_2: false + # V1.1 for Application enables the new non-backwards compatible + # features and fixes of fabric v1.1 (note, this need not be set if + # later version capabilities are set). + V1_1: false + +################################################################################ +# +# APPLICATION +# +# This section defines the values to encode into a config transaction or +# genesis block for application-related parameters. +# +################################################################################ +Application: &ApplicationDefaults + ACLs: &ACLsDefault + # This section provides defaults for policies for various resources + # in the system. These "resources" could be functions on system chaincodes + # (e.g., "GetBlockByNumber" on the "qscc" system chaincode) or other resources + # (e.g.,who can receive Block events). This section does NOT specify the resource's + # definition or API, but just the ACL policy for it. + # + # User's can override these defaults with their own policy mapping by defining the + # mapping under ACLs in their channel definition + + #---New Lifecycle System Chaincode (_lifecycle) function to policy mapping for access control--# + + # ACL policy for _lifecycle's "CommitChaincodeDefinition" function + _lifecycle/CommitChaincodeDefinition: /Channel/Application/Writers + + # ACL policy for _lifecycle's "QueryChaincodeDefinition" function + _lifecycle/QueryChaincodeDefinition: /Channel/Application/Readers + + # ACL policy for _lifecycle's "QueryNamespaceDefinitions" function + _lifecycle/QueryNamespaceDefinitions: /Channel/Application/Readers + + #---Lifecycle System Chaincode (lscc) function to policy mapping for access control---# + + # ACL policy for lscc's "getid" function + lscc/ChaincodeExists: /Channel/Application/Readers + + # ACL policy for lscc's "getdepspec" function + lscc/GetDeploymentSpec: /Channel/Application/Readers + + # ACL policy for lscc's "getccdata" function + lscc/GetChaincodeData: /Channel/Application/Readers + + # ACL Policy for lscc's "getchaincodes" function + lscc/GetInstantiatedChaincodes: /Channel/Application/Readers + + #---Query System Chaincode (qscc) function to policy mapping for access control---# + + # ACL policy for qscc's "GetChainInfo" function + qscc/GetChainInfo: /Channel/Application/Readers + + # ACL policy for qscc's "GetBlockByNumber" function + qscc/GetBlockByNumber: /Channel/Application/Readers + + # ACL policy for qscc's "GetBlockByHash" function + qscc/GetBlockByHash: /Channel/Application/Readers + + # ACL policy for qscc's "GetTransactionByID" function + qscc/GetTransactionByID: /Channel/Application/Readers + + # ACL policy for qscc's "GetBlockByTxID" function + qscc/GetBlockByTxID: /Channel/Application/Readers + + #---Configuration System Chaincode (cscc) function to policy mapping for access control---# + + # ACL policy for cscc's "GetConfigBlock" function + cscc/GetConfigBlock: /Channel/Application/Readers + + # ACL policy for cscc's "GetConfigTree" function + cscc/GetConfigTree: /Channel/Application/Readers + + # ACL policy for cscc's "SimulateConfigTreeUpdate" function + cscc/SimulateConfigTreeUpdate: /Channel/Application/Readers + + #---Miscellanesous peer function to policy mapping for access control---# + + # ACL policy for invoking chaincodes on peer + peer/Propose: /Channel/Application/Writers + + # ACL policy for chaincode to chaincode invocation + peer/ChaincodeToChaincode: /Channel/Application/Readers + + #---Events resource to policy mapping for access control###---# + + # ACL policy for sending block events + event/Block: /Channel/Application/Readers + + # ACL policy for sending filtered block events + event/FilteredBlock: /Channel/Application/Readers + + # Organizations lists the orgs participating on the application side of the + # network. + Organizations: + + # Policies defines the set of policies at this level of the config tree + # For Application policies, their canonical path is + # /Channel/Application/ + Policies: &ApplicationDefaultPolicies + LifecycleEndorsement: + Type: ImplicitMeta + Rule: "MAJORITY Endorsement" + Endorsement: + Type: ImplicitMeta + Rule: "MAJORITY Endorsement" + Readers: + Type: ImplicitMeta + Rule: "ANY Readers" + Writers: + Type: ImplicitMeta + Rule: "ANY Writers" + Admins: + Type: ImplicitMeta + Rule: "MAJORITY Admins" + + # Capabilities describes the application level capabilities, see the + # dedicated Capabilities section elsewhere in this file for a full + # description + Capabilities: + <<: *ApplicationCapabilities + +################################################################################ +# +# ORDERER +# +# This section defines the values to encode into a config transaction or +# genesis block for orderer related parameters. +# +################################################################################ +Orderer: &OrdererDefaults + + # Orderer Type: The orderer implementation to start. + # Available types are "solo" and "kafka". + OrdererType: solo + + # Addresses here is a nonexhaustive list of orderers the peers and clients can + # connect to. Adding/removing nodes from this list has no impact on their + # participation in ordering. + # NOTE: In the solo case, this should be a one-item list. + Addresses: + - 127.0.0.1:7050 + + # Batch Timeout: The amount of time to wait before creating a batch. + BatchTimeout: 2s + + # Batch Size: Controls the number of messages batched into a block. + # The orderer views messages opaquely, but typically, messages may + # be considered to be Fabric transactions. The 'batch' is the group + # of messages in the 'data' field of the block. Blocks will be a few kb + # larger than the batch size, when signatures, hashes, and other metadata + # is applied. + BatchSize: + + # Max Message Count: The maximum number of messages to permit in a + # batch. No block will contain more than this number of messages. + MaxMessageCount: 500 + + # Absolute Max Bytes: The absolute maximum number of bytes allowed for + # the serialized messages in a batch. The maximum block size is this value + # plus the size of the associated metadata (usually a few KB depending + # upon the size of the signing identities). Any transaction larger than + # this value will be rejected by ordering. If the "kafka" OrdererType is + # selected, set 'message.max.bytes' and 'replica.fetch.max.bytes' on + # the Kafka brokers to a value that is larger than this one. + AbsoluteMaxBytes: 10 MB + + # Preferred Max Bytes: The preferred maximum number of bytes allowed + # for the serialized messages in a batch. Roughly, this field may be considered + # the best effort maximum size of a batch. A batch will fill with messages + # until this size is reached (or the max message count, or batch timeout is + # exceeded). If adding a new message to the batch would cause the batch to + # exceed the preferred max bytes, then the current batch is closed and written + # to a block, and a new batch containing the new message is created. If a + # message larger than the preferred max bytes is received, then its batch + # will contain only that message. Because messages may be larger than + # preferred max bytes (up to AbsoluteMaxBytes), some batches may exceed + # the preferred max bytes, but will always contain exactly one transaction. + PreferredMaxBytes: 2 MB + + # Max Channels is the maximum number of channels to allow on the ordering + # network. When set to 0, this implies no maximum number of channels. + MaxChannels: 0 + + Kafka: + # Brokers: A list of Kafka brokers to which the orderer connects. Edit + # this list to identify the brokers of the ordering service. + # NOTE: Use IP:port notation. + Brokers: + - kafka0:9092 + - kafka1:9092 + - kafka2:9092 + + # EtcdRaft defines configuration which must be set when the "etcdraft" + # orderertype is chosen. + EtcdRaft: + # The set of Raft replicas for this network. For the etcd/raft-based + # implementation, we expect every replica to also be an OSN. Therefore, + # a subset of the host:port items enumerated in this list should be + # replicated under the Orderer.Addresses key above. + Consenters: + - Host: raft0.example.com + Port: 7050 + ClientTLSCert: path/to/ClientTLSCert0 + ServerTLSCert: path/to/ServerTLSCert0 + - Host: raft1.example.com + Port: 7050 + ClientTLSCert: path/to/ClientTLSCert1 + ServerTLSCert: path/to/ServerTLSCert1 + - Host: raft2.example.com + Port: 7050 + ClientTLSCert: path/to/ClientTLSCert2 + ServerTLSCert: path/to/ServerTLSCert2 + + # Options to be specified for all the etcd/raft nodes. The values here + # are the defaults for all new channels and can be modified on a + # per-channel basis via configuration updates. + Options: + # TickInterval is the time interval between two Node.Tick invocations. + TickInterval: 500ms + + # ElectionTick is the number of Node.Tick invocations that must pass + # between elections. That is, if a follower does not receive any + # message from the leader of current term before ElectionTick has + # elapsed, it will become candidate and start an election. + # ElectionTick must be greater than HeartbeatTick. + ElectionTick: 10 + + # HeartbeatTick is the number of Node.Tick invocations that must + # pass between heartbeats. That is, a leader sends heartbeat + # messages to maintain its leadership every HeartbeatTick ticks. + HeartbeatTick: 1 + + # MaxInflightMsgs limits the max number of in-flight append messages + # during optimistic replication phase. + MaxInflightMsgs: 5 + + # MaxSizePerMsg limits the max size of each append message. Smaller + # value lowers the raft recovery cost(initial probing and message + # lost during normal operation). On the other side, it might affect + # the throughput during normal replication. + MaxSizePerMsg: 1048576 + + # SnapshotInterval defines number of bytes per which a snapshot is taken + SnapshotInterval: 100 MB + + # Organizations lists the orgs participating on the orderer side of the + # network. + Organizations: + + # Policies defines the set of policies at this level of the config tree + # For Orderer policies, their canonical path is + # /Channel/Orderer/ + Policies: + Readers: + Type: ImplicitMeta + Rule: "ANY Readers" + Writers: + Type: ImplicitMeta + Rule: "ANY Writers" + Admins: + Type: ImplicitMeta + Rule: "MAJORITY Admins" + # BlockValidation specifies what signatures must be included in the block + # from the orderer for the peer to validate it. + BlockValidation: + Type: ImplicitMeta + Rule: "ANY Writers" + + # Capabilities describes the orderer level capabilities, see the + # dedicated Capabilities section elsewhere in this file for a full + # description + Capabilities: + <<: *OrdererCapabilities + +################################################################################ +# +# CHANNEL +# +# This section defines the values to encode into a config transaction or +# genesis block for channel related parameters. +# +################################################################################ +Channel: &ChannelDefaults + # Policies defines the set of policies at this level of the config tree + # For Channel policies, their canonical path is + # /Channel/ + Policies: + # Who may invoke the 'Deliver' API + Readers: + Type: ImplicitMeta + Rule: "ANY Readers" + # Who may invoke the 'Broadcast' API + Writers: + Type: ImplicitMeta + Rule: "ANY Writers" + # By default, who may modify elements at this config level + Admins: + Type: ImplicitMeta + Rule: "MAJORITY Admins" + + + # Capabilities describes the channel level capabilities, see the + # dedicated Capabilities section elsewhere in this file for a full + # description + Capabilities: + <<: *ChannelCapabilities + +################################################################################ +# +# PROFILES +# +# Different configuration profiles may be encoded here to be specified as +# parameters to the configtxgen tool. The profiles which specify consortiums +# are to be used for generating the orderer genesis block. With the correct +# consortium members defined in the orderer genesis block, channel creation +# requests may be generated with only the org member names and a consortium +# name. +# +################################################################################ +Profiles: + + # SampleSingleMSPSolo defines a configuration which uses the Solo orderer, + # and contains a single MSP definition (the MSP sampleconfig). + # The Consortium SampleConsortium has only a single member, SampleOrg. + SampleSingleMSPSolo: + <<: *ChannelDefaults + Orderer: + <<: *OrdererDefaults + Organizations: + - *SampleOrg + Consortiums: + SampleConsortium: + Organizations: + - *SampleOrg + + # SampleSingleMSPKafka defines a configuration that differs from the + # SampleSingleMSPSolo one only in that it uses the Kafka-based orderer. + SampleSingleMSPKafka: + <<: *ChannelDefaults + Orderer: + <<: *OrdererDefaults + OrdererType: kafka + Organizations: + - *SampleOrg + Consortiums: + SampleConsortium: + Organizations: + - *SampleOrg + + # SampleInsecureSolo defines a configuration which uses the Solo orderer, + # contains no MSP definitions, and allows all transactions and channel + # creation requests for the consortium SampleConsortium. + SampleInsecureSolo: + <<: *ChannelDefaults + Orderer: + <<: *OrdererDefaults + Consortiums: + SampleConsortium: + Organizations: + + # SampleInsecureKafka defines a configuration that differs from the + # SampleInsecureSolo one only in that it uses the Kafka-based orderer. + SampleInsecureKafka: + <<: *ChannelDefaults + Orderer: + OrdererType: kafka + <<: *OrdererDefaults + Consortiums: + SampleConsortium: + Organizations: + + # SampleDevModeSolo defines a configuration which uses the Solo orderer, + # contains the sample MSP as both orderer and consortium member, and + # requires only basic membership for admin privileges. It also defines + # an Application on the ordering system channel, which should usually + # be avoided. + SampleDevModeSolo: + <<: *ChannelDefaults + Orderer: + <<: *OrdererDefaults + Organizations: + - <<: *SampleOrg + Policies: + <<: *SampleOrgPolicies + Admins: + Type: Signature + Rule: "OR('SampleOrg.member')" + Application: + <<: *ApplicationDefaults + Organizations: + - <<: *SampleOrg + Policies: + <<: *SampleOrgPolicies + Admins: + Type: Signature + Rule: "OR('SampleOrg.member')" + Consortiums: + SampleConsortium: + Organizations: + - <<: *SampleOrg + Policies: + <<: *SampleOrgPolicies + Admins: + Type: Signature + Rule: "OR('SampleOrg.member')" + + # SampleDevModeKafka defines a configuration that differs from the + # SampleDevModeSolo one only in that it uses the Kafka-based orderer. + SampleDevModeKafka: + <<: *ChannelDefaults + Orderer: + <<: *OrdererDefaults + OrdererType: kafka + Organizations: + - <<: *SampleOrg + Policies: + <<: *SampleOrgPolicies + Admins: + Type: Signature + Rule: "OR('SampleOrg.member')" + Application: + <<: *ApplicationDefaults + Organizations: + - <<: *SampleOrg + Policies: + <<: *SampleOrgPolicies + Admins: + Type: Signature + Rule: "OR('SampleOrg.member')" + Consortiums: + SampleConsortium: + Organizations: + - <<: *SampleOrg + Policies: + <<: *SampleOrgPolicies + Admins: + Type: Signature + Rule: "OR('SampleOrg.member')" + + # SampleSingleMSPChannel defines a channel with only the sample org as a + # member. It is designed to be used in conjunction with SampleSingleMSPSolo + # and SampleSingleMSPKafka orderer profiles. Note, for channel creation + # profiles, only the 'Application' section and consortium # name are + # considered. + SampleSingleMSPChannel: + <<: *ChannelDefaults + Consortium: SampleConsortium + Application: + <<: *ApplicationDefaults + Organizations: + - <<: *SampleOrg + + # SampleDevModeEtcdRaft defines a configuration that differs from the + # SampleDevModeSolo one only in that it uses the etcd/raft-based orderer. + SampleDevModeEtcdRaft: + <<: *ChannelDefaults + Orderer: + <<: *OrdererDefaults + OrdererType: etcdraft + Organizations: + - <<: *SampleOrg + Policies: + <<: *SampleOrgPolicies + Admins: + Type: Signature + Rule: "OR('SampleOrg.member')" + Application: + <<: *ApplicationDefaults + Organizations: + - <<: *SampleOrg + Policies: + <<: *SampleOrgPolicies + Admins: + Type: Signature + Rule: "OR('SampleOrg.member')" + Consortiums: + SampleConsortium: + Organizations: + - <<: *SampleOrg + Policies: + <<: *SampleOrgPolicies + Admins: + Type: Signature + Rule: "OR('SampleOrg.member')" diff --git a/pkg/testutil/sampleconfig/msp/admincerts/admincert.pem b/pkg/testutil/sampleconfig/msp/admincerts/admincert.pem new file mode 100644 index 00000000..415d5617 --- /dev/null +++ b/pkg/testutil/sampleconfig/msp/admincerts/admincert.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICNjCCAd2gAwIBAgIRAMnf9/dmV9RvCCVw9pZQUfUwCgYIKoZIzj0EAwIwgYEx +CzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRYwFAYDVQQHEw1TYW4g +RnJhbmNpc2NvMRkwFwYDVQQKExBvcmcxLmV4YW1wbGUuY29tMQwwCgYDVQQLEwND +T1AxHDAaBgNVBAMTE2NhLm9yZzEuZXhhbXBsZS5jb20wHhcNMTcxMTEyMTM0MTEx +WhcNMjcxMTEwMTM0MTExWjBpMQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZv +cm5pYTEWMBQGA1UEBxMNU2FuIEZyYW5jaXNjbzEMMAoGA1UECxMDQ09QMR8wHQYD +VQQDExZwZWVyMC5vcmcxLmV4YW1wbGUuY29tMFkwEwYHKoZIzj0CAQYIKoZIzj0D +AQcDQgAEZ8S4V71OBJpyMIVZdwYdFXAckItrpvSrCf0HQg40WW9XSoOOO76I+Umf +EkmTlIJXP7/AyRRSRU38oI8Ivtu4M6NNMEswDgYDVR0PAQH/BAQDAgeAMAwGA1Ud +EwEB/wQCMAAwKwYDVR0jBCQwIoAginORIhnPEFZUhXm6eWBkm7K7Zc8R4/z7LW4H +ossDlCswCgYIKoZIzj0EAwIDRwAwRAIgVikIUZzgfuFsGLQHWJUVJCU7pDaETkaz +PzFgsCiLxUACICgzJYlW7nvZxP7b6tbeu3t8mrhMXQs956mD4+BoKuNI +-----END CERTIFICATE----- diff --git a/pkg/testutil/sampleconfig/msp/cacerts/cacert.pem b/pkg/testutil/sampleconfig/msp/cacerts/cacert.pem new file mode 100644 index 00000000..22dfce98 --- /dev/null +++ b/pkg/testutil/sampleconfig/msp/cacerts/cacert.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICYjCCAgigAwIBAgIRAL1fEAnz5zp4moJ8MdSb/lYwCgYIKoZIzj0EAwIwgYEx +CzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRYwFAYDVQQHEw1TYW4g +RnJhbmNpc2NvMRkwFwYDVQQKExBvcmcxLmV4YW1wbGUuY29tMQwwCgYDVQQLEwND +T1AxHDAaBgNVBAMTE2NhLm9yZzEuZXhhbXBsZS5jb20wHhcNMTcxMTEyMTM0MTEx +WhcNMjcxMTEwMTM0MTExWjCBgTELMAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlm +b3JuaWExFjAUBgNVBAcTDVNhbiBGcmFuY2lzY28xGTAXBgNVBAoTEG9yZzEuZXhh +bXBsZS5jb20xDDAKBgNVBAsTA0NPUDEcMBoGA1UEAxMTY2Eub3JnMS5leGFtcGxl +LmNvbTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABGrsQ6oJpk6hDWf63HU3OSNd +bou9KNw/VIee1IngPDI4YJU7O+Xa/XLJuwnFv7BpR8Ytl3f+njC8i/RZP2/svO+j +XzBdMA4GA1UdDwEB/wQEAwIBpjAPBgNVHSUECDAGBgRVHSUAMA8GA1UdEwEB/wQF +MAMBAf8wKQYDVR0OBCIEIIpzkSIZzxBWVIV5unlgZJuyu2XPEeP8+y1uB6LLA5Qr +MAoGCCqGSM49BAMCA0gAMEUCIQDUh/+CC2dAICnYtACXspwUaaEbiyZxYIx+XDvW +o8VVcgIgGz5S4iC5+xkxgeaISPfxKTTVy6yzTdYGzCw1vPppjzo= +-----END CERTIFICATE----- diff --git a/pkg/testutil/sampleconfig/msp/config.yaml b/pkg/testutil/sampleconfig/msp/config.yaml new file mode 100644 index 00000000..690d6c3e --- /dev/null +++ b/pkg/testutil/sampleconfig/msp/config.yaml @@ -0,0 +1,18 @@ +# Copyright IBM Corp. All Rights Reserved. +# +# SPDX-License-Identifier: Apache-2.0 +# + +OrganizationalUnitIdentifiers: + - Certificate: "cacerts/cacert.pem" + OrganizationalUnitIdentifier: "COP" + +NodeOUs: + Enable: false + ClientOUIdentifier: + # if Certificate is empty, then the certifier identifier will not be enforced + Certificate: "cacerts/cacert.pem" + OrganizationalUnitIdentifier: "OU_client" + PeerOUIdentifier: + Certificate: "cacerts/cacert.pem" + OrganizationalUnitIdentifier: "OU_peer" diff --git a/pkg/testutil/sampleconfig/msp/keystore/key.pem b/pkg/testutil/sampleconfig/msp/keystore/key.pem new file mode 100644 index 00000000..797d567a --- /dev/null +++ b/pkg/testutil/sampleconfig/msp/keystore/key.pem @@ -0,0 +1,5 @@ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgXa3mln4anewXtqrM +hMw6mfZhslkRa/j9P790ToKjlsihRANCAARnxLhXvU4EmnIwhVl3Bh0VcByQi2um +9KsJ/QdCDjRZb1dKg447voj5SZ8SSZOUglc/v8DJFFJFTfygjwi+27gz +-----END PRIVATE KEY----- diff --git a/pkg/testutil/sampleconfig/msp/signcerts/peer.pem b/pkg/testutil/sampleconfig/msp/signcerts/peer.pem new file mode 100644 index 00000000..415d5617 --- /dev/null +++ b/pkg/testutil/sampleconfig/msp/signcerts/peer.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICNjCCAd2gAwIBAgIRAMnf9/dmV9RvCCVw9pZQUfUwCgYIKoZIzj0EAwIwgYEx +CzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRYwFAYDVQQHEw1TYW4g +RnJhbmNpc2NvMRkwFwYDVQQKExBvcmcxLmV4YW1wbGUuY29tMQwwCgYDVQQLEwND +T1AxHDAaBgNVBAMTE2NhLm9yZzEuZXhhbXBsZS5jb20wHhcNMTcxMTEyMTM0MTEx +WhcNMjcxMTEwMTM0MTExWjBpMQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZv +cm5pYTEWMBQGA1UEBxMNU2FuIEZyYW5jaXNjbzEMMAoGA1UECxMDQ09QMR8wHQYD +VQQDExZwZWVyMC5vcmcxLmV4YW1wbGUuY29tMFkwEwYHKoZIzj0CAQYIKoZIzj0D +AQcDQgAEZ8S4V71OBJpyMIVZdwYdFXAckItrpvSrCf0HQg40WW9XSoOOO76I+Umf +EkmTlIJXP7/AyRRSRU38oI8Ivtu4M6NNMEswDgYDVR0PAQH/BAQDAgeAMAwGA1Ud +EwEB/wQCMAAwKwYDVR0jBCQwIoAginORIhnPEFZUhXm6eWBkm7K7Zc8R4/z7LW4H +ossDlCswCgYIKoZIzj0EAwIDRwAwRAIgVikIUZzgfuFsGLQHWJUVJCU7pDaETkaz +PzFgsCiLxUACICgzJYlW7nvZxP7b6tbeu3t8mrhMXQs956mD4+BoKuNI +-----END CERTIFICATE----- diff --git a/pkg/testutil/sampleconfig/msp/tlscacerts/tlsroot.pem b/pkg/testutil/sampleconfig/msp/tlscacerts/tlsroot.pem new file mode 100644 index 00000000..10625226 --- /dev/null +++ b/pkg/testutil/sampleconfig/msp/tlscacerts/tlsroot.pem @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIIB8jCCAZigAwIBAgIRANxd4D3sY0656NqOh8Rha0AwCgYIKoZIzj0EAwIwWDEL +MAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExFjAUBgNVBAcTDVNhbiBG +cmFuY2lzY28xDTALBgNVBAoTBE9yZzIxDTALBgNVBAMTBE9yZzIwHhcNMTcwNTA4 +MDkzMDM0WhcNMjcwNTA2MDkzMDM0WjBYMQswCQYDVQQGEwJVUzETMBEGA1UECBMK +Q2FsaWZvcm5pYTEWMBQGA1UEBxMNU2FuIEZyYW5jaXNjbzENMAsGA1UEChMET3Jn +MjENMAsGA1UEAxMET3JnMjBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABDYy+qzS +J/8CMfhpBFhUhhz+7up4+lwjBWDSS01koszNh8camHTA8vS4ZsN+DZ2DRsSmRZgs +tG2oogLLIdh6Z1CjQzBBMA4GA1UdDwEB/wQEAwIBpjAPBgNVHSUECDAGBgRVHSUA +MA8GA1UdEwEB/wQFMAMBAf8wDQYDVR0OBAYEBAECAwQwCgYIKoZIzj0EAwIDSAAw +RQIgWnMmH0yxAjub3qfzxQioHKQ8+WvUjAXm0ejId9Q+rDICIQDr30UCPj+SXzOb +Cu4psMMBfLujKoiBNdLE1KEpt8lN1g== +-----END CERTIFICATE----- diff --git a/pkg/testutil/sampleconfig/msp/tlsintermediatecerts/tlsintermediate.pem b/pkg/testutil/sampleconfig/msp/tlsintermediatecerts/tlsintermediate.pem new file mode 100644 index 00000000..0feba1a6 --- /dev/null +++ b/pkg/testutil/sampleconfig/msp/tlsintermediatecerts/tlsintermediate.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICETCCAbagAwIBAgIQNpgoASE9fi0ooZVKcnwnZzAKBggqhkjOPQQDAjBYMQsw +CQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTEWMBQGA1UEBxMNU2FuIEZy +YW5jaXNjbzENMAsGA1UEChMET3JnMjENMAsGA1UEAxMET3JnMjAeFw0xNzA1MDgw +OTMwMzRaFw0yNzA1MDYwOTMwMzRaMGYxCzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpD +YWxpZm9ybmlhMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMRQwEgYDVQQKEwtPcmcy +LWNoaWxkMTEUMBIGA1UEAxMLT3JnMi1jaGlsZDEwWTATBgcqhkjOPQIBBggqhkjO +PQMBBwNCAARTBJ8/o1tpHPwuixYDgRwcrzAru0cWJJhE6KWHAa0vBCG4nl0zjjRS +og+iAuUcY4Z/gJoHol6dKSHk9h5jrqtEo1QwUjAOBgNVHQ8BAf8EBAMCAaYwDwYD +VR0lBAgwBgYEVR0lADAPBgNVHRMBAf8EBTADAQH/MA0GA1UdDgQGBAQBAgMEMA8G +A1UdIwQIMAaABAECAwQwCgYIKoZIzj0EAwIDSQAwRgIhAIkPzk7ORV/WhfG7QY/6 +/OJg4++ftz2SZc44NIuogMArAiEAqbnpnmmHnzo2Qc6gnliCegpGnJ18RUT/jZlj +1qXHcvg= +-----END CERTIFICATE----- diff --git a/scripts/pull_fabric.sh b/scripts/pull_fabric.sh index babfc66d..deb87c8a 100755 --- a/scripts/pull_fabric.sh +++ b/scripts/pull_fabric.sh @@ -11,8 +11,8 @@ git clone https://github.com/trustbloc/fabric-mod.git $GOPATH/src/github.com/hyp cp -r . $GOPATH/src/github.com/hyperledger/fabric/fabric-peer-ext cd $GOPATH/src/github.com/hyperledger/fabric git config advice.detachedHead false -# fabric-mod (May 7, 2019) -git checkout 4beae6ee306bba2668b7b051e90433d57df3f9cd +# fabric-mod (May 10, 2019) +git checkout ff89b7580e81f4f79d7cdf90c012fa2d2dcd4466 # Rewrite viper import to allow plugins to load different version of viper sed 's/\github.com\/spf13\/viper.*/github.com\/spf13\/oldviper v0.0.0/g' -i fabric-peer-ext/mod/peer/go.mod diff --git a/scripts/unit.sh b/scripts/unit.sh index 68167c6b..53f956ee 100755 --- a/scripts/unit.sh +++ b/scripts/unit.sh @@ -6,10 +6,12 @@ # set -e + + # Packages to exclude PKGS=`go list github.com/trustbloc/fabric-peer-ext/pkg/... 2> /dev/null | \ grep -v /mocks | \ grep -v /api | \ grep -v /protos` echo "Running pkg unit tests..." -go test -count=1 -tags "testing" -cover $PKGS -p 1 -timeout=10m \ No newline at end of file +FABRIC_SAMPLECONFIG_PATH="src/github.com/trustbloc/fabric-peer-ext/pkg/testutil/sampleconfig" go test -count=1 -tags "testing" -cover $PKGS -p 1 -timeout=10m \ No newline at end of file