Skip to content

Commit

Permalink
feeds/: Allow for per package/feed error handling when polling
Browse files Browse the repository at this point in the history
When errors occur but package data can still be processed then
the system should continue where possible. This is particularly
beneficial in feeds where data is fetched on a per package basis,
as a singular failure shouldn't immediately lead to data loss
for the whole session. `pollAndPublish()` error handling and
subsequent callers have been modified to handle this.

This also removes `errUnpublished` as a 'hard' error when polling
npm for critical packages as such it is only logged.
  • Loading branch information
tom--pollard committed Jun 10, 2021
1 parent 940c020 commit d79dc2a
Show file tree
Hide file tree
Showing 20 changed files with 395 additions and 182 deletions.
6 changes: 3 additions & 3 deletions feeds/crates/crates.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er
}, nil
}

func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
packages, err := fetchPackages(feed.baseURL)
if err != nil {
return pkgs, err
return pkgs, []error{err}
}
for _, pkg := range packages {
pkg := feeds.NewPackage(pkg.UpdatedAt, pkg.Name, pkg.NewestVersion, FeedName)
Expand All @@ -93,7 +93,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
feed.lossyFeedAlerter.ProcessPackages(FeedName, pkgs)

pkgs = feeds.ApplyCutoff(pkgs, cutoff)
return pkgs, nil
return pkgs, []error{}
}

