Skip to content

Commit

Permalink
global rebalance will now delete (part two)
Browse files Browse the repository at this point in the history
* fix 7c6f318 regression
* refactor

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 24, 2024
1 parent ac38c98 commit c1299b8
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 16 deletions.
2 changes: 1 addition & 1 deletion reb/globrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ func (rj *rebJogger) _lwalk(lom *core.LOM, fqn string) error {
// transmit (unlock via transport completion => roc.Close)
rj.m.addLomAck(lom)
if err := rj.doSend(lom, tsi, roc); err != nil {
rj.m.delLomAck(lom, 0, false /*free LOM*/)
rj.m.cleanupLomAck(lom)
return err
}

Expand Down
5 changes: 4 additions & 1 deletion reb/recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func (reb *Reb) recvRegularAck(hdr *transport.ObjHdr, unpacker *cos.ByteUnpack)
if err := unpacker.ReadAny(ack); err != nil {
return fmt.Errorf("g[%d]: failed to unpack regular ACK: %v", reb.RebID(), err)
}
if ack.rebID == 0 {
return fmt.Errorf("g[%d]: invalid g[0] ACK from %s", reb.RebID(), meta.Tname(ack.daemonID))
}
if ack.rebID != reb.rebID.Load() {
nlog.Warningln("ACK from", ack.daemonID, "[", reb.warnID(ack.rebID, ack.daemonID), "]")
return nil
Expand All @@ -224,7 +227,7 @@ func (reb *Reb) recvRegularAck(hdr *transport.ObjHdr, unpacker *cos.ByteUnpack)
// [NOTE]
// - remove migrated object and copies (unless disallowed by feature flag)
// - free pending (original) transmitted LOM
reb.delLomAck(lom, ack.rebID, true)
reb.ackLomAck(lom)
core.FreeLOM(lom)

return nil
Expand Down
34 changes: 20 additions & 14 deletions reb/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package reb provides global cluster-wide rebalance upon adding/removing storage nodes.
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package reb

Expand Down Expand Up @@ -158,9 +158,11 @@ func (reb *Reb) isQuiescent() bool {
}

/////////////
// lomAcks TODO: lomAck.q[lom.Uname()] = lom.Bprops().BID and, subsequently, LIF => LOM reinit
// lomAcks //
/////////////

// transaction: addLomAck => (cleanupLomAck | ackLomAck)

func (reb *Reb) lomAcks() *[cos.MultiHashMapCount]*lomAcks { return &reb.lomacks }

func (reb *Reb) addLomAck(lom *core.LOM) {
Expand All @@ -170,31 +172,36 @@ func (reb *Reb) addLomAck(lom *core.LOM) {
lomAck.mu.Unlock()
}

func (reb *Reb) delLomAck(lom *core.LOM, rebID int64, freeLOM bool) {
debug.Assert(rebID != 0)
if rebID != reb.rebID.Load() {
return
}
// called upon failure to send
func (reb *Reb) cleanupLomAck(lom *core.LOM) {
lomAck := reb.lomAcks()[lom.CacheIdx()]

lomAck.mu.Lock()
delete(lomAck.q, lom.Uname())
lomAck.mu.Unlock()
}

// called by recvRegularAck
func (reb *Reb) ackLomAck(lom *core.LOM) {
lomAck := reb.lomAcks()[lom.CacheIdx()]

lomAck.mu.Lock()
uname := lom.Uname()
lomOrig, ok := lomAck.q[uname]
lomOrig, ok := lomAck.q[uname] // via addLomAck() above
if !ok {
lomAck.mu.Unlock()
return
}
debug.Assert(uname == lomOrig.Uname(), "uname ", uname, " vs ", lomOrig.Uname())
delete(lomAck.q, uname)
lomAck.mu.Unlock()

if !freeLOM {
return
}
debug.Assert(uname == lomOrig.Uname())
size := lomOrig.Lsize()
core.FreeLOM(lomOrig)

// counting acknowledged migrations (as initiator)
xreb := reb.xctn()
xreb.ObjsAdd(1, lomOrig.Lsize())
xreb.ObjsAdd(1, size)

// NOTE: rm migrated object (and local copies, if any) right away
// TODO [feature]: mark "deleted" instead
Expand All @@ -204,5 +211,4 @@ func (reb *Reb) delLomAck(lom *core.LOM, rebID int64, freeLOM bool) {
lom.Unlock(true)
debug.AssertNoErr(err)
}
core.FreeLOM(lomOrig)
}

0 comments on commit c1299b8

Please sign in to comment.