Skip to content

Commit

Permalink
cgrouprate: refactor base registration code
Browse files Browse the repository at this point in the history
Instead of using base objects directly, it's cleaner to operate on the
contents of the sensor object passed at registration time. Add some
helper functions to identify these objects and use them instead.

Also, refactor the global state so that everything is under a single
object for easier access.

Move everything into a single file for clarity.

Signed-off-by: Kornilios Kourtis <[email protected]>
  • Loading branch information
kkourt committed Nov 20, 2024
1 parent 5d50928 commit a511135
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 87 deletions.
4 changes: 3 additions & 1 deletion cmd/tetragon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,9 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu
return err
}

cgrouprate.NewCgroupRate(ctx, pm, &option.Config.CgroupRate)
if err := cgrouprate.NewCgroupRate(ctx, pm, &option.Config.CgroupRate); err != nil {
return err
}
cgrouprate.Config()

err = loadTpFromDir(ctx, option.Config.TracingPolicyDir)
Expand Down
27 changes: 0 additions & 27 deletions pkg/cgrouprate/bpf.go

This file was deleted.

111 changes: 89 additions & 22 deletions pkg/cgrouprate/cgrouprate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package cgrouprate

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -36,6 +37,8 @@ import (
"github.com/cilium/tetragon/pkg/observer"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/reader/notify"
"github.com/cilium/tetragon/pkg/sensors"
"github.com/cilium/tetragon/pkg/sensors/base"
"github.com/cilium/tetragon/pkg/sensors/program"
"github.com/sirupsen/logrus"
)
Expand All @@ -44,12 +47,18 @@ const (
aliveCnt = 5
cleanupInterval = time.Minute
cleanupInactiveTime = time.Minute
cgRateMaxEntries = 32768 // this value could be fine tuned
)

var (
handle *CgroupRate
handleLock sync.RWMutex
)
type globalState struct {
cgRateMap *program.Map
cgRateOptionsMap *program.Map

handle *CgroupRate
mu sync.RWMutex
}

var glSt globalState

