Skip to content

Commit

Permalink
refactor(contracts,sqlite): improve revise sector performance, add ad…
Browse files Browse the repository at this point in the history
…ditional contract tests for sqlite
  • Loading branch information
n8maninger committed Nov 20, 2024
1 parent b5d883e commit f45da04
Show file tree
Hide file tree
Showing 5 changed files with 557 additions and 106 deletions.
2 changes: 1 addition & 1 deletion host/contracts/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type (
RenewV2Contract(renewal V2Contract, renewalSet rhp4.TransactionSet, renewedID types.FileContractID, clearing types.V2FileContract, roots []types.Hash256) error
// ReviseV2Contract atomically updates a contract and its associated
// sector roots.
ReviseV2Contract(types.FileContractID, types.V2FileContract, []types.Hash256, proto4.Usage) error
ReviseV2Contract(id types.FileContractID, revision types.V2FileContract, oldRoots, newRoots []types.Hash256, usage proto4.Usage) error
// ExpireV2ContractSectors removes sector roots for any v2 contracts that are
// rejected or past their proof window.
ExpireV2ContractSectors(height uint64) error
Expand Down
235 changes: 153 additions & 82 deletions persist/sqlite/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,54 @@ WHERE c.contract_id=$1;`
return
}

// V2Contracts returns a paginated list of v2 contracts.
func (s *Store) V2Contracts(filter contracts.V2ContractFilter) (contracts []contracts.V2Contract, count int, err error) {
if filter.Limit <= 0 || filter.Limit > 100 {
filter.Limit = 100
}

whereClause, whereParams, err := buildV2ContractFilter(filter)
if err != nil {
return nil, 0, fmt.Errorf("failed to build where clause: %w", err)
}

contractQuery := fmt.Sprintf(`SELECT c.contract_id, rt.contract_id AS renewed_to, rf.contract_id AS renewed_from, c.contract_status, c.negotiation_height, c.confirmation_index,
COALESCE(c.revision_number=cs.revision_number, false) AS revision_confirmed, c.resolution_index, c.rpc_revenue,
c.storage_revenue, c.ingress_revenue, c.egress_revenue, c.account_funding, c.risked_collateral, c.raw_revision
FROM contracts_v2 c
LEFT JOIN contract_v2_state_elements cs ON (c.id = cs.contract_id)
INNER JOIN contract_renters r ON (c.renter_id=r.id)
LEFT JOIN contracts_v2 rt ON (c.renewed_to=rt.id)
LEFT JOIN contracts_v2 rf ON (c.renewed_from=rf.id) %s %s LIMIT ? OFFSET ?`, whereClause, buildV2OrderBy(filter))

countQuery := fmt.Sprintf(`SELECT COUNT(*) FROM contracts_v2 c
INNER JOIN contract_renters r ON (c.renter_id=r.id)
LEFT JOIN contracts_v2 rt ON (c.renewed_to=rt.id)
LEFT JOIN contracts_v2 rf ON (c.renewed_from=rf.id) %s`, whereClause)

err = s.transaction(func(tx *txn) error {
if err := tx.QueryRow(countQuery, whereParams...).Scan(&count); err != nil {
return fmt.Errorf("failed to query contract count: %w", err)
}

rows, err := tx.Query(contractQuery, append(whereParams, filter.Limit, filter.Offset)...)
if err != nil {
return fmt.Errorf("failed to query contracts: %w", err)
}
defer rows.Close()

for rows.Next() {
contract, err := scanV2Contract(rows)
if err != nil {
return fmt.Errorf("failed to scan contract: %w", err)
}
contracts = append(contracts, contract)
}
return rows.Err()
})
return
}

// AddV2Contract adds a new contract to the database.
func (s *Store) AddV2Contract(contract contracts.V2Contract, formationSet rhp4.TransactionSet) error {
return s.transaction(func(tx *txn) error {
Expand Down Expand Up @@ -276,12 +324,12 @@ func incrementV2ContractUsage(tx *txn, dbID int64, usage proto4.Usage) error {
}

// ReviseV2Contract atomically updates a contract's revision and sectors
func (s *Store) ReviseV2Contract(id types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage proto4.Usage) error {
func (s *Store) ReviseV2Contract(id types.FileContractID, revision types.V2FileContract, oldRoots, newRoots []types.Hash256, usage proto4.Usage) error {
return s.transaction(func(tx *txn) error {
contractDBID, err := reviseV2Contract(tx, id, revision, usage)
if err != nil {
return fmt.Errorf("failed to revise contract: %w", err)
} else if err := updateV2ContractSectors(tx, contractDBID, roots, s.log.Named("ReviseV2Contract").With(zap.Stringer("contract", id))); err != nil {
} else if err := updateV2ContractSectors(tx, contractDBID, oldRoots, newRoots); err != nil {
return fmt.Errorf("failed to update contract sectors: %w", err)
}
return nil
Expand Down Expand Up @@ -670,8 +718,8 @@ func deleteExpiredV2ContractSectors(tx *txn, height uint64) (sectorIDs []int64,
const query = `DELETE FROM contract_v2_sector_roots
WHERE id IN (SELECT csr.id FROM contract_v2_sector_roots csr
INNER JOIN contracts_v2 c ON (csr.contract_id=c.id)
-- past proof window or not confirmed and past the rebroadcast height
WHERE c.window_end < $1 OR c.contract_status=$2 LIMIT $3)
-- past expiration or not confirmed and past the rebroadcast height
WHERE c.expiration_height < $1 OR c.contract_status=$2 LIMIT $3)
RETURNING sector_id;`
rows, err := tx.Query(query, height, contracts.ContractStatusRejected, sqlSectorBatchSize)
if err != nil {
Expand All @@ -690,7 +738,6 @@ RETURNING sector_id;`

