Skip to content

Commit

Permalink
scheduled-feed: Extract poll & publish into funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
tom--pollard committed May 6, 2021
1 parent 7a5988a commit d1bfd8c
Showing 1 changed file with 41 additions and 25 deletions.
66 changes: 41 additions & 25 deletions cmd/scheduled-feed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/ossf/package-feeds/config"
"github.com/ossf/package-feeds/feeds"
"github.com/ossf/package-feeds/feeds/scheduler"
"github.com/ossf/package-feeds/publisher"
)
Expand All @@ -26,17 +27,50 @@ type FeedHandler struct {
}

func (handler *FeedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var err error
pkgs, pollErrors := handler.pollFeeds()
processedPackages, err := handler.publishPackages(pkgs)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if pollErrors {
http.Error(w, "error polling for packages - see logs for more information", http.StatusInternalServerError)
return
}
_, err = w.Write([]byte(fmt.Sprintf("%d packages processed", processedPackages)))
if err != nil {
http.Error(w, "unexpected error during http server write: %w", http.StatusInternalServerError)
}
}

func (handler FeedHandler) getCutoff() time.Time {
var cutoff time.Time
if handler.lastPoll.IsZero() {
cutoff = time.Now().UTC().Add(-handler.pollRate)
} else {
cutoff = handler.lastPoll
}
return cutoff
}

func (handler *FeedHandler) pollFeeds() ([]*feeds.Package, bool) {
cutoff := handler.getCutoff()
handler.lastPoll = time.Now().UTC()
pkgs, errs := handler.scheduler.Poll(cutoff)
errors := false
if len(errs) > 0 {
errors = true
for _, err := range errs {
log.Errorf("error polling for new packages: %v", err)
}
}
return pkgs, errors
}

func (handler FeedHandler) publishPackages(pkgs []*feeds.Package) (int, error) {
processed := 0
for _, pkg := range pkgs {
processed++
log.WithFields(log.Fields{
"name": pkg.Name,
"feed": pkg.Type,
Expand All @@ -45,33 +79,15 @@ func (handler *FeedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
b, err := json.Marshal(pkg)
if err != nil {
log.Printf("error marshaling package: %#v", pkg)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
return processed, err
}
if err := handler.pub.Send(context.Background(), b); err != nil {
log.Printf("error sending package to upstream publisher %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
return processed, err
}
processed++
}
if len(errs) > 0 {
http.Error(w, "error polling for packages - see logs for more information", http.StatusInternalServerError)
return
}
_, err := w.Write([]byte(fmt.Sprintf("%d packages processed", processed)))
if err != nil {
http.Error(w, "unexpected error during http server write: %w", http.StatusInternalServerError)
}
}

func (handler FeedHandler) getCutoff() time.Time {
var cutoff time.Time
if handler.lastPoll.IsZero() {
cutoff = time.Now().UTC().Add(-handler.pollRate)
} else {
cutoff = handler.lastPoll
}
return cutoff
return processed, nil
}

func main() {
Expand All @@ -96,12 +112,12 @@ func main() {
}
log.Infof("using %q publisher", pub.Name())

feeds, err := appConfig.GetScheduledFeeds()
scheduledFeeds, err := appConfig.GetScheduledFeeds()
log.Infof("watching feeds: %v", strings.Join(appConfig.EnabledFeeds, ", "))
if err != nil {
log.Fatal(err)
}
sched := scheduler.New(feeds)
sched := scheduler.New(scheduledFeeds)

log.Printf("listening on port %v", appConfig.HTTPPort)
pollRate, err := time.ParseDuration(appConfig.PollRate)
Expand Down

0 comments on commit d1bfd8c

Please sign in to comment.