type cgroupQueue struct {
id uint64
Expand Down Expand Up @@ -84,26 +93,30 @@ func newCgroupRate(

func NewCgroupRate(ctx context.Context,
listener observer.Listener,
opts *option.CgroupRate) {
opts *option.CgroupRate) error {

if opts.Events == 0 || opts.Interval == 0 {
logger.GetLogger().Infof("Cgroup rate disabled (%d/%s)",
opts.Events, time.Duration(opts.Interval).String())
return
return nil
}

handleLock.Lock()
defer handleLock.Unlock()
glSt.mu.Lock()
defer glSt.mu.Unlock()
if glSt.cgRateMap == nil {
return fmt.Errorf("cgrouprate has not been registered to base sensor")
}

handle = newCgroupRate(listener, cgroupRateMap, opts)
go handle.process(ctx)
glSt.handle = newCgroupRate(listener, glSt.cgRateMap, opts)
go glSt.handle.process(ctx)
return nil
}

func NewTestCgroupRate(listener observer.Listener,
hash *program.Map,
opts *option.CgroupRate) {

handle = newCgroupRate(listener, hash, opts)
glSt.handle = newCgroupRate(listener, hash, opts)
}

func (r *CgroupRate) notify(msg notify.Message) {
Expand All @@ -117,6 +130,11 @@ func (r *CgroupRate) process(ctx context.Context) {
r.log.Infof("Cgroup rate started (%d/%s)",
r.opts.Events, time.Duration(r.opts.Interval).String())

defer func() {
// cleanup
glSt.handle = nil
}()

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -263,14 +281,14 @@ func (r *CgroupRate) processCgroup(id uint64, cgroup string, last uint64) bool {
// Called from event handlers to kick off the cgroup rate
// periodical check for event's cgroup.
func Check(kube *processapi.MsgK8s, ktime uint64) {
if handle == nil {
if glSt.handle == nil {
return
}

handleLock.RLock()
defer handleLock.RUnlock()
glSt.mu.RLock()
defer glSt.mu.RUnlock()

if handle == nil {
if glSt.handle == nil {
return
}

Expand All @@ -280,27 +298,76 @@ func Check(kube *processapi.MsgK8s, ktime uint64) {
name: string(kube.Docker[:]),
}

handle.ch <- cq
glSt.handle.ch <- cq
cgroupratemetrics.CgroupRateTotalInc(cgroupratemetrics.Check)
}

func Config() {
if handle == nil {
if glSt.handle == nil {
return
}

if cgroupRateOptionsMap.MapHandle == nil {
handle.log.Warn("failed to update cgroup rate options map")
if glSt.cgRateOptionsMap.MapHandle == nil {
glSt.handle.log.Warn("failed to update cgroup rate options map")
return
}

key := uint32(0)
opts := processapi.CgroupRateOptions{
Events: handle.opts.Events,
Interval: handle.opts.Interval,
Events: glSt.handle.opts.Events,
Interval: glSt.handle.opts.Interval,
}

if err := cgroupRateOptionsMap.MapHandle.Put(key, opts); err != nil {
if err := glSt.cgRateOptionsMap.MapHandle.Put(key, opts); err != nil {
cgroupratemetrics.CgroupRateTotalInc(cgroupratemetrics.UpdateFail)
}
}

func RegisterCgroupRate(sensor *sensors.Sensor) (*sensors.Sensor, error) {
if !option.CgroupRateEnabled() {
return sensor, nil
}

glSt.mu.Lock()
defer glSt.mu.Unlock()

if glSt.handle != nil {
return nil, fmt.Errorf("cgrouprate: handle is already set, need to cleanup first")
}

var rateProgs []*program.Program
var optsProgs []*program.Program
for _, p := range sensor.Progs {
if base.IsExecve(p) || base.IsFork(p) || base.IsExit(p) {
rateProgs = append(rateProgs, p)
}
if base.IsExecve(p) {
optsProgs = append(optsProgs, p)
}
}

if len(optsProgs) == 0 || len(rateProgs) == 0 {
return nil, fmt.Errorf("failed to find base programs")
}

cgRmdirProg := program.Builder(
"bpf_cgroup.o",
"cgroup/cgroup_rmdir",
"raw_tracepoint/cgroup_rmdir",
"tg_cgroup_rmdir",
"raw_tracepoint",
).SetPolicy(optsProgs[0].Policy)
rateProgs = append(rateProgs, cgRmdirProg)

glSt.cgRateMap = program.MapBuilder("cgroup_rate_map", rateProgs...)
glSt.cgRateMap.SetMaxEntries(cgRateMaxEntries)
glSt.cgRateOptionsMap = program.MapBuilder("cgroup_rate_options_map", optsProgs...)

sensor.Progs = append(sensor.Progs, cgRmdirProg)
sensor.Maps = append(sensor.Maps, glSt.cgRateMap, glSt.cgRateOptionsMap)
return sensor, nil
}

func init() {
base.RegisterExtensionAtInit("cgroup_rate", RegisterCgroupRate)
}
6 changes: 3 additions & 3 deletions pkg/cgrouprate/cgrouprate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ func TestProcessCgroup(t *testing.T) {
NewTestCgroupRate(l, hash, &d.opts)

// setup cgrouprate cgroup
handle.cgroups[key.Id] = cgroup
assert.NotEqual(t, nil, handle)
glSt.handle.cgroups[key.Id] = cgroup
assert.NotEqual(t, nil, glSt.handle)

// store hash values
values[0] = d.values[0]
Expand All @@ -252,7 +252,7 @@ func TestProcessCgroup(t *testing.T) {
}

t.Logf("Test %d", idx)
ret := handle.processCgroup(key.Id, cgroup, d.last)
ret := glSt.handle.processCgroup(key.Id, cgroup, d.last)

assert.Equal(t, d.ret, ret)
assert.Equal(t, d.throttle, l.throttle)
Expand Down
30 changes: 0 additions & 30 deletions pkg/cgrouprate/sensor.go

This file was deleted.

3 changes: 1 addition & 2 deletions pkg/observer/observertesthelper/observer_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op
obs.RemoveListener(processManager)
})

cgrouprate.NewCgroupRate(ctx, processManager, &option.Config.CgroupRate)
return nil
return cgrouprate.NewCgroupRate(ctx, processManager, &option.Config.CgroupRate)
}

func loadObserver(tb testing.TB, ctx context.Context, base *sensors.Sensor,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sensors/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func initBaseSensor() *sensors.Sensor {
setupPrograms()
sensor.Progs = GetDefaultPrograms()
sensor.Maps = GetDefaultMaps()
return applyExtensions(&sensor)
return ApplyExtensions(&sensor)
}

func initBaseSensorFn() func(tb testing.TB) *sensors.Sensor {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sensors/base/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func RegisterExtensionAtInit(name string, fn ExtensionFn) {
})
}

func applyExtensions(s *sensors.Sensor) *sensors.Sensor {
func ApplyExtensions(s *sensors.Sensor) *sensors.Sensor {
for _, ext := range extensions {
newS, err := ext.fn(s)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/sensors/base/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Tetragon

package base

import "github.com/cilium/tetragon/pkg/sensors/program"

// IsExecve returns true if this is a base execve program
func IsExecve(p *program.Program) bool {
return p.PinName == "event_execve" && p.Policy == basePolicy
}

func IsFork(p *program.Program) bool {
return p.PinName == "kprobe_pid_clear" && p.Policy == basePolicy
}

func IsExit(p *program.Program) bool {
return p.PinName == "event_exit" && p.Policy == basePolicy
}

0 comments on commit a511135

Please sign in to comment.