Skip to content

Commit

Permalink
Switch to using a single worker
Browse files Browse the repository at this point in the history
Change "workers" cli option to be in pkg/config/operator and use
ALLSTAR_NUM_WORKERS envvar with same default at 5. Update staging and prod
config to use 1 worker to save concurrent memory usage.

Signed-off-by: Jeff Mendoza <[email protected]>
  • Loading branch information
jeffmendoza committed Mar 8, 2024
1 parent 9c5f410 commit 964a34c
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 13 deletions.
1 change: 1 addition & 0 deletions app-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ env_variables:
APP_ID: 119816
KEY_SECRET: "gcpsecretmanager://projects/allstar-ossf/secrets/allstar-private-key?decoder=bytes"
DO_NOTHING_ON_OPT_OUT: true
ALLSTAR_NUM_WORKERS: 1
1 change: 1 addition & 0 deletions app-staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ resources:
env_variables:
APP_ID: 166485
KEY_SECRET: "gcpsecretmanager://projects/allstar-ossf/secrets/allstar-staging-private-key?decoder=bytes"
ALLSTAR_NUM_WORKERS: 1
6 changes: 2 additions & 4 deletions cmd/allstar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ func main() {
specificPolicyArg := flag.String("policy", "", fmt.Sprintf("Run a specific policy check. Supported policies: %s", supportedPoliciesMsg))
specificRepoArg := flag.String("repo", "", "Run on a specific \"owner/repo\". For example \"ossf/allstar\"")

numWorkersArg := flag.Int("workers", 5, "maximum number of active goroutines for Allstar scans")

flag.Parse()

if *specificPolicyArg != "" {
Expand All @@ -83,7 +81,7 @@ func main() {
}

if runOnce {
_, err := enforce.EnforceAll(ctx, ghc, *specificPolicyArg, *specificRepoArg, *numWorkersArg)
_, err := enforce.EnforceAll(ctx, ghc, *specificPolicyArg, *specificRepoArg)
if err != nil {
log.Fatal().
Err(err).
Expand All @@ -96,7 +94,7 @@ func main() {
go func() {
defer wg.Done()
log.Info().
Err(enforce.EnforceJob(ctx, ghc, (5 * time.Minute), *specificPolicyArg, *specificRepoArg, *numWorkersArg)).
Err(enforce.EnforceJob(ctx, ghc, (5 * time.Minute), *specificPolicyArg, *specificRepoArg)).
Msg("Enforce job shutting down.")
}()
sigs := make(chan os.Signal, 1)
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ const setNoticePingDurationHrs = (24 * time.Hour)

var NoticePingDuration time.Duration

// NumWorkers is the number of concurrent orginazations/installations the
// Allstar binary will scan concurrently.
const setNumWorkers = 5

var NumWorkers int

var osGetenv func(string) string

func init() {
Expand Down Expand Up @@ -147,4 +153,12 @@ func setVars() {

allowedOrgs := osGetenv("GITHUB_ALLOWED_ORGS")
AllowedOrganizations = strings.Split(allowedOrgs, ",")

nws := osGetenv("ALLSTAR_NUM_WORKERS")
nw, err := strconv.Atoi(nws)
if err == nil {
NumWorkers = nw
} else {
NumWorkers = setNumWorkers
}
}
11 changes: 7 additions & 4 deletions pkg/enforce/enforce.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func init() {
//
// TBD: determine if this should remain exported, or if it will only be called
// from EnforceJob.
func EnforceAll(ctx context.Context, ghc ghclients.GhClientsInterface, specificPolicyArg string, specificRepoArg string, numWorkersArg int) (EnforceAllResults, error) {
func EnforceAll(ctx context.Context, ghc ghclients.GhClientsInterface, specificPolicyArg string, specificRepoArg string) (EnforceAllResults, error) {
var repoCount int
var enforceAllResults = make(EnforceAllResults)
ac, err := ghc.Get(0)
Expand All @@ -85,10 +85,13 @@ func EnforceAll(ctx context.Context, ghc ghclients.GhClientsInterface, specificP
Msg("Enforcing policies on installations.")

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(numWorkersArg)
g.SetLimit(operator.NumWorkers)
var mu sync.Mutex

for _, i := range insts {
if ctx.Err() != nil {
break
}
if i.SuspendedAt != nil {
log.Info().
Str("area", "bot").
Expand Down Expand Up @@ -302,9 +305,9 @@ func getAppInstallationReposReal(ctx context.Context, ic *github.Client) ([]*git

// EnforceJob is a reconciliation job that enforces policies on all repos every
// d duration. It runs forever until the context is done.
func EnforceJob(ctx context.Context, ghc *ghclients.GHClients, d time.Duration, specificPolicyArg string, specificRepoArg string, numWorkersArg int) error {
func EnforceJob(ctx context.Context, ghc *ghclients.GHClients, d time.Duration, specificPolicyArg string, specificRepoArg string) error {
for {
_, err := EnforceAll(ctx, ghc, specificPolicyArg, specificRepoArg, numWorkersArg)
_, err := EnforceAll(ctx, ghc, specificPolicyArg, specificRepoArg)
if err != nil {
log.Error().
Err(err).
Expand Down
8 changes: 3 additions & 5 deletions pkg/enforce/enforce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,7 @@ func TestEnforceAll(t *testing.T) {
policy1Results = test.Policy1Results
policy2Results = test.Policy2Results

numWorkers := 1
enforceAllResults, err := EnforceAll(context.Background(), mockGhc, "", "", numWorkers)
enforceAllResults, err := EnforceAll(context.Background(), mockGhc, "", "")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -582,16 +581,15 @@ func TestSuspendedEnforce(t *testing.T) {
}
suspended = false
gaicalled = false
numWorkers := 1
if _, err := EnforceAll(context.Background(), &MockGhClients{}, "", "", numWorkers); err != nil {
if _, err := EnforceAll(context.Background(), &MockGhClients{}, "", ""); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !gaicalled {
t.Errorf("Expected getAppInstallationRepos() to be called, but wasn't")
}
suspended = true
gaicalled = false
if _, err := EnforceAll(context.Background(), &MockGhClients{}, "", "", numWorkers); err != nil {
if _, err := EnforceAll(context.Background(), &MockGhClients{}, "", ""); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if gaicalled {
Expand Down

0 comments on commit 964a34c

Please sign in to comment.