Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use defer to release the lock.(#401) #402

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 56 additions & 45 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,23 @@ type cacheShard struct {

func (s *cacheShard) getWithInfo(key string, hashedKey uint64) (entry []byte, resp Response, err error) {
currentTime := uint64(s.clock.Epoch())
entry, resp, err = s.getWithInfoByLock(key, hashedKey, currentTime)
if err != nil {
return entry, resp, err
}
s.hit(hashedKey)
return entry, resp, nil
}

func (s *cacheShard) getWithInfoByLock(key string, hashedKey uint64,
currentTime uint64) (entry []byte, resp Response, err error) {
s.lock.RLock()
defer s.lock.RUnlock()
wrappedEntry, err := s.getWrappedEntry(hashedKey)
if err != nil {
s.lock.RUnlock()
return nil, resp, err
}
if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
s.lock.RUnlock()
s.collision()
if s.isVerbose {
s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key, entryKey, hashedKey)
Expand All @@ -54,30 +63,33 @@ func (s *cacheShard) getWithInfo(key string, hashedKey uint64) (entry []byte, re
if s.isExpired(wrappedEntry, currentTime) {
resp.EntryStatus = Expired
}
s.lock.RUnlock()
s.hit(hashedKey)
return entry, resp, nil
}

func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) {
entry, err := s.getByLock(key, hashedKey)
if err != nil {
return entry, err
}
s.hit(hashedKey)
return entry, nil
}

func (s *cacheShard) getByLock(key string, hashedKey uint64) ([]byte, error) {
s.lock.RLock()
defer s.lock.RUnlock()
wrappedEntry, err := s.getWrappedEntry(hashedKey)
if err != nil {
s.lock.RUnlock()
return nil, err
}
if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
s.lock.RUnlock()
s.collision()
if s.isVerbose {
s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key, entryKey, hashedKey)
}
return nil, ErrEntryNotFound
}
entry := readEntry(wrappedEntry)
s.lock.RUnlock()
s.hit(hashedKey)

return entry, nil
}

Expand Down Expand Up @@ -107,7 +119,8 @@ func (s *cacheShard) getValidWrapEntry(key string, hashedKey uint64) ([]byte, er
if !compareKeyFromEntry(wrappedEntry, key) {
s.collision()
if s.isVerbose {
s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key, readKeyFromEntry(wrappedEntry), hashedKey)
s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key,
readKeyFromEntry(wrappedEntry), hashedKey)
}

return nil, ErrEntryNotFound
Expand All @@ -121,7 +134,7 @@ func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
currentTimestamp := uint64(s.clock.Epoch())

s.lock.Lock()