func (feed Feed) GetName() string {
Expand Down
10 changes: 5 additions & 5 deletions feeds/crates/crates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func TestCratesLatest(t *testing.T) {
}

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
pkgs, err := feed.Latest(cutoff)
if err != nil {
pkgs, errs := feed.Latest(cutoff)
if len(errs) != 0 {
t.Fatalf("feed.Latest returned error: %v", err)
}

Expand Down Expand Up @@ -67,11 +67,11 @@ func TestCratesNotFound(t *testing.T) {
}

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
_, err = feed.Latest(cutoff)
if err == nil {
_, errs := feed.Latest(cutoff)
if len(errs) == 0 {
t.Fatalf("feed.Latest() was successful when an error was expected")
}
if !errors.Is(err, utils.ErrUnsuccessfulRequest) {
if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) {
t.Fatalf("feed.Latest() returned an error which did not match the expected error")
}
}
Expand Down
14 changes: 13 additions & 1 deletion feeds/feed.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package feeds

import (
"errors"
"fmt"
"time"
)

const schemaVer = "1.0"

var ErrNoPackagesPolled = errors.New("no packages were successfully polled")

type UnsupportedOptionError struct {
Option string
Feed string
}

type ScheduledFeed interface {
Latest(cutoff time.Time) ([]*Package, error)
Latest(cutoff time.Time) ([]*Package, []error)
GetFeedOptions() FeedOptions
GetName() string
}
Expand All @@ -37,6 +40,15 @@ type Package struct {
SchemaVer string `json:"schema_ver"`
}

type PackagePollError struct {
Err error
Name string
}

func (err PackagePollError) Error() string {
return fmt.Sprintf("Polling for package %s returned error: %v", err.Name, err.Err)
}

func NewPackage(created time.Time, name, version, feed string) *Package {
return &Package{
Name: name,
Expand Down
6 changes: 3 additions & 3 deletions feeds/goproxy/goproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,18 @@ func New(feedOptions feeds.FeedOptions) (*Feed, error) {
}, nil
}

func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
packages, err := fetchPackages(feed.baseURL, cutoff)
if err != nil {
return pkgs, err
return pkgs, []error{err}
}
for _, pkg := range packages {
pkg := feeds.NewPackage(pkg.ModifiedDate, pkg.Title, pkg.Version, FeedName)
pkgs = append(pkgs, pkg)
}
pkgs = feeds.ApplyCutoff(pkgs, cutoff)
return pkgs, nil
return pkgs, []error{}
}

func (feed Feed) GetName() string {
Expand Down
10 changes: 5 additions & 5 deletions feeds/goproxy/goproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func TestGoProxyLatest(t *testing.T) {
feed.baseURL = srv.URL

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
pkgs, err := feed.Latest(cutoff)
if err != nil {
pkgs, errs := feed.Latest(cutoff)
if len(errs) != 0 {
t.Fatalf("feed.Latest returned error: %v", err)
}

Expand Down Expand Up @@ -66,11 +66,11 @@ func TestGoproxyNotFound(t *testing.T) {
feed.baseURL = srv.URL

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
_, err = feed.Latest(cutoff)
if err == nil {
_, errs := feed.Latest(cutoff)
if len(errs) == 0 {
t.Fatalf("feed.Latest() was successful when an error was expected")
}
if !errors.Is(err, utils.ErrUnsuccessfulRequest) {
if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) {
t.Fatalf("feed.Latest() returned an error which did not match the expected error")
}
}
Expand Down
60 changes: 36 additions & 24 deletions feeds/npm/npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,15 @@ func fetchPackage(baseURL, pkgTitle string) ([]*Package, error) {
return versionSlice, nil
}

func fetchAllPackages(url string) ([]*feeds.Package, error) {
func fetchAllPackages(url string) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
errs := []error{}
packageChannel := make(chan []*Package)
errs := make(chan error)
errChannel := make(chan error)
packageEvents, err := fetchPackageEvents(url)
if err != nil {
return pkgs, err
// If we can't generate package events then return early.
return pkgs, append(errs, err)
}
// Handle the possibility of multiple releases of the same package
// within the polled `packages` slice.
Expand All @@ -157,7 +159,10 @@ func fetchAllPackages(url string) ([]*feeds.Package, error) {
go func(pkgTitle string, count int) {
pkgs, err := fetchPackage(url, pkgTitle)
if err != nil {
errs <- err
if !errors.Is(err, errUnpublished) {
err = feeds.PackagePollError{Name: pkgTitle, Err: err}
}
errChannel <- err
return
}
// Apply count slice
Expand All @@ -173,27 +178,31 @@ func fetchAllPackages(url string) ([]*feeds.Package, error) {
pkg.Version, FeedName)
pkgs = append(pkgs, feedPkg)
}
case err := <-errs:
// TODO: Add an event for unpublished package? This shouldn't
// be a hard error for the 'firehose'.
case err := <-errChannel:
// When polling the 'firehose' unpublished packages
// don't need to be logged as an error.
if !errors.Is(err, errUnpublished) {
return pkgs, fmt.Errorf("error in fetching version information: %w", err)
errs = append(errs, err)
}
}
}
return pkgs, nil
return pkgs, errs
}

func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, error) {
func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
errs := []error{}
packageChannel := make(chan []*Package)
errs := make(chan error)
errChannel := make(chan error)

for _, pkgTitle := range packages {
go func(pkgTitle string) {
pkgs, err := fetchPackage(url, pkgTitle)
if err != nil {
errs <- err
if !errors.Is(err, errUnpublished) {
err = feeds.PackagePollError{Name: pkgTitle, Err: err}
}
errChannel <- err
return
}
packageChannel <- pkgs
Expand All @@ -208,13 +217,15 @@ func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, err
pkg.Version, FeedName)
pkgs = append(pkgs, feedPkg)
}
case err := <-errs:
// Assume if a package has been unpublished that it is a valid hard
// error when polling for 'critical' packages.
return pkgs, fmt.Errorf("error in fetching version information: %w", err)
case err := <-errChannel:
// Assume if a package has been unpublished that it is a valid reason
// to log the error when polling for 'critical' packages. This could
// be changed for a 'lossy' type event instead. Further packages should
// be proccessed.
errs = append(errs, err)
}
}
return pkgs, nil
return pkgs, errs
}

type Feed struct {
Expand All @@ -233,18 +244,19 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er
}, nil
}

func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
var err error
var errs []error

if feed.packages == nil {
pkgs, err = fetchAllPackages(feed.baseURL)
pkgs, errs = fetchAllPackages(feed.baseURL)
} else {
pkgs, err = fetchCriticalPackages(feed.baseURL, *feed.packages)
pkgs, errs = fetchCriticalPackages(feed.baseURL, *feed.packages)
}

if err != nil {
return nil, err
if len(pkgs) == 0 {
// If none of the packages were successfully polled for, return early.
return nil, append(errs, feeds.ErrNoPackagesPolled)
}

// Ensure packages are sorted by CreatedDate in order of most recent, as goroutine
Expand All @@ -261,7 +273,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
}

pkgs = feeds.ApplyCutoff(pkgs, cutoff)
return pkgs, nil
return pkgs, errs
}

func (feed Feed) GetName() string {
Expand Down
Loading

0 comments on commit d79dc2a

Please sign in to comment.