Skip to content

Commit

Permalink
Merge pull request #369 from SiaFoundation/nate/distribute-sectors
Browse files Browse the repository at this point in the history
Remove volume fill
  • Loading branch information
n8maninger authored Apr 23, 2024
2 parents f27974a + b42e507 commit ff7f1e6
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 221 deletions.
24 changes: 17 additions & 7 deletions host/contracts/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ func TestContractLifecycle(t *testing.T) {
}
defer node.Close()

cm := node.ChainManager()
webhookReporter, err := webhooks.NewManager(node.Store(), log.Named("webhooks"))
if err != nil {
t.Fatal(err)
Expand All @@ -797,11 +798,19 @@ func TestContractLifecycle(t *testing.T) {
}
defer c.Close()

// waitForScan is a helper func to wait for the contract manager
// to catch up with chain manager
waitForScan := func() {
for cm.TipState().Index.Height != c.ScanHeight() {
time.Sleep(100 * time.Millisecond)
}
}

// note: many more blocks than necessary are mined to ensure all forks have activated
if err := node.MineBlocks(node.Address(), int(stypes.MaturityDelay*4)); err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond) // sync time
waitForScan()

renterFunds := types.Siacoins(500)
hostCollateral := types.Siacoins(1000)
Expand All @@ -828,7 +837,7 @@ func TestContractLifecycle(t *testing.T) {
if err := node.MineBlocks(types.VoidAddress, 1); err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond) // sync time
waitForScan()

