diff --git a/cmd/scheduled-feed/main.go b/cmd/scheduled-feed/main.go index bcab7fa4..87782f50 100644 --- a/cmd/scheduled-feed/main.go +++ b/cmd/scheduled-feed/main.go @@ -13,6 +13,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/ossf/package-feeds/config" + "github.com/ossf/package-feeds/feeds" "github.com/ossf/package-feeds/feeds/scheduler" "github.com/ossf/package-feeds/publisher" ) @@ -26,17 +27,50 @@ type FeedHandler struct { } func (handler *FeedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var err error + pkgs, pollErrors := handler.pollFeeds() + processedPackages, err := handler.publishPackages(pkgs) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if pollErrors { + http.Error(w, "error polling for packages - see logs for more information", http.StatusInternalServerError) + return + } + _, err = w.Write([]byte(fmt.Sprintf("%d packages processed", processedPackages))) + if err != nil { + http.Error(w, "unexpected error during http server write: %w", http.StatusInternalServerError) + } +} + +func (handler FeedHandler) getCutoff() time.Time { + var cutoff time.Time + if handler.lastPoll.IsZero() { + cutoff = time.Now().UTC().Add(-handler.pollRate) + } else { + cutoff = handler.lastPoll + } + return cutoff +} + +func (handler *FeedHandler) pollFeeds() ([]*feeds.Package, bool) { cutoff := handler.getCutoff() handler.lastPoll = time.Now().UTC() pkgs, errs := handler.scheduler.Poll(cutoff) + errors := false if len(errs) > 0 { + errors = true for _, err := range errs { log.Errorf("error polling for new packages: %v", err) } } + return pkgs, errors +} + +func (handler FeedHandler) publishPackages(pkgs []*feeds.Package) (int, error) { processed := 0 for _, pkg := range pkgs { - processed++ log.WithFields(log.Fields{ "name": pkg.Name, "feed": pkg.Type, @@ -45,33 +79,15 @@ func (handler *FeedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { b, err := json.Marshal(pkg) if err != nil { log.Printf("error marshaling package: %#v", pkg) - http.Error(w, err.Error(), http.StatusInternalServerError) - return + return processed, err } if err := handler.pub.Send(context.Background(), b); err != nil { log.Printf("error sending package to upstream publisher %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return + return processed, err } + processed++ } - if len(errs) > 0 { - http.Error(w, "error polling for packages - see logs for more information", http.StatusInternalServerError) - return - } - _, err := w.Write([]byte(fmt.Sprintf("%d packages processed", processed))) - if err != nil { - http.Error(w, "unexpected error during http server write: %w", http.StatusInternalServerError) - } -} - -func (handler FeedHandler) getCutoff() time.Time { - var cutoff time.Time - if handler.lastPoll.IsZero() { - cutoff = time.Now().UTC().Add(-handler.pollRate) - } else { - cutoff = handler.lastPoll - } - return cutoff + return processed, nil } func main() { @@ -96,12 +112,12 @@ func main() { } log.Infof("using %q publisher", pub.Name()) - feeds, err := appConfig.GetScheduledFeeds() + scheduledFeeds, err := appConfig.GetScheduledFeeds() log.Infof("watching feeds: %v", strings.Join(appConfig.EnabledFeeds, ", ")) if err != nil { log.Fatal(err) } - sched := scheduler.New(feeds) + sched := scheduler.New(scheduledFeeds) log.Printf("listening on port %v", appConfig.HTTPPort) pollRate, err := time.ParseDuration(appConfig.PollRate) @@ -118,7 +134,7 @@ func main() { cronjob := cron.New() crontab := fmt.Sprintf("@every %s", pollRate.String()) log.Printf("Running a timer %s", crontab) - err := cronjob.AddFunc(crontab, func() { cronRequest(appConfig.HTTPPort) }) + err := cronjob.AddFunc(crontab, func() { cronPoll(handler) }) if err != nil { log.Fatal(err) } @@ -131,14 +147,16 @@ func main() { } } -func cronRequest(port int) { - client := &http.Client{ - Timeout: 10 * time.Second, - } - resp, err := client.Get(fmt.Sprintf("http://localhost:%v", port)) +func cronPoll(handler *FeedHandler) { + pkgs, pollErrors := handler.pollFeeds() + processedPackages, err := handler.publishPackages(pkgs) if err != nil { - log.Printf("http request failed: %v", err) + log.Errorf(err.Error()) + return + } + if pollErrors { + // pollFeeds already logs with ErrorF. return } - resp.Body.Close() + log.Printf("%d packages processed", processedPackages) }