Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not require agent polling to make workflows that should be skipped done #4271

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions cmd/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func runGrpcServer(ctx context.Context, c *cli.Command, _store store.Store) erro
)

woodpeckerServer := woodpeckerGrpcServer.NewWoodpeckerServer(
ctx,
server.Config.Services.Queue,
server.Config.Services.Logs,
server.Config.Services.Pubsub,
Expand Down
5 changes: 5 additions & 0 deletions server/grpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (

func createFilterFunc(agentFilter rpc.Filter) queue.FilterFn {
return func(task *model.Task) (bool, int) {
// don't return tasks who are not ready jet
if !task.ShouldRun() {
return false, 0
}

score := 0
for taskLabel, taskLabelValue := range task.Labels {
// if a task label is empty it will be ignored
Expand Down
12 changes: 9 additions & 3 deletions server/grpc/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func TestCreateFilterFunc(t *testing.T) {
Labels: map[string]string{"org-id": "123", "platform": "windows"},
},
wantMatched: false,
wantScore: 0,
},
{
name: "No match",
Expand All @@ -73,7 +72,6 @@ func TestCreateFilterFunc(t *testing.T) {
Labels: map[string]string{"org-id": "123", "platform": "windows"},
},
wantMatched: false,
wantScore: 0,
},
{
name: "Missing label",
Expand All @@ -84,7 +82,6 @@ func TestCreateFilterFunc(t *testing.T) {
Labels: map[string]string{"needed": "some"},
},
wantMatched: false,
wantScore: 0,
},
{
name: "Empty task labels",
Expand Down Expand Up @@ -119,6 +116,15 @@ func TestCreateFilterFunc(t *testing.T) {
wantMatched: true,
wantScore: 2,
},
{
name: "dont match task not ready to run",
agentFilter: rpc.Filter{},
task: &model.Task{
Labels: map[string]string{"org-id": "123", "platform": "linux"},
RunOn: []string{"success"},
},
wantMatched: false,
},
}

for _, tt := range tests {
Expand Down
25 changes: 8 additions & 17 deletions server/grpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
const updateAgentLastWorkDelay = time.Minute

type RPC struct {
ctx context.Context
queue queue.Queue
pubsub *pubsub.Publisher
logger logging.Log
Expand Down Expand Up @@ -81,24 +82,14 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er

filterFn := createFilterFunc(agentFilter)

for {
// poll blocks until a task is available or the context is canceled / worker is kicked
task, err := s.queue.Poll(c, agent.ID, filterFn)
if err != nil || task == nil {
return nil, err
}

if task.ShouldRun() {
workflow := new(rpc.Workflow)
err = json.Unmarshal(task.Data, workflow)
return workflow, err
}

// task should not run, so mark it as done
if err := s.Done(c, task.ID, rpc.WorkflowState{}); err != nil {
log.Error().Err(err).Msgf("marking workflow task '%s' as done failed", task.ID)
}
// poll blocks until a task is available or the context is canceled / worker is kicked
task, err := s.queue.Poll(c, agent.ID, filterFn)
if err != nil || task == nil {
return nil, err
}

workflow := new(rpc.Workflow)
return workflow, json.Unmarshal(task.Data, workflow)
}

// Wait blocks until the workflow with the given ID is done.
Expand Down
31 changes: 29 additions & 2 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto"
"go.woodpecker-ci.org/woodpecker/v2/server/logging"
"go.woodpecker-ci.org/woodpecker/v2/server/model"
"go.woodpecker-ci.org/woodpecker/v2/server/pubsub"
"go.woodpecker-ci.org/woodpecker/v2/server/queue"
"go.woodpecker-ci.org/woodpecker/v2/server/store"
Expand All @@ -37,7 +38,7 @@ type WoodpeckerServer struct {
peer RPC
}

func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.Publisher, store store.Store) proto.WoodpeckerServer {
func NewWoodpeckerServer(ctx context.Context, queue queue.Queue, logger logging.Log, pubsub *pubsub.Publisher, store store.Store) proto.WoodpeckerServer {
pipelineTime := prometheus_auto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "pipeline_time",
Expand All @@ -49,14 +50,40 @@ func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.P
Help: "Pipeline count.",
}, []string{"repo", "branch", "status", "pipeline"})
peer := RPC{
ctx: ctx,
store: store,
queue: queue,
pubsub: pubsub,
logger: logger,
pipelineTime: pipelineTime,
pipelineCount: pipelineCount,
}
return &WoodpeckerServer{peer: peer}
rpcServer := &WoodpeckerServer{peer: peer}
go rpcServer.markSkippedDone()
return rpcServer
}

// mark skipped tasks done, based on dependencies.
// TODO: find better place for this background service
func (s *WoodpeckerServer) markSkippedDone() {
for {
task, err := s.peer.queue.Poll(s.peer.ctx, queue.InternalWorkerID, func(t *model.Task) (bool, int) {
return !t.ShouldRun(), 0
})
if err != nil {
log.Error().Err(err).Msg("got error while polling for tasks that should be skipped")
continue
}
if task == nil {
log.Error().Msg("queue poll returned nil task")
continue
}

log.Trace().Msgf("mark skipped task '%s' as done", task.String())
if err := s.peer.Done(s.peer.ctx, task.ID, rpc.WorkflowState{}); err != nil {
log.Error().Err(err).Msgf("marking workflow task '%s' as done failed", task.ID)
}
}
}

func (s *WoodpeckerServer) Version(_ context.Context, _ *proto.Empty) (*proto.VersionResponse, error) {
Expand Down
27 changes: 19 additions & 8 deletions server/pipeline/stepbuilder/stepBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,13 @@ func (b *StepBuilder) environmentVariables(metadata metadata.Metadata, axis matr
return environ
}

func (b *StepBuilder) toInternalRepresentation(parsed *yaml_types.Workflow, environ map[string]string, metadata metadata.Metadata, workflowID int64) (*backend_types.Config, error) {
var secrets []compiler.Secret
for _, sec := range b.Secs {
var events []string
for _, event := range sec.Events {
events = append(events, string(event))
func toCompilerSecrets(in []*model.Secret) []compiler.Secret {
secrets := make([]compiler.Secret, 0, len(in))

for _, sec := range in {
events := make([]string, len(sec.Events))
for i, event := range sec.Events {
events[i] = string(event)
}

secrets = append(secrets, compiler.Secret{
Expand All @@ -256,14 +257,24 @@ func (b *StepBuilder) toInternalRepresentation(parsed *yaml_types.Workflow, envi
})
}

var registries []compiler.Registry
for _, reg := range b.Regs {
return secrets
}

func toCompilerRegistries(in []*model.Registry) []compiler.Registry {
registries := make([]compiler.Registry, 0, len(in))
for _, reg := range in {
registries = append(registries, compiler.Registry{
Hostname: reg.Address,
Username: reg.Username,
Password: reg.Password,
})
}
return registries
}

func (b *StepBuilder) toInternalRepresentation(parsed *yaml_types.Workflow, environ map[string]string, metadata metadata.Metadata, workflowID int64) (*backend_types.Config, error) {
secrets := toCompilerSecrets(b.Secs)
registries := toCompilerRegistries(b.Regs)

return compiler.New(
compiler.WithEnviron(environ),
Expand Down
15 changes: 14 additions & 1 deletion server/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/shared/constant"
)

// InternalWorkerID is pseudo agent ID for internal routines using the queue.
const InternalWorkerID = -2
6543 marked this conversation as resolved.
Show resolved Hide resolved

type entry struct {
item *model.Task
done chan bool
Expand Down Expand Up @@ -213,7 +216,13 @@ func (q *fifo) Extend(_ context.Context, agentID int64, taskID string) error {
func (q *fifo) Info(_ context.Context) InfoT {
q.Lock()
stats := InfoT{}
stats.Stats.Workers = len(q.workers)
workerCount := 0
for w := range q.workers {
if w.agentID != InternalWorkerID { // ignore internal workers
workerCount++
}
}
stats.Stats.Workers = workerCount
stats.Stats.Pending = q.pending.Len()
stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len()
stats.Stats.Running = len(q.running)
Expand Down Expand Up @@ -251,6 +260,10 @@ func (q *fifo) Resume() {

// KickAgentWorkers kicks all workers for a given agent.
func (q *fifo) KickAgentWorkers(agentID int64) {
if agentID == InternalWorkerID {
return
}

q.Lock()
defer q.Unlock()

Expand Down
1 change: 1 addition & 0 deletions server/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Queue interface {
PushAtOnce(c context.Context, tasks []*model.Task) error

// Poll retrieves and removes a task head of this queue.
// blocks until a task is available or the context is canceled
Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error)

// Extend extends the deadline for a task.
Expand Down