diff --git a/host/contracts/manager_test.go b/host/contracts/manager_test.go index acf41471..093eb5c8 100644 --- a/host/contracts/manager_test.go +++ b/host/contracts/manager_test.go @@ -772,6 +772,7 @@ func TestContractLifecycle(t *testing.T) { } defer node.Close() + cm := node.ChainManager() webhookReporter, err := webhooks.NewManager(node.Store(), log.Named("webhooks")) if err != nil { t.Fatal(err) @@ -797,11 +798,19 @@ func TestContractLifecycle(t *testing.T) { } defer c.Close() + // waitForScan is a helper func to wait for the contract manager + // to catch up with chain manager + waitForScan := func() { + for cm.TipState().Index.Height != c.ScanHeight() { + time.Sleep(100 * time.Millisecond) + } + } + // note: many more blocks than necessary are mined to ensure all forks have activated if err := node.MineBlocks(node.Address(), int(stypes.MaturityDelay*4)); err != nil { t.Fatal(err) } - time.Sleep(100 * time.Millisecond) // sync time + waitForScan() renterFunds := types.Siacoins(500) hostCollateral := types.Siacoins(1000) @@ -828,7 +837,7 @@ func TestContractLifecycle(t *testing.T) { if err := node.MineBlocks(types.VoidAddress, 1); err != nil { t.Fatal(err) } - time.Sleep(100 * time.Millisecond) // sync time + waitForScan() contract, err = c.Contract(rev.Revision.ParentID) if err != nil { @@ -851,7 +860,7 @@ func TestContractLifecycle(t *testing.T) { var roots []types.Hash256 for i := 0; i < 5; i++ { var sector [rhp2.SectorSize]byte - frand.Read(sector[:256]) + frand.Read(sector[:]) root := rhp2.SectorRoot(§or) release, err := s.Write(root, §or) if err != nil { @@ -914,12 +923,13 @@ func TestContractLifecycle(t *testing.T) { if err := node.MineBlocks(types.VoidAddress, int(remainingBlocks)); err != nil { t.Fatal(err) } - time.Sleep(100 * time.Millisecond) // sync time + waitForScan() + // confirm the revision if err := node.MineBlocks(types.VoidAddress, 1); err != nil { t.Fatal(err) } - time.Sleep(100 * time.Millisecond) // sync time + waitForScan() contract, err = c.Contract(rev.Revision.ParentID) if err != nil { @@ -935,7 +945,7 @@ func TestContractLifecycle(t *testing.T) { if err := node.MineBlocks(types.VoidAddress, int(remainingBlocks)); err != nil { t.Fatal(err) } - time.Sleep(time.Second) // sync time + waitForScan() // check that the contract is still active contract, err = c.Contract(rev.Revision.ParentID) @@ -971,7 +981,7 @@ func TestContractLifecycle(t *testing.T) { if err != nil { t.Fatal(err) } else if contract.Status != contracts.ContractStatusFailed { - t.Fatal("expected contract to be successful") + t.Fatalf("expected contract to be failed, got %q", contract.Status) } else if contract.ResolutionHeight != 0 { t.Fatalf("expected resolution height %v, got %v", 0, contract.ResolutionHeight) } else if m, err := node.Store().Metrics(time.Now()); err != nil { diff --git a/host/storage/storage.go b/host/storage/storage.go index 94bf8abf..e603810a 100644 --- a/host/storage/storage.go +++ b/host/storage/storage.go @@ -225,7 +225,7 @@ func (vm *VolumeManager) growVolume(ctx context.Context, id int64, volume *volum // truncate the file and add the indices to the volume store. resize is // done in chunks to prevent holding a lock for too long and to allow // progress tracking. - if err := volume.Resize(current, target); err != nil { + if err := volume.Resize(target); err != nil { return fmt.Errorf("failed to expand volume data: %w", err) } else if err := vm.vs.GrowVolume(id, target); err != nil { return fmt.Errorf("failed to expand volume metadata: %w", err) @@ -305,7 +305,7 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol if err := vm.vs.ShrinkVolume(id, target); err != nil { return fmt.Errorf("failed to shrink volume metadata: %w", err) - } else if err := volume.Resize(current, target); err != nil { + } else if err := volume.Resize(target); err != nil { return fmt.Errorf("failed to shrink volume data to %v sectors: %w", current, err) } diff --git a/host/storage/storage_test.go b/host/storage/storage_test.go index 1d83ddb7..671a6a2d 100644 --- a/host/storage/storage_test.go +++ b/host/storage/storage_test.go @@ -757,146 +757,6 @@ func TestRemoveMissing(t *testing.T) { } } -func TestVolumeDistribution(t *testing.T) { - const initialSectors = 10 - dir := t.TempDir() - - // create the database - log := zaptest.NewLogger(t) - db, err := sqlite.OpenDatabase(filepath.Join(dir, "hostd.db"), log.Named("sqlite")) - if err != nil { - t.Fatal(err) - } - defer db.Close() - - g, err := gateway.New(":0", false, filepath.Join(dir, "gateway")) - if err != nil { - t.Fatal(err) - } - defer g.Close() - - cs, errCh := consensus.New(g, false, filepath.Join(dir, "consensus")) - select { - case err := <-errCh: - if err != nil { - t.Fatal(err) - } - default: - } - cm, err := chain.NewManager(cs) - if err != nil { - t.Fatal(err) - } - defer cm.Close() - defer cm.Close() - - // initialize the storage manager - webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) - if err != nil { - t.Fatal(err) - } - - am := alerts.NewManager(webhookReporter, log.Named("alerts")) - vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), 0) - if err != nil { - t.Fatal(err) - } - defer vm.Close() - - volumeIDs := make([]int64, 5) - volumeDir := t.TempDir() - for i := range volumeIDs { - result := make(chan error, 1) - // add a few volumes - vol, err := vm.AddVolume(context.Background(), filepath.Join(volumeDir, fmt.Sprintf("vol%d.dat", i)), initialSectors, result) - if err != nil { - t.Fatal(err) - } else if err := <-result; err != nil { - t.Fatal(err) - } - volumeIDs[i] = vol.ID - } - - // helper func to check that both volumes have the correct number of sectors - checkSectorDistribution := func(vals ...uint64) error { - if len(vals) > len(volumeIDs) { - panic("not enough volumes") - } - for i, id := range volumeIDs { - stat, err := vm.Volume(id) - if err != nil { - return fmt.Errorf("failed to check volume %d: %w", volumeIDs[i], err) - } - var value uint64 - if len(vals) > i { - value = vals[i] - } - - if stat.UsedSectors != value { - return fmt.Errorf("volume %d: expected %d sectors, got %d", volumeIDs[i], value, stat.UsedSectors) - } - } - return nil - } - - writeSector := func() error { - var sector [rhp2.SectorSize]byte - frand.Read(sector[:1024]) - root := rhp2.SectorRoot(§or) - - _, err := vm.Write(root, §or) - if err != nil { - return fmt.Errorf("failed to write sector: %w", err) - } - return nil - } - - // write the first sector to the first volume - if err := writeSector(); err != nil { - t.Fatal(err) - } else if err := checkSectorDistribution(1); err != nil { - t.Fatal(err) - } - - // write a sector to the sector volume - if err := writeSector(); err != nil { - t.Fatal(err) - } else if err := checkSectorDistribution(1, 1); err != nil { - t.Fatal(err) - } - - expectedSectors := make([]uint64, len(volumeIDs)) - // fill in the already written sectors - expectedSectors[0] = 1 - expectedSectors[1] = 1 - // fill the volumes - for i := 2; i < initialSectors*len(volumeIDs); i++ { - // write a random sector - if err := writeSector(); err != nil { - t.Fatal(err) - } - // increment the counter - expectedSectors[i%len(volumeIDs)]++ - // check the distribution - if err := checkSectorDistribution(expectedSectors...); err != nil { - t.Fatal(err) - } - } - - volumes, err := vm.Volumes() - if err != nil { - t.Fatal(err) - } - if len(volumes) != len(volumeIDs) { - t.Fatal("unexpected number of volumes") - } - for i, v := range volumes { - if v.TotalSectors != v.UsedSectors { - t.Fatalf("volume %d should be full", i) - } - } -} - func TestVolumeConcurrency(t *testing.T) { t.Skip("This test is flaky and needs to be fixed") diff --git a/host/storage/volume.go b/host/storage/volume.go index 25cd8d31..ddea99d3 100644 --- a/host/storage/volume.go +++ b/host/storage/volume.go @@ -9,7 +9,6 @@ import ( rhp2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" - "lukechampine.com/frand" ) type ( @@ -209,32 +208,15 @@ func (v *volume) Sync() error { return err } -func (v *volume) Resize(oldSectors, newSectors uint64) error { +// Resize resizes the volume to the new number of sectors +func (v *volume) Resize(newSectors uint64) error { + v.mu.Lock() + defer v.mu.Unlock() + if v.data == nil { return ErrVolumeNotAvailable } - - if newSectors > oldSectors { - size := (newSectors - oldSectors) * rhp2.SectorSize // should never be more than 256 MiB - buf := make([]byte, size) - _, _ = frand.Read(buf) // frand will never return an error - - v.mu.Lock() - defer v.mu.Unlock() - - // write the data to the end of the file - if _, err := v.data.WriteAt(buf, int64(oldSectors*rhp2.SectorSize)); err != nil { - return err - } - } else { - v.mu.Lock() - defer v.mu.Unlock() - - if err := v.data.Truncate(int64(newSectors * rhp2.SectorSize)); err != nil { - return err - } - } - return nil + return v.data.Truncate(int64(newSectors * rhp2.SectorSize)) } func (v *volume) Stats() VolumeStats { diff --git a/persist/sqlite/init.sql b/persist/sqlite/init.sql index 53e22486..f52e52d4 100644 --- a/persist/sqlite/init.sql +++ b/persist/sqlite/init.sql @@ -55,10 +55,10 @@ CREATE TABLE volume_sectors ( volume_id INTEGER NOT NULL REFERENCES storage_volumes (id), -- all sectors will need to be migrated first when deleting a volume volume_index INTEGER NOT NULL, sector_id INTEGER UNIQUE REFERENCES stored_sectors (id), + sector_writes INTEGER NOT NULL DEFAULT 0, UNIQUE (volume_id, volume_index) ); -CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_compound ON volume_sectors(volume_id, sector_id, volume_index) WHERE sector_id IS NULL; -CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_set_compound ON volume_sectors (volume_id, sector_id, volume_index) WHERE sector_id IS NOT NULL; +CREATE INDEX volume_sectors_sector_writes_volume_id_sector_id_volume_index_compound ON volume_sectors(sector_writes ASC, volume_id, sector_id, volume_index) WHERE sector_id IS NULL; CREATE INDEX volume_sectors_volume_id_sector_id ON volume_sectors(volume_id, sector_id); CREATE INDEX volume_sectors_volume_id ON volume_sectors(volume_id); CREATE INDEX volume_sectors_volume_index ON volume_sectors(volume_index ASC); diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 5869d5ef..69149e9e 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -10,6 +10,16 @@ import ( "go.uber.org/zap" ) +// migrateVersion27 adds the sector_writes column to the volume_sectors table to +// more evenly distribute sector writes across disks. +func migrateVersion27(tx txn, _ *zap.Logger) error { + _, err := tx.Exec(`ALTER TABLE volume_sectors ADD COLUMN sector_writes INTEGER NOT NULL DEFAULT 0; +DROP INDEX volume_sectors_volume_id_sector_id_volume_index_compound; +DROP INDEX volume_sectors_volume_id_sector_id_volume_index_set_compound; +CREATE INDEX volume_sectors_sector_writes_volume_id_sector_id_volume_index_compound ON volume_sectors(sector_writes ASC, volume_id, sector_id, volume_index) WHERE sector_id IS NULL;`) + return err +} + // migrateVersion26 creates the host_pinned_settings table. func migrateVersion26(tx txn, _ *zap.Logger) error { _, err := tx.Exec(`CREATE TABLE host_pinned_settings ( @@ -753,4 +763,5 @@ var migrations = []func(tx txn, log *zap.Logger) error{ migrateVersion24, migrateVersion25, migrateVersion26, + migrateVersion27, } diff --git a/persist/sqlite/volumes.go b/persist/sqlite/volumes.go index 1c6fbeb3..c0fbf63e 100644 --- a/persist/sqlite/volumes.go +++ b/persist/sqlite/volumes.go @@ -569,63 +569,46 @@ WHERE v.sector_id=$1` return } -// emptyLocationInVolume returns an empty location in the given volume. If -// there is no space available, ErrNotEnoughStorage is returned. -func emptyLocationInVolume(tx txn, volumeID int64) (loc storage.SectorLocation, err error) { +// emptyLocation returns an empty location in a writable volume. If there is no +// space available, ErrNotEnoughStorage is returned. +func emptyLocation(tx txn) (loc storage.SectorLocation, err error) { const query = `SELECT vs.id, vs.volume_id, vs.volume_index -FROM volume_sectors vs INDEXED BY volume_sectors_volume_id_sector_id_volume_index_compound -LEFT JOIN locked_volume_sectors lvs ON (lvs.volume_sector_id=vs.id) -WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1 -LIMIT 1;` - err = tx.QueryRow(query, volumeID).Scan(&loc.ID, &loc.Volume, &loc.Index) + FROM volume_sectors vs INDEXED BY volume_sectors_sector_writes_volume_id_sector_id_volume_index_compound + LEFT JOIN locked_volume_sectors lvs ON (lvs.volume_sector_id=vs.id) + INNER JOIN storage_volumes sv ON (sv.id=vs.volume_id) + WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND sv.available=true AND sv.read_only=false + ORDER BY vs.sector_writes ASC + LIMIT 1;` + err = tx.QueryRow(query).Scan(&loc.ID, &loc.Volume, &loc.Index) if errors.Is(err, sql.ErrNoRows) { err = storage.ErrNotEnoughStorage + return + } else if err != nil { + return } + _, err = tx.Exec(`UPDATE volume_sectors SET sector_writes=sector_writes+1 WHERE id=$1`, loc.ID) return } -// emptyLocation returns an empty location in a writable volume. If there is no +// emptyLocationForMigration returns an empty location in a writable volume. If there is no // space available, ErrNotEnoughStorage is returned. -func emptyLocation(tx txn) (storage.SectorLocation, error) { - const query = `SELECT id -FROM storage_volumes -WHERE available=true AND read_only=false AND total_sectors-used_sectors > 0 -ORDER BY used_sectors ASC LIMIT 1;` - var volumeID int64 - err := tx.QueryRow(query).Scan(&volumeID) - if errors.Is(err, sql.ErrNoRows) { - return storage.SectorLocation{}, storage.ErrNotEnoughStorage - } else if err != nil { - return storage.SectorLocation{}, fmt.Errorf("failed to get empty location: %w", err) - } - - // note: there is a slight race here where all sectors in a volume could be - // locked, but not committed. This is unlikely to happen in practice, and - // the worst case is that the host fails to store a sector. The performance - // benefits of choosing a volume first far outweigh the downsides. - return emptyLocationInVolume(tx, volumeID) -} - -// emptyLocationForMigration returns an empty location in a writable volume -// other than the given volumeID. If there is no space available, -// ErrNotEnoughStorage is returned. -func emptyLocationForMigration(tx txn, oldVolumeID int64) (loc storage.SectorLocation, err error) { - const query = `SELECT id FROM storage_volumes -WHERE available=true AND read_only=false AND total_sectors-used_sectors > 0 AND id<>$1 -ORDER BY used_sectors ASC LIMIT 1;` - var newVolumeID int64 - err = tx.QueryRow(query, oldVolumeID).Scan(&newVolumeID) +func emptyLocationForMigration(tx txn, volumeID int64) (loc storage.SectorLocation, err error) { + const query = `SELECT vs.id, vs.volume_id, vs.volume_index + FROM volume_sectors vs INDEXED BY volume_sectors_sector_writes_volume_id_sector_id_volume_index_compound + LEFT JOIN locked_volume_sectors lvs ON (lvs.volume_sector_id=vs.id) + INNER JOIN storage_volumes sv ON (sv.id=vs.volume_id) + WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND sv.available=true AND sv.read_only=false AND vs.volume_id <> $1 + ORDER BY vs.sector_writes ASC + LIMIT 1;` + err = tx.QueryRow(query, volumeID).Scan(&loc.ID, &loc.Volume, &loc.Index) if errors.Is(err, sql.ErrNoRows) { - return storage.SectorLocation{}, storage.ErrNotEnoughStorage + err = storage.ErrNotEnoughStorage + return } else if err != nil { - return storage.SectorLocation{}, fmt.Errorf("failed to get empty location: %w", err) + return } - - // note: there is a slight race here where all sectors in a volume could be - // locked, but not committed. This is unlikely to happen in practice, and - // the worst case is that the host fails to store a sector. The performance - // benefits of choosing a volume first far outweigh the downsides. - return emptyLocationInVolume(tx, newVolumeID) + _, err = tx.Exec(`UPDATE volume_sectors SET sector_writes=sector_writes+1 WHERE id=$1`, loc.ID) + return } // sectorForMigration returns the location of the first occupied sector in the diff --git a/persist/sqlite/volumes_test.go b/persist/sqlite/volumes_test.go index d11ad35a..b0538fcd 100644 --- a/persist/sqlite/volumes_test.go +++ b/persist/sqlite/volumes_test.go @@ -549,6 +549,8 @@ func TestMigrateSectors(t *testing.T) { volume2, err := addTestVolume(db, "test2", initialSectors/4) if err != nil { t.Fatal(err) + } else if err := db.SetReadOnly(volume.ID, true); err != nil { // set the volume to read-onl + t.Fatal(err) } // migrate the remaining sectors from the first volume; should partially complete