contract, err = c.Contract(rev.Revision.ParentID)
if err != nil {
Expand All @@ -851,7 +860,7 @@ func TestContractLifecycle(t *testing.T) {
var roots []types.Hash256
for i := 0; i < 5; i++ {
var sector [rhp2.SectorSize]byte
frand.Read(sector[:256])
frand.Read(sector[:])
root := rhp2.SectorRoot(&sector)
release, err := s.Write(root, &sector)
if err != nil {
Expand Down Expand Up @@ -914,12 +923,13 @@ func TestContractLifecycle(t *testing.T) {
if err := node.MineBlocks(types.VoidAddress, int(remainingBlocks)); err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond) // sync time
waitForScan()

// confirm the revision
if err := node.MineBlocks(types.VoidAddress, 1); err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond) // sync time
waitForScan()

contract, err = c.Contract(rev.Revision.ParentID)
if err != nil {
Expand All @@ -935,7 +945,7 @@ func TestContractLifecycle(t *testing.T) {
if err := node.MineBlocks(types.VoidAddress, int(remainingBlocks)); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second) // sync time
waitForScan()

// check that the contract is still active
contract, err = c.Contract(rev.Revision.ParentID)
Expand Down Expand Up @@ -971,7 +981,7 @@ func TestContractLifecycle(t *testing.T) {
if err != nil {
t.Fatal(err)
} else if contract.Status != contracts.ContractStatusFailed {
t.Fatal("expected contract to be successful")
t.Fatalf("expected contract to be failed, got %q", contract.Status)
} else if contract.ResolutionHeight != 0 {
t.Fatalf("expected resolution height %v, got %v", 0, contract.ResolutionHeight)
} else if m, err := node.Store().Metrics(time.Now()); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions host/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (vm *VolumeManager) growVolume(ctx context.Context, id int64, volume *volum
// truncate the file and add the indices to the volume store. resize is
// done in chunks to prevent holding a lock for too long and to allow
// progress tracking.
if err := volume.Resize(current, target); err != nil {
if err := volume.Resize(target); err != nil {
return fmt.Errorf("failed to expand volume data: %w", err)
} else if err := vm.vs.GrowVolume(id, target); err != nil {
return fmt.Errorf("failed to expand volume metadata: %w", err)
Expand Down Expand Up @@ -305,7 +305,7 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol

if err := vm.vs.ShrinkVolume(id, target); err != nil {
return fmt.Errorf("failed to shrink volume metadata: %w", err)
} else if err := volume.Resize(current, target); err != nil {
} else if err := volume.Resize(target); err != nil {
return fmt.Errorf("failed to shrink volume data to %v sectors: %w", current, err)
}

Expand Down
140 changes: 0 additions & 140 deletions host/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,146 +757,6 @@ func TestRemoveMissing(t *testing.T) {
}
}

func TestVolumeDistribution(t *testing.T) {
const initialSectors = 10
dir := t.TempDir()

// create the database
log := zaptest.NewLogger(t)
db, err := sqlite.OpenDatabase(filepath.Join(dir, "hostd.db"), log.Named("sqlite"))
if err != nil {
t.Fatal(err)
}
defer db.Close()

g, err := gateway.New(":0", false, filepath.Join(dir, "gateway"))
if err != nil {
t.Fatal(err)
}
defer g.Close()

cs, errCh := consensus.New(g, false, filepath.Join(dir, "consensus"))
select {
case err := <-errCh:
if err != nil {
t.Fatal(err)
}
default:
}
cm, err := chain.NewManager(cs)
if err != nil {
t.Fatal(err)
}
defer cm.Close()
defer cm.Close()

// initialize the storage manager
webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks"))
if err != nil {
t.Fatal(err)
}

am := alerts.NewManager(webhookReporter, log.Named("alerts"))
vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), 0)
if err != nil {
t.Fatal(err)
}
defer vm.Close()

volumeIDs := make([]int64, 5)
volumeDir := t.TempDir()
for i := range volumeIDs {
result := make(chan error, 1)
// add a few volumes
vol, err := vm.AddVolume(context.Background(), filepath.Join(volumeDir, fmt.Sprintf("vol%d.dat", i)), initialSectors, result)
if err != nil {
t.Fatal(err)
} else if err := <-result; err != nil {
t.Fatal(err)
}
volumeIDs[i] = vol.ID
}

// helper func to check that both volumes have the correct number of sectors
checkSectorDistribution := func(vals ...uint64) error {
if len(vals) > len(volumeIDs) {
panic("not enough volumes")
}
for i, id := range volumeIDs {
stat, err := vm.Volume(id)
if err != nil {
return fmt.Errorf("failed to check volume %d: %w", volumeIDs[i], err)
}
var value uint64
if len(vals) > i {
value = vals[i]
}

if stat.UsedSectors != value {
return fmt.Errorf("volume %d: expected %d sectors, got %d", volumeIDs[i], value, stat.UsedSectors)
}
}
return nil
}

writeSector := func() error {
var sector [rhp2.SectorSize]byte
frand.Read(sector[:1024])
root := rhp2.SectorRoot(&sector)

_, err := vm.Write(root, &sector)
if err != nil {
return fmt.Errorf("failed to write sector: %w", err)
}
return nil
}

// write the first sector to the first volume
if err := writeSector(); err != nil {
t.Fatal(err)
} else if err := checkSectorDistribution(1); err != nil {
t.Fatal(err)
}

// write a sector to the sector volume
if err := writeSector(); err != nil {
t.Fatal(err)
} else if err := checkSectorDistribution(1, 1); err != nil {
t.Fatal(err)
}

expectedSectors := make([]uint64, len(volumeIDs))
// fill in the already written sectors
expectedSectors[0] = 1
expectedSectors[1] = 1
// fill the volumes
for i := 2; i < initialSectors*len(volumeIDs); i++ {
// write a random sector
if err := writeSector(); err != nil {
t.Fatal(err)
}
// increment the counter
expectedSectors[i%len(volumeIDs)]++
// check the distribution
if err := checkSectorDistribution(expectedSectors...); err != nil {
t.Fatal(err)
}
}

volumes, err := vm.Volumes()
if err != nil {
t.Fatal(err)
}
if len(volumes) != len(volumeIDs) {
t.Fatal("unexpected number of volumes")
}
for i, v := range volumes {
if v.TotalSectors != v.UsedSectors {
t.Fatalf("volume %d should be full", i)
}
}
}

func TestVolumeConcurrency(t *testing.T) {
t.Skip("This test is flaky and needs to be fixed")

Expand Down
30 changes: 6 additions & 24 deletions host/storage/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

rhp2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"lukechampine.com/frand"
)

type (
Expand Down Expand Up @@ -209,32 +208,15 @@ func (v *volume) Sync() error {
return err
}

func (v *volume) Resize(oldSectors, newSectors uint64) error {
// Resize resizes the volume to the new number of sectors
func (v *volume) Resize(newSectors uint64) error {
v.mu.Lock()
defer v.mu.Unlock()

if v.data == nil {
return ErrVolumeNotAvailable
}

if newSectors > oldSectors {
size := (newSectors - oldSectors) * rhp2.SectorSize // should never be more than 256 MiB
buf := make([]byte, size)
_, _ = frand.Read(buf) // frand will never return an error

v.mu.Lock()
defer v.mu.Unlock()

// write the data to the end of the file
if _, err := v.data.WriteAt(buf, int64(oldSectors*rhp2.SectorSize)); err != nil {
return err
}
} else {
v.mu.Lock()
defer v.mu.Unlock()

if err := v.data.Truncate(int64(newSectors * rhp2.SectorSize)); err != nil {
return err
}
}
return nil
return v.data.Truncate(int64(newSectors * rhp2.SectorSize))
}

func (v *volume) Stats() VolumeStats {
Expand Down
4 changes: 2 additions & 2 deletions persist/sqlite/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ CREATE TABLE volume_sectors (
volume_id INTEGER NOT NULL REFERENCES storage_volumes (id), -- all sectors will need to be migrated first when deleting a volume
volume_index INTEGER NOT NULL,
sector_id INTEGER UNIQUE REFERENCES stored_sectors (id),
sector_writes INTEGER NOT NULL DEFAULT 0,
UNIQUE (volume_id, volume_index)
);
CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_compound ON volume_sectors(volume_id, sector_id, volume_index) WHERE sector_id IS NULL;
CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_set_compound ON volume_sectors (volume_id, sector_id, volume_index) WHERE sector_id IS NOT NULL;
CREATE INDEX volume_sectors_sector_writes_volume_id_sector_id_volume_index_compound ON volume_sectors(sector_writes ASC, volume_id, sector_id, volume_index) WHERE sector_id IS NULL;
CREATE INDEX volume_sectors_volume_id_sector_id ON volume_sectors(volume_id, sector_id);
CREATE INDEX volume_sectors_volume_id ON volume_sectors(volume_id);
CREATE INDEX volume_sectors_volume_index ON volume_sectors(volume_index ASC);
Expand Down
11 changes: 11 additions & 0 deletions persist/sqlite/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ import (
"go.uber.org/zap"
)

// migrateVersion27 adds the sector_writes column to the volume_sectors table to
// more evenly distribute sector writes across disks.
func migrateVersion27(tx txn, _ *zap.Logger) error {
_, err := tx.Exec(`ALTER TABLE volume_sectors ADD COLUMN sector_writes INTEGER NOT NULL DEFAULT 0;
DROP INDEX volume_sectors_volume_id_sector_id_volume_index_compound;
DROP INDEX volume_sectors_volume_id_sector_id_volume_index_set_compound;
CREATE INDEX volume_sectors_sector_writes_volume_id_sector_id_volume_index_compound ON volume_sectors(sector_writes ASC, volume_id, sector_id, volume_index) WHERE sector_id IS NULL;`)
return err
}

// migrateVersion26 creates the host_pinned_settings table.
func migrateVersion26(tx txn, _ *zap.Logger) error {
_, err := tx.Exec(`CREATE TABLE host_pinned_settings (
Expand Down Expand Up @@ -753,4 +763,5 @@ var migrations = []func(tx txn, log *zap.Logger) error{
migrateVersion24,
migrateVersion25,
migrateVersion26,
migrateVersion27,
}
Loading

0 comments on commit ff7f1e6

Please sign in to comment.