From 94b337254c622114bbae9efcb8a917d7b8bbdab9 Mon Sep 17 00:00:00 2001 From: Nate Date: Mon, 18 Nov 2024 14:58:45 -0800 Subject: [PATCH 1/5] refactor(contracts): update contracts manager to support latest changes to RHP4 --- host/contracts/contracts.go | 44 +---- host/contracts/lock.go | 141 +++++++++++++ host/contracts/manager.go | 97 ++------- host/contracts/manager_test.go | 69 +++---- host/contracts/persist.go | 11 +- host/contracts/update.go | 13 +- persist/sqlite/consensus.go | 39 ++-- persist/sqlite/contracts.go | 352 ++++++++++++++++++--------------- 8 files changed, 418 insertions(+), 348 deletions(-) create mode 100644 host/contracts/lock.go diff --git a/host/contracts/contracts.go b/host/contracts/contracts.go index 1c0da26b..fc64d63e 100644 --- a/host/contracts/contracts.go +++ b/host/contracts/contracts.go @@ -8,6 +8,7 @@ import ( "time" rhp2 "go.sia.tech/core/rhp/v2" + proto4 "go.sia.tech/core/rhp/v4" "go.sia.tech/core/types" "go.uber.org/zap" ) @@ -106,23 +107,13 @@ type ( RiskedCollateral types.Currency `json:"riskedCollateral"` } - // V2Usage tracks the usage of a contract's funds. - V2Usage struct { - RPCRevenue types.Currency `json:"rpc"` - StorageRevenue types.Currency `json:"storage"` - EgressRevenue types.Currency `json:"egress"` - IngressRevenue types.Currency `json:"ingress"` - AccountFunding types.Currency `json:"accountFunding"` - RiskedCollateral types.Currency `json:"riskedCollateral"` - } - // A V2Contract contains metadata on the current state of a v2 file contract. V2Contract struct { types.V2FileContract ID types.FileContractID `json:"id"` Status V2ContractStatus `json:"status"` - Usage V2Usage `json:"usage"` + Usage proto4.Usage `json:"usage"` // NegotiationHeight is the height the contract was negotiated at. NegotiationHeight uint64 `json:"negotiationHeight"` @@ -144,13 +135,6 @@ type ( RenewedFrom types.FileContractID `json:"renewedFrom"` } - // A V2FormationTransactionSet contains the formation transaction set for a - // v2 contract. - V2FormationTransactionSet struct { - TransactionSet []types.V2Transaction - Basis types.ChainIndex - } - // A Contract contains metadata on the current state of a file contract. Contract struct { SignedRevision @@ -288,30 +272,6 @@ func (a Usage) Sub(b Usage) (c Usage) { } } -// Add returns u + b -func (a V2Usage) Add(b V2Usage) (c V2Usage) { - return V2Usage{ - RPCRevenue: a.RPCRevenue.Add(b.RPCRevenue), - StorageRevenue: a.StorageRevenue.Add(b.StorageRevenue), - EgressRevenue: a.EgressRevenue.Add(b.EgressRevenue), - IngressRevenue: a.IngressRevenue.Add(b.IngressRevenue), - AccountFunding: a.AccountFunding.Add(b.AccountFunding), - RiskedCollateral: a.RiskedCollateral.Add(b.RiskedCollateral), - } -} - -// Sub returns a - b -func (a V2Usage) Sub(b V2Usage) (c V2Usage) { - return V2Usage{ - RPCRevenue: a.RPCRevenue.Sub(b.RPCRevenue), - StorageRevenue: a.StorageRevenue.Sub(b.StorageRevenue), - EgressRevenue: a.EgressRevenue.Sub(b.EgressRevenue), - IngressRevenue: a.IngressRevenue.Sub(b.IngressRevenue), - AccountFunding: a.AccountFunding.Sub(b.AccountFunding), - RiskedCollateral: a.RiskedCollateral.Sub(b.RiskedCollateral), - } -} - // String returns the string representation of a ContractStatus. func (c ContractStatus) String() string { switch c { diff --git a/host/contracts/lock.go b/host/contracts/lock.go new file mode 100644 index 00000000..c6594f6a --- /dev/null +++ b/host/contracts/lock.go @@ -0,0 +1,141 @@ +package contracts + +import ( + "context" + "fmt" + "sync" + + "go.sia.tech/core/types" + rhp4 "go.sia.tech/coreutils/rhp/v4" +) + +type ( + lock struct { + ch chan struct{} + n int + } + + locker struct { + mu sync.Mutex + locks map[types.FileContractID]*lock + } +) + +func newLocker() *locker { + l := &locker{ + locks: make(map[types.FileContractID]*lock), + } + return l +} + +// Unlock releases a lock on the given contract ID. If the lock is not held, the +// function will panic. +func (lr *locker) Unlock(id types.FileContractID) { + lr.mu.Lock() + defer lr.mu.Unlock() + l, ok := lr.locks[id] + if !ok { + panic("unlocking unheld lock") // developer error + } + l.n-- + if l.n == 0 { + delete(lr.locks, id) + } else { + l.ch <- struct{}{} + } +} + +// Lock acquires a lock on the given contract ID. If the lock is already held, the +// function will block until the lock is released or the context is canceled. +// If the context is canceled, the function will return an error. +func (lr *locker) Lock(ctx context.Context, id types.FileContractID) error { + lr.mu.Lock() + l, ok := lr.locks[id] + if !ok { + // immediately acquire the lock + defer lr.mu.Unlock() + l = &lock{ + ch: make(chan struct{}, 1), + n: 1, + } + lr.locks[id] = l + return nil + } + l.n++ + lr.mu.Unlock() // unlock before waiting to avoid deadlock + select { + case <-ctx.Done(): + lr.mu.Lock() + l.n-- + if l.n == 0 { + delete(lr.locks, id) + } + lr.mu.Unlock() + return ctx.Err() + case <-l.ch: + return nil + } +} + +// Lock locks a contract for modification. +// +// Deprecated: Use LockV2Contract instead. +func (cm *Manager) Lock(ctx context.Context, id types.FileContractID) (SignedRevision, error) { + ctx, cancel, err := cm.tg.AddContext(ctx) + if err != nil { + return SignedRevision{}, err + } + defer cancel() + + if err := cm.locks.Lock(ctx, id); err != nil { + return SignedRevision{}, err + } + + contract, err := cm.store.Contract(id) + if err != nil { + cm.locks.Unlock(id) + return SignedRevision{}, fmt.Errorf("failed to get contract: %w", err) + } else if err := cm.isGoodForModification(contract); err != nil { + cm.locks.Unlock(id) + return SignedRevision{}, fmt.Errorf("contract is not good for modification: %w", err) + } + return contract.SignedRevision, nil +} + +// Unlock unlocks a locked contract. +// +// Deprecated: Use LockV2Contract instead. +func (cm *Manager) Unlock(id types.FileContractID) { + cm.locks.Unlock(id) +} + +// LockV2Contract locks a contract for modification. The returned unlock function +// must be called to release the lock. +func (cm *Manager) LockV2Contract(id types.FileContractID) (rev rhp4.RevisionState, unlock func(), _ error) { + done, err := cm.tg.Add() + if err != nil { + return rhp4.RevisionState{}, nil, err + } + defer done() + + // blocking is fine because locks are held for a short time + if err := cm.locks.Lock(context.Background(), id); err != nil { + return rhp4.RevisionState{}, nil, err + } + + contract, err := cm.store.V2Contract(id) + if err != nil { + cm.locks.Unlock(id) + return rhp4.RevisionState{}, nil, fmt.Errorf("failed to get contract: %w", err) + } + + var once sync.Once + return rhp4.RevisionState{ + Revision: contract.V2FileContract, + Roots: cm.getSectorRoots(id), + }, func() { + once.Do(func() { + cm.locks.Unlock(id) + }) + }, nil +} diff --git a/host/contracts/manager.go b/host/contracts/manager.go index 1053ff90..4c07a655 100644 --- a/host/contracts/manager.go +++ b/host/contracts/manager.go @@ -1,7 +1,6 @@ package contracts import ( - "context" "errors" "fmt" "math" @@ -10,7 +9,9 @@ import ( "go.sia.tech/core/consensus" rhp2 "go.sia.tech/core/rhp/v2" + proto4 "go.sia.tech/core/rhp/v4" "go.sia.tech/core/types" + rhp4 "go.sia.tech/coreutils/rhp/v4" "go.sia.tech/hostd/alerts" "go.sia.tech/hostd/internal/threadgroup" "go.uber.org/zap" @@ -59,11 +60,6 @@ type ( Dismiss(...types.Hash256) } - locker struct { - c chan struct{} - waiters int - } - // A Manager manages contracts' lifecycle Manager struct { rejectBuffer uint64 @@ -79,11 +75,12 @@ type ( syncer Syncer wallet Wallet + locks *locker // contracts must be locked while they are being modified + mu sync.Mutex // guards the following fields // caches the sector roots of all contracts to avoid long reads from // the store sectorRoots map[types.FileContractID][]types.Hash256 - locks map[types.FileContractID]*locker // contracts must be locked while they are being modified } ) @@ -106,68 +103,6 @@ func (cm *Manager) setSectorRoots(id types.FileContractID, roots []types.Hash256 cm.sectorRoots[id] = append([]types.Hash256(nil), roots...) } -// Lock locks a contract for modification. -func (cm *Manager) Lock(ctx context.Context, id types.FileContractID) (SignedRevision, error) { - ctx, cancel, err := cm.tg.AddContext(ctx) - if err != nil { - return SignedRevision{}, err - } - defer cancel() - - cm.mu.Lock() - contract, err := cm.store.Contract(id) - if err != nil { - cm.mu.Unlock() - return SignedRevision{}, fmt.Errorf("failed to get contract: %w", err) - } else if err := cm.isGoodForModification(contract); err != nil { - cm.mu.Unlock() - return SignedRevision{}, fmt.Errorf("contract is not good for modification: %w", err) - } - - // if the contract isn't already locked, create a new lock - if _, exists := cm.locks[id]; !exists { - cm.locks[id] = &locker{ - c: make(chan struct{}, 1), - waiters: 0, - } - cm.mu.Unlock() - return contract.SignedRevision, nil - } - cm.locks[id].waiters++ - c := cm.locks[id].c - // mutex must be unlocked before waiting on the channel to prevent deadlock. - cm.mu.Unlock() - select { - case <-c: - cm.mu.Lock() - defer cm.mu.Unlock() - contract, err := cm.store.Contract(id) - if err != nil { - return SignedRevision{}, fmt.Errorf("failed to get contract: %w", err) - } else if err := cm.isGoodForModification(contract); err != nil { - return SignedRevision{}, fmt.Errorf("contract is not good for modification: %w", err) - } - return contract.SignedRevision, nil - case <-ctx.Done(): - return SignedRevision{}, ctx.Err() - } -} - -// Unlock unlocks a locked contract. -func (cm *Manager) Unlock(id types.FileContractID) { - cm.mu.Lock() - defer cm.mu.Unlock() - lock, exists := cm.locks[id] - if !exists { - return - } else if lock.waiters <= 0 { - delete(cm.locks, id) - return - } - lock.waiters-- - lock.c <- struct{}{} -} - // Contracts returns a paginated list of contracts matching the filter and the // total number of contracts matching the filter. func (cm *Manager) Contracts(filter ContractFilter) ([]Contract, int, error) { @@ -184,8 +119,9 @@ func (cm *Manager) V2Contract(id types.FileContractID) (V2Contract, error) { return cm.store.V2Contract(id) } -// V2ContractElement returns the latest v2 state element with the given ID. -func (cm *Manager) V2ContractElement(id types.FileContractID) (types.V2FileContractElement, error) { +// V2FileContractElement returns the chain index and file contract element for the +// given contract ID. +func (cm *Manager) V2FileContractElement(id types.FileContractID) (types.ChainIndex, types.V2FileContractElement, error) { return cm.store.V2ContractElement(id) } @@ -236,7 +172,7 @@ func (cm *Manager) RenewContract(renewal SignedRevision, existing SignedRevision } // ReviseV2Contract atomically updates a contract and its associated sector roots. -func (cm *Manager) ReviseV2Contract(contractID types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage Usage) error { +func (cm *Manager) ReviseV2Contract(contractID types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage proto4.Usage) error { done, err := cm.tg.Add() if err != nil { return err @@ -279,7 +215,8 @@ func (cm *Manager) ReviseV2Contract(contractID types.FileContractID, revision ty } // revise the contract in the store - if err := cm.store.ReviseV2Contract(contractID, revision, roots, usage); err != nil { + err = cm.store.ReviseV2Contract(contractID, revision, roots, usage) + if err != nil { return err } // update the sector roots cache @@ -290,14 +227,14 @@ func (cm *Manager) ReviseV2Contract(contractID types.FileContractID, revision ty // AddV2Contract stores the provided contract, should error if the contract // already exists. -func (cm *Manager) AddV2Contract(formation V2FormationTransactionSet, usage V2Usage) error { +func (cm *Manager) AddV2Contract(formation rhp4.TransactionSet, usage proto4.Usage) error { done, err := cm.tg.Add() if err != nil { return err } defer done() - formationSet := formation.TransactionSet + formationSet := formation.Transactions if len(formationSet) == 0 { return errors.New("no formation transactions provided") } else if len(formationSet[len(formationSet)-1].FileContracts) != 1 { @@ -326,14 +263,14 @@ func (cm *Manager) AddV2Contract(formation V2FormationTransactionSet, usage V2Us // RenewV2Contract renews a contract. It is expected that the existing // contract will be cleared. -func (cm *Manager) RenewV2Contract(renewal V2FormationTransactionSet, usage V2Usage) error { +func (cm *Manager) RenewV2Contract(renewal rhp4.TransactionSet, usage proto4.Usage) error { done, err := cm.tg.Add() if err != nil { return err } defer done() - renewalSet := renewal.TransactionSet + renewalSet := renewal.Transactions if len(renewalSet) == 0 { return errors.New("no renewal transactions provided") } else if len(renewalSet[len(renewalSet)-1].FileContractResolutions) != 1 { @@ -356,7 +293,7 @@ func (cm *Manager) RenewV2Contract(renewal V2FormationTransactionSet, usage V2Us // sanity checks if finalRevision.RevisionNumber != types.MaxRevisionNumber { - return errors.New("existing contract must be cleared") + return errors.New("final revision must have max revision number") } else if fc.Filesize != existing.Filesize { return errors.New("renewal contract must have same file size as existing contract") } else if fc.Capacity != existing.Capacity { @@ -381,7 +318,7 @@ func (cm *Manager) RenewV2Contract(renewal V2FormationTransactionSet, usage V2Us Usage: usage, } - if err := cm.store.RenewV2Contract(contract, renewal, existingID, finalRevision); err != nil { + if err := cm.store.RenewV2Contract(contract, renewal, existingID, finalRevision, existingRoots); err != nil { return err } cm.setSectorRoots(contract.ID, existingRoots) @@ -448,7 +385,7 @@ func NewManager(store ContractStore, storage StorageManager, chain ChainManager, tg: threadgroup.New(), log: zap.NewNop(), - locks: make(map[types.FileContractID]*locker), + locks: newLocker(), } for _, opt := range opts { diff --git a/host/contracts/manager_test.go b/host/contracts/manager_test.go index 51d31b27..bbd740ac 100644 --- a/host/contracts/manager_test.go +++ b/host/contracts/manager_test.go @@ -10,9 +10,10 @@ import ( "time" rhp2 "go.sia.tech/core/rhp/v2" - rhp4 "go.sia.tech/core/rhp/v4" + proto4 "go.sia.tech/core/rhp/v4" "go.sia.tech/core/types" "go.sia.tech/coreutils/chain" + rhp4 "go.sia.tech/coreutils/rhp/v4" "go.sia.tech/coreutils/syncer" "go.sia.tech/coreutils/wallet" "go.sia.tech/hostd/host/contracts" @@ -67,19 +68,19 @@ func formV2Contract(t *testing.T, cm *chain.Manager, c *contracts.Manager, w *wa t.Fatal("failed to fund transaction:", err) } w.SignV2Inputs(&txn, toSign) - formationSet := contracts.V2FormationTransactionSet{ - TransactionSet: []types.V2Transaction{txn}, - Basis: basis, + formationSet := rhp4.TransactionSet{ + Transactions: []types.V2Transaction{txn}, + Basis: basis, } if broadcast { - if _, err := cm.AddV2PoolTransactions(formationSet.Basis, formationSet.TransactionSet); err != nil { + if _, err := cm.AddV2PoolTransactions(formationSet.Basis, formationSet.Transactions); err != nil { t.Fatal("failed to add formation set to pool:", err) } - s.BroadcastV2TransactionSet(formationSet.Basis, formationSet.TransactionSet) + s.BroadcastV2TransactionSet(formationSet.Basis, formationSet.Transactions) } - if err := c.AddV2Contract(formationSet, contracts.V2Usage{}); err != nil { + if err := c.AddV2Contract(formationSet, proto4.Usage{}); err != nil { t.Fatal("failed to add contract:", err) } return txn.V2FileContractID(txn.ID(), 0), fc @@ -853,9 +854,9 @@ func TestV2ContractLifecycle(t *testing.T) { } defer release() - fc.Filesize = rhp4.SectorSize - fc.Capacity = rhp4.SectorSize - fc.FileMerkleRoot = rhp4.MetaRoot(roots) + fc.Filesize = proto4.SectorSize + fc.Capacity = proto4.SectorSize + fc.FileMerkleRoot = proto4.MetaRoot(roots) fc.RevisionNumber++ // transfer some funds from the renter to the host cost, collateral := types.Siacoins(1), types.Siacoins(2) @@ -866,8 +867,8 @@ func TestV2ContractLifecycle(t *testing.T) { fc.HostSignature = hostKey.SignHash(sigHash) fc.RenterSignature = renterKey.SignHash(sigHash) - err = node.Contracts.ReviseV2Contract(contractID, fc, roots, contracts.Usage{ - StorageRevenue: cost, + err = node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{ + Storage: cost, RiskedCollateral: collateral, }) if err != nil { @@ -913,9 +914,9 @@ func TestV2ContractLifecycle(t *testing.T) { } defer release() - fc.Filesize = rhp4.SectorSize - fc.Capacity = rhp4.SectorSize - fc.FileMerkleRoot = rhp4.MetaRoot(roots) + fc.Filesize = proto4.SectorSize + fc.Capacity = proto4.SectorSize + fc.FileMerkleRoot = proto4.MetaRoot(roots) fc.RevisionNumber++ // transfer some funds from the renter to the host cost, collateral := types.Siacoins(1), types.Siacoins(2) @@ -926,8 +927,8 @@ func TestV2ContractLifecycle(t *testing.T) { fc.HostSignature = hostKey.SignHash(sigHash) fc.RenterSignature = renterKey.SignHash(sigHash) - err = node.Contracts.ReviseV2Contract(contractID, fc, roots, contracts.Usage{ - StorageRevenue: cost, + err = node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{ + Storage: cost, RiskedCollateral: collateral, }) if err != nil { @@ -973,9 +974,9 @@ func TestV2ContractLifecycle(t *testing.T) { } defer release() - fc.Filesize = rhp4.SectorSize - fc.Capacity = rhp4.SectorSize - fc.FileMerkleRoot = rhp4.MetaRoot(roots) + fc.Filesize = proto4.SectorSize + fc.Capacity = proto4.SectorSize + fc.FileMerkleRoot = proto4.MetaRoot(roots) fc.RevisionNumber++ // transfer some funds from the renter to the host cost, collateral := types.Siacoins(1), types.Siacoins(2) @@ -986,8 +987,8 @@ func TestV2ContractLifecycle(t *testing.T) { fc.HostSignature = hostKey.SignHash(sigHash) fc.RenterSignature = renterKey.SignHash(sigHash) - err = node.Contracts.ReviseV2Contract(contractID, fc, roots, contracts.Usage{ - StorageRevenue: cost, + err = node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{ + Storage: cost, RiskedCollateral: collateral, }) if err != nil { @@ -1042,7 +1043,7 @@ func TestV2ContractLifecycle(t *testing.T) { renewal.HostSignature = hostKey.SignHash(renewalSigHash) renewal.RenterSignature = renterKey.SignHash(renewalSigHash) - fce, err := com.V2ContractElement(contractID) + _, fce, err := com.V2FileContractElement(contractID) if err != nil { t.Fatal(err) } @@ -1073,16 +1074,16 @@ func TestV2ContractLifecycle(t *testing.T) { }, } node.Wallet.SignV2Inputs(&renewalTxn, []int{0}) - renewalTxnSet := contracts.V2FormationTransactionSet{ - Basis: basis, - TransactionSet: []types.V2Transaction{setupTxn, renewalTxn}, + renewalTxnSet := rhp4.TransactionSet{ + Basis: basis, + Transactions: []types.V2Transaction{setupTxn, renewalTxn}, } - if _, err := cm.AddV2PoolTransactions(renewalTxnSet.Basis, renewalTxnSet.TransactionSet); err != nil { + if _, err := cm.AddV2PoolTransactions(renewalTxnSet.Basis, renewalTxnSet.Transactions); err != nil { t.Fatal("failed to add renewal to pool:", err) } - node.Syncer.BroadcastV2TransactionSet(renewalTxnSet.Basis, renewalTxnSet.TransactionSet) + node.Syncer.BroadcastV2TransactionSet(renewalTxnSet.Basis, renewalTxnSet.Transactions) - err = com.RenewV2Contract(renewalTxnSet, contracts.V2Usage{ + err = com.RenewV2Contract(renewalTxnSet, proto4.Usage{ RiskedCollateral: renewal.NewContract.TotalCollateral.Sub(renewal.NewContract.MissedHostValue), }) if err != nil { @@ -1159,14 +1160,14 @@ func TestV2ContractLifecycle(t *testing.T) { t.Fatal("failed to fund transaction:", err) } w.SignV2Inputs(&txn, toSign) - formationSet := contracts.V2FormationTransactionSet{ - TransactionSet: []types.V2Transaction{txn}, - Basis: basis, + formationSet := rhp4.TransactionSet{ + Transactions: []types.V2Transaction{txn}, + Basis: basis, } contractID := txn.V2FileContractID(txn.ID(), 0) // corrupt the formation set to trigger a rejection - formationSet.TransactionSet[len(formationSet.TransactionSet)-1].SiacoinInputs[0].SatisfiedPolicy.Signatures[0] = types.Signature{} - if err := c.AddV2Contract(formationSet, contracts.V2Usage{}); err != nil { + formationSet.Transactions[len(formationSet.Transactions)-1].SiacoinInputs[0].SatisfiedPolicy.Signatures[0] = types.Signature{} + if err := c.AddV2Contract(formationSet, proto4.Usage{}); err != nil { t.Fatal("failed to add contract:", err) } diff --git a/host/contracts/persist.go b/host/contracts/persist.go index ba73ae18..9f5a5f67 100644 --- a/host/contracts/persist.go +++ b/host/contracts/persist.go @@ -1,7 +1,9 @@ package contracts import ( + proto4 "go.sia.tech/core/rhp/v4" "go.sia.tech/core/types" + rhp4 "go.sia.tech/coreutils/rhp/v4" ) type ( @@ -38,19 +40,18 @@ type ( ExpireContractSectors(height uint64) error // V2ContractElement returns the latest v2 state element with the given ID. - V2ContractElement(types.FileContractID) (types.V2FileContractElement, error) + V2ContractElement(types.FileContractID) (types.ChainIndex, types.V2FileContractElement, error) // V2Contract returns the v2 contract with the given ID. V2Contract(types.FileContractID) (V2Contract, error) // AddV2Contract stores the provided contract, should error if the contract // already exists in the store. - AddV2Contract(V2Contract, V2FormationTransactionSet) error + AddV2Contract(V2Contract, rhp4.TransactionSet) error // RenewV2Contract renews a contract. It is expected that the existing // contract will be cleared. - RenewV2Contract(renewal V2Contract, renewalSet V2FormationTransactionSet, renewedID types.FileContractID, finalRevision types.V2FileContract) error + RenewV2Contract(renewal V2Contract, renewalSet rhp4.TransactionSet, renewedID types.FileContractID, clearing types.V2FileContract, roots []types.Hash256) error // ReviseV2Contract atomically updates a contract and its associated // sector roots. - ReviseV2Contract(id types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage Usage) error - + ReviseV2Contract(types.FileContractID, types.V2FileContract, []types.Hash256, proto4.Usage) error // ExpireV2ContractSectors removes sector roots for any v2 contracts that are // rejected or past their proof window. ExpireV2ContractSectors(height uint64) error diff --git a/host/contracts/update.go b/host/contracts/update.go index 53246eb0..2cf4630f 100644 --- a/host/contracts/update.go +++ b/host/contracts/update.go @@ -7,6 +7,7 @@ import ( rhp2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/coreutils/chain" + rhp4 "go.sia.tech/coreutils/rhp/v4" "go.sia.tech/coreutils/wallet" "go.uber.org/zap" ) @@ -33,7 +34,7 @@ type ( BroadcastProof []SignedRevision // V2 actions - RebroadcastV2Formation []V2FormationTransactionSet + RebroadcastV2Formation []rhp4.TransactionSet BroadcastV2Revision []types.V2FileContractRevision BroadcastV2Proof []types.V2FileContractElement BroadcastV2Expiration []types.V2FileContractElement @@ -295,10 +296,10 @@ func (cm *Manager) ProcessActions(index types.ChainIndex) error { } for _, formationSet := range actions.RebroadcastV2Formation { - if len(formationSet.TransactionSet) == 0 { + if len(formationSet.Transactions) == 0 { continue } - formationTxn := formationSet.TransactionSet[len(formationSet.TransactionSet)-1] + formationTxn := formationSet.Transactions[len(formationSet.Transactions)-1] if len(formationTxn.FileContracts) == 0 { continue } @@ -306,12 +307,12 @@ func (cm *Manager) ProcessActions(index types.ChainIndex) error { contractID := formationTxn.V2FileContractID(formationTxn.ID(), 0) log := log.Named("v2 formation").With(zap.Stringer("basis", formationSet.Basis), zap.Stringer("contractID", contractID)) - if _, err := cm.chain.AddV2PoolTransactions(formationSet.Basis, formationSet.TransactionSet); err != nil { + if _, err := cm.chain.AddV2PoolTransactions(formationSet.Basis, formationSet.Transactions); err != nil { log.Error("failed to add formation transaction to pool", zap.Error(err)) continue } - cm.syncer.BroadcastV2TransactionSet(formationSet.Basis, formationSet.TransactionSet) - log.Debug("broadcast transaction", zap.String("transactionID", formationSet.TransactionSet[len(formationSet.TransactionSet)-1].ID().String())) + cm.syncer.BroadcastV2TransactionSet(formationSet.Basis, formationSet.Transactions) + log.Debug("broadcast transaction", zap.String("transactionID", formationSet.Transactions[len(formationSet.Transactions)-1].ID().String())) } for _, fcr := range actions.BroadcastV2Revision { diff --git a/persist/sqlite/consensus.go b/persist/sqlite/consensus.go index 8feb7674..882480a3 100644 --- a/persist/sqlite/consensus.go +++ b/persist/sqlite/consensus.go @@ -7,6 +7,7 @@ import ( "fmt" "time" + proto4 "go.sia.tech/core/rhp/v4" "go.sia.tech/core/types" "go.sia.tech/coreutils/wallet" "go.sia.tech/hostd/host/contracts" @@ -32,7 +33,7 @@ type ( v2ContractState struct { ID int64 LockedCollateral types.Currency - Usage contracts.V2Usage + Usage proto4.Usage Status contracts.V2ContractStatus } @@ -721,9 +722,9 @@ func updateBalanceMetric(tx *txn, matureInflow, matureOutflow, immatureInflow, i // getContractStateStmt helper to get the current state of a contract. func getContractStateStmt(tx *txn) (func(contractID types.FileContractID) (contractState, error), func() error, error) { - stmt, err := tx.Prepare(`SELECT id, locked_collateral, risked_collateral, rpc_revenue, storage_revenue, -ingress_revenue, egress_revenue, registry_read, registry_write, contract_status -FROM contracts + stmt, err := tx.Prepare(`SELECT id, locked_collateral, risked_collateral, rpc_revenue, storage_revenue, +ingress_revenue, egress_revenue, registry_read, registry_write, contract_status +FROM contracts WHERE contract_id=?`) if err != nil { return nil, nil, fmt.Errorf("failed to prepare select statement: %w", err) @@ -740,9 +741,9 @@ WHERE contract_id=?`) // getV2ContractStateStmt helper to get the current state of a v2 contract. func getV2ContractStateStmt(tx *txn) (func(contractID types.FileContractID) (v2ContractState, error), func() error, error) { - stmt, err := tx.Prepare(`SELECT id, locked_collateral, risked_collateral, rpc_revenue, storage_revenue, -ingress_revenue, egress_revenue, contract_status -FROM contracts_v2 + stmt, err := tx.Prepare(`SELECT id, locked_collateral, risked_collateral, rpc_revenue, storage_revenue, +ingress_revenue, egress_revenue, contract_status +FROM contracts_v2 WHERE contract_id=?`) if err != nil { return nil, nil, fmt.Errorf("failed to prepare select statement: %w", err) @@ -750,8 +751,8 @@ WHERE contract_id=?`) return func(contractID types.FileContractID) (state v2ContractState, err error) { err = stmt.QueryRow(encode(contractID)).Scan(&state.ID, - decode(&state.LockedCollateral), decode(&state.Usage.RiskedCollateral), decode(&state.Usage.RPCRevenue), - decode(&state.Usage.StorageRevenue), decode(&state.Usage.IngressRevenue), decode(&state.Usage.EgressRevenue), + decode(&state.LockedCollateral), decode(&state.Usage.RiskedCollateral), decode(&state.Usage.RPC), + decode(&state.Usage.Storage), decode(&state.Usage.Ingress), decode(&state.Usage.Egress), &state.Status) return }, stmt.Close, nil @@ -794,28 +795,28 @@ func updatePotentialRevenueMetrics(usage contracts.Usage, negative bool, fn func } // updateV2EarnedRevenueMetrics helper to update the earned revenue metrics. -func updateV2EarnedRevenueMetrics(usage contracts.V2Usage, negative bool, fn func(stat string, delta types.Currency, negative bool, timestamp time.Time) error) error { - if err := fn(metricEarnedRPCRevenue, usage.RPCRevenue, negative, time.Now()); err != nil { +func updateV2EarnedRevenueMetrics(usage proto4.Usage, negative bool, fn func(stat string, delta types.Currency, negative bool, timestamp time.Time) error) error { + if err := fn(metricEarnedRPCRevenue, usage.RPC, negative, time.Now()); err != nil { return fmt.Errorf("failed to update metric %q: %w", metricEarnedRPCRevenue, err) - } else if err := fn(metricEarnedStorageRevenue, usage.StorageRevenue, negative, time.Now()); err != nil { + } else if err := fn(metricEarnedStorageRevenue, usage.Storage, negative, time.Now()); err != nil { return fmt.Errorf("failed to update metric %q: %w", metricEarnedStorageRevenue, err) - } else if err := fn(metricEarnedIngressRevenue, usage.IngressRevenue, negative, time.Now()); err != nil { + } else if err := fn(metricEarnedIngressRevenue, usage.Ingress, negative, time.Now()); err != nil { return fmt.Errorf("failed to update metric %q: %w", metricEarnedIngressRevenue, err) - } else if err := fn(metricEarnedEgressRevenue, usage.EgressRevenue, negative, time.Now()); err != nil { + } else if err := fn(metricEarnedEgressRevenue, usage.Egress, negative, time.Now()); err != nil { return fmt.Errorf("failed to update metric %q: %w", metricEarnedEgressRevenue, err) } return nil } // updateV2PotentialRevenueMetrics helper to update the potential revenue metrics. -func updateV2PotentialRevenueMetrics(usage contracts.V2Usage, negative bool, fn func(stat string, delta types.Currency, negative bool, timestamp time.Time) error) error { - if err := fn(metricPotentialRPCRevenue, usage.RPCRevenue, negative, time.Now()); err != nil { +func updateV2PotentialRevenueMetrics(usage proto4.Usage, negative bool, fn func(stat string, delta types.Currency, negative bool, timestamp time.Time) error) error { + if err := fn(metricPotentialRPCRevenue, usage.RPC, negative, time.Now()); err != nil { return fmt.Errorf("failed to update metric %q: %w", metricPotentialRPCRevenue, err) - } else if err := fn(metricPotentialStorageRevenue, usage.StorageRevenue, negative, time.Now()); err != nil { + } else if err := fn(metricPotentialStorageRevenue, usage.Storage, negative, time.Now()); err != nil { return fmt.Errorf("failed to update metric %q: %w", metricPotentialStorageRevenue, err) - } else if err := fn(metricPotentialIngressRevenue, usage.IngressRevenue, negative, time.Now()); err != nil { + } else if err := fn(metricPotentialIngressRevenue, usage.Ingress, negative, time.Now()); err != nil { return fmt.Errorf("failed to update metric %q: %w", metricPotentialIngressRevenue, err) - } else if err := fn(metricPotentialEgressRevenue, usage.EgressRevenue, negative, time.Now()); err != nil { + } else if err := fn(metricPotentialEgressRevenue, usage.Egress, negative, time.Now()); err != nil { return fmt.Errorf("failed to update metric %q: %w", metricPotentialEgressRevenue, err) } return nil diff --git a/persist/sqlite/contracts.go b/persist/sqlite/contracts.go index 90d3b785..791a5a88 100644 --- a/persist/sqlite/contracts.go +++ b/persist/sqlite/contracts.go @@ -8,7 +8,9 @@ import ( "strings" "time" + proto4 "go.sia.tech/core/rhp/v4" "go.sia.tech/core/types" + rhp4 "go.sia.tech/coreutils/rhp/v4" "go.sia.tech/hostd/host/contracts" "go.uber.org/zap" ) @@ -72,9 +74,9 @@ func (s *Store) Contracts(filter contracts.ContractFilter) (contracts []contract return nil, 0, fmt.Errorf("failed to build where clause: %w", err) } - contractQuery := fmt.Sprintf(`SELECT c.contract_id, rt.contract_id AS renewed_to, rf.contract_id AS renewed_from, c.contract_status, c.negotiation_height, c.formation_confirmed, + contractQuery := fmt.Sprintf(`SELECT c.contract_id, rt.contract_id AS renewed_to, rf.contract_id AS renewed_from, c.contract_status, c.negotiation_height, c.formation_confirmed, COALESCE(c.revision_number=c.confirmed_revision_number, false) AS revision_confirmed, c.resolution_height, c.locked_collateral, c.rpc_revenue, - c.storage_revenue, c.ingress_revenue, c.egress_revenue, c.account_funding, c.risked_collateral, c.raw_revision, c.host_sig, c.renter_sig + c.storage_revenue, c.ingress_revenue, c.egress_revenue, c.account_funding, c.risked_collateral, c.raw_revision, c.host_sig, c.renter_sig FROM contracts c INNER JOIN contract_renters r ON (c.renter_id=r.id) LEFT JOIN contracts rt ON (c.renewed_to=rt.id) @@ -126,14 +128,15 @@ func (s *Store) Contract(id types.FileContractID) (contract contracts.Contract, } // V2ContractElement returns the latest v2 state element with the given ID. -func (s *Store) V2ContractElement(contractID types.FileContractID) (ele types.V2FileContractElement, err error) { +func (s *Store) V2ContractElement(contractID types.FileContractID) (basis types.ChainIndex, ele types.V2FileContractElement, err error) { err = s.transaction(func(tx *txn) error { - const query = `SELECT cs.raw_contract, cs.leaf_index, cs.merkle_proof + const query = `SELECT cs.raw_contract, cs.leaf_index, cs.merkle_proof, g.last_scanned_index AS basis FROM contracts_v2 c INNER JOIN contract_v2_state_elements cs ON (c.id = cs.contract_id) +CROSS JOIN global_settings g WHERE c.contract_id=?` - err := tx.QueryRow(query, encode(contractID)).Scan(decode(&ele.V2FileContract), decode(&ele.StateElement.LeafIndex), decode(&ele.StateElement.MerkleProof)) + err := tx.QueryRow(query, encode(contractID)).Scan(decode(&ele.V2FileContract), decode(&ele.StateElement.LeafIndex), decode(&ele.StateElement.MerkleProof), decode(&basis)) if errors.Is(err, sql.ErrNoRows) { return contracts.ErrNotFound } @@ -146,7 +149,7 @@ WHERE c.contract_id=?` // V2Contract returns the contract with the given ID. func (s *Store) V2Contract(id types.FileContractID) (contract contracts.V2Contract, err error) { err = s.transaction(func(tx *txn) error { - const query = `SELECT c.contract_id, rt.contract_id AS renewed_to, rf.contract_id AS renewed_from, c.contract_status, c.negotiation_height, c.confirmation_index, + const query = `SELECT c.contract_id, rt.contract_id AS renewed_to, rf.contract_id AS renewed_from, c.contract_status, c.negotiation_height, c.confirmation_index, COALESCE(c.revision_number=cs.revision_number, false) AS revision_confirmed, c.resolution_index, c.rpc_revenue, c.storage_revenue, c.ingress_revenue, c.egress_revenue, c.account_funding, c.risked_collateral, c.raw_revision FROM contracts_v2 c @@ -161,7 +164,7 @@ WHERE c.contract_id=$1;` } // AddV2Contract adds a new contract to the database. -func (s *Store) AddV2Contract(contract contracts.V2Contract, formationSet contracts.V2FormationTransactionSet) error { +func (s *Store) AddV2Contract(contract contracts.V2Contract, formationSet rhp4.TransactionSet) error { return s.transaction(func(tx *txn) error { _, err := insertV2Contract(tx, contract, formationSet) return err @@ -172,7 +175,7 @@ func (s *Store) AddV2Contract(contract contracts.V2Contract, formationSet contra // contract's renewed_from field. The old contract's sector roots are // copied to the new contract. The status of the old contract should continue // to be active until the renewal is confirmed -func (s *Store) RenewV2Contract(renewal contracts.V2Contract, renewalSet contracts.V2FormationTransactionSet, renewedID types.FileContractID, clearing types.V2FileContract) error { +func (s *Store) RenewV2Contract(renewal contracts.V2Contract, renewalSet rhp4.TransactionSet, renewedID types.FileContractID, clearing types.V2FileContract, roots []types.Hash256) error { return s.transaction(func(tx *txn) error { // add the new contract renewedDBID, err := insertV2Contract(tx, renewal, renewalSet) @@ -238,14 +241,14 @@ func (s *Store) RenewContract(renewal contracts.SignedRevision, clearing contrac }) } -func incrementV2ContractUsage(tx *txn, dbID int64, usage contracts.Usage) error { +func incrementV2ContractUsage(tx *txn, dbID int64, usage proto4.Usage) error { const query = `SELECT rpc_revenue, storage_revenue, ingress_revenue, egress_revenue, account_funding, risked_collateral FROM contracts_v2 WHERE id=$1;` - var existing contracts.Usage + var existing proto4.Usage err := tx.QueryRow(query, dbID).Scan( - decode(&existing.RPCRevenue), - decode(&existing.StorageRevenue), - decode(&existing.IngressRevenue), - decode(&existing.EgressRevenue), + decode(&existing.RPC), + decode(&existing.Storage), + decode(&existing.Ingress), + decode(&existing.Egress), decode(&existing.AccountFunding), decode(&existing.RiskedCollateral)) if err != nil { @@ -259,10 +262,10 @@ func incrementV2ContractUsage(tx *txn, dbID int64, usage contracts.Usage) error var updatedID int64 err = tx.QueryRow(`UPDATE contracts_v2 SET (rpc_revenue, storage_revenue, ingress_revenue, egress_revenue, account_funding, risked_collateral) = ($1, $2, $3, $4, $5, $6) WHERE id=$7 RETURNING id;`, - encode(total.RPCRevenue), - encode(total.StorageRevenue), - encode(total.IngressRevenue), - encode(total.EgressRevenue), + encode(total.RPC), + encode(total.Storage), + encode(total.Ingress), + encode(total.Egress), encode(total.AccountFunding), encode(total.RiskedCollateral), dbID).Scan(&updatedID) @@ -272,134 +275,15 @@ func incrementV2ContractUsage(tx *txn, dbID int64, usage contracts.Usage) error return nil } -func cleanupDanglingRoots(tx *txn, contractID int64, length int64) (deleted []int64, err error) { - rows, err := tx.Query(`DELETE FROM contract_sector_roots WHERE contract_id=$1 AND root_index >= $2 RETURNING sector_id`, contractID, length) - if err != nil { - return nil, fmt.Errorf("failed to cleanup dangling roots: %w", err) - } - defer rows.Close() - - used := make(map[int64]bool) - for rows.Next() { - var sectorID int64 - if err := rows.Scan(§orID); err != nil { - return nil, fmt.Errorf("failed to scan sector ID: %w", err) - } - - if used[sectorID] { - continue - } - deleted = append(deleted, sectorID) - used[sectorID] = true - } - return deleted, nil -} - // ReviseV2Contract atomically updates a contract's revision and sectors -func (s *Store) ReviseV2Contract(id types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage contracts.Usage) error { +func (s *Store) ReviseV2Contract(id types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage proto4.Usage) error { return s.transaction(func(tx *txn) error { - incrementCurrencyStat, done, err := incrementCurrencyStatStmt(tx) - if err != nil { - return fmt.Errorf("failed to prepare increment currency stat statement: %w", err) - } - defer done() - - const updateQuery = `UPDATE contracts_v2 SET raw_revision=?, revision_number=? WHERE contract_id=? RETURNING id, contract_status` - - var contractDBID int64 - var status contracts.V2ContractStatus - err = tx.QueryRow(updateQuery, encode(revision), encode(revision.RevisionNumber), encode(id)).Scan(&contractDBID, &status) - if err != nil { - return fmt.Errorf("failed to update contract: %w", err) - } else if err := incrementV2ContractUsage(tx, contractDBID, usage); err != nil { - return fmt.Errorf("failed to update contract usage: %w", err) - } - - // only increment metrics if the contract is active. - // If the contract is pending or some variant of successful, the metrics - // will already be handled. - if status == contracts.V2ContractStatusActive { - if err := updatePotentialRevenueMetrics(usage, false, incrementCurrencyStat); err != nil { - return fmt.Errorf("failed to update potential revenue: %w", err) - } else if err := updateCollateralMetrics(types.ZeroCurrency, usage.RiskedCollateral, false, incrementCurrencyStat); err != nil { - return fmt.Errorf("failed to update collateral metrics: %w", err) - } - } - - selectOldSectorStmt, err := tx.Prepare(`SELECT sector_id FROM contract_v2_sector_roots WHERE contract_id=? AND root_index=?`) - if err != nil { - return fmt.Errorf("failed to prepare select old sector statement: %w", err) - } - defer selectOldSectorStmt.Close() - - selectRootIDStmt, err := tx.Prepare(`SELECT id FROM stored_sectors WHERE sector_root=?`) - if err != nil { - return fmt.Errorf("failed to prepare select root ID statement: %w", err) - } - defer selectRootIDStmt.Close() - - updateRootStmt, err := tx.Prepare(`INSERT INTO contract_v2_sector_roots (contract_id, sector_id, root_index) VALUES (?, ?, ?) ON CONFLICT (contract_id, root_index) DO UPDATE SET sector_id=excluded.sector_id`) - if err != nil { - return fmt.Errorf("failed to prepare update root statement: %w", err) - } - defer updateRootStmt.Close() - - var appended int - var deleted []int64 - seen := make(map[int64]bool) - for i, root := range roots { - // TODO: benchmark this against an exceptionally large contract. - // This is less efficient than the v1 implementation, but it leaves - // less room for update edge-cases now that all sectors are loaded - // into memory. - var newSectorID int64 - if err := selectRootIDStmt.QueryRow(encode(root)).Scan(&newSectorID); err != nil { - return fmt.Errorf("failed to get sector ID: %w", err) - } - - var oldSectorID int64 - err := selectOldSectorStmt.QueryRow(contractDBID, i).Scan(&oldSectorID) - if errors.Is(err, sql.ErrNoRows) { - // new sector - appended++ - } else if err != nil { - // db error - return fmt.Errorf("failed to get sector ID: %w", err) - } else if newSectorID == oldSectorID { - // no change - continue - } else if !seen[oldSectorID] { - // updated root - deleted = append(deleted, oldSectorID) // mark for pruning - seen[oldSectorID] = true - } - - if _, err := updateRootStmt.Exec(contractDBID, newSectorID, i); err != nil { - return fmt.Errorf("failed to update sector root: %w", err) - } - } - - cleaned, err := cleanupDanglingRoots(tx, contractDBID, int64(len(roots))) + contractDBID, err := reviseV2Contract(tx, id, revision, usage) if err != nil { - return fmt.Errorf("failed to cleanup dangling roots: %w", err) - } - for _, sectorID := range cleaned { - if seen[sectorID] { - continue - } - deleted = append(deleted, sectorID) - } - - delta := appended - len(deleted) - if err := incrementNumericStat(tx, metricContractSectors, delta, time.Now()); err != nil { + return fmt.Errorf("failed to revise contract: %w", err) + } else if err := updateV2ContractSectors(tx, contractDBID, roots, s.log.Named("ReviseV2Contract").With(zap.Stringer("contract", id))); err != nil { return fmt.Errorf("failed to update contract sectors: %w", err) } - - if pruned, err := pruneSectors(tx, deleted); err != nil { - return fmt.Errorf("failed to prune sectors: %w", err) - } else if len(pruned) > 0 { - s.log.Debug("pruned sectors", zap.Int("count", len(pruned)), zap.Stringers("sectors", pruned)) - } return nil }) } @@ -594,9 +478,9 @@ func (s *Store) ExpireV2ContractSectors(height uint64) error { } func getContract(tx *txn, contractID int64) (contracts.Contract, error) { - const query = `SELECT c.contract_id, rt.contract_id AS renewed_to, rf.contract_id AS renewed_from, c.contract_status, c.negotiation_height, c.formation_confirmed, + const query = `SELECT c.contract_id, rt.contract_id AS renewed_to, rf.contract_id AS renewed_from, c.contract_status, c.negotiation_height, c.formation_confirmed, COALESCE(c.revision_number=c.confirmed_revision_number, false) AS revision_confirmed, c.resolution_height, c.locked_collateral, c.rpc_revenue, - c.storage_revenue, c.ingress_revenue, c.egress_revenue, c.account_funding, c.risked_collateral, c.raw_revision, c.host_sig, c.renter_sig + c.storage_revenue, c.ingress_revenue, c.egress_revenue, c.account_funding, c.risked_collateral, c.raw_revision, c.host_sig, c.renter_sig FROM contracts c LEFT JOIN contracts rt ON (c.renewed_to = rt.id) LEFT JOIN contracts rf ON (c.renewed_from = rf.id) @@ -713,7 +597,7 @@ ORDER BY root_index ASC;`, contractID, i, j) func trimSectors(tx *txn, contractID int64, n uint64, log *zap.Logger) ([]types.Hash256, error) { selectStmt, err := tx.Prepare(`SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr INNER JOIN stored_sectors ss ON (csr.sector_id=ss.id) -WHERE csr.contract_id=$1 +WHERE csr.contract_id=$1 ORDER BY root_index DESC LIMIT 1`) if err != nil { @@ -920,7 +804,7 @@ func broadcastRevision(tx *txn, index types.ChainIndex, revisionBroadcastHeight } func proofContracts(tx *txn, index types.ChainIndex) (revisions []contracts.SignedRevision, err error) { - const query = `SELECT raw_revision, host_sig, renter_sig + const query = `SELECT raw_revision, host_sig, renter_sig FROM contracts WHERE formation_confirmed AND resolution_height IS NULL AND window_start <= $1 AND window_end > $1` @@ -943,7 +827,7 @@ func proofContracts(tx *txn, index types.ChainIndex) (revisions []contracts.Sign return } -func rebroadcastV2Contracts(tx *txn) (rebroadcast []contracts.V2FormationTransactionSet, err error) { +func rebroadcastV2Contracts(tx *txn) (rebroadcast []rhp4.TransactionSet, err error) { rows, err := tx.Query(`SELECT formation_txn_set, formation_txn_set_basis FROM contracts_v2 WHERE confirmation_index IS NULL AND contract_status <> ?`, contracts.ContractStatusRejected) if err != nil { return nil, err @@ -951,13 +835,13 @@ func rebroadcastV2Contracts(tx *txn) (rebroadcast []contracts.V2FormationTransac defer rows.Close() for rows.Next() { - var formationSet contracts.V2FormationTransactionSet + var formationSet rhp4.TransactionSet var buf []byte if err := rows.Scan(&buf, decode(&formationSet.Basis)); err != nil { return nil, fmt.Errorf("failed to scan contract id: %w", err) } dec := types.NewBufDecoder(buf) - types.DecodeSlice(dec, &formationSet.TransactionSet) + types.DecodeSlice(dec, &formationSet.Transactions) if err := dec.Err(); err != nil { return nil, fmt.Errorf("failed to decode formation txn set: %w", err) } @@ -1092,8 +976,8 @@ func renterDBID(tx *txn, renterKey types.PublicKey) (int64, error) { } func insertContract(tx *txn, revision contracts.SignedRevision, formationSet []types.Transaction, lockedCollateral types.Currency, initialUsage contracts.Usage, negotationHeight uint64) (dbID int64, err error) { - const query = `INSERT INTO contracts (contract_id, renter_id, locked_collateral, rpc_revenue, storage_revenue, ingress_revenue, -egress_revenue, registry_read, registry_write, account_funding, risked_collateral, revision_number, negotiation_height, window_start, window_end, formation_txn_set, + const query = `INSERT INTO contracts (contract_id, renter_id, locked_collateral, rpc_revenue, storage_revenue, ingress_revenue, +egress_revenue, registry_read, registry_write, account_funding, risked_collateral, revision_number, negotiation_height, window_start, window_end, formation_txn_set, raw_revision, host_sig, renter_sig, confirmed_revision_number, contract_status, formation_confirmed) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, false) RETURNING id;` renterID, err := renterDBID(tx, revision.RenterKey()) @@ -1129,9 +1013,9 @@ raw_revision, host_sig, renter_sig, confirmed_revision_number, contract_status, return } -func insertV2Contract(tx *txn, contract contracts.V2Contract, formationSet contracts.V2FormationTransactionSet) (dbID int64, err error) { - const query = `INSERT INTO contracts_v2 (contract_id, renter_id, locked_collateral, rpc_revenue, storage_revenue, ingress_revenue, -egress_revenue, account_funding, risked_collateral, revision_number, negotiation_height, window_start, window_end, formation_txn_set, +func insertV2Contract(tx *txn, contract contracts.V2Contract, formationSet rhp4.TransactionSet) (dbID int64, err error) { + const query = `INSERT INTO contracts_v2 (contract_id, renter_id, locked_collateral, rpc_revenue, storage_revenue, ingress_revenue, +egress_revenue, account_funding, risked_collateral, revision_number, negotiation_height, window_start, window_end, formation_txn_set, formation_txn_set_basis, raw_revision, contract_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) RETURNING id;` renterID, err := renterDBID(tx, contract.RenterPublicKey) @@ -1143,17 +1027,17 @@ formation_txn_set_basis, raw_revision, contract_status) VALUES encode(contract.ID), renterID, encode(contract.V2FileContract.TotalCollateral), - encode(contract.Usage.RPCRevenue), - encode(contract.Usage.StorageRevenue), - encode(contract.Usage.IngressRevenue), - encode(contract.Usage.EgressRevenue), + encode(contract.Usage.RPC), + encode(contract.Usage.Storage), + encode(contract.Usage.Ingress), + encode(contract.Usage.Egress), encode(contract.Usage.AccountFunding), encode(contract.Usage.RiskedCollateral), encode(contract.RevisionNumber), contract.NegotiationHeight, // stored as int64 for queries, should never overflow contract.V2FileContract.ProofHeight, // stored as int64 for queries, should never overflow contract.ExpirationHeight, // stored as int64 for queries, should never overflow - encodeSlice(formationSet.TransactionSet), + encodeSlice(formationSet.Transactions), encode(formationSet.Basis), encode(contract.V2FileContract), contracts.V2ContractStatusPending, @@ -1164,6 +1048,150 @@ formation_txn_set_basis, raw_revision, contract_status) VALUES return } +func updateV2ContractUsage(tx *txn, contractDBID int64, usage proto4.Usage) error { + if err := incrementV2ContractUsage(tx, contractDBID, usage); err != nil { + return fmt.Errorf("failed to update contract usage: %w", err) + } + + var status contracts.V2ContractStatus + err := tx.QueryRow(`SELECT contract_status FROM contracts_v2 WHERE id=$1`, contractDBID).Scan(&status) + if err != nil { + return fmt.Errorf("failed to get contract status: %w", err) + } + + // only increment metrics if the contract is active. + // If the contract is pending or some variant of successful, the metrics + // will already be handled. + if status == contracts.V2ContractStatusActive { + incrementCurrencyStat, done, err := incrementCurrencyStatStmt(tx) + if err != nil { + return fmt.Errorf("failed to prepare increment currency stat statement: %w", err) + } + defer done() + + if err := updateV2PotentialRevenueMetrics(usage, false, incrementCurrencyStat); err != nil { + return fmt.Errorf("failed to update potential revenue: %w", err) + } else if err := updateCollateralMetrics(types.ZeroCurrency, usage.RiskedCollateral, false, incrementCurrencyStat); err != nil { + return fmt.Errorf("failed to update collateral metrics: %w", err) + } + } + return nil +} + +func reviseV2Contract(tx *txn, id types.FileContractID, revision types.V2FileContract, usage proto4.Usage) (int64, error) { + const updateQuery = `UPDATE contracts_v2 SET raw_revision=?, revision_number=? WHERE contract_id=? RETURNING id` + + var contractDBID int64 + err := tx.QueryRow(updateQuery, encode(revision), encode(revision.RevisionNumber), encode(id)).Scan(&contractDBID) + if err != nil { + return 0, fmt.Errorf("failed to update contract: %w", err) + } else if err := updateV2ContractUsage(tx, contractDBID, usage); err != nil { + return 0, fmt.Errorf("failed to update contract usage: %w", err) + } + return contractDBID, nil +} + +func cleanupDanglingRoots(tx *txn, contractID int64, length int64) (deleted []int64, err error) { + rows, err := tx.Query(`DELETE FROM contract_sector_roots WHERE contract_id=$1 AND root_index >= $2 RETURNING sector_id`, contractID, length) + if err != nil { + return nil, fmt.Errorf("failed to cleanup dangling roots: %w", err) + } + defer rows.Close() + + used := make(map[int64]bool) + for rows.Next() { + var sectorID int64 + if err := rows.Scan(§orID); err != nil { + return nil, fmt.Errorf("failed to scan sector ID: %w", err) + } + + if used[sectorID] { + continue + } + deleted = append(deleted, sectorID) + used[sectorID] = true + } + return deleted, nil +} + +func updateV2ContractSectors(tx *txn, contractDBID int64, roots []types.Hash256, log *zap.Logger) error { + selectOldSectorStmt, err := tx.Prepare(`SELECT sector_id FROM contract_v2_sector_roots WHERE contract_id=? AND root_index=?`) + if err != nil { + return fmt.Errorf("failed to prepare select old sector statement: %w", err) + } + defer selectOldSectorStmt.Close() + + selectRootIDStmt, err := tx.Prepare(`SELECT id FROM stored_sectors WHERE sector_root=?`) + if err != nil { + return fmt.Errorf("failed to prepare select root ID statement: %w", err) + } + defer selectRootIDStmt.Close() + + updateRootStmt, err := tx.Prepare(`INSERT INTO contract_v2_sector_roots (contract_id, sector_id, root_index) VALUES (?, ?, ?) ON CONFLICT (contract_id, root_index) DO UPDATE SET sector_id=excluded.sector_id`) + if err != nil { + return fmt.Errorf("failed to prepare update root statement: %w", err) + } + defer updateRootStmt.Close() + + var appended int + var deleted []int64 + seen := make(map[int64]bool) + for i, root := range roots { + // TODO: benchmark this against an exceptionally large contract. + // This is less efficient than the v1 implementation, but it leaves + // less room for update edge-cases now that all sectors are loaded + // into memory. + var newSectorID int64 + if err := selectRootIDStmt.QueryRow(encode(root)).Scan(&newSectorID); err != nil { + return fmt.Errorf("failed to get sector ID: %w", err) + } + + var oldSectorID int64 + err := selectOldSectorStmt.QueryRow(contractDBID, i).Scan(&oldSectorID) + if errors.Is(err, sql.ErrNoRows) { + // new sector + appended++ + } else if err != nil { + // db error + return fmt.Errorf("failed to get sector ID: %w", err) + } else if newSectorID == oldSectorID { + // no change + continue + } else if !seen[oldSectorID] { + // updated root + deleted = append(deleted, oldSectorID) // mark for pruning + seen[oldSectorID] = true + } + + if _, err := updateRootStmt.Exec(contractDBID, newSectorID, i); err != nil { + return fmt.Errorf("failed to update sector root: %w", err) + } + } + + cleaned, err := cleanupDanglingRoots(tx, contractDBID, int64(len(roots))) + if err != nil { + return fmt.Errorf("failed to cleanup dangling roots: %w", err) + } + for _, sectorID := range cleaned { + if seen[sectorID] { + continue + } + deleted = append(deleted, sectorID) + } + + delta := appended - len(deleted) + if err := incrementNumericStat(tx, metricContractSectors, delta, time.Now()); err != nil { + return fmt.Errorf("failed to update contract sectors: %w", err) + } + + if pruned, err := pruneSectors(tx, deleted); err != nil { + return fmt.Errorf("failed to prune sectors: %w", err) + } else if len(pruned) > 0 { + log.Debug("pruned sectors", zap.Int("count", len(pruned)), zap.Stringers("sectors", pruned)) + } + return nil +} + func encodeTxnSet(txns []types.Transaction) []byte { var buf bytes.Buffer e := types.NewEncoder(&buf) @@ -1301,10 +1329,10 @@ func scanV2Contract(row scanner) (c contracts.V2Contract, err error) { decodeNullable(&c.FormationIndex), &c.RevisionConfirmed, decodeNullable(&c.ResolutionIndex), - decode(&c.Usage.RPCRevenue), - decode(&c.Usage.StorageRevenue), - decode(&c.Usage.IngressRevenue), - decode(&c.Usage.EgressRevenue), + decode(&c.Usage.RPC), + decode(&c.Usage.Storage), + decode(&c.Usage.Ingress), + decode(&c.Usage.Egress), decode(&c.Usage.AccountFunding), decode(&c.Usage.RiskedCollateral), decode(&c.V2FileContract), From 1d1f17c3d26f99655688704e8c8c9f4fdcc6d9c3 Mon Sep 17 00:00:00 2001 From: Nate Date: Wed, 20 Nov 2024 09:05:16 -0800 Subject: [PATCH 2/5] fix(contracts): potentially panic on double unlock --- host/contracts/lock.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/host/contracts/lock.go b/host/contracts/lock.go index c6594f6a..86f9850b 100644 --- a/host/contracts/lock.go +++ b/host/contracts/lock.go @@ -129,13 +129,10 @@ func (cm *Manager) LockV2Contract(id types.FileContractID) (rev rhp4.RevisionSta return rhp4.RevisionState{}, nil, fmt.Errorf("failed to get contract: %w", err) } - var once sync.Once return rhp4.RevisionState{ Revision: contract.V2FileContract, Roots: cm.getSectorRoots(id), }, func() { - once.Do(func() { - cm.locks.Unlock(id) - }) + cm.locks.Unlock(id) }, nil } From b5d883e24fc322c36d99f87b84270c22b5716352 Mon Sep 17 00:00:00 2001 From: Nate Date: Wed, 20 Nov 2024 14:22:47 -0800 Subject: [PATCH 3/5] chore(contracts,metrics): remove finalization --- host/contracts/contracts.go | 2 -- host/contracts/manager.go | 16 +++++++++------- host/contracts/manager_test.go | 2 -- host/contracts/update.go | 4 ---- host/metrics/types.go | 1 - persist/sqlite/consensus.go | 6 ------ persist/sqlite/metrics.go | 5 +---- 7 files changed, 10 insertions(+), 26 deletions(-) diff --git a/host/contracts/contracts.go b/host/contracts/contracts.go index fc64d63e..d30f9a04 100644 --- a/host/contracts/contracts.go +++ b/host/contracts/contracts.go @@ -55,8 +55,6 @@ const ( // V2ContractStatusActive indicates that the contract has been confirmed on // the blockchain and is currently active. V2ContractStatusActive V2ContractStatus = "active" - // V2ContractStatusFinalized indicates that the contract has been finalized. - V2ContractStatusFinalized V2ContractStatus = "finalized" // V2ContractStatusRenewed indicates that the contract has been renewed. V2ContractStatusRenewed V2ContractStatus = "renewed" // V2ContractStatusSuccessful indicates that a storage proof has been diff --git a/host/contracts/manager.go b/host/contracts/manager.go index 4c07a655..25a35adb 100644 --- a/host/contracts/manager.go +++ b/host/contracts/manager.go @@ -172,7 +172,7 @@ func (cm *Manager) RenewContract(renewal SignedRevision, existing SignedRevision } // ReviseV2Contract atomically updates a contract and its associated sector roots. -func (cm *Manager) ReviseV2Contract(contractID types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage proto4.Usage) error { +func (cm *Manager) ReviseV2Contract(contractID types.FileContractID, revision types.V2FileContract, newRoots []types.Hash256, usage proto4.Usage) error { done, err := cm.tg.Add() if err != nil { return err @@ -184,6 +184,8 @@ func (cm *Manager) ReviseV2Contract(contractID types.FileContractID, revision ty return fmt.Errorf("failed to get existing contract: %w", err) } + oldRoots := cm.getSectorRoots(contractID) + // validate the contract revision fields switch { case existing.RenterPublicKey != revision.RenterPublicKey: @@ -194,8 +196,10 @@ func (cm *Manager) ReviseV2Contract(contractID types.FileContractID, revision ty return errors.New("proof height does not match") case existing.ExpirationHeight != revision.ExpirationHeight: return errors.New("expiration height does not match") - case revision.Filesize != uint64(rhp2.SectorSize*len(roots)): + case revision.Filesize != uint64(rhp2.SectorSize*len(newRoots)): return errors.New("revision has incorrect file size") + case revision.Capacity < revision.Filesize: + return errors.New("revision capacity must be greater than or equal to file size") } // validate signatures @@ -207,20 +211,18 @@ func (cm *Manager) ReviseV2Contract(contractID types.FileContractID, revision ty } // validate contract Merkle root - metaRoot := rhp2.MetaRoot(roots) + metaRoot := rhp2.MetaRoot(newRoots) if revision.FileMerkleRoot != metaRoot { return errors.New("revision root does not match") - } else if revision.Filesize != uint64(rhp2.SectorSize*len(roots)) { - return errors.New("revision has incorrect file size") } // revise the contract in the store - err = cm.store.ReviseV2Contract(contractID, revision, roots, usage) + err = cm.store.ReviseV2Contract(contractID, revision, oldRoots, newRoots, usage) if err != nil { return err } // update the sector roots cache - cm.setSectorRoots(contractID, roots) + cm.setSectorRoots(contractID, newRoots) cm.log.Debug("contract revised", zap.Stringer("contractID", contractID), zap.Uint64("revisionNumber", revision.RevisionNumber)) return nil } diff --git a/host/contracts/manager_test.go b/host/contracts/manager_test.go index bbd740ac..a992ee75 100644 --- a/host/contracts/manager_test.go +++ b/host/contracts/manager_test.go @@ -753,8 +753,6 @@ func TestV2ContractLifecycle(t *testing.T) { t.Fatalf("expected %v successful contracts, got %v", expectedStatuses[contracts.V2ContractStatusSuccessful], m.Contracts.Successful) } else if m.Contracts.Renewed != expectedStatuses[contracts.V2ContractStatusRenewed] { t.Fatalf("expected %v renewed contracts, got %v", expectedStatuses[contracts.V2ContractStatusRenewed], m.Contracts.Renewed) - } else if m.Contracts.Finalized != expectedStatuses[contracts.V2ContractStatusFinalized] { - t.Fatalf("expected %v finalized contracts, got %v", expectedStatuses[contracts.V2ContractStatusFinalized], m.Contracts.Finalized) } else if m.Contracts.Failed != expectedStatuses[contracts.V2ContractStatusFailed] { t.Fatalf("expected %v failed contracts, got %v", expectedStatuses[contracts.V2ContractStatusFailed], m.Contracts.Failed) } else if !m.Contracts.LockedCollateral.Equals(locked) { diff --git a/host/contracts/update.go b/host/contracts/update.go index 2cf4630f..cccb59ca 100644 --- a/host/contracts/update.go +++ b/host/contracts/update.go @@ -52,7 +52,6 @@ type ( ConfirmedV2 []types.V2FileContractElement RevisedV2 []types.V2FileContractElement SuccessfulV2 []types.FileContractID - FinalizedV2 []types.FileContractID RenewedV2 []types.FileContractID FailedV2 []types.FileContractID } @@ -500,9 +499,6 @@ func buildContractState(tx UpdateStateTx, u stateUpdater, revert bool, log *zap. } case res != nil: switch res := res.(type) { - case *types.V2FileContractFinalization: - state.FinalizedV2 = append(state.FinalizedV2, types.FileContractID(fce.ID)) - log.Debug("finalized v2 contract", zap.Stringer("contractID", fce.ID)) case *types.V2FileContractRenewal: state.RenewedV2 = append(state.RenewedV2, types.FileContractID(fce.ID)) log.Debug("renewed v2 contract", zap.Stringer("contractID", fce.ID)) diff --git a/host/metrics/types.go b/host/metrics/types.go index aec608bc..3df91cc9 100644 --- a/host/metrics/types.go +++ b/host/metrics/types.go @@ -48,7 +48,6 @@ type ( Rejected uint64 `json:"rejected"` Failed uint64 `json:"failed"` Renewed uint64 `json:"renewed"` - Finalized uint64 `json:"finalized"` Successful uint64 `json:"successful"` LockedCollateral types.Currency `json:"lockedCollateral"` diff --git a/persist/sqlite/consensus.go b/persist/sqlite/consensus.go index 882480a3..8fbba3c6 100644 --- a/persist/sqlite/consensus.go +++ b/persist/sqlite/consensus.go @@ -340,8 +340,6 @@ func (ux *updateTx) ApplyContracts(index types.ChainIndex, state contracts.State return fmt.Errorf("failed to apply v2 contract revisions: %w", err) } else if err := applySuccessfulV2Contracts(ux.tx, index, contracts.V2ContractStatusSuccessful, state.SuccessfulV2); err != nil { return fmt.Errorf("failed to apply successful v2 resolution: %w", err) - } else if err := applySuccessfulV2Contracts(ux.tx, index, contracts.V2ContractStatusFinalized, state.FinalizedV2); err != nil { - return fmt.Errorf("failed to apply v2 finalized v2 resolution: %w", err) } else if err := applySuccessfulV2Contracts(ux.tx, index, contracts.V2ContractStatusRenewed, state.RenewedV2); err != nil { return fmt.Errorf("failed to apply v2 renewed v2 resolution: %w", err) } else if err := applyFailedV2Contracts(ux.tx, index, state.FailedV2); err != nil { @@ -370,8 +368,6 @@ func (ux *updateTx) RevertContracts(index types.ChainIndex, state contracts.Stat return fmt.Errorf("failed to revert v2 contract revisions: %w", err) } else if err := revertSuccessfulV2Contracts(ux.tx, contracts.V2ContractStatusSuccessful, state.SuccessfulV2); err != nil { return fmt.Errorf("failed to revert v2 successful resolution: %w", err) - } else if err := revertSuccessfulV2Contracts(ux.tx, contracts.V2ContractStatusFinalized, state.FinalizedV2); err != nil { - return fmt.Errorf("failed to revert v2 finalized resolution: %w", err) } else if err := revertSuccessfulV2Contracts(ux.tx, contracts.V2ContractStatusRenewed, state.RenewedV2); err != nil { return fmt.Errorf("failed to revert v2 renewed resolution: %w", err) } else if err := revertFailedV2Contracts(ux.tx, state.FailedV2); err != nil { @@ -878,8 +874,6 @@ func v2ContractStatusMetric(status contracts.V2ContractStatus) string { return metricSuccessfulContracts case contracts.V2ContractStatusFailed: return metricFailedContracts - case contracts.V2ContractStatusFinalized: - return metricFinalizedContracts case contracts.V2ContractStatusRenewed: return metricRenewedContracts default: diff --git a/persist/sqlite/metrics.go b/persist/sqlite/metrics.go index b4404d3f..ef58152f 100644 --- a/persist/sqlite/metrics.go +++ b/persist/sqlite/metrics.go @@ -20,8 +20,7 @@ const ( metricFailedContracts = "failedContracts" // v2 - metricFinalizedContracts = "finalizedContracts" - metricRenewedContracts = "renewedContracts" + metricRenewedContracts = "renewedContracts" metricLockedCollateral = "lockedCollateral" metricRiskedCollateral = "riskedCollateral" @@ -354,8 +353,6 @@ func mustParseMetricValue(stat string, buf []byte, m *metrics.Metrics) { m.Contracts.Successful = mustScanUint64(buf) case metricFailedContracts: m.Contracts.Failed = mustScanUint64(buf) - case metricFinalizedContracts: - m.Contracts.Finalized = mustScanUint64(buf) case metricRenewedContracts: m.Contracts.Renewed = mustScanUint64(buf) case metricLockedCollateral: From f45da04ce3e1343a70f48a8ec4a8e72112e37026 Mon Sep 17 00:00:00 2001 From: Nate Date: Wed, 20 Nov 2024 14:23:55 -0800 Subject: [PATCH 4/5] refactor(contracts,sqlite): improve revise sector performance, add additional contract tests for sqlite --- host/contracts/persist.go | 2 +- persist/sqlite/contracts.go | 235 +++++++++++++------- persist/sqlite/contracts_test.go | 359 +++++++++++++++++++++++++++++++ persist/sqlite/init.sql | 14 +- persist/sqlite/migrations.go | 53 +++-- 5 files changed, 557 insertions(+), 106 deletions(-) diff --git a/host/contracts/persist.go b/host/contracts/persist.go index 9f5a5f67..13faacb7 100644 --- a/host/contracts/persist.go +++ b/host/contracts/persist.go @@ -51,7 +51,7 @@ type ( RenewV2Contract(renewal V2Contract, renewalSet rhp4.TransactionSet, renewedID types.FileContractID, clearing types.V2FileContract, roots []types.Hash256) error // ReviseV2Contract atomically updates a contract and its associated // sector roots. - ReviseV2Contract(types.FileContractID, types.V2FileContract, []types.Hash256, proto4.Usage) error + ReviseV2Contract(id types.FileContractID, revision types.V2FileContract, oldRoots, newRoots []types.Hash256, usage proto4.Usage) error // ExpireV2ContractSectors removes sector roots for any v2 contracts that are // rejected or past their proof window. ExpireV2ContractSectors(height uint64) error diff --git a/persist/sqlite/contracts.go b/persist/sqlite/contracts.go index 791a5a88..8161f9db 100644 --- a/persist/sqlite/contracts.go +++ b/persist/sqlite/contracts.go @@ -163,6 +163,54 @@ WHERE c.contract_id=$1;` return } +// V2Contracts returns a paginated list of v2 contracts. +func (s *Store) V2Contracts(filter contracts.V2ContractFilter) (contracts []contracts.V2Contract, count int, err error) { + if filter.Limit <= 0 || filter.Limit > 100 { + filter.Limit = 100 + } + + whereClause, whereParams, err := buildV2ContractFilter(filter) + if err != nil { + return nil, 0, fmt.Errorf("failed to build where clause: %w", err) + } + + contractQuery := fmt.Sprintf(`SELECT c.contract_id, rt.contract_id AS renewed_to, rf.contract_id AS renewed_from, c.contract_status, c.negotiation_height, c.confirmation_index, +COALESCE(c.revision_number=cs.revision_number, false) AS revision_confirmed, c.resolution_index, c.rpc_revenue, +c.storage_revenue, c.ingress_revenue, c.egress_revenue, c.account_funding, c.risked_collateral, c.raw_revision +FROM contracts_v2 c +LEFT JOIN contract_v2_state_elements cs ON (c.id = cs.contract_id) +INNER JOIN contract_renters r ON (c.renter_id=r.id) +LEFT JOIN contracts_v2 rt ON (c.renewed_to=rt.id) +LEFT JOIN contracts_v2 rf ON (c.renewed_from=rf.id) %s %s LIMIT ? OFFSET ?`, whereClause, buildV2OrderBy(filter)) + + countQuery := fmt.Sprintf(`SELECT COUNT(*) FROM contracts_v2 c +INNER JOIN contract_renters r ON (c.renter_id=r.id) +LEFT JOIN contracts_v2 rt ON (c.renewed_to=rt.id) +LEFT JOIN contracts_v2 rf ON (c.renewed_from=rf.id) %s`, whereClause) + + err = s.transaction(func(tx *txn) error { + if err := tx.QueryRow(countQuery, whereParams...).Scan(&count); err != nil { + return fmt.Errorf("failed to query contract count: %w", err) + } + + rows, err := tx.Query(contractQuery, append(whereParams, filter.Limit, filter.Offset)...) + if err != nil { + return fmt.Errorf("failed to query contracts: %w", err) + } + defer rows.Close() + + for rows.Next() { + contract, err := scanV2Contract(rows) + if err != nil { + return fmt.Errorf("failed to scan contract: %w", err) + } + contracts = append(contracts, contract) + } + return rows.Err() + }) + return +} + // AddV2Contract adds a new contract to the database. func (s *Store) AddV2Contract(contract contracts.V2Contract, formationSet rhp4.TransactionSet) error { return s.transaction(func(tx *txn) error { @@ -276,12 +324,12 @@ func incrementV2ContractUsage(tx *txn, dbID int64, usage proto4.Usage) error { } // ReviseV2Contract atomically updates a contract's revision and sectors -func (s *Store) ReviseV2Contract(id types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage proto4.Usage) error { +func (s *Store) ReviseV2Contract(id types.FileContractID, revision types.V2FileContract, oldRoots, newRoots []types.Hash256, usage proto4.Usage) error { return s.transaction(func(tx *txn) error { contractDBID, err := reviseV2Contract(tx, id, revision, usage) if err != nil { return fmt.Errorf("failed to revise contract: %w", err) - } else if err := updateV2ContractSectors(tx, contractDBID, roots, s.log.Named("ReviseV2Contract").With(zap.Stringer("contract", id))); err != nil { + } else if err := updateV2ContractSectors(tx, contractDBID, oldRoots, newRoots); err != nil { return fmt.Errorf("failed to update contract sectors: %w", err) } return nil @@ -670,8 +718,8 @@ func deleteExpiredV2ContractSectors(tx *txn, height uint64) (sectorIDs []int64, const query = `DELETE FROM contract_v2_sector_roots WHERE id IN (SELECT csr.id FROM contract_v2_sector_roots csr INNER JOIN contracts_v2 c ON (csr.contract_id=c.id) --- past proof window or not confirmed and past the rebroadcast height -WHERE c.window_end < $1 OR c.contract_status=$2 LIMIT $3) +-- past expiration or not confirmed and past the rebroadcast height +WHERE c.expiration_height < $1 OR c.contract_status=$2 LIMIT $3) RETURNING sector_id;` rows, err := tx.Query(query, height, contracts.ContractStatusRejected, sqlSectorBatchSize) if err != nil { @@ -690,7 +738,6 @@ RETURNING sector_id;` // updateResolvedV2Contract clears a contract and returns its ID func updateResolvedV2Contract(tx *txn, contractID types.FileContractID, clearing types.V2FileContract, renewedDBID int64) (dbID int64, err error) { - // add the final usage to the contract revenue const clearQuery = `UPDATE contracts_v2 SET (renewed_to, revision_number, raw_revision) = ($1, $2, $3) WHERE contract_id=$4 RETURNING id;` err = tx.QueryRow(clearQuery, renewedDBID, @@ -857,7 +904,7 @@ func broadcastV2Revision(tx *txn, index types.ChainIndex, revisionBroadcastHeigh const query = `SELECT c.raw_revision, c.contract_id, cs.leaf_index, cs.merkle_proof, cs.raw_contract FROM contracts_v2 c INNER JOIN contract_v2_state_elements cs ON (c.id = cs.contract_id) - WHERE c.confirmation_index IS NOT NULL AND c.resolution_index IS NULL AND cs.revision_number != c.revision_number AND c.window_start BETWEEN ? AND ?` + WHERE c.confirmation_index IS NOT NULL AND c.resolution_index IS NULL AND cs.revision_number != c.revision_number AND c.proof_height BETWEEN ? AND ?` rows, err := tx.Query(query, index.Height, revisionBroadcastHeight) if err != nil { @@ -888,7 +935,7 @@ func proofV2Contracts(tx *txn, index types.ChainIndex) (elements []types.V2FileC const query = `SELECT c.contract_id, cs.raw_contract, cs.leaf_index, cs.merkle_proof FROM contracts_v2 c INNER JOIN contract_v2_state_elements cs ON (c.id = cs.contract_id) - WHERE c.confirmation_index IS NOT NULL AND c.resolution_index IS NULL AND c.window_start <= $1 AND c.window_end > $1` + WHERE c.confirmation_index IS NOT NULL AND c.resolution_index IS NULL AND c.proof_height <= $1 AND c.expiration_height > $1` rows, err := tx.Query(query, index.Height) if err != nil { @@ -913,7 +960,7 @@ func expireV2Contracts(tx *txn, index types.ChainIndex) (elements []types.V2File const query = `SELECT c.contract_id, cs.raw_contract, cs.leaf_index, cs.merkle_proof FROM contracts_v2 c INNER JOIN contract_v2_state_elements cs ON (c.id = cs.contract_id) - WHERE c.resolution_index IS NULL AND c.window_end <= $1` + WHERE c.resolution_index IS NULL AND c.expiration_height <= $1` rows, err := tx.Query(query, index.Height) if err != nil { @@ -1015,7 +1062,7 @@ raw_revision, host_sig, renter_sig, confirmed_revision_number, contract_status, func insertV2Contract(tx *txn, contract contracts.V2Contract, formationSet rhp4.TransactionSet) (dbID int64, err error) { const query = `INSERT INTO contracts_v2 (contract_id, renter_id, locked_collateral, rpc_revenue, storage_revenue, ingress_revenue, -egress_revenue, account_funding, risked_collateral, revision_number, negotiation_height, window_start, window_end, formation_txn_set, +egress_revenue, account_funding, risked_collateral, revision_number, negotiation_height, proof_height, expiration_height, formation_txn_set, formation_txn_set_basis, raw_revision, contract_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) RETURNING id;` renterID, err := renterDBID(tx, contract.RenterPublicKey) @@ -1091,36 +1138,7 @@ func reviseV2Contract(tx *txn, id types.FileContractID, revision types.V2FileCon return contractDBID, nil } -func cleanupDanglingRoots(tx *txn, contractID int64, length int64) (deleted []int64, err error) { - rows, err := tx.Query(`DELETE FROM contract_sector_roots WHERE contract_id=$1 AND root_index >= $2 RETURNING sector_id`, contractID, length) - if err != nil { - return nil, fmt.Errorf("failed to cleanup dangling roots: %w", err) - } - defer rows.Close() - - used := make(map[int64]bool) - for rows.Next() { - var sectorID int64 - if err := rows.Scan(§orID); err != nil { - return nil, fmt.Errorf("failed to scan sector ID: %w", err) - } - - if used[sectorID] { - continue - } - deleted = append(deleted, sectorID) - used[sectorID] = true - } - return deleted, nil -} - -func updateV2ContractSectors(tx *txn, contractDBID int64, roots []types.Hash256, log *zap.Logger) error { - selectOldSectorStmt, err := tx.Prepare(`SELECT sector_id FROM contract_v2_sector_roots WHERE contract_id=? AND root_index=?`) - if err != nil { - return fmt.Errorf("failed to prepare select old sector statement: %w", err) - } - defer selectOldSectorStmt.Close() - +func updateV2ContractSectors(tx *txn, contractDBID int64, oldRoots, newRoots []types.Hash256) error { selectRootIDStmt, err := tx.Prepare(`SELECT id FROM stored_sectors WHERE sector_root=?`) if err != nil { return fmt.Errorf("failed to prepare select root ID statement: %w", err) @@ -1133,62 +1151,30 @@ func updateV2ContractSectors(tx *txn, contractDBID int64, roots []types.Hash256, } defer updateRootStmt.Close() - var appended int - var deleted []int64 - seen := make(map[int64]bool) - for i, root := range roots { - // TODO: benchmark this against an exceptionally large contract. - // This is less efficient than the v1 implementation, but it leaves - // less room for update edge-cases now that all sectors are loaded - // into memory. - var newSectorID int64 - if err := selectRootIDStmt.QueryRow(encode(root)).Scan(&newSectorID); err != nil { - return fmt.Errorf("failed to get sector ID: %w", err) - } - - var oldSectorID int64 - err := selectOldSectorStmt.QueryRow(contractDBID, i).Scan(&oldSectorID) - if errors.Is(err, sql.ErrNoRows) { - // new sector - appended++ - } else if err != nil { - // db error - return fmt.Errorf("failed to get sector ID: %w", err) - } else if newSectorID == oldSectorID { - // no change + for i, root := range newRoots { + if i < len(oldRoots) && oldRoots[i] == root { continue - } else if !seen[oldSectorID] { - // updated root - deleted = append(deleted, oldSectorID) // mark for pruning - seen[oldSectorID] = true } - if _, err := updateRootStmt.Exec(contractDBID, newSectorID, i); err != nil { + var newSectorID int64 + if err := selectRootIDStmt.QueryRow(encode(root)).Scan(&newSectorID); err != nil { + return fmt.Errorf("failed to get sector ID: %w", err) + } else if _, err := updateRootStmt.Exec(contractDBID, newSectorID, i); err != nil { return fmt.Errorf("failed to update sector root: %w", err) } } - cleaned, err := cleanupDanglingRoots(tx, contractDBID, int64(len(roots))) - if err != nil { - return fmt.Errorf("failed to cleanup dangling roots: %w", err) - } - for _, sectorID := range cleaned { - if seen[sectorID] { - continue + if len(newRoots) < len(oldRoots) { + _, err := tx.Exec(`DELETE FROM contract_v2_sector_roots WHERE contract_id=$1 AND root_index >= $2`, contractDBID, len(newRoots)) + if err != nil { + return fmt.Errorf("failed to remove old roots: %w", err) } - deleted = append(deleted, sectorID) } - delta := appended - len(deleted) + delta := len(newRoots) - len(oldRoots) if err := incrementNumericStat(tx, metricContractSectors, delta, time.Now()); err != nil { return fmt.Errorf("failed to update contract sectors: %w", err) } - - if pruned, err := pruneSectors(tx, deleted); err != nil { - return fmt.Errorf("failed to prune sectors: %w", err) - } else if len(pruned) > 0 { - log.Debug("pruned sectors", zap.Int("count", len(pruned)), zap.Stringers("sectors", pruned)) - } return nil } @@ -1291,6 +1277,91 @@ func buildOrderBy(filter contracts.ContractFilter) string { } } +func buildV2ContractFilter(filter contracts.V2ContractFilter) (string, []any, error) { + var whereClause []string + var queryParams []any + + if len(filter.Statuses) != 0 { + whereClause = append(whereClause, `c.contract_status IN (`+queryPlaceHolders(len(filter.Statuses))+`)`) + queryParams = append(queryParams, queryArgs(filter.Statuses)...) + } + + if len(filter.ContractIDs) != 0 { + whereClause = append(whereClause, `c.contract_id IN (`+queryPlaceHolders(len(filter.ContractIDs))+`)`) + for _, value := range filter.ContractIDs { + queryParams = append(queryParams, encode(value)) + } + } + + if len(filter.RenewedFrom) != 0 { + whereClause = append(whereClause, `rf.contract_id IN (`+queryPlaceHolders(len(filter.RenewedFrom))+`)`) + for _, value := range filter.RenewedFrom { + queryParams = append(queryParams, encode(value)) + } + } + + if len(filter.RenewedTo) != 0 { + whereClause = append(whereClause, `rt.contract_id IN (`+queryPlaceHolders(len(filter.RenewedTo))+`)`) + for _, value := range filter.RenewedTo { + queryParams = append(queryParams, encode(value)) + } + } + + if len(filter.RenterKey) != 0 { + whereClause = append(whereClause, `r.public_key IN (`+queryPlaceHolders(len(filter.RenterKey))+`)`) + for _, value := range filter.RenterKey { + queryParams = append(queryParams, encode(value)) + } + } + + if filter.MinNegotiationHeight > 0 && filter.MaxNegotiationHeight > 0 { + if filter.MinNegotiationHeight < filter.MaxNegotiationHeight { + return "", nil, errors.New("min negotiation height must be less than max negotiation height") + } + whereClause = append(whereClause, `c.negotiation_height BETWEEN ? AND ?`) + queryParams = append(queryParams, filter.MinNegotiationHeight, filter.MaxNegotiationHeight) + } else if filter.MinNegotiationHeight > 0 { + whereClause = append(whereClause, `c.negotiation_height >= ?`) + queryParams = append(queryParams, filter.MinNegotiationHeight) + } else if filter.MaxNegotiationHeight > 0 { + whereClause = append(whereClause, `c.negotiation_height <= ?`) + queryParams = append(queryParams, filter.MaxNegotiationHeight) + } + + if filter.MinExpirationHeight > 0 && filter.MaxExpirationHeight > 0 { + if filter.MinExpirationHeight < filter.MaxExpirationHeight { + return "", nil, errors.New("min expiration height must be less than max expiration height") + } + whereClause = append(whereClause, `c.expiration_height BETWEEN ? AND ?`) + queryParams = append(queryParams, filter.MinExpirationHeight, filter.MaxExpirationHeight) + } else if filter.MinExpirationHeight > 0 { + whereClause = append(whereClause, `c.expiration_hieght >= ?`) + queryParams = append(queryParams, filter.MinExpirationHeight) + } else if filter.MaxExpirationHeight > 0 { + whereClause = append(whereClause, `c.expiration_hieght <= ?`) + queryParams = append(queryParams, filter.MaxExpirationHeight) + } + if len(whereClause) == 0 { + return "", nil, nil + } + return "WHERE " + strings.Join(whereClause, " AND "), queryParams, nil +} + +func buildV2OrderBy(filter contracts.V2ContractFilter) string { + dir := "ASC" + if filter.SortDesc { + dir = "DESC" + } + switch filter.SortField { + case contracts.ContractSortStatus: + return `ORDER BY c.contract_status ` + dir + case contracts.ContractSortNegotiationHeight: + return `ORDER BY c.negotiation_height ` + dir + default: + return `ORDER BY c.expiration_height ` + dir + } +} + func scanContract(row scanner) (c contracts.Contract, err error) { var contractID types.FileContractID err = row.Scan(decode(&contractID), diff --git a/persist/sqlite/contracts_test.go b/persist/sqlite/contracts_test.go index 46949253..5cd7d0f3 100644 --- a/persist/sqlite/contracts_test.go +++ b/persist/sqlite/contracts_test.go @@ -7,7 +7,9 @@ import ( "testing" "time" + proto4 "go.sia.tech/core/rhp/v4" "go.sia.tech/core/types" + rhp4 "go.sia.tech/coreutils/rhp/v4" "go.sia.tech/hostd/host/contracts" "go.sia.tech/hostd/host/storage" "go.uber.org/zap/zaptest" @@ -425,6 +427,251 @@ func TestContracts(t *testing.T) { } } +func TestV2Contracts(t *testing.T) { + log := zaptest.NewLogger(t) + db, err := OpenDatabase(filepath.Join(t.TempDir(), "test.db"), log) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + renterKey := types.NewPrivateKeyFromSeed(frand.Bytes(32)) + hostKey := types.NewPrivateKeyFromSeed(frand.Bytes(32)) + + c, count, err := db.V2Contracts(contracts.V2ContractFilter{}) + if err != nil { + t.Fatal(err) + } else if len(c) != 0 { + t.Fatal("expected no contracts") + } else if count != 0 { + t.Fatal("expected no contracts") + } + + // add a contract to the database + contract := contracts.V2Contract{ + ID: frand.Entropy256(), + V2FileContract: types.V2FileContract{ + RenterPublicKey: renterKey.PublicKey(), + HostPublicKey: hostKey.PublicKey(), + ProofHeight: 100, + ExpirationHeight: 200, + }, + } + + if err := db.AddV2Contract(contract, rhp4.TransactionSet{}); err != nil { + t.Fatal(err) + } + + volumeID, err := db.AddVolume("test.dat", false) + if err != nil { + t.Fatal(err) + } else if err := db.SetAvailable(volumeID, true); err != nil { + t.Fatal(err) + } else if err = db.GrowVolume(volumeID, 100); err != nil { + t.Fatal(err) + } + + c, count, err = db.V2Contracts(contracts.V2ContractFilter{}) + if err != nil { + t.Fatal(err) + } else if len(c) != 1 { + t.Fatal("expected one contract") + } else if count != 1 { + t.Fatal("expected one contract") + } + + filter := contracts.V2ContractFilter{ + Statuses: []contracts.V2ContractStatus{contracts.V2ContractStatusActive}, + } + c, count, err = db.V2Contracts(filter) + if err != nil { + t.Fatal(err) + } else if len(c) != 0 { + t.Fatal("expected no contracts") + } else if count != 0 { + t.Fatal("expected no contracts") + } +} + +func TestReviseV2ContractConsistency(t *testing.T) { + log := zaptest.NewLogger(t) + db, err := OpenDatabase(filepath.Join(t.TempDir(), "test.db"), log) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + renterKey := types.NewPrivateKeyFromSeed(frand.Bytes(32)) + hostKey := types.NewPrivateKeyFromSeed(frand.Bytes(32)) + + c, count, err := db.V2Contracts(contracts.V2ContractFilter{}) + if err != nil { + t.Fatal(err) + } else if len(c) != 0 { + t.Fatal("expected no contracts") + } else if count != 0 { + t.Fatal("expected no contracts") + } + + // add a contract to the database + contract := contracts.V2Contract{ + ID: frand.Entropy256(), + V2FileContract: types.V2FileContract{ + RenterPublicKey: renterKey.PublicKey(), + HostPublicKey: hostKey.PublicKey(), + ProofHeight: 100, + ExpirationHeight: 200, + }, + } + + if err := db.AddV2Contract(contract, rhp4.TransactionSet{}); err != nil { + t.Fatal(err) + } + + volumeID, err := db.AddVolume("test.dat", false) + if err != nil { + t.Fatal(err) + } else if err := db.SetAvailable(volumeID, true); err != nil { + t.Fatal(err) + } else if err = db.GrowVolume(volumeID, 1000); err != nil { + t.Fatal(err) + } + + checkRootConsistency := func(t *testing.T, expected []types.Hash256) { + t.Helper() + + err := db.transaction(func(*txn) error { + stmt, err := db.db.Prepare(`SELECT ss.sector_root FROM stored_sectors ss +INNER JOIN contract_v2_sector_roots csr ON (ss.id = csr.sector_id) +INNER JOIN contracts_v2 c ON (csr.contract_id = c.id) +WHERE c.contract_id=$1 AND csr.root_index= $2`) + if err != nil { + t.Fatal("failed to prepare statement:", err) + } + defer stmt.Close() + + for i, root := range expected { + var dbRoot types.Hash256 + if err := stmt.QueryRow(encode(contract.ID), i).Scan(decode(&dbRoot)); err != nil { + t.Fatalf("failed to scan root %d: %s", i, err) + } else if dbRoot != root { + t.Fatalf("expected root %q at index %d, got %q", root, i, dbRoot) + } + } + return nil + }) + if err != nil { + t.Fatal("failed to get db roots:", err) + } + } + + checkMetricConsistency := func(t *testing.T, expected uint64) { + t.Helper() + + m, err := db.Metrics(time.Now()) + if err != nil { + t.Fatal("failed to get metrics:", err) + } else if m.Storage.ContractSectors != expected { + t.Fatalf("expected %d contract sectors, got %d", expected, m.Storage.ContractSectors) + } + } + + var roots []types.Hash256 + appendSectors := func(t *testing.T, n int) { + t.Helper() + + var releaseFn []func() error + var appended []types.Hash256 + for i := 0; i < n; i++ { + root := frand.Entropy256() + release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + if err != nil { + t.Fatal("failed to store sector:", err) + } + appended = append(appended, root) + releaseFn = append(releaseFn, release) + } + newRoots := append(append([]types.Hash256(nil), roots...), appended...) + if err := db.ReviseV2Contract(contract.ID, contract.V2FileContract, roots, newRoots, proto4.Usage{}); err != nil { + t.Fatal("failed to revise contract:", err) + } + + for _, fn := range releaseFn { + if err := fn(); err != nil { + t.Fatal("failed to release sector:", err) + } + } + + checkRootConsistency(t, newRoots) + checkMetricConsistency(t, uint64(len(newRoots))) + roots = newRoots + } + + swapSectors := func(t *testing.T) { + t.Helper() + + a, b := frand.Intn(len(roots)), frand.Intn(len(roots)) + + newRoots := append([]types.Hash256(nil), roots...) + newRoots[a], newRoots[b] = newRoots[b], newRoots[a] + + if err := db.ReviseV2Contract(contract.ID, contract.V2FileContract, roots, newRoots, proto4.Usage{}); err != nil { + t.Fatal("failed to revise contract:", err) + } + checkRootConsistency(t, newRoots) + checkMetricConsistency(t, uint64(len(newRoots))) + roots = newRoots + } + + deleteSectors := func(t *testing.T, n int) { + t.Helper() + + newRoots := append([]types.Hash256(nil), roots...) + for i := 0; i < n; i++ { + j := frand.Intn(len(newRoots)) + newRoots = append(newRoots[:j], newRoots[j+1:]...) + } + + if err := db.ReviseV2Contract(contract.ID, contract.V2FileContract, roots, newRoots, proto4.Usage{}); err != nil { + t.Fatal("failed to revise contract:", err) + } + + checkRootConsistency(t, newRoots) + checkMetricConsistency(t, uint64(len(newRoots))) + roots = newRoots + } + + trimSectors := func(t *testing.T, n int) { + t.Helper() + + newRoots := append([]types.Hash256(nil), roots...) + newRoots = newRoots[:len(newRoots)-n] + + if err := db.ReviseV2Contract(contract.ID, contract.V2FileContract, roots, newRoots, proto4.Usage{}); err != nil { + t.Fatal("failed to revise contract:", err) + } + + checkRootConsistency(t, newRoots) + checkMetricConsistency(t, uint64(len(newRoots))) + roots = newRoots + } + + appendSectors(t, 15) + swapSectors(t) + deleteSectors(t, 1) + trimSectors(t, 5) + + appendSectors(t, frand.Intn(49)+1) + for i := 0; i < 100; i++ { + swapSectors(t) + } + for i := 0; i < 10; i++ { + appendSectors(t, 1) + deleteSectors(t, frand.Intn(len(roots)/4)+1) + } + trimSectors(t, frand.Intn(len(roots)/4)+1) +} + func BenchmarkTrimSectors(b *testing.B) { log := zaptest.NewLogger(b) db, err := OpenDatabase(filepath.Join(b.TempDir(), "test.db"), log) @@ -504,3 +751,115 @@ func BenchmarkTrimSectors(b *testing.B) { b.Fatal(err) } } + +func BenchmarkV2AppendSectors(b *testing.B) { + log := zaptest.NewLogger(b) + db, err := OpenDatabase(filepath.Join(b.TempDir(), "test.db"), log) + if err != nil { + b.Fatal(err) + } + defer db.Close() + + // add a contract to the database + contract := contracts.V2Contract{ + ID: frand.Entropy256(), + V2FileContract: types.V2FileContract{ + RevisionNumber: 1, + }, + } + + if err := db.AddV2Contract(contract, rhp4.TransactionSet{}); err != nil { + b.Fatal(err) + } + + volumeID, err := db.AddVolume("test.dat", false) + if err != nil { + b.Fatal(err) + } else if err := db.SetAvailable(volumeID, true); err != nil { + b.Fatal(err) + } else if err = db.GrowVolume(volumeID, uint64(b.N)); err != nil { + b.Fatal(err) + } + + roots := make([]types.Hash256, 0, b.N) + + for i := 0; i < b.N; i++ { + root := types.Hash256(frand.Entropy256()) + roots = append(roots, root) + + release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + if err != nil { + b.Fatal(err) + } else if err := db.AddTemporarySectors([]storage.TempSector{{Root: root, Expiration: 100}}); err != nil { + b.Fatal(err) + } else if err := release(); err != nil { + b.Fatal(err) + } + } + + b.ResetTimer() + b.ReportAllocs() + b.ReportMetric(float64(b.N), "sectors") + + if err := db.ReviseV2Contract(contract.ID, contract.V2FileContract, nil, roots, proto4.Usage{}); err != nil { + b.Fatal(err) + } +} + +func BenchmarkV2TrimSectors(b *testing.B) { + log := zaptest.NewLogger(b) + db, err := OpenDatabase(filepath.Join(b.TempDir(), "test.db"), log) + if err != nil { + b.Fatal(err) + } + defer db.Close() + + // add a contract to the database + contract := contracts.V2Contract{ + ID: frand.Entropy256(), + V2FileContract: types.V2FileContract{ + RevisionNumber: 1, + }, + } + + if err := db.AddV2Contract(contract, rhp4.TransactionSet{}); err != nil { + b.Fatal(err) + } + + volumeID, err := db.AddVolume("test.dat", false) + if err != nil { + b.Fatal(err) + } else if err := db.SetAvailable(volumeID, true); err != nil { + b.Fatal(err) + } else if err = db.GrowVolume(volumeID, uint64(b.N)); err != nil { + b.Fatal(err) + } + + roots := make([]types.Hash256, 0, b.N) + + for i := 0; i < b.N; i++ { + root := types.Hash256(frand.Entropy256()) + roots = append(roots, root) + + release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + if err != nil { + b.Fatal(err) + } else if err := db.AddTemporarySectors([]storage.TempSector{{Root: root, Expiration: 100}}); err != nil { + b.Fatal(err) + } else if err := release(); err != nil { + b.Fatal(err) + } + } + + if err := db.ReviseV2Contract(contract.ID, contract.V2FileContract, nil, roots, proto4.Usage{}); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.ReportAllocs() + b.ReportMetric(float64(b.N), "sectors") + + if err := db.ReviseV2Contract(contract.ID, contract.V2FileContract, roots, nil, proto4.Usage{}); err != nil { + b.Fatal(err) + } +} diff --git a/persist/sqlite/init.sql b/persist/sqlite/init.sql index fdaec9c6..946b13e7 100644 --- a/persist/sqlite/init.sql +++ b/persist/sqlite/init.sql @@ -159,8 +159,8 @@ CREATE TABLE contracts_v2 ( confirmation_index BLOB, -- null if the contract has not been confirmed on the blockchain, otherwise the chain index of the block containing the confirmation transaction resolution_index BLOB, -- null if the storage proof/resolution has not been confirmed on the blockchain, otherwise the chain index of the block containing the resolution transaction negotiation_height INTEGER NOT NULL, -- determines if the formation txn should be rebroadcast or if the contract should be deleted - window_start INTEGER NOT NULL, - window_end INTEGER NOT NULL, + proof_height INTEGER NOT NULL, + expiration_height INTEGER NOT NULL, contract_status TEXT NOT NULL ); CREATE INDEX contracts_v2_contract_id ON contracts_v2(contract_id); @@ -168,12 +168,12 @@ CREATE INDEX contracts_v2_renter_id ON contracts_v2(renter_id); CREATE INDEX contracts_v2_renewed_to ON contracts_v2(renewed_to); CREATE INDEX contracts_v2_renewed_from ON contracts_v2(renewed_from); CREATE INDEX contracts_v2_negotiation_height ON contracts_v2(negotiation_height); -CREATE INDEX contracts_v2_window_start ON contracts_v2(window_start); -CREATE INDEX contracts_v2_window_end ON contracts_v2(window_end); +CREATE INDEX contracts_v2_proof_height ON contracts_v2(proof_height); +CREATE INDEX contracts_v2_expiration_height ON contracts_v2(expiration_height); CREATE INDEX contracts_v2_contract_status ON contracts_v2(contract_status); -CREATE INDEX contracts_v2_confirmation_index_resolution_index_window_start ON contracts_v2(confirmation_index, resolution_index, window_start); -CREATE INDEX contracts_v2_confirmation_index_resolution_index_window_end ON contracts_v2(confirmation_index, resolution_index, window_end); -CREATE INDEX contracts_v2_confirmation_index_window_start ON contracts_v2(confirmation_index, window_start); +CREATE INDEX contracts_v2_confirmation_index_resolution_index_proof_height ON contracts_v2(confirmation_index, resolution_index, proof_height); +CREATE INDEX contracts_v2_confirmation_index_resolution_index_expiration_height ON contracts_v2(confirmation_index, resolution_index, expiration_height); +CREATE INDEX contracts_v2_confirmation_index_proof_height ON contracts_v2(confirmation_index, proof_height); CREATE INDEX contracts_v2_confirmation_index_negotiation_height ON contracts_v2(confirmation_index, negotiation_height); CREATE TABLE contract_v2_sector_roots ( diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index b59b05f7..9788c83d 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -10,6 +10,26 @@ import ( "go.uber.org/zap" ) +// migrateVersion32 adds the proof height and expiration_height columns to the contracts_v2 table. +func migrateVersion32(tx *txn, _ *zap.Logger) error { + _, err := tx.Exec(` +ALTER TABLE contracts_v2 ADD COLUMN proof_height INTEGER NOT NULL; +ALTER TABLE contracts_v2 ADD COLUMN expiration_height INTEGER NOT NULL; +ALTER TABLE contracts_v2 DROP COLUMN window_start; +ALTER TABLE contracts_v2 DROP COLUMN window_end; +DROP INDEX contracts_v2_window_start; +DROP INDEX contracts_v2_window_end; +DROP INDEX contracts_v2_confirmation_index_resolution_index_window_start; +DROP INDEX contracts_v2_confirmation_index_resolution_index_window_end; +DROP INDEX contracts_v2_confirmation_index_window_start; +CREATE INDEX contracts_v2_proof_height ON contracts_v2(proof_height); +CREATE INDEX contracts_v2_expiration_height ON contracts_v2(expiration_height); +CREATE INDEX contracts_v2_confirmation_index_resolution_index_proof_height ON contracts_v2(confirmation_index, resolution_index, proof_height); +CREATE INDEX contracts_v2_confirmation_index_resolution_index_expiration_height ON contracts_v2(confirmation_index, resolution_index, expiration_height); +CREATE INDEX contracts_v2_confirmation_index_proof_height ON contracts_v2(confirmation_index, proof_height);`) + return err +} + func migrateVersion31(tx *txn, _ *zap.Logger) error { _, err := tx.Exec(` ALTER TABLE global_settings ADD COLUMN last_v2_announce_hash BLOB; @@ -52,7 +72,7 @@ UPDATE global_settings SET last_scanned_index=NULL, last_announce_index=NULL, la // migrateVersion28 prepares the database for version 2 func migrateVersion28(tx *txn, log *zap.Logger) error { _, err := tx.Exec(` --- Drop v1 tables +-- Drop v1 tables DROP TABLE wallet_utxos; DROP TABLE wallet_transactions; @@ -457,8 +477,8 @@ func migrateVersion18(tx *txn, _ *zap.Logger) error { func migrateVersion17(tx *txn, _ *zap.Logger) error { const query = ` -- create a temp table that contains the new indices -CREATE TEMP TABLE temp_contract_sector_roots AS -SELECT * FROM (SELECT id, contract_id, root_index, ROW_NUMBER() OVER (PARTITION BY contract_id ORDER BY root_index ASC)-1 AS expected_root_index +CREATE TEMP TABLE temp_contract_sector_roots AS +SELECT * FROM (SELECT id, contract_id, root_index, ROW_NUMBER() OVER (PARTITION BY contract_id ORDER BY root_index ASC)-1 AS expected_root_index FROM contract_sector_roots) a WHERE root_index <> expected_root_index ORDER BY contract_id, root_index ASC; -- update the contract_sector_roots table with the new indices UPDATE contract_sector_roots @@ -563,7 +583,7 @@ CREATE TABLE contracts_new ( ); -- copy the data from the old contracts table to the new contracts table -INSERT INTO contracts_new (id, renter_id, renewed_to, renewed_from, contract_id, revision_number, formation_txn_set, locked_collateral, rpc_revenue, storage_revenue, ingress_revenue, egress_revenue, account_funding, registry_read, registry_write, risked_collateral, confirmed_revision_number, host_sig, renter_sig, raw_revision, formation_confirmed, resolution_height, negotiation_height, window_start, window_end, contract_status) +INSERT INTO contracts_new (id, renter_id, renewed_to, renewed_from, contract_id, revision_number, formation_txn_set, locked_collateral, rpc_revenue, storage_revenue, ingress_revenue, egress_revenue, account_funding, registry_read, registry_write, risked_collateral, confirmed_revision_number, host_sig, renter_sig, raw_revision, formation_confirmed, resolution_height, negotiation_height, window_start, window_end, contract_status) SELECT id, renter_id, renewed_to, renewed_from, contract_id, revision_number, formation_txn_set, locked_collateral, $1, $1, $1, $1, $1, $1, $1, risked_collateral, confirmed_revision_number, host_sig, renter_sig, raw_revision, formation_confirmed, resolution_height, negotiation_height, window_start, window_end, contract_status FROM contracts; -- drop the old contracts table and rename the new contracts table @@ -820,13 +840,13 @@ func migrateVersion4(tx *txn, _ *zap.Logger) error { ddns_opts BLOB, registry_limit INTEGER NOT NULL );` - migrateSettings = `INSERT INTO host_settings (id, settings_revision, accepting_contracts, net_address, -contract_price, base_rpc_price, sector_access_price, collateral_multiplier, max_collateral, storage_price, egress_price, -ingress_price, max_account_balance, max_account_age, price_table_validity, max_contract_duration, window_size, + migrateSettings = `INSERT INTO host_settings (id, settings_revision, accepting_contracts, net_address, +contract_price, base_rpc_price, sector_access_price, collateral_multiplier, max_collateral, storage_price, egress_price, +ingress_price, max_account_balance, max_account_age, price_table_validity, max_contract_duration, window_size, ingress_limit, egress_limit, ddns_provider, ddns_update_v4, ddns_update_v6, ddns_opts, registry_limit) -SELECT 0, settings_revision, accepting_contracts, net_address, contract_price, base_rpc_price, -sector_access_price, 2.0, max_collateral, storage_price, egress_price, ingress_price, -max_account_balance, max_account_age, price_table_validity, max_contract_duration, window_size, ingress_limit, +SELECT 0, settings_revision, accepting_contracts, net_address, contract_price, base_rpc_price, +sector_access_price, 2.0, max_collateral, storage_price, egress_price, ingress_price, +max_account_balance, max_account_age, price_table_validity, max_contract_duration, window_size, ingress_limit, egress_limit, ddns_provider, ddns_update_v4, ddns_update_v6, ddns_opts, registry_limit FROM host_settings_old;` ) @@ -882,13 +902,13 @@ func migrateVersion2(tx *txn, _ *zap.Logger) error { ddns_opts BLOB, registry_limit INTEGER NOT NULL );` - migrateSettings = `INSERT INTO host_settings (id, settings_revision, accepting_contracts, net_address, -contract_price, base_rpc_price, sector_access_price, collateral, max_collateral, storage_price, egress_price, -ingress_price, max_account_balance, max_account_age, price_table_validity, max_contract_duration, window_size, + migrateSettings = `INSERT INTO host_settings (id, settings_revision, accepting_contracts, net_address, +contract_price, base_rpc_price, sector_access_price, collateral, max_collateral, storage_price, egress_price, +ingress_price, max_account_balance, max_account_age, price_table_validity, max_contract_duration, window_size, ingress_limit, egress_limit, ddns_provider, ddns_update_v4, ddns_update_v6, ddns_opts, registry_limit) -SELECT 0, settings_revision, accepting_contracts, net_address, contract_price, base_rpc_price, -sector_access_price, collateral, max_collateral, min_storage_price, min_egress_price, min_ingress_price, -max_account_balance, max_account_age, price_table_validity, max_contract_duration, window_size, ingress_limit, +SELECT 0, settings_revision, accepting_contracts, net_address, contract_price, base_rpc_price, +sector_access_price, collateral, max_collateral, min_storage_price, min_egress_price, min_ingress_price, +max_account_balance, max_account_age, price_table_validity, max_contract_duration, window_size, ingress_limit, egress_limit, dyn_dns_provider, dns_update_v4, dns_update_v6, dyn_dns_opts, registry_limit FROM host_settings_old;` ) @@ -940,4 +960,5 @@ var migrations = []func(tx *txn, log *zap.Logger) error{ migrateVersion29, migrateVersion30, migrateVersion31, + migrateVersion32, } From 16de1172fb2bd7823bd3995a8f3fae53c85c45ec Mon Sep 17 00:00:00 2001 From: Nate Date: Wed, 20 Nov 2024 14:28:18 -0800 Subject: [PATCH 5/5] fix(sqlite): fix column spelling --- persist/sqlite/contracts.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/persist/sqlite/contracts.go b/persist/sqlite/contracts.go index 8161f9db..6bf6fc26 100644 --- a/persist/sqlite/contracts.go +++ b/persist/sqlite/contracts.go @@ -1335,10 +1335,10 @@ func buildV2ContractFilter(filter contracts.V2ContractFilter) (string, []any, er whereClause = append(whereClause, `c.expiration_height BETWEEN ? AND ?`) queryParams = append(queryParams, filter.MinExpirationHeight, filter.MaxExpirationHeight) } else if filter.MinExpirationHeight > 0 { - whereClause = append(whereClause, `c.expiration_hieght >= ?`) + whereClause = append(whereClause, `c.expiration_height >= ?`) queryParams = append(queryParams, filter.MinExpirationHeight) } else if filter.MaxExpirationHeight > 0 { - whereClause = append(whereClause, `c.expiration_hieght <= ?`) + whereClause = append(whereClause, `c.expiration_height <= ?`) queryParams = append(queryParams, filter.MaxExpirationHeight) } if len(whereClause) == 0 {