From fff6b8c9516100c1c7cab365afd81a517d88dfe4 Mon Sep 17 00:00:00 2001 From: Tom Pollard Date: Thu, 29 Apr 2021 12:49:44 +0100 Subject: [PATCH] scheduled-feed: Extract poll & publish into funcs --- cmd/scheduled-feed/main.go | 66 +++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/cmd/scheduled-feed/main.go b/cmd/scheduled-feed/main.go index bcab7fa4..8df32618 100644 --- a/cmd/scheduled-feed/main.go +++ b/cmd/scheduled-feed/main.go @@ -13,6 +13,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/ossf/package-feeds/config" + "github.com/ossf/package-feeds/feeds" "github.com/ossf/package-feeds/feeds/scheduler" "github.com/ossf/package-feeds/publisher" ) @@ -26,17 +27,50 @@ type FeedHandler struct { } func (handler *FeedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var err error + pkgs, pollErrors := handler.pollFeeds() + processedPackages, err := handler.publishPackages(pkgs) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if pollErrors { + http.Error(w, "error polling for packages - see logs for more information", http.StatusInternalServerError) + return + } + _, err = w.Write([]byte(fmt.Sprintf("%d packages processed", processedPackages))) + if err != nil { + http.Error(w, "unexpected error during http server write: %w", http.StatusInternalServerError) + } +} + +func (handler FeedHandler) getCutoff() time.Time { + var cutoff time.Time + if handler.lastPoll.IsZero() { + cutoff = time.Now().UTC().Add(-handler.pollRate) + } else { + cutoff = handler.lastPoll + } + return cutoff +} + +func (handler *FeedHandler) pollFeeds() ([]*feeds.Package, bool) { cutoff := handler.getCutoff() handler.lastPoll = time.Now().UTC() pkgs, errs := handler.scheduler.Poll(cutoff) + errors := false if len(errs) > 0 { + errors = true for _, err := range errs { log.Errorf("error polling for new packages: %v", err) } } + return pkgs, errors +} + +func (handler FeedHandler) publishPackages(pkgs []*feeds.Package) (int, error) { processed := 0 for _, pkg := range pkgs { - processed++ log.WithFields(log.Fields{ "name": pkg.Name, "feed": pkg.Type, @@ -45,33 +79,15 @@ func (handler *FeedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { b, err := json.Marshal(pkg) if err != nil { log.Printf("error marshaling package: %#v", pkg) - http.Error(w, err.Error(), http.StatusInternalServerError) - return + return processed, err } if err := handler.pub.Send(context.Background(), b); err != nil { log.Printf("error sending package to upstream publisher %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return + return processed, err } + processed++ } - if len(errs) > 0 { - http.Error(w, "error polling for packages - see logs for more information", http.StatusInternalServerError) - return - } - _, err := w.Write([]byte(fmt.Sprintf("%d packages processed", processed))) - if err != nil { - http.Error(w, "unexpected error during http server write: %w", http.StatusInternalServerError) - } -} - -func (handler FeedHandler) getCutoff() time.Time { - var cutoff time.Time - if handler.lastPoll.IsZero() { - cutoff = time.Now().UTC().Add(-handler.pollRate) - } else { - cutoff = handler.lastPoll - } - return cutoff + return processed, nil } func main() { @@ -96,12 +112,12 @@ func main() { } log.Infof("using %q publisher", pub.Name()) - feeds, err := appConfig.GetScheduledFeeds() + scheduledFeeds, err := appConfig.GetScheduledFeeds() log.Infof("watching feeds: %v", strings.Join(appConfig.EnabledFeeds, ", ")) if err != nil { log.Fatal(err) } - sched := scheduler.New(feeds) + sched := scheduler.New(scheduledFeeds) log.Printf("listening on port %v", appConfig.HTTPPort) pollRate, err := time.ParseDuration(appConfig.PollRate)