// updateResolvedV2Contract clears a contract and returns its ID
func updateResolvedV2Contract(tx *txn, contractID types.FileContractID, clearing types.V2FileContract, renewedDBID int64) (dbID int64, err error) {
// add the final usage to the contract revenue
const clearQuery = `UPDATE contracts_v2 SET (renewed_to, revision_number, raw_revision) = ($1, $2, $3) WHERE contract_id=$4 RETURNING id;`
err = tx.QueryRow(clearQuery,
renewedDBID,
Expand Down Expand Up @@ -857,7 +904,7 @@ func broadcastV2Revision(tx *txn, index types.ChainIndex, revisionBroadcastHeigh
const query = `SELECT c.raw_revision, c.contract_id, cs.leaf_index, cs.merkle_proof, cs.raw_contract
FROM contracts_v2 c
INNER JOIN contract_v2_state_elements cs ON (c.id = cs.contract_id)
WHERE c.confirmation_index IS NOT NULL AND c.resolution_index IS NULL AND cs.revision_number != c.revision_number AND c.window_start BETWEEN ? AND ?`
WHERE c.confirmation_index IS NOT NULL AND c.resolution_index IS NULL AND cs.revision_number != c.revision_number AND c.proof_height BETWEEN ? AND ?`

rows, err := tx.Query(query, index.Height, revisionBroadcastHeight)
if err != nil {
Expand Down Expand Up @@ -888,7 +935,7 @@ func proofV2Contracts(tx *txn, index types.ChainIndex) (elements []types.V2FileC
const query = `SELECT c.contract_id, cs.raw_contract, cs.leaf_index, cs.merkle_proof
FROM contracts_v2 c
INNER JOIN contract_v2_state_elements cs ON (c.id = cs.contract_id)
WHERE c.confirmation_index IS NOT NULL AND c.resolution_index IS NULL AND c.window_start <= $1 AND c.window_end > $1`
WHERE c.confirmation_index IS NOT NULL AND c.resolution_index IS NULL AND c.proof_height <= $1 AND c.expiration_height > $1`

rows, err := tx.Query(query, index.Height)
if err != nil {
Expand All @@ -913,7 +960,7 @@ func expireV2Contracts(tx *txn, index types.ChainIndex) (elements []types.V2File
const query = `SELECT c.contract_id, cs.raw_contract, cs.leaf_index, cs.merkle_proof
FROM contracts_v2 c
INNER JOIN contract_v2_state_elements cs ON (c.id = cs.contract_id)
WHERE c.resolution_index IS NULL AND c.window_end <= $1`
WHERE c.resolution_index IS NULL AND c.expiration_height <= $1`

rows, err := tx.Query(query, index.Height)
if err != nil {
Expand Down Expand Up @@ -1015,7 +1062,7 @@ raw_revision, host_sig, renter_sig, confirmed_revision_number, contract_status,

func insertV2Contract(tx *txn, contract contracts.V2Contract, formationSet rhp4.TransactionSet) (dbID int64, err error) {
const query = `INSERT INTO contracts_v2 (contract_id, renter_id, locked_collateral, rpc_revenue, storage_revenue, ingress_revenue,
egress_revenue, account_funding, risked_collateral, revision_number, negotiation_height, window_start, window_end, formation_txn_set,
egress_revenue, account_funding, risked_collateral, revision_number, negotiation_height, proof_height, expiration_height, formation_txn_set,
formation_txn_set_basis, raw_revision, contract_status) VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) RETURNING id;`
renterID, err := renterDBID(tx, contract.RenterPublicKey)
Expand Down Expand Up @@ -1091,36 +1138,7 @@ func reviseV2Contract(tx *txn, id types.FileContractID, revision types.V2FileCon
return contractDBID, nil
}

func cleanupDanglingRoots(tx *txn, contractID int64, length int64) (deleted []int64, err error) {
rows, err := tx.Query(`DELETE FROM contract_sector_roots WHERE contract_id=$1 AND root_index >= $2 RETURNING sector_id`, contractID, length)
if err != nil {
return nil, fmt.Errorf("failed to cleanup dangling roots: %w", err)
}
defer rows.Close()

used := make(map[int64]bool)
for rows.Next() {
var sectorID int64
if err := rows.Scan(&sectorID); err != nil {
return nil, fmt.Errorf("failed to scan sector ID: %w", err)
}

if used[sectorID] {
continue
}
deleted = append(deleted, sectorID)
used[sectorID] = true
}
return deleted, nil
}

func updateV2ContractSectors(tx *txn, contractDBID int64, roots []types.Hash256, log *zap.Logger) error {
selectOldSectorStmt, err := tx.Prepare(`SELECT sector_id FROM contract_v2_sector_roots WHERE contract_id=? AND root_index=?`)
if err != nil {
return fmt.Errorf("failed to prepare select old sector statement: %w", err)
}
defer selectOldSectorStmt.Close()

func updateV2ContractSectors(tx *txn, contractDBID int64, oldRoots, newRoots []types.Hash256) error {
selectRootIDStmt, err := tx.Prepare(`SELECT id FROM stored_sectors WHERE sector_root=?`)
if err != nil {
return fmt.Errorf("failed to prepare select root ID statement: %w", err)
Expand All @@ -1133,62 +1151,30 @@ func updateV2ContractSectors(tx *txn, contractDBID int64, roots []types.Hash256,
}
defer updateRootStmt.Close()

var appended int
var deleted []int64
seen := make(map[int64]bool)
for i, root := range roots {
// TODO: benchmark this against an exceptionally large contract.
// This is less efficient than the v1 implementation, but it leaves
// less room for update edge-cases now that all sectors are loaded
// into memory.
var newSectorID int64
if err := selectRootIDStmt.QueryRow(encode(root)).Scan(&newSectorID); err != nil {
return fmt.Errorf("failed to get sector ID: %w", err)
}

var oldSectorID int64
err := selectOldSectorStmt.QueryRow(contractDBID, i).Scan(&oldSectorID)
if errors.Is(err, sql.ErrNoRows) {
// new sector
appended++
} else if err != nil {
// db error
return fmt.Errorf("failed to get sector ID: %w", err)
} else if newSectorID == oldSectorID {
// no change
for i, root := range newRoots {
if i < len(oldRoots) && oldRoots[i] == root {
continue
} else if !seen[oldSectorID] {
// updated root
deleted = append(deleted, oldSectorID) // mark for pruning
seen[oldSectorID] = true
}

if _, err := updateRootStmt.Exec(contractDBID, newSectorID, i); err != nil {
var newSectorID int64
if err := selectRootIDStmt.QueryRow(encode(root)).Scan(&newSectorID); err != nil {
return fmt.Errorf("failed to get sector ID: %w", err)
} else if _, err := updateRootStmt.Exec(contractDBID, newSectorID, i); err != nil {
return fmt.Errorf("failed to update sector root: %w", err)
}
}

cleaned, err := cleanupDanglingRoots(tx, contractDBID, int64(len(roots)))
if err != nil {
return fmt.Errorf("failed to cleanup dangling roots: %w", err)
}
for _, sectorID := range cleaned {
if seen[sectorID] {
continue
if len(newRoots) < len(oldRoots) {
_, err := tx.Exec(`DELETE FROM contract_v2_sector_roots WHERE contract_id=$1 AND root_index >= $2`, contractDBID, len(newRoots))
if err != nil {
return fmt.Errorf("failed to remove old roots: %w", err)
}
deleted = append(deleted, sectorID)
}

delta := appended - len(deleted)
delta := len(newRoots) - len(oldRoots)
if err := incrementNumericStat(tx, metricContractSectors, delta, time.Now()); err != nil {
return fmt.Errorf("failed to update contract sectors: %w", err)
}

if pruned, err := pruneSectors(tx, deleted); err != nil {
return fmt.Errorf("failed to prune sectors: %w", err)
} else if len(pruned) > 0 {
log.Debug("pruned sectors", zap.Int("count", len(pruned)), zap.Stringers("sectors", pruned))
}
return nil
}

Expand Down Expand Up @@ -1291,6 +1277,91 @@ func buildOrderBy(filter contracts.ContractFilter) string {
}
}

func buildV2ContractFilter(filter contracts.V2ContractFilter) (string, []any, error) {
var whereClause []string
var queryParams []any

if len(filter.Statuses) != 0 {
whereClause = append(whereClause, `c.contract_status IN (`+queryPlaceHolders(len(filter.Statuses))+`)`)
queryParams = append(queryParams, queryArgs(filter.Statuses)...)
}

if len(filter.ContractIDs) != 0 {
whereClause = append(whereClause, `c.contract_id IN (`+queryPlaceHolders(len(filter.ContractIDs))+`)`)
for _, value := range filter.ContractIDs {
queryParams = append(queryParams, encode(value))
}
}

if len(filter.RenewedFrom) != 0 {
whereClause = append(whereClause, `rf.contract_id IN (`+queryPlaceHolders(len(filter.RenewedFrom))+`)`)
for _, value := range filter.RenewedFrom {
queryParams = append(queryParams, encode(value))
}
}

if len(filter.RenewedTo) != 0 {
whereClause = append(whereClause, `rt.contract_id IN (`+queryPlaceHolders(len(filter.RenewedTo))+`)`)
for _, value := range filter.RenewedTo {
queryParams = append(queryParams, encode(value))
}
}

if len(filter.RenterKey) != 0 {
whereClause = append(whereClause, `r.public_key IN (`+queryPlaceHolders(len(filter.RenterKey))+`)`)
for _, value := range filter.RenterKey {
queryParams = append(queryParams, encode(value))
}
}

if filter.MinNegotiationHeight > 0 && filter.MaxNegotiationHeight > 0 {
if filter.MinNegotiationHeight < filter.MaxNegotiationHeight {
return "", nil, errors.New("min negotiation height must be less than max negotiation height")
}
whereClause = append(whereClause, `c.negotiation_height BETWEEN ? AND ?`)
queryParams = append(queryParams, filter.MinNegotiationHeight, filter.MaxNegotiationHeight)
} else if filter.MinNegotiationHeight > 0 {
whereClause = append(whereClause, `c.negotiation_height >= ?`)
queryParams = append(queryParams, filter.MinNegotiationHeight)
} else if filter.MaxNegotiationHeight > 0 {
whereClause = append(whereClause, `c.negotiation_height <= ?`)
queryParams = append(queryParams, filter.MaxNegotiationHeight)
}

if filter.MinExpirationHeight > 0 && filter.MaxExpirationHeight > 0 {
if filter.MinExpirationHeight < filter.MaxExpirationHeight {
return "", nil, errors.New("min expiration height must be less than max expiration height")
}
whereClause = append(whereClause, `c.expiration_height BETWEEN ? AND ?`)
queryParams = append(queryParams, filter.MinExpirationHeight, filter.MaxExpirationHeight)
} else if filter.MinExpirationHeight > 0 {
whereClause = append(whereClause, `c.expiration_hieght >= ?`)

Check failure on line 1338 in persist/sqlite/contracts.go

View workflow job for this annotation

GitHub Actions / test / test (1.23, ubuntu-latest)

`hieght` is a misspelling of `height` (misspell)
queryParams = append(queryParams, filter.MinExpirationHeight)
} else if filter.MaxExpirationHeight > 0 {
whereClause = append(whereClause, `c.expiration_hieght <= ?`)

Check failure on line 1341 in persist/sqlite/contracts.go

View workflow job for this annotation

GitHub Actions / test / test (1.23, ubuntu-latest)

`hieght` is a misspelling of `height` (misspell)
queryParams = append(queryParams, filter.MaxExpirationHeight)
}
if len(whereClause) == 0 {
return "", nil, nil
}
return "WHERE " + strings.Join(whereClause, " AND "), queryParams, nil
}

func buildV2OrderBy(filter contracts.V2ContractFilter) string {
dir := "ASC"
if filter.SortDesc {
dir = "DESC"
}
switch filter.SortField {
case contracts.ContractSortStatus:
return `ORDER BY c.contract_status ` + dir
case contracts.ContractSortNegotiationHeight:
return `ORDER BY c.negotiation_height ` + dir
default:
return `ORDER BY c.expiration_height ` + dir
}
}

func scanContract(row scanner) (c contracts.Contract, err error) {
var contractID types.FileContractID
err = row.Scan(decode(&contractID),
Expand Down
Loading

0 comments on commit f45da04

Please sign in to comment.