From 6dddc713e38b2eb68648545fcb4bfdd72626df99 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Tue, 30 Apr 2024 15:31:14 -0700 Subject: [PATCH 1/5] cmd, sqlite, wallet: implement soft delete for siacoin elements --- .golangci.yml | 76 +++- api/api_test.go | 30 +- cmd/walletd/node.go | 5 +- persist/sqlite/addresses.go | 7 +- persist/sqlite/consensus.go | 735 ++++++++++++++++++++++++------- persist/sqlite/consensus_test.go | 309 +++++++++++++ persist/sqlite/consts_default.go | 2 + persist/sqlite/consts_testing.go | 2 + persist/sqlite/init.sql | 28 +- persist/sqlite/wallet.go | 7 +- wallet/manager.go | 31 +- wallet/update.go | 109 ++--- wallet/wallet_test.go | 131 ++++-- 13 files changed, 1105 insertions(+), 367 deletions(-) create mode 100644 persist/sqlite/consensus_test.go diff --git a/.golangci.yml b/.golangci.yml index d439ef1..050db11 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -17,16 +17,6 @@ run: # list of build tags, all linters use it. Default is empty list. build-tags: [] - # default is true. Enables skipping of directories: - # vendor$, third_party$, testdata$, examples$, Godeps$, builtin$ - skip-dirs-use-default: true - - # which files to skip: they will be analyzed, but issues from them - # won't be reported. Default value is empty list, but there is - # no need to include all autogenerated files, we confidently recognize - # autogenerated files. If it's not please let us know. - skip-files: [] - # output configuration options output: # print lines of code with issue, default is true @@ -40,11 +30,14 @@ linters-settings: ## Enabled linters: govet: # report about shadowed variables - check-shadowing: false disable-all: false - golint: - min-confidence: 1.0 + tagliatelle: + case: + rules: + json: goCamel + yaml: goCamel + gocritic: # Which checks should be enabled; can't be combined with 'disabled-checks'; @@ -56,13 +49,16 @@ linters-settings: - style disabled-checks: # diagnostic + - appendAssign - commentedOutCode - uncheckedInlineErr - # style + - httpNoBody - exitAfterDefer - ifElseChain - importShadow + - initClause + - nestingReduce - octalLiteral - paramTypeCombine - ptrToRefParam @@ -76,19 +72,56 @@ linters-settings: revive: ignore-generated-header: true rules: + - name: blank-imports + disabled: false + - name: bool-literal-in-expr + disabled: false + - name: confusing-naming + disabled: false + - name: confusing-results + disabled: false + - name: constant-logical-expr + disabled: false + - name: context-as-argument + disabled: false + - name: exported + disabled: false + - name: errorf + disabled: false + - name: if-return + disabled: false + - name: indent-error-flow + disabled: true + - name: increment-decrement + disabled: false + - name: modifies-value-receiver + disabled: true + - name: optimize-operands-order + disabled: false + - name: range-val-in-closure + disabled: false + - name: struct-tag + disabled: false + - name: superfluous-else + disabled: false + - name: time-equal + disabled: false + - name: unexported-naming + disabled: false + - name: unexported-return + disabled: false + - name: unnecessary-stmt + disabled: false + - name: unreachable-code + disabled: false - name: package-comments disabled: true - tagliatelle: - case: - rules: - json: goCamel - yaml: goCamel - linters: disable-all: true fast: false enable: + - tagliatelle - gocritic - gofmt - revive @@ -96,9 +129,6 @@ linters: - misspell - typecheck - whitespace - - tagliatelle - - unused - - unparam issues: # Maximum issues count per one linter. Set to 0 to disable. Default is 50. diff --git a/api/api_test.go b/api/api_test.go index f254366..d6e1de0 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -88,10 +88,7 @@ func TestWalletAdd(t *testing.T) { } defer ws.Close() - wm, err := wallet.NewManager(cm, ws, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm := wallet.NewManager(cm, ws, log.Named("wallet")) defer wm.Close() c, shutdown := runServer(cm, nil, wm) @@ -276,10 +273,7 @@ func TestWallet(t *testing.T) { }) // create the wallet manager - wm, err := wallet.NewManager(cm, ws, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm := wallet.NewManager(cm, ws, log.Named("wallet")) defer wm.Close() // create seed address vault @@ -498,10 +492,7 @@ func TestAddresses(t *testing.T) { } defer ws.Close() - wm, err := wallet.NewManager(cm, ws, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm := wallet.NewManager(cm, ws, log.Named("wallet")) defer wm.Close() sav := wallet.NewSeedAddressVault(wallet.NewSeed(), 0, 20) @@ -695,10 +686,7 @@ func TestV2(t *testing.T) { t.Fatal(err) } defer ws.Close() - wm, err := wallet.NewManager(cm, ws, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm := wallet.NewManager(cm, ws, log.Named("wallet")) defer wm.Close() c, shutdown := runServer(cm, nil, wm) @@ -921,10 +909,7 @@ func TestP2P(t *testing.T) { t.Fatal(err) } - wm1, err := wallet.NewManager(cm1, store1, log1.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm1 := wallet.NewManager(cm1, store1, log1.Named("wallet")) defer wm1.Close() l1, err := net.Listen("tcp", ":0") @@ -964,10 +949,7 @@ func TestP2P(t *testing.T) { t.Fatal(err) } defer store2.Close() - wm2, err := wallet.NewManager(cm2, store2, log2.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm2 := wallet.NewManager(cm2, store2, log2.Named("wallet")) defer wm2.Close() l2, err := net.Listen("tcp", ":0") diff --git a/cmd/walletd/node.go b/cmd/walletd/node.go index 1900d77..9eb9c7f 100644 --- a/cmd/walletd/node.go +++ b/cmd/walletd/node.go @@ -187,10 +187,7 @@ func newNode(addr, dir string, chainNetwork string, useUPNP, useBootstrap bool, } s := syncer.New(l, cm, ps, header, syncer.WithLogger(log.Named("syncer"))) - wm, err := wallet.NewManager(cm, store, log.Named("wallet")) - if err != nil { - return nil, fmt.Errorf("failed to create wallet manager: %w", err) - } + wm := wallet.NewManager(cm, store, log.Named("wallet")) return &node{ chainStore: bdb, diff --git a/persist/sqlite/addresses.go b/persist/sqlite/addresses.go index 1891cea..b3fadd3 100644 --- a/persist/sqlite/addresses.go +++ b/persist/sqlite/addresses.go @@ -19,10 +19,11 @@ func (s *Store) AddressBalance(address types.Address) (balance wallet.Balance, e // AddressEvents returns the events of a single address. func (s *Store) AddressEvents(address types.Address, offset, limit int) (events []wallet.Event, err error) { err = s.transaction(func(tx *txn) error { - const query = `SELECT ev.id, ev.event_id, ev.maturity_height, ev.date_created, ev.height, ev.block_id, ev.event_type, ev.event_data + const query = `SELECT ev.id, ev.event_id, ev.maturity_height, ev.date_created, ci.height, ci.block_id, ev.event_type, ev.event_data FROM events ev INNER JOIN event_addresses ea ON (ev.id = ea.event_id) INNER JOIN sia_addresses sa ON (ea.address_id = sa.id) + INNER JOIN chain_indices ci ON (ev.chain_index_id = ci.id) WHERE sa.sia_address = $1 ORDER BY ev.maturity_height DESC, ev.id DESC LIMIT $2 OFFSET $3` @@ -52,7 +53,7 @@ func (s *Store) AddressSiacoinOutputs(address types.Address, offset, limit int) const query = `SELECT se.id, se.siacoin_value, se.merkle_proof, se.leaf_index, se.maturity_height, sa.sia_address FROM siacoin_elements se INNER JOIN sia_addresses sa ON (se.address_id = sa.id) - WHERE sa.sia_address=$1 + WHERE sa.sia_address=$1 AND se.spent_index_id IS NULL LIMIT $2 OFFSET $3` rows, err := tx.Query(query, encode(address), limit, offset) @@ -80,7 +81,7 @@ func (s *Store) AddressSiafundOutputs(address types.Address, offset, limit int) const query = `SELECT se.id, se.leaf_index, se.merkle_proof, se.siafund_value, se.claim_start, sa.sia_address FROM siafund_elements se INNER JOIN sia_addresses sa ON (se.address_id = sa.id) - WHERE sa.sia_address = $1 + WHERE sa.sia_address = $1 AND se.spent_index_id IS NULL LIMIT $2 OFFSET $3` rows, err := tx.Query(query, encode(address), limit, offset) diff --git a/persist/sqlite/consensus.go b/persist/sqlite/consensus.go index f34e1d0..362c1bc 100644 --- a/persist/sqlite/consensus.go +++ b/persist/sqlite/consensus.go @@ -23,18 +23,8 @@ type addressRef struct { Balance wallet.Balance } -func scanStateElement(s scanner) (se types.StateElement, err error) { - err = s.Scan(decode(&se.ID), &se.LeafIndex, decodeSlice(&se.MerkleProof)) - return -} - -func scanAddress(s scanner) (ab addressRef, err error) { - err = s.Scan(&ab.ID, decode(&ab.Balance.Siacoins), decode(&ab.Balance.ImmatureSiacoins), &ab.Balance.Siafunds) - return -} - func (ut *updateTx) SiacoinStateElements() ([]types.StateElement, error) { - const query = `SELECT id, leaf_index, merkle_proof FROM siacoin_elements` + const query = `SELECT id, leaf_index, merkle_proof FROM siacoin_elements WHERE spent_index_id IS NULL` rows, err := ut.tx.Query(query) if err != nil { return nil, fmt.Errorf("failed to query siacoin elements: %w", err) @@ -53,6 +43,9 @@ func (ut *updateTx) SiacoinStateElements() ([]types.StateElement, error) { } func (ut *updateTx) UpdateSiacoinStateElements(elements []types.StateElement) error { + log := ut.tx.log.Named("UpdateSiacoinStateElements") + log.Debug("updating siacoin state elements", zap.Int("count", len(elements))) + const query = `UPDATE siacoin_elements SET merkle_proof=$1, leaf_index=$2 WHERE id=$3 RETURNING id` stmt, err := ut.tx.Prepare(query) if err != nil { @@ -66,12 +59,13 @@ func (ut *updateTx) UpdateSiacoinStateElements(elements []types.StateElement) er if err != nil { return fmt.Errorf("failed to execute statement: %w", err) } + log.Debug("updated element proof", zap.Stringer("id", se.ID), zap.Uint64("leafIndex", se.LeafIndex)) } return nil } func (ut *updateTx) SiafundStateElements() ([]types.StateElement, error) { - const query = `SELECT id, leaf_index, merkle_proof FROM siafund_elements` + const query = `SELECT id, leaf_index, merkle_proof FROM siafund_elements WHERE spent_index_id IS NULL` rows, err := ut.tx.Query(query) if err != nil { return nil, fmt.Errorf("failed to query siacoin elements: %w", err) @@ -129,11 +123,131 @@ func (ut *updateTx) AddressBalance(addr types.Address) (balance wallet.Balance, return } -func (ut *updateTx) ApplyMatureSiacoinBalance(index types.ChainIndex) error { - const query = `SELECT id, se.address_id, se.siacoin_value -FROM siacoin_elements se -WHERE maturity_height=$1 AND matured=false` - rows, err := ut.tx.Query(query, index.Height) +func (ut *updateTx) ApplyIndex(index types.ChainIndex, state wallet.AppliedState) error { + tx := ut.tx + log := tx.log.Named("ApplyIndex").With(zap.Stringer("blockID", index.ID), zap.Uint64("height", index.Height)) + + if err := revertOrphans(tx, index, log.Named("revertOrphans")); err != nil { + return fmt.Errorf("failed to revert orphans: %w", err) + } + + if err := applyMatureSiacoinBalance(tx, index, log.Named("applyMatureSiacoinBalance")); err != nil { + return fmt.Errorf("failed to apply mature siacoin balance: %w", err) + } + + var indexID int64 + if err := tx.QueryRow(`INSERT INTO chain_indices (block_id, height) VALUES ($1, $2) ON CONFLICT (block_id) DO UPDATE SET height=height RETURNING id`, encode(index.ID), index.Height).Scan(&indexID); err != nil { + return fmt.Errorf("failed to insert chain index: %w", err) + } + + if err := spendSiacoinElements(tx, state.SpentSiacoinElements, indexID); err != nil { + return fmt.Errorf("failed to spend siacoin elements: %w", err) + } else if err := addSiacoinElements(tx, state.CreatedSiacoinElements, indexID, log.Named("addSiacoinElements")); err != nil { + return fmt.Errorf("failed to add siacoin elements: %w", err) + } + + if err := spendSiafundElements(tx, state.SpentSiafundElements, indexID); err != nil { + return fmt.Errorf("failed to spend siafund elements: %w", err) + } else if err := addSiafundElements(tx, state.CreatedSiafundElements, indexID); err != nil { + return fmt.Errorf("failed to add siafund elements: %w", err) + } + + if err := addEvents(tx, state.Events, indexID); err != nil { + return fmt.Errorf("failed to add events: %w", err) + } + return nil +} + +func (ut *updateTx) RevertIndex(index types.ChainIndex, state wallet.RevertedState) error { + tx := ut.tx + + if err := revertSpentSiacoinElements(tx, state.UnspentSiacoinElements); err != nil { + return fmt.Errorf("failed to revert spent siacoin elements: %w", err) + } else if err := removeSiacoinElements(tx, state.DeletedSiacoinElements); err != nil { + return fmt.Errorf("failed to remove siacoin elements: %w", err) + } + + if err := revertSpentSiafundElements(tx, state.UnspentSiafundElements); err != nil { + return fmt.Errorf("failed to revert spent siafund elements: %w", err) + } else if err := removeSiafundElements(tx, state.DeletedSiafundElements); err != nil { + return fmt.Errorf("failed to remove siafund elements: %w", err) + } + + if err := revertEvents(tx, index); err != nil { + return fmt.Errorf("failed to revert events: %w", err) + } else if err := revertMatureSiacoinBalance(tx, index); err != nil { + return fmt.Errorf("failed to revert mature siacoin balance: %w", err) + } + return nil +} + +// ProcessChainApplyUpdate implements chain.Subscriber +func (s *Store) UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error { + log := s.log.Named("UpdateChainState").With(zap.Int("reverted", len(reverted)), zap.Int("applied", len(applied))) + return s.transaction(func(tx *txn) error { + utx := &updateTx{ + tx: tx, + relevantAddresses: make(map[types.Address]bool), + } + + if err := wallet.UpdateChainState(utx, reverted, applied, log); err != nil { + return fmt.Errorf("failed to update chain state: %w", err) + } else if err := setLastCommittedIndex(tx, applied[len(applied)-1].State.Index); err != nil { + return fmt.Errorf("failed to set last committed index: %w", err) + } + + height := applied[len(applied)-1].State.Index.Height + + if height > spentElementRetentionBlocks { + pruneHeight := height - spentElementRetentionBlocks + + siacoins, err := pruneSpentSiacoinElements(tx, pruneHeight) + if err != nil { + return fmt.Errorf("failed to cleanup siacoin elements: %w", err) + } + + siafunds, err := pruneSpentSiafundElements(tx, pruneHeight) + if err != nil { + return fmt.Errorf("failed to cleanup siafund elements: %w", err) + } + + if len(siacoins) > 0 || len(siafunds) > 0 { + log.Debug("pruned elements", zap.Stringers("siacoins", siacoins), zap.Stringers("siafunds", siafunds), zap.Uint64("pruneHeight", pruneHeight)) + } + } + return nil + }) +} + +// LastCommittedIndex returns the last chain index that was committed. +func (s *Store) LastCommittedIndex() (index types.ChainIndex, err error) { + err = s.db.QueryRow(`SELECT last_indexed_tip FROM global_settings`).Scan(decode(&index)) + return +} + +// ResetLastIndex resets the last indexed tip to trigger a full rescan. +func (s *Store) ResetLastIndex() error { + _, err := s.db.Exec(`UPDATE global_settings SET last_indexed_tip=$1`, encode(types.ChainIndex{})) + return err +} + +func scanStateElement(s scanner) (se types.StateElement, err error) { + err = s.Scan(decode(&se.ID), &se.LeafIndex, decodeSlice(&se.MerkleProof)) + return +} + +func scanAddress(s scanner) (ab addressRef, err error) { + err = s.Scan(&ab.ID, decode(&ab.Balance.Siacoins), decode(&ab.Balance.ImmatureSiacoins), &ab.Balance.Siafunds) + return +} + +func applyMatureSiacoinBalance(tx *txn, index types.ChainIndex, log *zap.Logger) error { + log = log.With(zap.Uint64("maturityHeight", index.Height)) + log.Debug("applying mature siacoin balance") + const query = `SELECT id, address_id, siacoin_value +FROM siacoin_elements +WHERE maturity_height=$1 AND matured=false AND spent_index_id IS NULL` + rows, err := tx.Query(query, index.Height) if err != nil { return fmt.Errorf("failed to query siacoin elements: %w", err) } @@ -151,25 +265,26 @@ WHERE maturity_height=$1 AND matured=false` } balanceDelta[addressID] = balanceDelta[addressID].Add(value) matured = append(matured, outputID) + log.Debug("matured siacoin output", zap.Stringer("outputID", outputID), zap.Int64("addressID", addressID), zap.Stringer("value", value)) } if err := rows.Err(); err != nil { return fmt.Errorf("failed to scan siacoin elements: %w", err) } - updateMaturedStmt, err := ut.tx.Prepare(`UPDATE siacoin_elements SET matured=true WHERE id=$1`) + updateMaturedStmt, err := tx.Prepare(`UPDATE siacoin_elements SET matured=true WHERE id=$1`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } defer updateMaturedStmt.Close() - getAddressBalanceStmt, err := ut.tx.Prepare(`SELECT siacoin_balance, immature_siacoin_balance FROM sia_addresses WHERE id=$1`) + getAddressBalanceStmt, err := tx.Prepare(`SELECT siacoin_balance, immature_siacoin_balance FROM sia_addresses WHERE id=$1`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } defer getAddressBalanceStmt.Close() - updateAddressBalanceStmt, err := ut.tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2 WHERE id=$3`) + updateAddressBalanceStmt, err := tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2 WHERE id=$3`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } @@ -207,11 +322,11 @@ WHERE maturity_height=$1 AND matured=false` return nil } -func (ut *updateTx) RevertMatureSiacoinBalance(index types.ChainIndex) error { +func revertMatureSiacoinBalance(tx *txn, index types.ChainIndex) error { const query = `SELECT se.id, se.address_id, se.siacoin_value FROM siacoin_elements se - WHERE maturity_height=$1 AND matured=true` - rows, err := ut.tx.Query(query, index.Height) + WHERE maturity_height=$1 AND matured=true AND spent_index_id IS NULL` + rows, err := tx.Query(query, index.Height) if err != nil { return fmt.Errorf("failed to query siacoin elements: %w", err) } @@ -235,19 +350,19 @@ func (ut *updateTx) RevertMatureSiacoinBalance(index types.ChainIndex) error { return fmt.Errorf("failed to scan siacoin elements: %w", err) } - updateMaturedStmt, err := ut.tx.Prepare(`UPDATE siacoin_elements SET matured=false WHERE id=$1`) + updateMaturedStmt, err := tx.Prepare(`UPDATE siacoin_elements SET matured=false WHERE id=$1`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } defer updateMaturedStmt.Close() - getAddressBalanceStmt, err := ut.tx.Prepare(`SELECT siacoin_balance, immature_siacoin_balance FROM sia_addresses WHERE id=$1`) + getAddressBalanceStmt, err := tx.Prepare(`SELECT siacoin_balance, immature_siacoin_balance FROM sia_addresses WHERE id=$1`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } defer getAddressBalanceStmt.Close() - updateAddressBalanceStmt, err := ut.tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2 WHERE id=$3`) + updateAddressBalanceStmt, err := tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2 WHERE id=$3`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } @@ -286,19 +401,19 @@ func (ut *updateTx) RevertMatureSiacoinBalance(index types.ChainIndex) error { return nil } -func (ut *updateTx) AddSiacoinElements(elements []types.SiacoinElement, index types.ChainIndex) error { +func addSiacoinElements(tx *txn, elements []types.SiacoinElement, indexID int64, log *zap.Logger) error { if len(elements) == 0 { return nil } - addrStmt, err := insertAddressStatement(ut.tx) + addrStmt, err := insertAddressStatement(tx) if err != nil { return fmt.Errorf("failed to prepare address statement: %w", err) } defer addrStmt.Close() // ignore elements already in the database. - insertStmt, err := ut.tx.Prepare(`INSERT INTO siacoin_elements (id, siacoin_value, merkle_proof, leaf_index, maturity_height, address_id, matured, block_id, height) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (id) DO NOTHING RETURNING id`) + insertStmt, err := tx.Prepare(`INSERT INTO siacoin_elements (id, siacoin_value, merkle_proof, leaf_index, maturity_height, address_id, matured, chain_index_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (id) DO NOTHING RETURNING id`) if err != nil { return fmt.Errorf("failed to prepare insert statement: %w", err) } @@ -314,8 +429,9 @@ func (ut *updateTx) AddSiacoinElements(elements []types.SiacoinElement, index ty } var dummyID types.Hash256 - err = insertStmt.QueryRow(encode(se.ID), encode(se.SiacoinOutput.Value), encodeSlice(se.MerkleProof), se.LeafIndex, se.MaturityHeight, addrRef.ID, se.MaturityHeight == 0, encode(index.ID), index.Height).Scan(decode(&dummyID)) + err = insertStmt.QueryRow(encode(se.ID), encode(se.SiacoinOutput.Value), encodeSlice(se.MerkleProof), se.LeafIndex, se.MaturityHeight, addrRef.ID, se.MaturityHeight == 0, indexID).Scan(decode(&dummyID)) if errors.Is(err, sql.ErrNoRows) { + log.Debug("siacoin element already exists", zap.Stringer("id", se.ID), zap.Stringer("address", se.SiacoinOutput.Address)) continue // skip if the element already exists } else if err != nil { return fmt.Errorf("failed to execute statement: %w", err) @@ -323,10 +439,12 @@ func (ut *updateTx) AddSiacoinElements(elements []types.SiacoinElement, index ty // update the balance if the element does not exist balance := balanceChanges[addrRef.ID] - if se.MaturityHeight <= index.Height { + if se.MaturityHeight == 0 { balance.Siacoins = balance.Siacoins.Add(se.SiacoinOutput.Value) + log.Debug("added siacoin output", zap.Stringer("id", se.ID), zap.Stringer("address", se.SiacoinOutput.Address), zap.Stringer("value", se.SiacoinOutput.Value)) } else { balance.ImmatureSiacoins = balance.ImmatureSiacoins.Add(se.SiacoinOutput.Value) + log.Debug("added immature siacoin output", zap.Stringer("id", se.ID), zap.Stringer("address", se.SiacoinOutput.Address), zap.Stringer("value", se.SiacoinOutput.Value), zap.Uint64("maturityHeight", se.MaturityHeight)) } balanceChanges[addrRef.ID] = balance } @@ -335,7 +453,7 @@ func (ut *updateTx) AddSiacoinElements(elements []types.SiacoinElement, index ty return nil } - updateAddressBalanceStmt, err := ut.tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2 WHERE id=$3`) + updateAddressBalanceStmt, err := tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2 WHERE id=$3`) if err != nil { return fmt.Errorf("failed to prepare update balance statement: %w", err) } @@ -354,18 +472,18 @@ func (ut *updateTx) AddSiacoinElements(elements []types.SiacoinElement, index ty return nil } -func (ut *updateTx) RemoveSiacoinElements(elements []types.SiacoinElement, index types.ChainIndex) error { +func removeSiacoinElements(tx *txn, elements []types.SiacoinElement) error { if len(elements) == 0 { return nil } - addrStmt, err := insertAddressStatement(ut.tx) + addrStmt, err := insertAddressStatement(tx) if err != nil { return fmt.Errorf("failed to prepare address statement: %w", err) } defer addrStmt.Close() - stmt, err := ut.tx.Prepare(`DELETE FROM siacoin_elements WHERE id=$1 RETURNING id`) + stmt, err := tx.Prepare(`DELETE FROM siacoin_elements WHERE id=$1 RETURNING id, matured`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } @@ -381,13 +499,14 @@ func (ut *updateTx) RemoveSiacoinElements(elements []types.SiacoinElement, index } var dummy types.Hash256 - err = stmt.QueryRow(encode(se.ID)).Scan(decode(&dummy)) + var matured bool + err = stmt.QueryRow(encode(se.ID)).Scan(decode(&dummy), &matured) if err != nil { return fmt.Errorf("failed to delete element %q: %w", se.ID, err) } balance := balanceChanges[addrRef.ID] - if se.MaturityHeight < index.Height { + if matured { balance.Siacoins = balance.Siacoins.Sub(se.SiacoinOutput.Value) } else { balance.ImmatureSiacoins = balance.ImmatureSiacoins.Sub(se.SiacoinOutput.Value) @@ -399,7 +518,7 @@ func (ut *updateTx) RemoveSiacoinElements(elements []types.SiacoinElement, index return nil } - updateAddressBalanceStmt, err := ut.tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2 WHERE id=$3`) + updateAddressBalanceStmt, err := tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2 WHERE id=$3`) if err != nil { return fmt.Errorf("failed to prepare update balance statement: %w", err) } @@ -418,18 +537,140 @@ func (ut *updateTx) RemoveSiacoinElements(elements []types.SiacoinElement, index return nil } -func (ut *updateTx) AddSiafundElements(elements []types.SiafundElement, index types.ChainIndex) error { +func revertSpentSiacoinElements(tx *txn, elements []types.SiacoinElement) error { + if len(elements) == 0 { + return nil + } + + addrStmt, err := insertAddressStatement(tx) + if err != nil { + return fmt.Errorf("failed to prepare address statement: %w", err) + } + defer addrStmt.Close() + + stmt, err := tx.Prepare(`UPDATE siacoin_elements SET spent_index_id=NULL WHERE id=$1 AND spent_index_id IS NOT NULL RETURNING id`) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + balanceChanges := make(map[int64]wallet.Balance) + for _, se := range elements { + addrRef, err := scanAddress(addrStmt.QueryRow(encode(se.SiacoinOutput.Address), encode(types.ZeroCurrency), 0)) + if err != nil { + return fmt.Errorf("failed to query address: %w", err) + } else if _, ok := balanceChanges[addrRef.ID]; !ok { + balanceChanges[addrRef.ID] = addrRef.Balance + } + + var dummy types.Hash256 + if err := stmt.QueryRow(encode(se.ID)).Scan(decode(&dummy)); err != nil && !errors.Is(err, sql.ErrNoRows) { + return err + } else if errors.Is(err, sql.ErrNoRows) { + continue // skip if the element does not exist + } + + balance := balanceChanges[addrRef.ID] + balance.Siacoins = balance.Siacoins.Add(se.SiacoinOutput.Value) + balanceChanges[addrRef.ID] = balance + } + + if len(balanceChanges) == 0 { + return nil + } + + updateAddressBalanceStmt, err := tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1 WHERE id=$2`) + if err != nil { + return fmt.Errorf("failed to prepare update balance statement: %w", err) + } + defer updateAddressBalanceStmt.Close() + + for addrID, balance := range balanceChanges { + res, err := updateAddressBalanceStmt.Exec(encode(balance.Siacoins), addrID) + if err != nil { + return fmt.Errorf("failed to update balance: %w", err) + } else if n, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } else if n != 1 { + return fmt.Errorf("expected 1 row affected, got %v", n) + } + } + return nil +} + +func spendSiacoinElements(tx *txn, elements []types.SiacoinElement, indexID int64) error { + if len(elements) == 0 { + return nil + } + + addrStmt, err := insertAddressStatement(tx) + if err != nil { + return fmt.Errorf("failed to prepare address statement: %w", err) + } + defer addrStmt.Close() + + stmt, err := tx.Prepare(`UPDATE siacoin_elements SET spent_index_id=$1 WHERE id=$2 AND spent_index_id IS NULL RETURNING id`) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + balanceChanges := make(map[int64]wallet.Balance) + for _, se := range elements { + addrRef, err := scanAddress(addrStmt.QueryRow(encode(se.SiacoinOutput.Address), encode(types.ZeroCurrency), 0)) + if err != nil { + return fmt.Errorf("failed to query address: %w", err) + } else if _, ok := balanceChanges[addrRef.ID]; !ok { + balanceChanges[addrRef.ID] = addrRef.Balance + } + + var dummy types.Hash256 + if err := stmt.QueryRow(indexID, encode(se.ID)).Scan(decode(&dummy)); err != nil && !errors.Is(err, sql.ErrNoRows) { + return err + } else if errors.Is(err, sql.ErrNoRows) { + continue // skip if the element does not exist + } + + balance := balanceChanges[addrRef.ID] + balance.Siacoins = balance.Siacoins.Sub(se.SiacoinOutput.Value) + balanceChanges[addrRef.ID] = balance + } + + if len(balanceChanges) == 0 { + return nil + } + + updateAddressBalanceStmt, err := tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1 WHERE id=$2`) + if err != nil { + return fmt.Errorf("failed to prepare update balance statement: %w", err) + } + defer updateAddressBalanceStmt.Close() + + for addrID, balance := range balanceChanges { + res, err := updateAddressBalanceStmt.Exec(encode(balance.Siacoins), addrID) + if err != nil { + return fmt.Errorf("failed to update balance: %w", err) + } else if n, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } else if n != 1 { + return fmt.Errorf("expected 1 row affected, got %v", n) + } + } + return nil +} + +func addSiafundElements(tx *txn, elements []types.SiafundElement, indexID int64) error { if len(elements) == 0 { return nil } - addrStmt, err := insertAddressStatement(ut.tx) + addrStmt, err := insertAddressStatement(tx) if err != nil { return fmt.Errorf("failed to prepare address statement: %w", err) } defer addrStmt.Close() - insertStmt, err := ut.tx.Prepare(`INSERT INTO siafund_elements (id, siafund_value, merkle_proof, leaf_index, claim_start, address_id, block_id, height) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (id) DO NOTHING RETURNING id`) + insertStmt, err := tx.Prepare(`INSERT INTO siafund_elements (id, siafund_value, merkle_proof, leaf_index, claim_start, address_id, chain_index_id) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO NOTHING RETURNING id`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } @@ -445,7 +686,7 @@ func (ut *updateTx) AddSiafundElements(elements []types.SiafundElement, index ty } var dummy types.Hash256 - err = insertStmt.QueryRow(encode(se.ID), se.SiafundOutput.Value, encodeSlice(se.MerkleProof), se.LeafIndex, encode(se.ClaimStart), addrRef.ID, encode(index.ID), index.Height).Scan(decode(&dummy)) + err = insertStmt.QueryRow(encode(se.ID), se.SiafundOutput.Value, encodeSlice(se.MerkleProof), se.LeafIndex, encode(se.ClaimStart), addrRef.ID, indexID).Scan(decode(&dummy)) if errors.Is(err, sql.ErrNoRows) { continue // skip if the element already exists } else if err != nil { @@ -458,7 +699,7 @@ func (ut *updateTx) AddSiafundElements(elements []types.SiafundElement, index ty return nil } - updateAddressBalanceStmt, err := ut.tx.Prepare(`UPDATE sia_addresses SET siafund_balance=$1 WHERE id=$2`) + updateAddressBalanceStmt, err := tx.Prepare(`UPDATE sia_addresses SET siafund_balance=$1 WHERE id=$2`) if err != nil { return fmt.Errorf("failed to prepare update balance statement: %w", err) } @@ -477,18 +718,18 @@ func (ut *updateTx) AddSiafundElements(elements []types.SiafundElement, index ty return nil } -func (ut *updateTx) RemoveSiafundElements(elements []types.SiafundElement, index types.ChainIndex) error { +func removeSiafundElements(tx *txn, elements []types.SiafundElement) error { if len(elements) == 0 { return nil } - addrStmt, err := insertAddressStatement(ut.tx) + addrStmt, err := insertAddressStatement(tx) if err != nil { return fmt.Errorf("failed to prepare address statement: %w", err) } defer addrStmt.Close() - stmt, err := ut.tx.Prepare(`DELETE FROM siafund_elements WHERE id=$1 RETURNING id`) + stmt, err := tx.Prepare(`DELETE FROM siafund_elements WHERE id=$1 RETURNING id`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } @@ -519,7 +760,7 @@ func (ut *updateTx) RemoveSiafundElements(elements []types.SiafundElement, index return nil } - updateAddressBalanceStmt, err := ut.tx.Prepare(`UPDATE sia_addresses SET siafund_balance=$1 WHERE id=$2`) + updateAddressBalanceStmt, err := tx.Prepare(`UPDATE sia_addresses SET siafund_balance=$1 WHERE id=$2`) if err != nil { return fmt.Errorf("failed to prepare update balance statement: %w", err) } @@ -538,24 +779,150 @@ func (ut *updateTx) RemoveSiafundElements(elements []types.SiafundElement, index return nil } -func (ut *updateTx) AddEvents(events []wallet.Event) error { +func spendSiafundElements(tx *txn, elements []types.SiafundElement, indexID int64) error { + if len(elements) == 0 { + return nil + } + + addrStmt, err := insertAddressStatement(tx) + if err != nil { + return fmt.Errorf("failed to prepare address statement: %w", err) + } + defer addrStmt.Close() + + stmt, err := tx.Prepare(`UPDATE siafund_elements SET spent_index_id=$1 WHERE id=$2 AND spent_index_id IS NULL RETURNING id`) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + balanceChanges := make(map[int64]wallet.Balance) + for _, se := range elements { + addrRef, err := scanAddress(addrStmt.QueryRow(encode(se.SiafundOutput.Address), encode(types.ZeroCurrency), 0)) + if err != nil { + return fmt.Errorf("failed to query address: %w", err) + } else if _, ok := balanceChanges[addrRef.ID]; !ok { + balanceChanges[addrRef.ID] = addrRef.Balance + } + + var dummy types.Hash256 + if err := stmt.QueryRow(indexID, encode(se.ID)).Scan(decode(&dummy)); err != nil && !errors.Is(err, sql.ErrNoRows) { + return err + } else if errors.Is(err, sql.ErrNoRows) { + continue // skip if the element does not exist + } + + balance := balanceChanges[addrRef.ID] + if balance.Siafunds < se.SiafundOutput.Value { + panic("siafund balance cannot be negative") + } + balance.Siafunds -= se.SiafundOutput.Value + + balanceChanges[addrRef.ID] = balance + } + + if len(balanceChanges) == 0 { + return nil + } + + updateAddressBalanceStmt, err := tx.Prepare(`UPDATE sia_addresses SET siafund_balance=$1 WHERE id=$3`) + if err != nil { + return fmt.Errorf("failed to prepare update balance statement: %w", err) + } + defer updateAddressBalanceStmt.Close() + + for addrID, balance := range balanceChanges { + res, err := updateAddressBalanceStmt.Exec(balance.Siafunds, addrID) + if err != nil { + return fmt.Errorf("failed to update balance: %w", err) + } else if n, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } else if n != 1 { + return fmt.Errorf("expected 1 row affected, got %v", n) + } + } + return nil +} + +func revertSpentSiafundElements(tx *txn, elements []types.SiafundElement) error { + if len(elements) == 0 { + return nil + } + + addrStmt, err := insertAddressStatement(tx) + if err != nil { + return fmt.Errorf("failed to prepare address statement: %w", err) + } + defer addrStmt.Close() + + stmt, err := tx.Prepare(`UPDATE siafund_elements SET spent_index_id=NULL WHERE id=$1 AND spent_index_id IS NOT NULL RETURNING id`) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + balanceChanges := make(map[int64]wallet.Balance) + for _, se := range elements { + addrRef, err := scanAddress(addrStmt.QueryRow(encode(se.SiafundOutput.Address), encode(types.ZeroCurrency), 0)) + if err != nil { + return fmt.Errorf("failed to query address: %w", err) + } else if _, ok := balanceChanges[addrRef.ID]; !ok { + balanceChanges[addrRef.ID] = addrRef.Balance + } + + var dummy types.Hash256 + if err := stmt.QueryRow(encode(se.ID)).Scan(decode(&dummy)); err != nil && !errors.Is(err, sql.ErrNoRows) { + return err + } else if errors.Is(err, sql.ErrNoRows) { + continue // skip if the element does not exist + } + + balance := balanceChanges[addrRef.ID] + balance.Siafunds += se.SiafundOutput.Value + balanceChanges[addrRef.ID] = balance + } + + if len(balanceChanges) == 0 { + return nil + } + + updateAddressBalanceStmt, err := tx.Prepare(`UPDATE sia_addresses SET siafund_balance=$1 WHERE id=$3`) + if err != nil { + return fmt.Errorf("failed to prepare update balance statement: %w", err) + } + defer updateAddressBalanceStmt.Close() + + for addrID, balance := range balanceChanges { + res, err := updateAddressBalanceStmt.Exec(balance.Siafunds, addrID) + if err != nil { + return fmt.Errorf("failed to update balance: %w", err) + } else if n, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } else if n != 1 { + return fmt.Errorf("expected 1 row affected, got %v", n) + } + } + return nil +} + +func addEvents(tx *txn, events []wallet.Event, indexID int64) error { if len(events) == 0 { return nil } - insertEventStmt, err := ut.tx.Prepare(`INSERT INTO events (event_id, maturity_height, date_created, event_type, event_data, block_id, height) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (event_id) DO NOTHING RETURNING id`) + insertEventStmt, err := tx.Prepare(`INSERT INTO events (event_id, maturity_height, date_created, event_type, event_data, chain_index_id) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (event_id) DO NOTHING RETURNING id`) if err != nil { return fmt.Errorf("failed to prepare event statement: %w", err) } defer insertEventStmt.Close() - addrStmt, err := ut.tx.Prepare(`INSERT INTO sia_addresses (sia_address, siacoin_balance, immature_siacoin_balance, siafund_balance) VALUES ($1, $2, $3, 0) ON CONFLICT (sia_address) DO UPDATE SET sia_address=EXCLUDED.sia_address RETURNING id`) + addrStmt, err := tx.Prepare(`INSERT INTO sia_addresses (sia_address, siacoin_balance, immature_siacoin_balance, siafund_balance) VALUES ($1, $2, $3, 0) ON CONFLICT (sia_address) DO UPDATE SET sia_address=EXCLUDED.sia_address RETURNING id`) if err != nil { return fmt.Errorf("failed to prepare address statement: %w", err) } defer addrStmt.Close() - relevantAddrStmt, err := ut.tx.Prepare(`INSERT INTO event_addresses (event_id, address_id) VALUES ($1, $2) ON CONFLICT (event_id, address_id) DO NOTHING`) + relevantAddrStmt, err := tx.Prepare(`INSERT INTO event_addresses (event_id, address_id) VALUES ($1, $2) ON CONFLICT (event_id, address_id) DO NOTHING`) if err != nil { return fmt.Errorf("failed to prepare relevant address statement: %w", err) } @@ -570,7 +937,7 @@ func (ut *updateTx) AddEvents(events []wallet.Event) error { } var eventID int64 - err = insertEventStmt.QueryRow(encode(event.ID), event.MaturityHeight, encode(event.Timestamp), event.Data.EventType(), buf.String(), encode(event.Index.ID), event.Index.Height).Scan(&eventID) + err = insertEventStmt.QueryRow(encode(event.ID), event.MaturityHeight, encode(event.Timestamp), event.Data.EventType(), buf.String(), indexID).Scan(&eventID) if errors.Is(err, sql.ErrNoRows) { continue // skip if the event already exists } else if err != nil { @@ -600,20 +967,45 @@ func (ut *updateTx) AddEvents(events []wallet.Event) error { return nil } -// RevertIndex reverts all siacoin_elements, siafund_elements or events that were added by the index -func (ut *updateTx) RevertIndex(index types.ChainIndex) error { - if _, err := ut.tx.Exec(`DELETE FROM siacoin_elements WHERE block_id=$1 AND height=$2`, encode(index.ID), index.Height); err != nil { - return fmt.Errorf("failed to delete siacoin elements: %w", err) - } else if _, err := ut.tx.Exec(`DELETE FROM siafund_elements WHERE block_id=$1 AND height=$2`, encode(index.ID), index.Height); err != nil { - return fmt.Errorf("failed to delete siafund elements: %w", err) - } else if _, err := ut.tx.Exec(`DELETE FROM events WHERE block_id=$1 AND height=$2`, encode(index.ID), index.Height); err != nil { - return fmt.Errorf("failed to delete events: %w", err) +// RevertEvents reverts any events that were added by the index +func revertEvents(tx *txn, index types.ChainIndex) error { + const query = `DELETE FROM events WHERE chain_index_id IN (SELECT id FROM chain_indices WHERE block_id=$1 AND height=$2)` + _, err := tx.Exec(query, encode(index.ID), index.Height) + return err +} + +func revertSpentOrphanedSiacoinElements(tx *txn, index types.ChainIndex, log *zap.Logger) (map[int64]wallet.Balance, error) { + rows, err := tx.Query(`UPDATE siacoin_elements SET spent_index_id=NULL WHERE id IN (SELECT se.id FROM siacoin_elements se +INNER JOIN chain_indices ci ON (ci.id=se.spent_index_id) +WHERE ci.height=$1 AND ci.block_id<>$2) +RETURNING address_id, siacoin_value`, index.Height, encode(index.ID)) + if err != nil { + return nil, fmt.Errorf("failed to query siacoin elements: %w", err) } - return nil + defer rows.Close() + + balances := make(map[int64]wallet.Balance) + for rows.Next() { + var addrID int64 + var value types.Currency + + if err := rows.Scan(&addrID, decode(&value)); err != nil { + return nil, fmt.Errorf("failed to scan siacoin element: %w", err) + } + + balance := balances[addrID] + balance.Siacoins = balance.Siacoins.Add(value) + balances[addrID] = balance + log.Debug("reverting spent orphaned siacoin element", zap.Stringer("value", value)) + } + return balances, rows.Err() } -func (ut *updateTx) orphanedSiacoinBalances(index types.ChainIndex) (map[int64]wallet.Balance, error) { - rows, err := ut.tx.Query(`SELECT address_id, siacoin_value, matured FROM siacoin_elements WHERE height=$1 AND block_id<>$2`, index.Height, encode(index.ID)) +func deleteOrphanedSiacoinElements(tx *txn, index types.ChainIndex, log *zap.Logger) (map[int64]wallet.Balance, error) { + rows, err := tx.Query(`DELETE FROM siacoin_elements WHERE id IN (SELECT se.id FROM siacoin_elements se +INNER JOIN chain_indices ci ON (ci.id=se.chain_index_id) +WHERE ci.height=$1 AND ci.block_id<>$2) +RETURNING id, address_id, siacoin_value, matured, spent_index_id IS NOT NULL`, index.Height, encode(index.ID)) if err != nil { return nil, fmt.Errorf("failed to query siacoin elements: %w", err) } @@ -621,27 +1013,33 @@ func (ut *updateTx) orphanedSiacoinBalances(index types.ChainIndex) (map[int64]w balances := make(map[int64]wallet.Balance) for rows.Next() { + var outputID types.SiacoinOutputID var addrID int64 var value types.Currency var matured bool + var spent bool - if err := rows.Scan(&addrID, decode(&value), &matured); err != nil { + if err := rows.Scan(decode(&outputID), &addrID, decode(&value), &matured, &spent); err != nil { return nil, fmt.Errorf("failed to scan siacoin element: %w", err) } balance := balances[addrID] - if matured { - balance.Siacoins = balance.Siacoins.Add(value) - } else { + if !matured { balance.ImmatureSiacoins = balance.ImmatureSiacoins.Add(value) + } else if !spent { + balance.Siacoins = balance.Siacoins.Add(value) } balances[addrID] = balance + log.Debug("deleting orphaned siacoin element", zap.Stringer("id", outputID), zap.Stringer("value", value), zap.Bool("matured", matured), zap.Bool("spent", spent)) } return balances, rows.Err() } -func (ut *updateTx) orphanedSiafundBalances(index types.ChainIndex) (map[int64]uint64, error) { - rows, err := ut.tx.Query(`SELECT address_id, siafund_value FROM siafund_elements WHERE height=$1 AND block_id<>$2`, index.Height, encode(index.ID)) +func revertSpentOrphanedSiafundElements(tx *txn, index types.ChainIndex, log *zap.Logger) (map[int64]uint64, error) { + rows, err := tx.Query(`UPDATE siafund_elements SET spent_index_id=NULL WHERE id IN (SELECT se.id FROM siafund_elements se +INNER JOIN chain_indices ci ON (ci.id=se.spent_index_id) +WHERE ci.height=$1 AND ci.block_id<>$2) +RETURNING id, address_id, siafund_value`, index.Height, encode(index.ID)) if err != nil { return nil, fmt.Errorf("failed to query siafund elements: %w", err) } @@ -649,168 +1047,175 @@ func (ut *updateTx) orphanedSiafundBalances(index types.ChainIndex) (map[int64]u balances := make(map[int64]uint64) for rows.Next() { + var outputID types.SiafundOutputID var addrID int64 var value uint64 - if err := rows.Scan(&addrID, &value); err != nil { + if err := rows.Scan(decode(&outputID), &addrID, value); err != nil { return nil, fmt.Errorf("failed to scan siafund element: %w", err) } - balances[addrID] += value + + balance := balances[addrID] + balance += value + balances[addrID] = balance + log.Debug("reverting spent orphaned siafund element", zap.Stringer("id", outputID), zap.Uint64("value", value)) } return balances, rows.Err() } -func (ut *updateTx) deleteOrphanedSiacoinElements(index types.ChainIndex) (reverted []types.BlockID, _ error) { - rows, err := ut.tx.Query(`DELETE FROM siacoin_elements WHERE height=$1 AND block_id<>$2 RETURNING block_id`, index.Height, encode(index.ID)) +func deleteOrphanedSiafundElements(tx *txn, index types.ChainIndex, log *zap.Logger) (map[int64]uint64, error) { + rows, err := tx.Query(`DELETE FROM siafund_elements WHERE id IN (SELECT se.id FROM siafund_elements se +INNER JOIN chain_indices ci ON (ci.id=se.chain_index_id) +WHERE ci.height=$1 AND ci.block_id<>$2) +RETURNING id, address_id, siafund_value, spent_index_id IS NOT NULL`, index.Height, encode(index.ID)) if err != nil { - return nil, fmt.Errorf("failed to query orphans: %w", err) + return nil, fmt.Errorf("failed to query siafund elements: %w", err) } defer rows.Close() + balances := make(map[int64]uint64) for rows.Next() { - var orphan types.BlockID - if err := rows.Scan(decode(&orphan)); err != nil { - return nil, fmt.Errorf("failed to scan orphan: %w", err) + var outputID types.SiafundOutputID + var addrID int64 + var value uint64 + var spent bool + + if err := rows.Scan(decode(&outputID), &addrID, &value, &spent); err != nil { + return nil, fmt.Errorf("failed to scan siafund element: %w", err) } - reverted = append(reverted, orphan) + balances[addrID] += value + log.Debug("deleting orphaned siafund element", zap.Stringer("id", outputID), zap.Uint64("value", value), zap.Bool("spent", spent)) } - return reverted, rows.Err() + return balances, rows.Err() } -func (ut *updateTx) deleteOrphanedSiafundElements(index types.ChainIndex) (reverted []types.BlockID, _ error) { - rows, err := ut.tx.Query(`DELETE FROM siafund_elements WHERE height=$1 AND block_id<>$2 RETURNING block_id`, index.Height, encode(index.ID)) +func deleteOrphanedEvents(tx *txn, index types.ChainIndex) error { + _, err := tx.Exec(`DELETE FROM events WHERE id IN (SELECT ev.id FROM events ev +INNER JOIN chain_indices ci ON (ev.chain_index_id=ci.id) +WHERE ci.height=$1 AND ci.block_id<>$2);`, index.Height, encode(index.ID)) + return err +} + +// revertOrphans reverts any chain indices that were orphaned by the given index +func revertOrphans(tx *txn, index types.ChainIndex, log *zap.Logger) error { + // fetch orphaned siacoin balances + deletedSiacoins, err := deleteOrphanedSiacoinElements(tx, index, log.Named("deleteOrphanedSiacoinElements")) if err != nil { - return nil, fmt.Errorf("failed to query orphans: %w", err) + return fmt.Errorf("failed to get orphaned siacoin elements: %w", err) } - defer rows.Close() - for rows.Next() { - var orphan types.BlockID - if err := rows.Scan(decode(&orphan)); err != nil { - return nil, fmt.Errorf("failed to scan orphan: %w", err) - } - reverted = append(reverted, orphan) + // fetch orphaned siafund balances + deletedSiafunds, err := deleteOrphanedSiafundElements(tx, index, log.Named("deleteOrphanedSiafundElements")) + if err != nil { + return fmt.Errorf("failed to get orphaned siafund elements: %w", err) } - return reverted, rows.Err() -} - -// RevertOrphans reverts any chain indices that were orphaned by the given index -func (ut *updateTx) RevertOrphans(index types.ChainIndex) (reverted []types.BlockID, err error) { - log := ut.tx.log.Named("RevertOrphans").With(zap.Uint64("height", index.Height), zap.Stringer("applied", index.ID)) - // fetch orphaned siacoin balances - siacoins, err := ut.orphanedSiacoinBalances(index) + unspentSiacoins, err := revertSpentOrphanedSiacoinElements(tx, index, log.Named("revertSpentOrphanedSiacoinElements")) if err != nil { - return nil, fmt.Errorf("failed to get orphaned siacoin elements: %w", err) + return fmt.Errorf("failed to revert spent orphaned siacoin elements: %w", err) } - // fetch orphaned siafund balances - siafunds, err := ut.orphanedSiafundBalances(index) + unspentSiafunds, err := revertSpentOrphanedSiafundElements(tx, index, log.Named("revertSpentOrphanedSiafundElements")) if err != nil { - return nil, fmt.Errorf("failed to get orphaned siafund elements: %w", err) + return fmt.Errorf("failed to revert spent orphaned siafund elements: %w", err) } - // merge the balances - revertedBalance := siacoins - for addr, balance := range siafunds { - b := revertedBalance[addr] - b.Siafunds = balance - revertedBalance[addr] = b + // get the addrIDs of all affected addresses + addrIDs := make(map[int64]bool) + for id := range deletedSiacoins { + addrIDs[id] = true + } + for id := range deletedSiafunds { + addrIDs[id] = true + } + for id := range unspentSiacoins { + addrIDs[id] = true + } + for id := range unspentSiafunds { + addrIDs[id] = true } - getBalanceStmt, err := ut.tx.Prepare(`SELECT siacoin_balance, immature_siacoin_balance, siafund_balance FROM sia_addresses WHERE id=$1`) + getBalanceStmt, err := tx.Prepare(`SELECT siacoin_balance, immature_siacoin_balance, siafund_balance FROM sia_addresses WHERE id=$1`) if err != nil { - return nil, fmt.Errorf("failed to prepare balance statement: %w", err) + return fmt.Errorf("failed to prepare balance statement: %w", err) } defer getBalanceStmt.Close() - updateBalanceStmt, err := ut.tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2, siafund_balance=$3 WHERE id=$4`) + updateBalanceStmt, err := tx.Prepare(`UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2, siafund_balance=$3 WHERE id=$4`) if err != nil { - return nil, fmt.Errorf("failed to prepare update statement: %w", err) + return fmt.Errorf("failed to prepare update statement: %w", err) } defer updateBalanceStmt.Close() - for addrID, balance := range revertedBalance { + for addrID := range addrIDs { var existing wallet.Balance err := getBalanceStmt.QueryRow(addrID).Scan(decode(&existing.Siacoins), decode(&existing.ImmatureSiacoins), &existing.Siafunds) if err != nil { - return nil, fmt.Errorf("failed to get balance: %w", err) + return fmt.Errorf("failed to get balance: %w", err) } - existing.Siacoins = existing.Siacoins.Sub(balance.Siacoins) - existing.ImmatureSiacoins = existing.ImmatureSiacoins.Sub(balance.ImmatureSiacoins) - if existing.Siafunds < balance.Siafunds { + existing.Siacoins = existing.Siacoins.Sub(deletedSiacoins[addrID].Siacoins) + existing.ImmatureSiacoins = existing.ImmatureSiacoins.Sub(deletedSiacoins[addrID].ImmatureSiacoins) + if existing.Siafunds < deletedSiafunds[addrID] { panic("siafund balance cannot be negative") } - existing.Siafunds -= balance.Siafunds + existing.Siafunds -= deletedSiafunds[addrID] + + existing.Siacoins = existing.Siacoins.Add(unspentSiacoins[addrID].Siacoins) + existing.Siafunds += unspentSiafunds[addrID] res, err := updateBalanceStmt.Exec(encode(existing.Siacoins), encode(existing.ImmatureSiacoins), existing.Siafunds, addrID) if err != nil { - return nil, fmt.Errorf("failed to update balance: %w", err) + return fmt.Errorf("failed to update balance: %w", err) } else if n, err := res.RowsAffected(); err != nil { - return nil, fmt.Errorf("failed to get rows affected: %w", err) + return fmt.Errorf("failed to get rows affected: %w", err) } else if n != 1 { - return nil, fmt.Errorf("expected 1 row affected, got %v", n) + return fmt.Errorf("expected 1 row affected, got %v", n) } } - // delete events - if _, err := ut.tx.Exec(`DELETE FROM events WHERE height=$1 AND block_id<>$2`, index.Height, encode(index.ID)); err != nil { - return nil, err + if err := deleteOrphanedEvents(tx, index); err != nil { + return fmt.Errorf("failed to delete orphaned events: %w", err) } - // delete orphaned siacoin elements - orphanedSCEs, err := ut.deleteOrphanedSiacoinElements(index) - if err != nil { - return nil, err - } + _, err = tx.Exec(`DELETE FROM chain_indices WHERE height=$1 AND block_id<>$2`, index.Height, encode(index.ID)) + return err +} - // delete orphaned siafund elements - orphanedSFEs, err := ut.deleteOrphanedSiafundElements(index) +func pruneSpentSiacoinElements(tx *txn, height uint64) (removed []types.SiacoinOutputID, err error) { + const query = `DELETE FROM siacoin_elements WHERE spent_index_id IN (SELECT id FROM chain_indices WHERE height <= $1) RETURNING id` + rows, err := tx.Query(query, height) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to query siacoin elements: %w", err) } + defer rows.Close() - // dedupe orphans - seen := make(map[types.BlockID]struct{}) - for _, orphan := range append(orphanedSCEs, orphanedSFEs...) { - if _, ok := seen[orphan]; !ok { - seen[orphan] = struct{}{} - reverted = append(reverted, orphan) - log.Debug("reverted orphan", zap.Stringer("orphan", orphan)) + for rows.Next() { + var id types.SiacoinOutputID + if err := rows.Scan(decode(&id)); err != nil { + return nil, fmt.Errorf("failed to scan siacoin element: %w", err) } + removed = append(removed, id) } - - return reverted, nil + return removed, rows.Err() } -// ProcessChainApplyUpdate implements chain.Subscriber -func (s *Store) UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error { - return s.transaction(func(tx *txn) error { - utx := &updateTx{ - tx: tx, - relevantAddresses: make(map[types.Address]bool), - } +func pruneSpentSiafundElements(tx *txn, height uint64) (removed []types.SiafundOutputID, err error) { + const query = `DELETE FROM siafund_elements WHERE spent_index_id IN (SELECT id FROM chain_indices WHERE height <= $1) RETURNING id` + rows, err := tx.Query(query, height) + if err != nil { + return nil, fmt.Errorf("failed to query siafund elements: %w", err) + } + defer rows.Close() - if err := wallet.UpdateChainState(utx, reverted, applied); err != nil { - return fmt.Errorf("failed to update chain state: %w", err) - } else if err := setLastCommittedIndex(tx, applied[len(applied)-1].State.Index); err != nil { - return fmt.Errorf("failed to set last committed index: %w", err) + for rows.Next() { + var id types.SiafundOutputID + if err := rows.Scan(decode(&id)); err != nil { + return nil, fmt.Errorf("failed to scan siafund element: %w", err) } - return nil - }) -} - -// LastCommittedIndex returns the last chain index that was committed. -func (s *Store) LastCommittedIndex() (index types.ChainIndex, err error) { - err = s.db.QueryRow(`SELECT last_indexed_tip FROM global_settings`).Scan(decode(&index)) - return -} - -// ResetLastIndex resets the last indexed tip to trigger a full rescan. -func (s *Store) ResetLastIndex() error { - _, err := s.db.Exec(`UPDATE global_settings SET last_indexed_tip=$1`, encode(types.ChainIndex{})) - return err + removed = append(removed, id) + } + return removed, rows.Err() } func setLastCommittedIndex(tx *txn, index types.ChainIndex) error { diff --git a/persist/sqlite/consensus_test.go b/persist/sqlite/consensus_test.go new file mode 100644 index 0000000..af40db9 --- /dev/null +++ b/persist/sqlite/consensus_test.go @@ -0,0 +1,309 @@ +package sqlite + +import ( + "path/filepath" + "testing" + + "go.sia.tech/core/consensus" + "go.sia.tech/core/types" + "go.sia.tech/coreutils" + "go.sia.tech/coreutils/chain" + "go.sia.tech/coreutils/testutil" + "go.sia.tech/walletd/wallet" + "go.uber.org/zap/zaptest" +) + +func mineBlock(state consensus.State, txns []types.Transaction, minerAddr types.Address) types.Block { + b := types.Block{ + ParentID: state.Index.ID, + Timestamp: types.CurrentTimestamp(), + Transactions: txns, + MinerPayouts: []types.SiacoinOutput{{Address: minerAddr, Value: state.BlockReward()}}, + } + for b.ID().CmpWork(state.ChildTarget) < 0 { + b.Nonce += state.NonceFactor() + } + return b +} + +func syncDB(tb testing.TB, store *Store, cm *chain.Manager) { + index, err := store.LastCommittedIndex() + if err != nil { + tb.Fatalf("failed to get last committed index: %v", err) + } + for index != cm.Tip() { + crus, caus, err := cm.UpdatesSince(index, 1000) + if err != nil { + tb.Fatalf("failed to subscribe to chain manager: %v", err) + } else if err := store.UpdateChainState(crus, caus); err != nil { + tb.Fatalf("failed to update chain state: %v", err) + } + index = caus[len(caus)-1].State.Index + } +} + +func TestPruneSiacoins(t *testing.T) { + log := zaptest.NewLogger(t) + dir := t.TempDir() + db, err := OpenDatabase(filepath.Join(dir, "walletd.sqlite3"), log.Named("sqlite3")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db")) + if err != nil { + t.Fatal(err) + } + defer bdb.Close() + + // mine a single payout to the wallet + pk := types.GeneratePrivateKey() + addr := types.StandardUnlockHash(pk.PublicKey()) + + network, genesisBlock := testutil.Network() + store, genesisState, err := chain.NewDBStore(bdb, network, genesisBlock) + if err != nil { + t.Fatal(err) + } + + cm := chain.NewManager(store, genesisState) + + // create a wallet + w, err := db.AddWallet(wallet.Wallet{Name: "test"}) + if err != nil { + t.Fatal(err) + } else if err := db.AddWalletAddress(w.ID, wallet.Address{Address: addr}); err != nil { + t.Fatal(err) + } + + // mine a block to the wallet + expectedPayout := cm.TipState().BlockReward() + maturityHeight := cm.TipState().MaturityHeight() + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, addr)}); err != nil { + t.Fatal(err) + } + syncDB(t, db, cm) + + assertBalance := func(siacoin, immature types.Currency) { + t.Helper() + + b, err := db.WalletBalance(w.ID) + if err != nil { + t.Fatalf("failed to get wallet balance: %v", err) + } else if !b.ImmatureSiacoins.Equals(immature) { + t.Fatalf("expected immature siacoin balance %v, got %v", immature, b.ImmatureSiacoins) + } else if !b.Siacoins.Equals(siacoin) { + t.Fatalf("expected siacoin balance %v, got %v", siacoin, b.Siacoins) + } + } + + assertUTXOs := func(spent int, unspent int) { + t.Helper() + + var n int + err := db.db.QueryRow(`SELECT COUNT(*) FROM siacoin_elements WHERE spent_index_id IS NOT NULL`).Scan(&n) + if err != nil { + t.Fatalf("failed to count spent siacoin elements: %v", err) + } else if n != spent { + t.Fatalf("expected %v spent siacoin elements, got %v", spent, n) + } + + err = db.db.QueryRow(`SELECT COUNT(*) FROM siacoin_elements WHERE spent_index_id IS NULL`).Scan(&n) + if err != nil { + t.Fatalf("failed to count unspent siacoin elements: %v", err) + } else if n != unspent { + t.Fatalf("expected %v unspent siacoin elements, got %v", unspent, n) + } + } + + assertBalance(types.ZeroCurrency, expectedPayout) + assertUTXOs(0, 1) + + // mine until the payout matures + for i := 0; i < int(maturityHeight); i++ { + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + } + syncDB(t, db, cm) + assertBalance(expectedPayout, types.ZeroCurrency) + assertUTXOs(0, 1) + + // spend the utxo + utxos, err := db.WalletSiacoinOutputs(w.ID, 0, 100) + if err != nil { + t.Fatalf("failed to get wallet siacoin outputs: %v", err) + } + + txn := types.Transaction{ + SiacoinInputs: []types.SiacoinInput{{ + ParentID: types.SiacoinOutputID(utxos[0].ID), + UnlockConditions: types.StandardUnlockConditions(pk.PublicKey()), + }}, + SiacoinOutputs: []types.SiacoinOutput{ + {Value: utxos[0].SiacoinOutput.Value, Address: types.VoidAddress}, + }, + } + + sigHash := cm.TipState().WholeSigHash(txn, utxos[0].ID, 0, 0, nil) + sig := pk.SignHash(sigHash) + txn.Signatures = append(txn.Signatures, types.TransactionSignature{ + ParentID: utxos[0].ID, + CoveredFields: types.CoveredFields{WholeTransaction: true}, + PublicKeyIndex: 0, + Timelock: 0, + Signature: sig[:], + }) + + // mine a block with the transaction + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), []types.Transaction{txn}, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + syncDB(t, db, cm) + + // the utxo should now have 0 balance and 1 spent element + assertBalance(types.ZeroCurrency, types.ZeroCurrency) + assertUTXOs(1, 0) + + // mine until the element is pruned + for i := 0; i < spentElementRetentionBlocks-1; i++ { + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + syncDB(t, db, cm) + assertUTXOs(1, 0) // check that the element is not pruned early + } + + // trigger the pruning + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + syncDB(t, db, cm) + assertUTXOs(0, 0) +} + +func TestPruneSiafunds(t *testing.T) { + log := zaptest.NewLogger(t) + dir := t.TempDir() + db, err := OpenDatabase(filepath.Join(dir, "walletd.sqlite3"), log.Named("sqlite3")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db")) + if err != nil { + t.Fatal(err) + } + defer bdb.Close() + + // mine a single payout to the wallet + pk := types.GeneratePrivateKey() + addr := types.StandardUnlockHash(pk.PublicKey()) + + network, genesisBlock := testutil.Network() + // send the siafund airdrop to the wallet + genesisBlock.Transactions[0].SiafundOutputs[0].Address = addr + store, genesisState, err := chain.NewDBStore(bdb, network, genesisBlock) + if err != nil { + t.Fatal(err) + } + + cm := chain.NewManager(store, genesisState) + + // create a wallet + w, err := db.AddWallet(wallet.Wallet{Name: "test"}) + if err != nil { + t.Fatal(err) + } else if err := db.AddWalletAddress(w.ID, wallet.Address{Address: addr}); err != nil { + t.Fatal(err) + } + + syncDB(t, db, cm) + + assertBalance := func(siafunds uint64) { + t.Helper() + + b, err := db.WalletBalance(w.ID) + if err != nil { + t.Fatalf("failed to get wallet balance: %v", err) + } else if b.Siafunds != siafunds { + t.Fatalf("expected siafund balance %v, got %v", siafunds, b.ImmatureSiacoins) + } + } + + assertUTXOs := func(spent int, unspent int) { + t.Helper() + + var n int + err := db.db.QueryRow(`SELECT COUNT(*) FROM siafund_elements WHERE spent_index_id IS NOT NULL`).Scan(&n) + if err != nil { + t.Fatalf("failed to count spent siacoin elements: %v", err) + } else if n != spent { + t.Fatalf("expected %v spent siacoin elements, got %v", spent, n) + } + + err = db.db.QueryRow(`SELECT COUNT(*) FROM siafund_elements WHERE spent_index_id IS NULL`).Scan(&n) + if err != nil { + t.Fatalf("failed to count unspent siacoin elements: %v", err) + } else if n != unspent { + t.Fatalf("expected %v unspent siacoin elements, got %v", unspent, n) + } + } + + assertBalance(cm.TipState().SiafundCount()) + assertUTXOs(0, 1) + + // spend the utxo + utxos, err := db.WalletSiafundOutputs(w.ID, 0, 100) + if err != nil { + t.Fatalf("failed to get wallet siacoin outputs: %v", err) + } + + txn := types.Transaction{ + SiafundInputs: []types.SiafundInput{{ + ParentID: types.SiafundOutputID(utxos[0].ID), + UnlockConditions: types.StandardUnlockConditions(pk.PublicKey()), + }}, + SiafundOutputs: []types.SiafundOutput{ + {Value: utxos[0].SiafundOutput.Value, Address: types.VoidAddress}, + }, + } + + sigHash := cm.TipState().WholeSigHash(txn, utxos[0].ID, 0, 0, nil) + sig := pk.SignHash(sigHash) + txn.Signatures = append(txn.Signatures, types.TransactionSignature{ + ParentID: utxos[0].ID, + CoveredFields: types.CoveredFields{WholeTransaction: true}, + PublicKeyIndex: 0, + Timelock: 0, + Signature: sig[:], + }) + + // mine a block with the transaction + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), []types.Transaction{txn}, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + syncDB(t, db, cm) + + // the utxo should now have 0 balance and 1 spent element + assertBalance(0) + assertUTXOs(1, 0) + + // mine until the element is pruned + for i := 0; i < spentElementRetentionBlocks-1; i++ { + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + syncDB(t, db, cm) // check that the element is not pruned early + assertUTXOs(1, 0) + } + + // the spent element should now be pruned + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + syncDB(t, db, cm) + assertUTXOs(0, 0) +} diff --git a/persist/sqlite/consts_default.go b/persist/sqlite/consts_default.go index 50b7330..19f84e5 100644 --- a/persist/sqlite/consts_default.go +++ b/persist/sqlite/consts_default.go @@ -9,4 +9,6 @@ const ( maxRetryAttempts = 30 // 30 attempts factor = 1.8 // factor ^ retryAttempts = backoff time in milliseconds maxBackoff = 15 * time.Second + + spentElementRetentionBlocks = 144 // 1 day ) diff --git a/persist/sqlite/consts_testing.go b/persist/sqlite/consts_testing.go index f4911e3..e4ade1e 100644 --- a/persist/sqlite/consts_testing.go +++ b/persist/sqlite/consts_testing.go @@ -9,4 +9,6 @@ const ( maxRetryAttempts = 10 // 10 attempts factor = 2.0 // factor ^ retryAttempts = backoff time in milliseconds maxBackoff = 15 * time.Second + + spentElementRetentionBlocks = 36 ) diff --git a/persist/sqlite/init.sql b/persist/sqlite/init.sql index e316dfa..c4c3537 100644 --- a/persist/sqlite/init.sql +++ b/persist/sqlite/init.sql @@ -1,3 +1,10 @@ +CREATE TABLE chain_indices ( + id INTEGER PRIMARY KEY, + block_id BLOB UNIQUE NOT NULL, + height INTEGER UNIQUE NOT NULL +); +CREATE INDEX chain_indices_height ON chain_indices (block_id, height); + CREATE TABLE sia_addresses ( id INTEGER PRIMARY KEY, sia_address BLOB UNIQUE NOT NULL, @@ -14,12 +21,13 @@ CREATE TABLE siacoin_elements ( maturity_height INTEGER NOT NULL, /* stored as int64 for easier querying */ address_id INTEGER NOT NULL REFERENCES sia_addresses (id), matured BOOLEAN NOT NULL, /* tracks whether the value has been added to the address balance */ - block_id BLOB NOT NULL, - height INTEGER NOT NULL + chain_index_id INTEGER NOT NULL REFERENCES chain_indices (id), + spent_index_id INTEGER REFERENCES chain_indices (id) /* soft delete */ ); CREATE INDEX siacoin_elements_address_id ON siacoin_elements (address_id); CREATE INDEX siacoin_elements_maturity_height_matured ON siacoin_elements (maturity_height, matured); -CREATE INDEX siacoin_elements_block_id_height ON siacoin_elements (block_id, height); +CREATE INDEX siacoin_elements_chain_index_id ON siacoin_elements (chain_index_id); +CREATE INDEX siacoin_elements_spent_index_id ON siacoin_elements (spent_index_id); CREATE TABLE siafund_elements ( id BLOB PRIMARY KEY, @@ -28,23 +36,23 @@ CREATE TABLE siafund_elements ( leaf_index INTEGER NOT NULL, siafund_value INTEGER NOT NULL, address_id INTEGER NOT NULL REFERENCES sia_addresses (id), - block_id BLOB NOT NULL, - height INTEGER NOT NULL + chain_index_id INTEGER NOT NULL REFERENCES chain_indices (id), + spent_index_id INTEGER REFERENCES chain_indices (id) /* soft delete */ ); CREATE INDEX siafund_elements_address_id ON siafund_elements (address_id); -CREATE INDEX siafund_elements_block_id_height ON siafund_elements (block_id, height); +CREATE INDEX siafund_elements_chain_index_id ON siafund_elements (chain_index_id); +CREATE INDEX siafund_elements_spent_index_id ON siafund_elements (spent_index_id); CREATE TABLE events ( id INTEGER PRIMARY KEY, + chain_index_id INTEGER NOT NULL REFERENCES chain_indices (id), event_id BLOB UNIQUE NOT NULL, maturity_height INTEGER NOT NULL, date_created INTEGER NOT NULL, event_type TEXT NOT NULL, - event_data BLOB NOT NULL, - block_id BLOB NOT NULL, - height INTEGER NOT NULL + event_data BLOB NOT NULL ); -CREATE INDEX events_block_id_height ON events (block_id, height); +CREATE INDEX events_chain_index_id ON events (chain_index_id); CREATE TABLE event_addresses ( event_id INTEGER NOT NULL REFERENCES events (id) ON DELETE CASCADE, diff --git a/persist/sqlite/wallet.go b/persist/sqlite/wallet.go index 7b0e8eb..2608687 100644 --- a/persist/sqlite/wallet.go +++ b/persist/sqlite/wallet.go @@ -71,8 +71,9 @@ func scanEvent(s scanner) (ev wallet.Event, eventID int64, err error) { } func getWalletEvents(tx *txn, id wallet.ID, offset, limit int) (events []wallet.Event, eventIDs []int64, err error) { - const query = `SELECT ev.id, ev.event_id, ev.maturity_height, ev.date_created, ev.height, ev.block_id, ev.event_type, ev.event_data + const query = `SELECT ev.id, ev.event_id, ev.maturity_height, ev.date_created, ci.height, ci.block_id, ev.event_type, ev.event_data FROM events ev + INNER JOIN chain_indices ci ON (ev.chain_index_id = ci.id) WHERE ev.id IN (SELECT event_id FROM event_addresses WHERE address_id IN (SELECT address_id FROM wallet_addresses WHERE wallet_id=$1)) ORDER BY ev.maturity_height DESC, ev.id DESC LIMIT $2 OFFSET $3` @@ -299,7 +300,7 @@ func (s *Store) WalletSiacoinOutputs(id wallet.ID, offset, limit int) (siacoins const query = `SELECT se.id, se.siacoin_value, se.merkle_proof, se.leaf_index, se.maturity_height, sa.sia_address FROM siacoin_elements se INNER JOIN sia_addresses sa ON (se.address_id = sa.id) - WHERE se.address_id IN (SELECT address_id FROM wallet_addresses WHERE wallet_id=$1) + WHERE se.spent_index_id IS NULL AND se.address_id IN (SELECT address_id FROM wallet_addresses WHERE wallet_id=$1) LIMIT $2 OFFSET $3` rows, err := tx.Query(query, id, limit, offset) @@ -331,7 +332,7 @@ func (s *Store) WalletSiafundOutputs(id wallet.ID, offset, limit int) (siafunds const query = `SELECT se.id, se.leaf_index, se.merkle_proof, se.siafund_value, se.claim_start, sa.sia_address FROM siafund_elements se INNER JOIN sia_addresses sa ON (se.address_id = sa.id) - WHERE se.address_id IN (SELECT address_id FROM wallet_addresses WHERE wallet_id=$1) + WHERE se.spent_index_id IS NULL AND se.address_id IN (SELECT address_id FROM wallet_addresses WHERE wallet_id=$1) LIMIT $2 OFFSET $3` rows, err := tx.Query(query, id, limit, offset) diff --git a/wallet/manager.go b/wallet/manager.go index a57ca6f..c75aefd 100644 --- a/wallet/manager.go +++ b/wallet/manager.go @@ -195,7 +195,7 @@ func syncStore(ctx context.Context, store Store, cm ChainManager, index types.Ch } // NewManager creates a new wallet manager. -func NewManager(cm ChainManager, store Store, log *zap.Logger) (*Manager, error) { +func NewManager(cm ChainManager, store Store, log *zap.Logger) *Manager { m := &Manager{ chain: cm, store: store, @@ -203,31 +203,24 @@ func NewManager(cm ChainManager, store Store, log *zap.Logger) (*Manager, error) tg: threadgroup.New(), } - lastTip, err := store.LastCommittedIndex() - if err != nil { - return nil, fmt.Errorf("failed to get last committed index: %w", err) - } + reorgChan := make(chan struct{}, 1) + reorgChan <- struct{}{} + unsubscribe := cm.OnReorg(func(index types.ChainIndex) { + select { + case reorgChan <- struct{}{}: + default: + } + }) go func() { + defer unsubscribe() + ctx, cancel, err := m.tg.AddWithContext(context.Background()) if err != nil { log.Panic("failed to add to threadgroup", zap.Error(err)) } defer cancel() - if err := syncStore(ctx, store, cm, lastTip); err != nil { - log.Fatal("failed to subscribe to chain manager", zap.Error(err)) - } - - reorgChan := make(chan types.ChainIndex, 1) - unsubscribe := cm.OnReorg(func(index types.ChainIndex) { - select { - case reorgChan <- index: - default: - } - }) - defer unsubscribe() - for { select { case <-ctx.Done(): @@ -246,5 +239,5 @@ func NewManager(cm ChainManager, store Store, log *zap.Logger) (*Manager, error) m.mu.Unlock() } }() - return m, nil + return m } diff --git a/wallet/update.go b/wallet/update.go index f0f1c12..81990e4 100644 --- a/wallet/update.go +++ b/wallet/update.go @@ -5,6 +5,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/coreutils/chain" + "go.uber.org/zap" ) type ( @@ -14,6 +15,21 @@ type ( Balance } + AppliedState struct { + Events []Event + CreatedSiacoinElements []types.SiacoinElement + SpentSiacoinElements []types.SiacoinElement + CreatedSiafundElements []types.SiafundElement + SpentSiafundElements []types.SiafundElement + } + + RevertedState struct { + UnspentSiacoinElements []types.SiacoinElement + DeletedSiacoinElements []types.SiacoinElement + UnspentSiafundElements []types.SiafundElement + DeletedSiafundElements []types.SiafundElement + } + // An UpdateTx atomically updates the state of a store. UpdateTx interface { SiacoinStateElements() ([]types.StateElement, error) @@ -22,34 +38,16 @@ type ( SiafundStateElements() ([]types.StateElement, error) UpdateSiafundStateElements([]types.StateElement) error - AddSiacoinElements([]types.SiacoinElement, types.ChainIndex) error - RemoveSiacoinElements([]types.SiacoinElement, types.ChainIndex) error - - AddSiafundElements([]types.SiafundElement, types.ChainIndex) error - RemoveSiafundElements([]types.SiafundElement, types.ChainIndex) error - AddressRelevant(types.Address) (bool, error) - ApplyMatureSiacoinBalance(types.ChainIndex) error - AddEvents([]Event) error - - RevertIndex(index types.ChainIndex) error - RevertMatureSiacoinBalance(types.ChainIndex) error - RevertOrphans(types.ChainIndex) (reverted []types.BlockID, err error) + ApplyIndex(types.ChainIndex, AppliedState) error + RevertIndex(types.ChainIndex, RevertedState) error } ) // applyChainUpdate atomically applies a chain update to a store func applyChainUpdate(tx UpdateTx, cau chain.ApplyUpdate) error { - // revert any orphaned chain indices - if _, err := tx.RevertOrphans(cau.State.Index); err != nil { - return fmt.Errorf("failed to revert orphans: %w", err) - } - - // update the immature balance of each relevant address - if err := tx.ApplyMatureSiacoinBalance(cau.State.Index); err != nil { - return fmt.Errorf("failed to get matured siacoin elements: %w", err) - } + var applied AppliedState // determine which siacoin and siafund elements are ephemeral // @@ -73,7 +71,6 @@ func applyChainUpdate(tx UpdateTx, cau chain.ApplyUpdate) error { } // add new siacoin elements to the store - var newSiacoinElements, spentSiacoinElements []types.SiacoinElement cau.ForEachSiacoinElement(func(se types.SiacoinElement, spent bool) { if ephemeral[se.ID] { return @@ -87,19 +84,12 @@ func applyChainUpdate(tx UpdateTx, cau chain.ApplyUpdate) error { } if spent { - spentSiacoinElements = append(spentSiacoinElements, se) + applied.SpentSiacoinElements = append(applied.SpentSiacoinElements, se) } else { - newSiacoinElements = append(newSiacoinElements, se) + applied.CreatedSiacoinElements = append(applied.CreatedSiacoinElements, se) } }) - if err := tx.AddSiacoinElements(newSiacoinElements, cau.State.Index); err != nil { - return fmt.Errorf("failed to add siacoin elements: %w", err) - } else if err := tx.RemoveSiacoinElements(spentSiacoinElements, cau.State.Index); err != nil { - return fmt.Errorf("failed to remove siacoin elements: %w", err) - } - - var newSiafundElements, spentSiafundElements []types.SiafundElement cau.ForEachSiafundElement(func(se types.SiafundElement, spent bool) { if ephemeral[se.ID] { return @@ -113,18 +103,12 @@ func applyChainUpdate(tx UpdateTx, cau chain.ApplyUpdate) error { } if spent { - spentSiafundElements = append(spentSiafundElements, se) + applied.SpentSiafundElements = append(applied.SpentSiafundElements, se) } else { - newSiafundElements = append(newSiafundElements, se) + applied.CreatedSiafundElements = append(applied.CreatedSiafundElements, se) } }) - if err := tx.AddSiafundElements(newSiafundElements, cau.State.Index); err != nil { - return fmt.Errorf("failed to add siafund elements: %w", err) - } else if err := tx.RemoveSiafundElements(spentSiafundElements, cau.State.Index); err != nil { - return fmt.Errorf("failed to remove siafund elements: %w", err) - } - // add events relevant := func(addr types.Address) bool { relevant, err := tx.AddressRelevant(addr) @@ -133,9 +117,7 @@ func applyChainUpdate(tx UpdateTx, cau chain.ApplyUpdate) error { } return relevant } - if err := tx.AddEvents(AppliedEvents(cau.State, cau.Block, cau, relevant)); err != nil { - return fmt.Errorf("failed to add events: %w", err) - } + applied.Events = AppliedEvents(cau.State, cau.Block, cau, relevant) // fetch all siacoin and siafund state elements siacoinStateElements, err := tx.SiacoinStateElements() @@ -165,11 +147,17 @@ func applyChainUpdate(tx UpdateTx, cau chain.ApplyUpdate) error { if err := tx.UpdateSiafundStateElements(siafundStateElements); err != nil { return fmt.Errorf("failed to update siacoin state elements: %w", err) } + + if err := tx.ApplyIndex(cau.State.Index, applied); err != nil { + return fmt.Errorf("failed to apply chain update %q: %w", cau.State.Index, err) + } return nil } // revertChainUpdate atomically reverts a chain update from a store func revertChainUpdate(tx UpdateTx, cru chain.RevertUpdate, revertedIndex types.ChainIndex) error { + var reverted RevertedState + // determine which siacoin and siafund elements are ephemeral // // note: I thought we could use LeafIndex == EphemeralLeafIndex, but @@ -191,7 +179,6 @@ func revertChainUpdate(tx UpdateTx, cru chain.RevertUpdate, revertedIndex types. } } - var removedSiacoinElements, addedSiacoinElements []types.SiacoinElement cru.ForEachSiacoinElement(func(se types.SiacoinElement, spent bool) { if ephemeral[se.ID] { return @@ -206,20 +193,13 @@ func revertChainUpdate(tx UpdateTx, cru chain.RevertUpdate, revertedIndex types. if spent { // re-add any spent siacoin elements - addedSiacoinElements = append(addedSiacoinElements, se) + reverted.UnspentSiacoinElements = append(reverted.UnspentSiacoinElements, se) } else { // delete any created siacoin elements - removedSiacoinElements = append(removedSiacoinElements, se) + reverted.DeletedSiacoinElements = append(reverted.DeletedSiacoinElements, se) } }) - if err := tx.AddSiacoinElements(addedSiacoinElements, revertedIndex); err != nil { - return fmt.Errorf("failed to add siacoin elements: %w", err) - } else if err := tx.RemoveSiacoinElements(removedSiacoinElements, revertedIndex); err != nil { - return fmt.Errorf("failed to remove siacoin elements: %w", err) - } - - var removedSiafundElements, addedSiafundElements []types.SiafundElement cru.ForEachSiafundElement(func(se types.SiafundElement, spent bool) { if ephemeral[se.ID] { return @@ -234,23 +214,15 @@ func revertChainUpdate(tx UpdateTx, cru chain.RevertUpdate, revertedIndex types. if spent { // re-add any spent siafund elements - addedSiafundElements = append(addedSiafundElements, se) + reverted.UnspentSiafundElements = append(reverted.UnspentSiafundElements, se) } else { // delete any created siafund elements - removedSiafundElements = append(removedSiafundElements, se) + reverted.DeletedSiafundElements = append(reverted.DeletedSiafundElements, se) } }) - // revert siafund element changes - if err := tx.AddSiafundElements(addedSiafundElements, revertedIndex); err != nil { - return fmt.Errorf("failed to add siafund elements: %w", err) - } else if err := tx.RemoveSiafundElements(removedSiafundElements, revertedIndex); err != nil { - return fmt.Errorf("failed to remove siafund elements: %w", err) - } - - // revert mature siacoin balance for each relevant address - if err := tx.RevertMatureSiacoinBalance(revertedIndex); err != nil { - return fmt.Errorf("failed to get matured siacoin elements: %w", err) + if err := tx.RevertIndex(revertedIndex, reverted); err != nil { + return fmt.Errorf("failed to revert index %q: %w", revertedIndex, err) } siacoinElements, err := tx.SiacoinStateElements() @@ -275,12 +247,10 @@ func revertChainUpdate(tx UpdateTx, cru chain.RevertUpdate, revertedIndex types. if err := tx.UpdateSiafundStateElements(siafundElements); err != nil { return fmt.Errorf("failed to update siafund state elements: %w", err) } - - // revert index - return tx.RevertIndex(revertedIndex) + return nil } -func UpdateChainState(tx UpdateTx, reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error { +func UpdateChainState(tx UpdateTx, reverted []chain.RevertUpdate, applied []chain.ApplyUpdate, log *zap.Logger) error { for _, cru := range reverted { revertedIndex := types.ChainIndex{ ID: cru.Block.ID(), @@ -289,12 +259,15 @@ func UpdateChainState(tx UpdateTx, reverted []chain.RevertUpdate, applied []chai if err := revertChainUpdate(tx, cru, revertedIndex); err != nil { return fmt.Errorf("failed to revert chain update %q: %w", revertedIndex, err) } + log.Debug("reverted chain update", zap.Stringer("blockID", revertedIndex.ID), zap.Uint64("height", revertedIndex.Height)) } for _, cau := range applied { + // apply the chain update if err := applyChainUpdate(tx, cau); err != nil { return fmt.Errorf("failed to apply chain update %q: %w", cau.State.Index, err) } + log.Debug("applied chain update", zap.Stringer("blockID", cau.State.Index.ID), zap.Uint64("height", cau.State.Index.Height)) } return nil } diff --git a/wallet/wallet_test.go b/wallet/wallet_test.go index df505c6..0283abb 100644 --- a/wallet/wallet_test.go +++ b/wallet/wallet_test.go @@ -20,6 +20,7 @@ import ( ) func waitForBlock(tb testing.TB, cm *chain.Manager, ws wallet.Store) { + tb.Helper() for i := 0; i < 1000; i++ { time.Sleep(10 * time.Millisecond) tip, _ := ws.LastCommittedIndex() @@ -119,10 +120,7 @@ func TestReorg(t *testing.T) { } cm := chain.NewManager(store, genesisState) - wm, err := wallet.NewManager(cm, db, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm := wallet.NewManager(cm, db, log.Named("wallet")) defer wm.Close() w, err := wm.AddWallet(wallet.Wallet{Name: "test"}) @@ -181,9 +179,10 @@ func TestReorg(t *testing.T) { // mine to trigger a reorg var blocks []types.Block state := genesisState - for i := 0; i < 5; i++ { - blocks = append(blocks, mineBlock(state, nil, types.VoidAddress)) - state.Index.ID = blocks[len(blocks)-1].ID() + for i := 0; i < 10; i++ { + block := mineBlock(state, nil, types.VoidAddress) + blocks = append(blocks, block) + state.Index.ID = block.ID() state.Index.Height++ } if err := cm.AddBlocks(blocks); err != nil { @@ -321,10 +320,7 @@ func TestEphemeralBalance(t *testing.T) { } cm := chain.NewManager(store, genesisState) - wm, err := wallet.NewManager(cm, db, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm := wallet.NewManager(cm, db, log.Named("wallet")) defer wm.Close() w, err := wm.AddWallet(wallet.Wallet{Name: "test"}) @@ -517,10 +513,7 @@ func TestWalletAddresses(t *testing.T) { } cm := chain.NewManager(store, genesisState) - wm, err := wallet.NewManager(cm, db, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm := wallet.NewManager(cm, db, log.Named("wallet")) defer wm.Close() // Add a wallet @@ -647,10 +640,7 @@ func TestV2(t *testing.T) { } cm := chain.NewManager(store, genesisState) - wm, err := wallet.NewManager(cm, db, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm := wallet.NewManager(cm, db, log.Named("wallet")) defer wm.Close() w, err := wm.AddWallet(wallet.Wallet{Name: "test"}) @@ -742,7 +732,7 @@ func TestV2(t *testing.T) { } } -func TestResubscribe(t *testing.T) { +func TestScan(t *testing.T) { log := zaptest.NewLogger(t) dir := t.TempDir() db, err := sqlite.OpenDatabase(filepath.Join(dir, "walletd.sqlite3"), log.Named("sqlite3")) @@ -772,10 +762,7 @@ func TestResubscribe(t *testing.T) { cm := chain.NewManager(store, genesisState) - wm, err := wallet.NewManager(cm, db, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm := wallet.NewManager(cm, db, log.Named("wallet")) defer wm.Close() pk2 := types.GeneratePrivateKey() @@ -926,10 +913,7 @@ func TestSiafunds(t *testing.T) { cm := chain.NewManager(store, genesisState) - wm, err := wallet.NewManager(cm, db, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm := wallet.NewManager(cm, db, log.Named("wallet")) defer wm.Close() pk2 := types.GeneratePrivateKey() @@ -1056,6 +1040,8 @@ func TestOrphans(t *testing.T) { defer bdb.Close() network, genesisBlock := testV1Network(types.VoidAddress) // don't care about siafunds + network.HardforkV2.AllowHeight = 200 + network.HardforkV2.RequireHeight = 201 store, genesisState, err := chain.NewDBStore(bdb, network, genesisBlock) if err != nil { @@ -1063,10 +1049,7 @@ func TestOrphans(t *testing.T) { } cm := chain.NewManager(store, genesisState) - wm, err := wallet.NewManager(cm, db, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm := wallet.NewManager(cm, db, log.Named("wallet")) defer wm.Close() w, err := wm.AddWallet(wallet.Wallet{Name: "test"}) @@ -1082,21 +1065,28 @@ func TestOrphans(t *testing.T) { if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, addr)}); err != nil { t.Fatal(err) } + + // mine until the maturity height + for i := cm.TipState().Index.Height; i < maturityHeight+1; i++ { + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + } waitForBlock(t, cm, db) assertBalance := func(siacoin, immature types.Currency) error { b, err := wm.WalletBalance(w.ID) if err != nil { return fmt.Errorf("failed to check balance: %w", err) - } else if !b.Siacoins.Equals(siacoin) { - return fmt.Errorf("expected siacoin balance %v, got %v", siacoin, b.Siacoins) } else if !b.ImmatureSiacoins.Equals(immature) { return fmt.Errorf("expected immature siacoin balance %v, got %v", immature, b.ImmatureSiacoins) + } else if !b.Siacoins.Equals(siacoin) { + return fmt.Errorf("expected siacoin balance %v, got %v", siacoin, b.Siacoins) } return nil } - if err := assertBalance(types.ZeroCurrency, expectedPayout); err != nil { + if err := assertBalance(expectedPayout, types.ZeroCurrency); err != nil { t.Fatal(err) } @@ -1122,8 +1112,54 @@ func TestOrphans(t *testing.T) { t.Fatalf("expected %v, got %v", maturityHeight, utxos[0].MaturityHeight) } + resetState := cm.TipState() + + // send a transaction that will be orphaned + txn := types.Transaction{ + SiacoinInputs: []types.SiacoinInput{ + { + ParentID: types.SiacoinOutputID(utxos[0].ID), + UnlockConditions: types.StandardUnlockConditions(pk.PublicKey()), + }, + }, + SiacoinOutputs: []types.SiacoinOutput{ + {Address: types.VoidAddress, Value: expectedPayout.Div64(2)}, // send the other half to the void + {Address: addr, Value: expectedPayout.Div64(2)}, // send half the payout back to the wallet + }, + Signatures: []types.TransactionSignature{ + { + ParentID: utxos[0].ID, + PublicKeyIndex: 0, + CoveredFields: types.CoveredFields{WholeTransaction: true}, + }, + }, + } + sigHash := cm.TipState().WholeSigHash(txn, utxos[0].ID, 0, 0, nil) + sig := pk.SignHash(sigHash) + txn.Signatures[0].Signature = sig[:] + + // broadcast the transaction + if _, err := cm.AddPoolTransactions([]types.Transaction{txn}); err != nil { + t.Fatal(err) + } else if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), []types.Transaction{txn}, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + waitForBlock(t, cm, db) + + if err := assertBalance(expectedPayout.Div64(2), types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // check that the transaction event was recorded + events, err = wm.Events(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(events) != 2 { + t.Fatalf("expected 2 events, got %v", len(events)) + } + // simulate an interrupted rescan by closing the wallet manager, resetting the - // last rescan index, and initializing a new wallet manager. + // last scan index, and initializing a new wallet manager. if err := wm.Close(); err != nil { t.Fatal(err) } else if err := db.ResetLastIndex(); err != nil { @@ -1134,7 +1170,7 @@ func TestOrphans(t *testing.T) { // orphaned blocks that will not be cleanly reverted since the rescan was // interrupted. var blocks []types.Block - state := genesisState + state := resetState for i := 0; i < 5; i++ { blocks = append(blocks, mineBlock(state, nil, types.VoidAddress)) state.Index.ID = blocks[len(blocks)-1].ID() @@ -1144,32 +1180,31 @@ func TestOrphans(t *testing.T) { t.Fatal(err) } - wm, err = wallet.NewManager(cm, db, log.Named("wallet")) - if err != nil { - t.Fatal(err) - } + wm = wallet.NewManager(cm, db, log.Named("wallet")) defer wm.Close() waitForBlock(t, cm, db) - // check that the balance was reverted - if err := assertBalance(types.ZeroCurrency, types.ZeroCurrency); err != nil { + // check that the transaction was reverted + if err := assertBalance(expectedPayout, types.ZeroCurrency); err != nil { t.Fatal(err) } - // check that the payout event was reverted + // check that the transaction event was reverted events, err = wm.Events(w.ID, 0, 100) if err != nil { t.Fatal(err) - } else if len(events) != 0 { - t.Fatalf("expected 0 events, got %v", len(events)) + } else if len(events) != 1 { + t.Fatalf("expected 1 event, got %v", len(events)) } // check that the utxo was reverted utxos, err = wm.UnspentSiacoinOutputs(w.ID, 0, 100) if err != nil { t.Fatal(err) - } else if len(utxos) != 0 { - t.Fatalf("expected 0 output, got %v", len(utxos)) + } else if len(utxos) != 1 { + t.Fatalf("expected 1 output, got %v", len(utxos)) + } else if !utxos[0].SiacoinOutput.Value.Equals(expectedPayout) { + t.Fatalf("expected %v, got %v", expectedPayout, utxos[0].SiacoinOutput.Value) } } From 9b0446d650ffe63d4194b142ada69f5350e9b9ab Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Tue, 30 Apr 2024 15:36:24 -0700 Subject: [PATCH 2/5] ci: fix lint errors --- api/api.go | 1 + persist/sqlite/consensus.go | 2 +- wallet/update.go | 6 ++++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/api/api.go b/api/api.go index 9972477..b059350 100644 --- a/api/api.go +++ b/api/api.go @@ -94,6 +94,7 @@ type SeedSignRequest struct { Keys []uint64 `json:"keys"` } +// RescanResponse contains information about the state of a chain rescan. type RescanResponse struct { StartIndex types.ChainIndex `json:"startIndex"` Index types.ChainIndex `json:"index"` diff --git a/persist/sqlite/consensus.go b/persist/sqlite/consensus.go index 362c1bc..c98bd56 100644 --- a/persist/sqlite/consensus.go +++ b/persist/sqlite/consensus.go @@ -181,7 +181,7 @@ func (ut *updateTx) RevertIndex(index types.ChainIndex, state wallet.RevertedSta return nil } -// ProcessChainApplyUpdate implements chain.Subscriber +// UpdateChainState implements chain.Subscriber func (s *Store) UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error { log := s.log.Named("UpdateChainState").With(zap.Int("reverted", len(reverted)), zap.Int("applied", len(applied))) return s.transaction(func(tx *txn) error { diff --git a/wallet/update.go b/wallet/update.go index 81990e4..8a42c6a 100644 --- a/wallet/update.go +++ b/wallet/update.go @@ -15,6 +15,8 @@ type ( Balance } + // AppliedState contains all state changes made to a store after applying a chain + // update. AppliedState struct { Events []Event CreatedSiacoinElements []types.SiacoinElement @@ -23,6 +25,8 @@ type ( SpentSiafundElements []types.SiafundElement } + // RevertedState contains all state changes made to a store after reverting + // a chain update. RevertedState struct { UnspentSiacoinElements []types.SiacoinElement DeletedSiacoinElements []types.SiacoinElement @@ -250,6 +254,8 @@ func revertChainUpdate(tx UpdateTx, cru chain.RevertUpdate, revertedIndex types. return nil } +// UpdateChainState atomically updates the state of a store with a set of +// updates from the chain manager. func UpdateChainState(tx UpdateTx, reverted []chain.RevertUpdate, applied []chain.ApplyUpdate, log *zap.Logger) error { for _, cru := range reverted { revertedIndex := types.ChainIndex{ From b5444ab0ebe2b22d84d10af7d84ba82ca7049efc Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Wed, 1 May 2024 13:31:49 -0700 Subject: [PATCH 3/5] sqlite: overwrite element proof during rescan --- persist/sqlite/consensus.go | 56 ++++++++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/persist/sqlite/consensus.go b/persist/sqlite/consensus.go index c98bd56..547a675 100644 --- a/persist/sqlite/consensus.go +++ b/persist/sqlite/consensus.go @@ -148,7 +148,7 @@ func (ut *updateTx) ApplyIndex(index types.ChainIndex, state wallet.AppliedState if err := spendSiafundElements(tx, state.SpentSiafundElements, indexID); err != nil { return fmt.Errorf("failed to spend siafund elements: %w", err) - } else if err := addSiafundElements(tx, state.CreatedSiafundElements, indexID); err != nil { + } else if err := addSiafundElements(tx, state.CreatedSiafundElements, indexID, log.Named("addSiafundElements")); err != nil { return fmt.Errorf("failed to add siafund elements: %w", err) } @@ -412,8 +412,14 @@ func addSiacoinElements(tx *txn, elements []types.SiacoinElement, indexID int64, } defer addrStmt.Close() + existsStmt, err := tx.Prepare(`SELECT EXISTS(SELECT 1 FROM siacoin_elements WHERE id=$1)`) + if err != nil { + return fmt.Errorf("failed to prepare exists statement: %w", err) + } + defer existsStmt.Close() + // ignore elements already in the database. - insertStmt, err := tx.Prepare(`INSERT INTO siacoin_elements (id, siacoin_value, merkle_proof, leaf_index, maturity_height, address_id, matured, chain_index_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (id) DO NOTHING RETURNING id`) + insertStmt, err := tx.Prepare(`INSERT INTO siacoin_elements (id, siacoin_value, merkle_proof, leaf_index, maturity_height, address_id, matured, chain_index_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (id) DO UPDATE SET leaf_index=EXCLUDED.leaf_index, merkle_proof=EXCLUDED.merkle_proof`) if err != nil { return fmt.Errorf("failed to prepare insert statement: %w", err) } @@ -428,16 +434,22 @@ func addSiacoinElements(tx *txn, elements []types.SiacoinElement, indexID int64, balanceChanges[addrRef.ID] = addrRef.Balance } - var dummyID types.Hash256 - err = insertStmt.QueryRow(encode(se.ID), encode(se.SiacoinOutput.Value), encodeSlice(se.MerkleProof), se.LeafIndex, se.MaturityHeight, addrRef.ID, se.MaturityHeight == 0, indexID).Scan(decode(&dummyID)) - if errors.Is(err, sql.ErrNoRows) { - log.Debug("siacoin element already exists", zap.Stringer("id", se.ID), zap.Stringer("address", se.SiacoinOutput.Address)) - continue // skip if the element already exists - } else if err != nil { + var exists bool + err = existsStmt.QueryRow(encode(se.ID)).Scan(&exists) + if err != nil { + return fmt.Errorf("failed to check if siacoin element exists: %w", err) + } + + _, err = insertStmt.Exec(encode(se.ID), encode(se.SiacoinOutput.Value), encodeSlice(se.MerkleProof), se.LeafIndex, se.MaturityHeight, addrRef.ID, se.MaturityHeight == 0, indexID) + if err != nil { return fmt.Errorf("failed to execute statement: %w", err) } + // skip balance update if the element already exists + if exists { + log.Debug("updated siacoin element", zap.Stringer("id", se.ID), zap.Stringer("address", se.SiacoinOutput.Address), zap.Stringer("value", se.SiacoinOutput.Value)) + continue + } - // update the balance if the element does not exist balance := balanceChanges[addrRef.ID] if se.MaturityHeight == 0 { balance.Siacoins = balance.Siacoins.Add(se.SiacoinOutput.Value) @@ -659,7 +671,7 @@ func spendSiacoinElements(tx *txn, elements []types.SiacoinElement, indexID int6 return nil } -func addSiafundElements(tx *txn, elements []types.SiafundElement, indexID int64) error { +func addSiafundElements(tx *txn, elements []types.SiafundElement, indexID int64, log *zap.Logger) error { if len(elements) == 0 { return nil } @@ -670,7 +682,13 @@ func addSiafundElements(tx *txn, elements []types.SiafundElement, indexID int64) } defer addrStmt.Close() - insertStmt, err := tx.Prepare(`INSERT INTO siafund_elements (id, siafund_value, merkle_proof, leaf_index, claim_start, address_id, chain_index_id) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO NOTHING RETURNING id`) + existsStmt, err := tx.Prepare(`SELECT EXISTS(SELECT 1 FROM siafund_elements WHERE id=$1)`) + if err != nil { + return fmt.Errorf("failed to prepare exists statement: %w", err) + } + defer existsStmt.Close() + + insertStmt, err := tx.Prepare(`INSERT INTO siafund_elements (id, siafund_value, merkle_proof, leaf_index, claim_start, address_id, chain_index_id) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO UPDATE SET leaf_index=EXCLUDED.leaf_index, merkle_proof=EXCLUDED.merkle_proof`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } @@ -685,12 +703,18 @@ func addSiafundElements(tx *txn, elements []types.SiafundElement, indexID int64) balanceChanges[addrRef.ID] = addrRef.Balance.Siafunds } - var dummy types.Hash256 - err = insertStmt.QueryRow(encode(se.ID), se.SiafundOutput.Value, encodeSlice(se.MerkleProof), se.LeafIndex, encode(se.ClaimStart), addrRef.ID, indexID).Scan(decode(&dummy)) - if errors.Is(err, sql.ErrNoRows) { - continue // skip if the element already exists - } else if err != nil { + var exists bool + if err := existsStmt.QueryRow(encode(se.ID)).Scan(&exists); err != nil { + return fmt.Errorf("failed to check if siafund element exists: %w", err) + } + + _, err = insertStmt.Exec(encode(se.ID), se.SiafundOutput.Value, encodeSlice(se.MerkleProof), se.LeafIndex, encode(se.ClaimStart), addrRef.ID, indexID) + if err != nil { return fmt.Errorf("failed to execute statement: %w", err) + } else if exists { + // skip balance update if the element already exists + log.Debug("updated siafund element", zap.Stringer("id", se.ID), zap.Stringer("address", se.SiafundOutput.Address), zap.Uint64("value", se.SiafundOutput.Value)) + continue } balanceChanges[addrRef.ID] += se.SiafundOutput.Value } From 40374b2827d1771c56545b89e260138e8057d32a Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Wed, 1 May 2024 13:34:01 -0700 Subject: [PATCH 4/5] sqlite: update all existing state elements --- persist/sqlite/consensus.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/persist/sqlite/consensus.go b/persist/sqlite/consensus.go index 547a675..c529dd7 100644 --- a/persist/sqlite/consensus.go +++ b/persist/sqlite/consensus.go @@ -24,7 +24,7 @@ type addressRef struct { } func (ut *updateTx) SiacoinStateElements() ([]types.StateElement, error) { - const query = `SELECT id, leaf_index, merkle_proof FROM siacoin_elements WHERE spent_index_id IS NULL` + const query = `SELECT id, leaf_index, merkle_proof FROM siacoin_elements` rows, err := ut.tx.Query(query) if err != nil { return nil, fmt.Errorf("failed to query siacoin elements: %w", err) @@ -65,7 +65,7 @@ func (ut *updateTx) UpdateSiacoinStateElements(elements []types.StateElement) er } func (ut *updateTx) SiafundStateElements() ([]types.StateElement, error) { - const query = `SELECT id, leaf_index, merkle_proof FROM siafund_elements WHERE spent_index_id IS NULL` + const query = `SELECT id, leaf_index, merkle_proof FROM siafund_elements` rows, err := ut.tx.Query(query) if err != nil { return nil, fmt.Errorf("failed to query siacoin elements: %w", err) From ece0273fcf3034345b14fb20b83e2cd2bd42f2ae Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Wed, 1 May 2024 13:53:48 -0700 Subject: [PATCH 5/5] wallet: additional v2 tests --- wallet/wallet_test.go | 857 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 739 insertions(+), 118 deletions(-) diff --git a/wallet/wallet_test.go b/wallet/wallet_test.go index 0283abb..ceee944 100644 --- a/wallet/wallet_test.go +++ b/wallet/wallet_test.go @@ -614,124 +614,6 @@ func TestWalletAddresses(t *testing.T) { } } -func TestV2(t *testing.T) { - pk := types.GeneratePrivateKey() - addr := types.StandardUnlockHash(pk.PublicKey()) - - log := zaptest.NewLogger(t) - dir := t.TempDir() - db, err := sqlite.OpenDatabase(filepath.Join(dir, "walletd.sqlite3"), log.Named("sqlite3")) - if err != nil { - t.Fatal(err) - } - defer db.Close() - - bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db")) - if err != nil { - t.Fatal(err) - } - defer bdb.Close() - - network, genesisBlock := testV2Network(types.VoidAddress) // don't care about siafunds - - store, genesisState, err := chain.NewDBStore(bdb, network, genesisBlock) - if err != nil { - t.Fatal(err) - } - - cm := chain.NewManager(store, genesisState) - wm := wallet.NewManager(cm, db, log.Named("wallet")) - defer wm.Close() - - w, err := wm.AddWallet(wallet.Wallet{Name: "test"}) - if err != nil { - t.Fatal(err) - } else if err := wm.AddAddress(w.ID, wallet.Address{Address: addr}); err != nil { - t.Fatal(err) - } - - expectedPayout := cm.TipState().BlockReward() - // mine a block sending the payout to the wallet - if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, addr)}); err != nil { - t.Fatal(err) - } - waitForBlock(t, cm, db) - - // check that the payout was received - balance, err := db.AddressBalance(addr) - if err != nil { - t.Fatal(err) - } else if !balance.ImmatureSiacoins.Equals(expectedPayout) { - t.Fatalf("expected %v, got %v", expectedPayout, balance.ImmatureSiacoins) - } - - // check that a payout event was recorded - events, err := wm.Events(w.ID, 0, 100) - if err != nil { - t.Fatal(err) - } else if len(events) != 1 { - t.Fatalf("expected 1 event, got %v", len(events)) - } else if events[0].Data.EventType() != wallet.EventTypeMinerPayout { - t.Fatalf("expected payout event, got %v", events[0].Data.EventType()) - } - - // mine until the payout matures - maturityHeight := cm.TipState().MaturityHeight() + 1 - for i := cm.TipState().Index.Height; i < maturityHeight; i++ { - if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, types.VoidAddress)}); err != nil { - t.Fatal(err) - } - } - waitForBlock(t, cm, db) - - // create a v2 transaction that spends the matured payout - utxos, err := wm.UnspentSiacoinOutputs(w.ID, 0, 100) - if err != nil { - t.Fatal(err) - } - - sce := utxos[0] - policy := types.PolicyTypeUnlockConditions(types.StandardUnlockConditions(pk.PublicKey())) - txn := types.V2Transaction{ - SiacoinInputs: []types.V2SiacoinInput{{ - Parent: sce, - SatisfiedPolicy: types.SatisfiedPolicy{ - Policy: types.SpendPolicy{Type: policy}, - }, - }}, - SiacoinOutputs: []types.SiacoinOutput{ - {Address: types.VoidAddress, Value: sce.SiacoinOutput.Value.Sub(types.Siacoins(100))}, - {Address: addr, Value: types.Siacoins(100)}, - }, - } - txn.SiacoinInputs[0].SatisfiedPolicy.Signatures = []types.Signature{pk.SignHash(cm.TipState().InputSigHash(txn))} - - if err := cm.AddBlocks([]types.Block{mineV2Block(cm.TipState(), []types.V2Transaction{txn}, types.VoidAddress)}); err != nil { - t.Fatal(err) - } - waitForBlock(t, cm, db) - - // check that the change was received - balance, err = wm.AddressBalance(addr) - if err != nil { - t.Fatal(err) - } else if !balance.Siacoins.Equals(types.Siacoins(100)) { - t.Fatalf("expected %v, got %v", expectedPayout, balance.ImmatureSiacoins) - } - - // check that a transaction event was recorded - events, err = wm.Events(w.ID, 0, 100) - if err != nil { - t.Fatal(err) - } else if len(events) != 2 { - t.Fatalf("expected 2 events, got %v", len(events)) - } else if events[0].Data.EventType() != wallet.EventTypeTransaction { - t.Fatalf("expected transaction event, got %v", events[0].Data.EventType()) - } else if events[0].Relevant[0] != addr { - t.Fatalf("expected address %v, got %v", addr, events[0].Relevant[0]) - } -} - func TestScan(t *testing.T) { log := zaptest.NewLogger(t) dir := t.TempDir() @@ -1208,3 +1090,742 @@ func TestOrphans(t *testing.T) { t.Fatalf("expected %v, got %v", expectedPayout, utxos[0].SiacoinOutput.Value) } } + +func TestV2(t *testing.T) { + pk := types.GeneratePrivateKey() + addr := types.StandardUnlockHash(pk.PublicKey()) + + log := zaptest.NewLogger(t) + dir := t.TempDir() + db, err := sqlite.OpenDatabase(filepath.Join(dir, "walletd.sqlite3"), log.Named("sqlite3")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db")) + if err != nil { + t.Fatal(err) + } + defer bdb.Close() + + network, genesisBlock := testV2Network(types.VoidAddress) // don't care about siafunds + + store, genesisState, err := chain.NewDBStore(bdb, network, genesisBlock) + if err != nil { + t.Fatal(err) + } + + cm := chain.NewManager(store, genesisState) + wm := wallet.NewManager(cm, db, log.Named("wallet")) + defer wm.Close() + + w, err := wm.AddWallet(wallet.Wallet{Name: "test"}) + if err != nil { + t.Fatal(err) + } else if err := wm.AddAddress(w.ID, wallet.Address{Address: addr}); err != nil { + t.Fatal(err) + } + + expectedPayout := cm.TipState().BlockReward() + // mine a block sending the payout to the wallet + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, addr)}); err != nil { + t.Fatal(err) + } + waitForBlock(t, cm, db) + + // check that the payout was received + balance, err := db.AddressBalance(addr) + if err != nil { + t.Fatal(err) + } else if !balance.ImmatureSiacoins.Equals(expectedPayout) { + t.Fatalf("expected %v, got %v", expectedPayout, balance.ImmatureSiacoins) + } + + // check that a payout event was recorded + events, err := wm.Events(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(events) != 1 { + t.Fatalf("expected 1 event, got %v", len(events)) + } else if events[0].Data.EventType() != wallet.EventTypeMinerPayout { + t.Fatalf("expected payout event, got %v", events[0].Data.EventType()) + } + + // mine until the payout matures + maturityHeight := cm.TipState().MaturityHeight() + 1 + for i := cm.TipState().Index.Height; i < maturityHeight; i++ { + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + } + waitForBlock(t, cm, db) + + // create a v2 transaction that spends the matured payout + utxos, err := wm.UnspentSiacoinOutputs(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } + + sce := utxos[0] + policy := types.PolicyTypeUnlockConditions(types.StandardUnlockConditions(pk.PublicKey())) + txn := types.V2Transaction{ + SiacoinInputs: []types.V2SiacoinInput{{ + Parent: sce, + SatisfiedPolicy: types.SatisfiedPolicy{ + Policy: types.SpendPolicy{Type: policy}, + }, + }}, + SiacoinOutputs: []types.SiacoinOutput{ + {Address: types.VoidAddress, Value: sce.SiacoinOutput.Value.Sub(types.Siacoins(100))}, + {Address: addr, Value: types.Siacoins(100)}, + }, + } + txn.SiacoinInputs[0].SatisfiedPolicy.Signatures = []types.Signature{pk.SignHash(cm.TipState().InputSigHash(txn))} + + if err := cm.AddBlocks([]types.Block{mineV2Block(cm.TipState(), []types.V2Transaction{txn}, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + waitForBlock(t, cm, db) + + // check that the change was received + balance, err = wm.AddressBalance(addr) + if err != nil { + t.Fatal(err) + } else if !balance.Siacoins.Equals(types.Siacoins(100)) { + t.Fatalf("expected %v, got %v", expectedPayout, balance.ImmatureSiacoins) + } + + // check that a transaction event was recorded + events, err = wm.Events(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(events) != 2 { + t.Fatalf("expected 2 events, got %v", len(events)) + } else if events[0].Data.EventType() != wallet.EventTypeTransaction { + t.Fatalf("expected transaction event, got %v", events[0].Data.EventType()) + } else if events[0].Relevant[0] != addr { + t.Fatalf("expected address %v, got %v", addr, events[0].Relevant[0]) + } +} + +func TestScanV2(t *testing.T) { + log := zaptest.NewLogger(t) + dir := t.TempDir() + db, err := sqlite.OpenDatabase(filepath.Join(dir, "walletd.sqlite3"), log.Named("sqlite3")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db")) + if err != nil { + t.Fatal(err) + } + defer bdb.Close() + + // mine a single payout to the wallet + pk := types.GeneratePrivateKey() + addr := types.StandardUnlockHash(pk.PublicKey()) + + network, genesisBlock := testV2Network(addr) + store, genesisState, err := chain.NewDBStore(bdb, network, genesisBlock) + if err != nil { + t.Fatal(err) + } + + cm := chain.NewManager(store, genesisState) + + wm := wallet.NewManager(cm, db, log.Named("wallet")) + defer wm.Close() + + pk2 := types.GeneratePrivateKey() + addr2 := types.StandardUnlockHash(pk2.PublicKey()) + + // create a wallet with no addresses + w, err := wm.AddWallet(wallet.Wallet{Name: "test"}) + if err != nil { + t.Fatal(err) + } + + // add the address to the wallet + if err := wm.AddAddress(w.ID, wallet.Address{Address: addr}); err != nil { + t.Fatal(err) + } + // rescan to get the genesis Siafund state + if err := wm.Scan(context.Background(), types.ChainIndex{}); err != nil { + t.Fatal(err) + } + + checkBalance := func(siacoin, immature types.Currency) error { + waitForBlock(t, cm, db) + + // note: the siafund balance is currently hardcoded to the number of + // siafunds in genesis. If we ever modify this test to also spend + // siafunds, this will need to be updated. + b, err := wm.WalletBalance(w.ID) + if err != nil { + return fmt.Errorf("failed to check balance: %w", err) + } else if !b.Siacoins.Equals(siacoin) { + return fmt.Errorf("expected siacoin balance %v, got %v", siacoin, b.Siacoins) + } else if !b.ImmatureSiacoins.Equals(immature) { + return fmt.Errorf("expected immature siacoin balance %v, got %v", immature, b.ImmatureSiacoins) + } else if b.Siafunds != network.GenesisState().SiafundCount() { + return fmt.Errorf("expected siafund balance %v, got %v", network.GenesisState().SiafundCount(), b.Siafunds) + } + return nil + } + + // check that the wallet has no balance + if err := checkBalance(types.ZeroCurrency, types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + expectedBalance1 := cm.TipState().BlockReward() + // mine a block to fund the first address + if err := cm.AddBlocks([]types.Block{testutil.MineBlock(cm, addr)}); err != nil { + t.Fatal(err) + } + + // mine a block to fund the second address + expectedBalance2 := cm.TipState().BlockReward() + if err := cm.AddBlocks([]types.Block{testutil.MineBlock(cm, addr2)}); err != nil { + t.Fatal(err) + } + + // check that the wallet has one immature payout + if err := checkBalance(types.ZeroCurrency, expectedBalance1); err != nil { + t.Fatal(err) + } + + // mine until the first payout matures + for i := cm.Tip().Height; i < genesisState.MaturityHeight(); i++ { + if err := cm.AddBlocks([]types.Block{testutil.MineBlock(cm, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + } + + // check that the wallet balance has matured + if err := checkBalance(expectedBalance1, types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // scan for changes + if err := wm.Scan(context.Background(), types.ChainIndex{}); err != nil { + t.Fatal(err) + } + + // check that the wallet balance did not change + if err := checkBalance(expectedBalance1, types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // add the second address to the wallet + if err := wm.AddAddress(w.ID, wallet.Address{Address: addr2}); err != nil { + t.Fatal(err) + } else if err := checkBalance(expectedBalance1, types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // scan for changes + if err := wm.Scan(context.Background(), types.ChainIndex{}); err != nil { + t.Fatal(err) + } + + if err := checkBalance(expectedBalance1, expectedBalance2); err != nil { + t.Fatal(err) + } + + // mine a block to mature the second payout + if err := cm.AddBlocks([]types.Block{testutil.MineBlock(cm, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + + // check that the wallet balance has matured + if err := checkBalance(expectedBalance1.Add(expectedBalance2), types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // sanity check + if err := wm.Scan(context.Background(), types.ChainIndex{}); err != nil { + t.Fatal(err) + } + + // check that the wallet balance has matured + if err := checkBalance(expectedBalance1.Add(expectedBalance2), types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + utxos, err := wm.AddressSiacoinOutputs(addr, 0, 100) + if err != nil { + t.Fatal(err) + } + + // spend the payout + sce := utxos[0] + policy := types.PolicyTypeUnlockConditions(types.StandardUnlockConditions(pk.PublicKey())) + txn := types.V2Transaction{ + SiacoinInputs: []types.V2SiacoinInput{{ + Parent: sce, + SatisfiedPolicy: types.SatisfiedPolicy{ + Policy: types.SpendPolicy{Type: policy}, + }, + }}, + SiacoinOutputs: []types.SiacoinOutput{ + {Address: types.VoidAddress, Value: sce.SiacoinOutput.Value}, + }, + } + txn.SiacoinInputs[0].SatisfiedPolicy.Signatures = []types.Signature{pk.SignHash(cm.TipState().InputSigHash(txn))} + + if err := cm.AddBlocks([]types.Block{mineV2Block(cm.TipState(), []types.V2Transaction{txn}, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + waitForBlock(t, cm, db) + + // check that the first address has a balance of zero + if err := checkBalance(expectedBalance2, types.ZeroCurrency); err != nil { + t.Fatal(err) + } +} + +func TestReorgV2(t *testing.T) { + pk := types.GeneratePrivateKey() + addr := types.StandardUnlockHash(pk.PublicKey()) + + log := zaptest.NewLogger(t) + dir := t.TempDir() + db, err := sqlite.OpenDatabase(filepath.Join(dir, "walletd.sqlite3"), log.Named("sqlite3")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db")) + if err != nil { + t.Fatal(err) + } + defer bdb.Close() + + network, genesisBlock := testV2Network(types.VoidAddress) // don't care about siafunds + + store, genesisState, err := chain.NewDBStore(bdb, network, genesisBlock) + if err != nil { + t.Fatal(err) + } + cm := chain.NewManager(store, genesisState) + + wm := wallet.NewManager(cm, db, log.Named("wallet")) + defer wm.Close() + + w, err := wm.AddWallet(wallet.Wallet{Name: "test"}) + if err != nil { + t.Fatal(err) + } else if err := wm.AddAddress(w.ID, wallet.Address{Address: addr}); err != nil { + t.Fatal(err) + } + + expectedPayout := cm.TipState().BlockReward() + maturityHeight := cm.TipState().MaturityHeight() + // mine a block sending the payout to the wallet + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, addr)}); err != nil { + t.Fatal(err) + } + waitForBlock(t, cm, db) + + assertBalance := func(siacoin, immature types.Currency) error { + b, err := wm.WalletBalance(w.ID) + if err != nil { + return fmt.Errorf("failed to check balance: %w", err) + } else if !b.Siacoins.Equals(siacoin) { + return fmt.Errorf("expected siacoin balance %v, got %v", siacoin, b.Siacoins) + } else if !b.ImmatureSiacoins.Equals(immature) { + return fmt.Errorf("expected immature siacoin balance %v, got %v", immature, b.ImmatureSiacoins) + } + return nil + } + + if err := assertBalance(types.ZeroCurrency, expectedPayout); err != nil { + t.Fatal(err) + } + + // check that a payout event was recorded + events, err := wm.Events(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(events) != 1 { + t.Fatalf("expected 1 event, got %v", len(events)) + } else if events[0].Data.EventType() != wallet.EventTypeMinerPayout { + t.Fatalf("expected payout event, got %v", events[0].Data.EventType()) + } + + // check that the utxo was created + utxos, err := wm.UnspentSiacoinOutputs(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(utxos) != 1 { + t.Fatalf("expected 1 output, got %v", len(utxos)) + } else if utxos[0].SiacoinOutput.Value.Cmp(expectedPayout) != 0 { + t.Fatalf("expected %v, got %v", expectedPayout, utxos[0].SiacoinOutput.Value) + } else if utxos[0].MaturityHeight != maturityHeight { + t.Fatalf("expected %v, got %v", maturityHeight, utxos[0].MaturityHeight) + } + + // mine to trigger a reorg + var blocks []types.Block + state := genesisState + for i := 0; i < 10; i++ { + block := mineBlock(state, nil, types.VoidAddress) + blocks = append(blocks, block) + state.Index.ID = block.ID() + state.Index.Height++ + } + if err := cm.AddBlocks(blocks); err != nil { + t.Fatal(err) + } + waitForBlock(t, cm, db) + + // check that the balance was reverted + if err := assertBalance(types.ZeroCurrency, types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // check that the payout event was reverted + events, err = wm.Events(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(events) != 0 { + t.Fatalf("expected 0 events, got %v", len(events)) + } + + // check that the utxo was removed + utxos, err = wm.UnspentSiacoinOutputs(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(utxos) != 0 { + t.Fatalf("expected 0 outputs, got %v", len(utxos)) + } + + // mine a new payout + expectedPayout = cm.TipState().BlockReward() + maturityHeight = cm.TipState().MaturityHeight() + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, addr)}); err != nil { + t.Fatal(err) + } + waitForBlock(t, cm, db) + + // check that the payout was received + if err := assertBalance(types.ZeroCurrency, expectedPayout); err != nil { + t.Fatal(err) + } + + // check that a payout event was recorded + events, err = wm.Events(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(events) != 1 { + t.Fatalf("expected 1 event, got %v", len(events)) + } else if events[0].Data.EventType() != wallet.EventTypeMinerPayout { + t.Fatalf("expected payout event, got %v", events[0].Data.EventType()) + } + + // check that the utxo was created + utxos, err = wm.UnspentSiacoinOutputs(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(utxos) != 1 { + t.Fatalf("expected 1 output, got %v", len(utxos)) + } else if utxos[0].SiacoinOutput.Value.Cmp(expectedPayout) != 0 { + t.Fatalf("expected %v, got %v", expectedPayout, utxos[0].SiacoinOutput.Value) + } else if utxos[0].MaturityHeight != maturityHeight { + t.Fatalf("expected %v, got %v", maturityHeight, utxos[0].MaturityHeight) + } + + // mine until the payout matures + var prevState consensus.State + for i := cm.TipState().Index.Height; i < maturityHeight+1; i++ { + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + if i == maturityHeight-5 { + prevState = cm.TipState() + } + } + waitForBlock(t, cm, db) + + // check that the balance was updated + if err := assertBalance(expectedPayout, types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // reorg the last few blocks to re-mature the payout + blocks = nil + state = prevState + for i := 0; i < 10; i++ { + blocks = append(blocks, mineBlock(state, nil, types.VoidAddress)) + state.Index.ID = blocks[len(blocks)-1].ID() + state.Index.Height++ + } + if err := cm.AddBlocks(blocks); err != nil { + t.Fatal(err) + } + waitForBlock(t, cm, db) + + // check that the balance is correct + if err := assertBalance(expectedPayout, types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // check that only the single utxo still exists + utxos, err = wm.UnspentSiacoinOutputs(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(utxos) != 1 { + t.Fatalf("expected 1 output, got %v", len(utxos)) + } else if utxos[0].SiacoinOutput.Value.Cmp(expectedPayout) != 0 { + t.Fatalf("expected %v, got %v", expectedPayout, utxos[0].SiacoinOutput.Value) + } else if utxos[0].MaturityHeight != maturityHeight { + t.Fatalf("expected %v, got %v", maturityHeight, utxos[0].MaturityHeight) + } + + // spend the payout + sce := utxos[0] + policy := types.PolicyTypeUnlockConditions(types.StandardUnlockConditions(pk.PublicKey())) + txn := types.V2Transaction{ + SiacoinInputs: []types.V2SiacoinInput{{ + Parent: sce, + SatisfiedPolicy: types.SatisfiedPolicy{ + Policy: types.SpendPolicy{Type: policy}, + }, + }}, + SiacoinOutputs: []types.SiacoinOutput{ + {Address: types.VoidAddress, Value: sce.SiacoinOutput.Value}, + }, + } + txn.SiacoinInputs[0].SatisfiedPolicy.Signatures = []types.Signature{pk.SignHash(cm.TipState().InputSigHash(txn))} + + if err := cm.AddBlocks([]types.Block{mineV2Block(cm.TipState(), []types.V2Transaction{txn}, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + waitForBlock(t, cm, db) + + // check that the balance is correct + if err := assertBalance(types.ZeroCurrency, types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // check that all UTXOs have been spent + utxos, err = wm.UnspentSiacoinOutputs(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(utxos) != 0 { + t.Fatalf("expected 0 output, got %v", len(utxos)) + } +} + +func TestOrphansV2(t *testing.T) { + pk := types.GeneratePrivateKey() + addr := types.StandardUnlockHash(pk.PublicKey()) + + log := zaptest.NewLogger(t) + dir := t.TempDir() + db, err := sqlite.OpenDatabase(filepath.Join(dir, "walletd.sqlite3"), log.Named("sqlite3")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db")) + if err != nil { + t.Fatal(err) + } + defer bdb.Close() + + network, genesisBlock := testV2Network(types.VoidAddress) // don't care about siafunds + store, genesisState, err := chain.NewDBStore(bdb, network, genesisBlock) + if err != nil { + t.Fatal(err) + } + cm := chain.NewManager(store, genesisState) + + wm := wallet.NewManager(cm, db, log.Named("wallet")) + defer wm.Close() + + w, err := wm.AddWallet(wallet.Wallet{Name: "test"}) + if err != nil { + t.Fatal(err) + } else if err := wm.AddAddress(w.ID, wallet.Address{Address: addr}); err != nil { + t.Fatal(err) + } + + expectedPayout := cm.TipState().BlockReward() + maturityHeight := cm.TipState().MaturityHeight() + // mine a block sending the payout to the wallet + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, addr)}); err != nil { + t.Fatal(err) + } + + // mine until the maturity height + for i := cm.TipState().Index.Height; i < maturityHeight+1; i++ { + if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + } + waitForBlock(t, cm, db) + + assertBalance := func(siacoin, immature types.Currency) error { + b, err := wm.WalletBalance(w.ID) + if err != nil { + return fmt.Errorf("failed to check balance: %w", err) + } else if !b.ImmatureSiacoins.Equals(immature) { + return fmt.Errorf("expected immature siacoin balance %v, got %v", immature, b.ImmatureSiacoins) + } else if !b.Siacoins.Equals(siacoin) { + return fmt.Errorf("expected siacoin balance %v, got %v", siacoin, b.Siacoins) + } + return nil + } + + if err := assertBalance(expectedPayout, types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // check that a payout event was recorded + events, err := wm.Events(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(events) != 1 { + t.Fatalf("expected 1 event, got %v", len(events)) + } else if events[0].Data.EventType() != wallet.EventTypeMinerPayout { + t.Fatalf("expected payout event, got %v", events[0].Data.EventType()) + } + + // check that the utxo was created + utxos, err := wm.UnspentSiacoinOutputs(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(utxos) != 1 { + t.Fatalf("expected 1 output, got %v", len(utxos)) + } else if utxos[0].SiacoinOutput.Value.Cmp(expectedPayout) != 0 { + t.Fatalf("expected %v, got %v", expectedPayout, utxos[0].SiacoinOutput.Value) + } else if utxos[0].MaturityHeight != maturityHeight { + t.Fatalf("expected %v, got %v", maturityHeight, utxos[0].MaturityHeight) + } + + resetState := cm.TipState() + + // send a transaction that will be orphaned + sce := utxos[0] + policy := types.PolicyTypeUnlockConditions(types.StandardUnlockConditions(pk.PublicKey())) + txn := types.V2Transaction{ + SiacoinInputs: []types.V2SiacoinInput{{ + Parent: sce, + SatisfiedPolicy: types.SatisfiedPolicy{ + Policy: types.SpendPolicy{Type: policy}, + }, + }}, + SiacoinOutputs: []types.SiacoinOutput{ + {Address: types.VoidAddress, Value: expectedPayout.Div64(2)}, // send the other half to the void + {Address: addr, Value: expectedPayout.Div64(2)}, // send half the payout back to the wallet + }, + } + txn.SiacoinInputs[0].SatisfiedPolicy.Signatures = []types.Signature{pk.SignHash(cm.TipState().InputSigHash(txn))} + + // broadcast the transaction + if err := cm.AddBlocks([]types.Block{mineV2Block(cm.TipState(), []types.V2Transaction{txn}, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + waitForBlock(t, cm, db) + + if err := assertBalance(expectedPayout.Div64(2), types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // check that the transaction event was recorded + events, err = wm.Events(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(events) != 2 { + t.Fatalf("expected 2 events, got %v", len(events)) + } + + // simulate an interrupted rescan by closing the wallet manager, resetting the + // last scan index, and initializing a new wallet manager. + if err := wm.Close(); err != nil { + t.Fatal(err) + } else if err := db.ResetLastIndex(); err != nil { + t.Fatal(err) + } + + // mine to trigger a reorg. The underlying store must properly revert the + // orphaned blocks that will not be cleanly reverted since the rescan was + // interrupted. + var blocks []types.Block + state := resetState + for i := 0; i < 5; i++ { + blocks = append(blocks, mineBlock(state, nil, types.VoidAddress)) + state.Index.ID = blocks[len(blocks)-1].ID() + state.Index.Height++ + } + if err := cm.AddBlocks(blocks); err != nil { + t.Fatal(err) + } + + wm = wallet.NewManager(cm, db, log.Named("wallet")) + defer wm.Close() + + waitForBlock(t, cm, db) + + // check that the transaction was reverted + if err := assertBalance(expectedPayout, types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // check that the transaction event was reverted + events, err = wm.Events(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(events) != 1 { + t.Fatalf("expected 1 event, got %v", len(events)) + } + + // check that the utxo was reverted + utxos, err = wm.UnspentSiacoinOutputs(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(utxos) != 1 { + t.Fatalf("expected 1 output, got %v", len(utxos)) + } else if !utxos[0].SiacoinOutput.Value.Equals(expectedPayout) { + t.Fatalf("expected %v, got %v", expectedPayout, utxos[0].SiacoinOutput.Value) + } + + // spend the payout + txn = types.V2Transaction{ + SiacoinInputs: []types.V2SiacoinInput{{ + Parent: sce, + SatisfiedPolicy: types.SatisfiedPolicy{ + Policy: types.SpendPolicy{Type: policy}, + }, + }}, + SiacoinOutputs: []types.SiacoinOutput{ + {Address: types.VoidAddress, Value: sce.SiacoinOutput.Value}, + }, + } + txn.SiacoinInputs[0].SatisfiedPolicy.Signatures = []types.Signature{pk.SignHash(cm.TipState().InputSigHash(txn))} + + if err := cm.AddBlocks([]types.Block{mineV2Block(cm.TipState(), []types.V2Transaction{txn}, types.VoidAddress)}); err != nil { + t.Fatal(err) + } + waitForBlock(t, cm, db) + + // check that the balance is correct + if err := assertBalance(types.ZeroCurrency, types.ZeroCurrency); err != nil { + t.Fatal(err) + } + + // check that all UTXOs have been spent + utxos, err = wm.UnspentSiacoinOutputs(w.ID, 0, 100) + if err != nil { + t.Fatal(err) + } else if len(utxos) != 0 { + t.Fatalf("expected 0 output, got %v", len(utxos)) + } +}