defer s.lock.Unlock()
if previousIndex := s.hashmap[hashedKey]; previousIndex != 0 {
if previousEntry, err := s.entries.Get(int(previousIndex)); err == nil {
resetHashFromEntry(previousEntry)
Expand All @@ -141,11 +154,9 @@ func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
for {
if index, err := s.entries.Push(w); err == nil {
s.hashmap[hashedKey] = uint64(index)
s.lock.Unlock()
return nil
}
if s.removeOldestEntry(NoSpace) != nil {
s.lock.Unlock()
return errors.New("entry is bigger than max shard size")
}
}
Expand Down Expand Up @@ -199,15 +210,14 @@ func (s *cacheShard) setWrappedEntryWithoutLock(currentTimestamp uint64, w []byt

func (s *cacheShard) append(key string, hashedKey uint64, entry []byte) error {
s.lock.Lock()
defer s.lock.Unlock()
wrappedEntry, err := s.getValidWrapEntry(key, hashedKey)

if err == ErrEntryNotFound {
err = s.addNewWithoutLock(key, hashedKey, entry)
s.lock.Unlock()
return err
}
if err != nil {
s.lock.Unlock()
return err
}

Expand All @@ -216,46 +226,50 @@ func (s *cacheShard) append(key string, hashedKey uint64, entry []byte) error {
w := appendToWrappedEntry(currentTimestamp, wrappedEntry, entry, &s.entryBuffer)

err = s.setWrappedEntryWithoutLock(currentTimestamp, w, hashedKey)
s.lock.Unlock()

return err
}

func (s *cacheShard) del(hashedKey uint64) error {
// Optimistic pre-check using only readlock
func (s *cacheShard) preDel(hashedKey uint64) error {
s.lock.RLock()
{
itemIndex := s.hashmap[hashedKey]
defer s.lock.RUnlock()

if itemIndex == 0 {
s.lock.RUnlock()
s.delmiss()
return ErrEntryNotFound
}
itemIndex := s.hashmap[hashedKey]

if err := s.entries.CheckGet(int(itemIndex)); err != nil {
s.lock.RUnlock()
s.delmiss()
return err
}
if itemIndex == 0 {
s.delmiss()
return ErrEntryNotFound
}

if err := s.entries.CheckGet(int(itemIndex)); err != nil {
s.delmiss()
return err
}

return nil
}

func (s *cacheShard) del(hashedKey uint64) error {
// Optimistic pre-check using only readlock

if err := s.preDel(hashedKey); err != nil {
return err
}
s.lock.RUnlock()

s.lock.Lock()
defer s.lock.Unlock()
{
// After obtaining the writelock, we need to read the same again,
// since the data delivered earlier may be stale now
itemIndex := s.hashmap[hashedKey]

if itemIndex == 0 {
s.lock.Unlock()
s.delmiss()
return ErrEntryNotFound
}

wrappedEntry, err := s.entries.Get(int(itemIndex))
if err != nil {
s.lock.Unlock()
s.delmiss()
return err
}
Expand All @@ -267,7 +281,6 @@ func (s *cacheShard) del(hashedKey uint64) error {
}
resetHashFromEntry(wrappedEntry)
}
s.lock.Unlock()

s.delhit()
return nil
Expand All @@ -291,39 +304,37 @@ func (s *cacheShard) isExpired(oldestEntry []byte, currentTimestamp uint64) bool

func (s *cacheShard) cleanUp(currentTimestamp uint64) {
s.lock.Lock()
defer s.lock.Unlock()
for {
if oldestEntry, err := s.entries.Peek(); err != nil {
break
} else if evicted := s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry); !evicted {
break
}
}
s.lock.Unlock()
}

func (s *cacheShard) getEntry(hashedKey uint64) ([]byte, error) {
s.lock.RLock()

defer s.lock.RUnlock()
entry, err := s.getWrappedEntry(hashedKey)
// copy entry
newEntry := make([]byte, len(entry))
copy(newEntry, entry)

s.lock.RUnlock()

return newEntry, err
}

func (s *cacheShard) copyHashedKeys() (keys []uint64, next int) {
s.lock.RLock()
defer s.lock.RUnlock()
keys = make([]uint64, len(s.hashmap))

for key := range s.hashmap {
keys[next] = key
next++
}

s.lock.RUnlock()
return keys, next
}

Expand All @@ -347,29 +358,29 @@ func (s *cacheShard) removeOldestEntry(reason RemoveReason) error {

func (s *cacheShard) reset(config Config) {
s.lock.Lock()
defer s.lock.Unlock()
s.hashmap = make(map[uint64]uint64, config.initialShardSize())
s.entryBuffer = make([]byte, config.MaxEntrySize+headersSizeInBytes)
s.entries.Reset()
s.lock.Unlock()
}

func (s *cacheShard) resetStats() {
s.lock.Lock()
defer s.lock.Unlock()
s.stats = Stats{}
s.lock.Unlock()
}

func (s *cacheShard) len() int {
s.lock.RLock()
defer s.lock.RUnlock()
res := len(s.hashmap)
s.lock.RUnlock()
return res
}

func (s *cacheShard) capacity() int {
s.lock.RLock()
defer s.lock.RUnlock()
res := s.entries.Capacity()
s.lock.RUnlock()
return res
}

Expand All @@ -386,8 +397,8 @@ func (s *cacheShard) getStats() Stats {

func (s *cacheShard) getKeyMetadataWithLock(key uint64) Metadata {
s.lock.RLock()
defer s.lock.RUnlock()
c := s.hashmapStats[key]
s.lock.RUnlock()
return Metadata{
RequestCount: c,
}
Expand All @@ -403,8 +414,8 @@ func (s *cacheShard) hit(key uint64) {
atomic.AddInt64(&s.stats.Hits, 1)
if s.statsEnabled {
s.lock.Lock()
defer s.lock.Unlock()
s.hashmapStats[key]++
s.lock.Unlock()
}
}

Expand Down