Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feeds/: Allow for per package/feed error handling when polling #123

Merged
merged 1 commit into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions feeds/crates/crates.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er
}, nil
}

func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
packages, err := fetchPackages(feed.baseURL)
if err != nil {
return pkgs, err
return pkgs, []error{err}
}
for _, pkg := range packages {
pkg := feeds.NewPackage(pkg.UpdatedAt, pkg.Name, pkg.NewestVersion, FeedName)
Expand All @@ -93,7 +93,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
feed.lossyFeedAlerter.ProcessPackages(FeedName, pkgs)

pkgs = feeds.ApplyCutoff(pkgs, cutoff)
return pkgs, nil
return pkgs, []error{}
}

func (feed Feed) GetName() string {
Expand Down
10 changes: 5 additions & 5 deletions feeds/crates/crates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func TestCratesLatest(t *testing.T) {
}

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
pkgs, err := feed.Latest(cutoff)
if err != nil {
pkgs, errs := feed.Latest(cutoff)
if len(errs) != 0 {
t.Fatalf("feed.Latest returned error: %v", err)
}

Expand Down Expand Up @@ -67,11 +67,11 @@ func TestCratesNotFound(t *testing.T) {
}

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
_, err = feed.Latest(cutoff)
if err == nil {
_, errs := feed.Latest(cutoff)
if len(errs) == 0 {
t.Fatalf("feed.Latest() was successful when an error was expected")
}
if !errors.Is(err, utils.ErrUnsuccessfulRequest) {
if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) {
t.Fatalf("feed.Latest() returned an error which did not match the expected error")
}
}
Expand Down
14 changes: 13 additions & 1 deletion feeds/feed.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package feeds

import (
"errors"
"fmt"
"time"
)

const schemaVer = "1.0"

var ErrNoPackagesPolled = errors.New("no packages were successfully polled")

type UnsupportedOptionError struct {
Option string
Feed string
}

type ScheduledFeed interface {
Latest(cutoff time.Time) ([]*Package, error)
Latest(cutoff time.Time) ([]*Package, []error)
GetFeedOptions() FeedOptions
GetName() string
}
Expand All @@ -37,6 +40,15 @@ type Package struct {
SchemaVer string `json:"schema_ver"`
}

type PackagePollError struct {
Err error
Name string
}

func (err PackagePollError) Error() string {
return fmt.Sprintf("Polling for package %s returned error: %v", err.Name, err.Err)
}

func NewPackage(created time.Time, name, version, feed string) *Package {
return &Package{
Name: name,
Expand Down
6 changes: 3 additions & 3 deletions feeds/goproxy/goproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,18 @@ func New(feedOptions feeds.FeedOptions) (*Feed, error) {
}, nil
}

func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
packages, err := fetchPackages(feed.baseURL, cutoff)
if err != nil {
return pkgs, err
return pkgs, []error{err}
}
for _, pkg := range packages {
pkg := feeds.NewPackage(pkg.ModifiedDate, pkg.Title, pkg.Version, FeedName)
pkgs = append(pkgs, pkg)
}
pkgs = feeds.ApplyCutoff(pkgs, cutoff)
return pkgs, nil
return pkgs, []error{}
}

func (feed Feed) GetName() string {
Expand Down
10 changes: 5 additions & 5 deletions feeds/goproxy/goproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func TestGoProxyLatest(t *testing.T) {
feed.baseURL = srv.URL

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
pkgs, err := feed.Latest(cutoff)
if err != nil {
pkgs, errs := feed.Latest(cutoff)
if len(errs) != 0 {
t.Fatalf("feed.Latest returned error: %v", err)
}

Expand Down Expand Up @@ -66,11 +66,11 @@ func TestGoproxyNotFound(t *testing.T) {
feed.baseURL = srv.URL

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
_, err = feed.Latest(cutoff)
if err == nil {
_, errs := feed.Latest(cutoff)
if len(errs) == 0 {
t.Fatalf("feed.Latest() was successful when an error was expected")
}
if !errors.Is(err, utils.ErrUnsuccessfulRequest) {
if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) {
t.Fatalf("feed.Latest() returned an error which did not match the expected error")
}
}
Expand Down
60 changes: 36 additions & 24 deletions feeds/npm/npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,15 @@ func fetchPackage(baseURL, pkgTitle string) ([]*Package, error) {
return versionSlice, nil
}

func fetchAllPackages(url string) ([]*feeds.Package, error) {
func fetchAllPackages(url string) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
errs := []error{}
packageChannel := make(chan []*Package)
errs := make(chan error)
errChannel := make(chan error)
packageEvents, err := fetchPackageEvents(url)
if err != nil {
return pkgs, err
// If we can't generate package events then return early.
return pkgs, append(errs, err)
}
// Handle the possibility of multiple releases of the same package
// within the polled `packages` slice.
Expand All @@ -157,7 +159,10 @@ func fetchAllPackages(url string) ([]*feeds.Package, error) {
go func(pkgTitle string, count int) {
pkgs, err := fetchPackage(url, pkgTitle)
if err != nil {
errs <- err
if !errors.Is(err, errUnpublished) {
err = feeds.PackagePollError{Name: pkgTitle, Err: err}
}
errChannel <- err
return
}
// Apply count slice
Expand All @@ -173,27 +178,31 @@ func fetchAllPackages(url string) ([]*feeds.Package, error) {
pkg.Version, FeedName)
pkgs = append(pkgs, feedPkg)
}
case err := <-errs:
// TODO: Add an event for unpublished package? This shouldn't
// be a hard error for the 'firehose'.
case err := <-errChannel:
// When polling the 'firehose' unpublished packages
// don't need to be logged as an error.
if !errors.Is(err, errUnpublished) {
return pkgs, fmt.Errorf("error in fetching version information: %w", err)
errs = append(errs, err)
}
}
}
return pkgs, nil
return pkgs, errs
}

func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, error) {
func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
errs := []error{}
packageChannel := make(chan []*Package)
errs := make(chan error)
errChannel := make(chan error)

for _, pkgTitle := range packages {
go func(pkgTitle string) {
pkgs, err := fetchPackage(url, pkgTitle)
if err != nil {
errs <- err
if !errors.Is(err, errUnpublished) {
err = feeds.PackagePollError{Name: pkgTitle, Err: err}
}
errChannel <- err
return
}
packageChannel <- pkgs
Expand All @@ -208,13 +217,15 @@ func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, err
pkg.Version, FeedName)
pkgs = append(pkgs, feedPkg)
}
case err := <-errs:
// Assume if a package has been unpublished that it is a valid hard
// error when polling for 'critical' packages.
return pkgs, fmt.Errorf("error in fetching version information: %w", err)
case err := <-errChannel:
// Assume if a package has been unpublished that it is a valid reason
// to log the error when polling for 'critical' packages. This could
// be changed for a 'lossy' type event instead. Further packages should
// be proccessed.
errs = append(errs, err)
}
}
return pkgs, nil
return pkgs, errs
}

type Feed struct {
Expand All @@ -233,18 +244,19 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er
}, nil
}

func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
var err error
var errs []error

if feed.packages == nil {
pkgs, err = fetchAllPackages(feed.baseURL)
pkgs, errs = fetchAllPackages(feed.baseURL)
} else {
pkgs, err = fetchCriticalPackages(feed.baseURL, *feed.packages)
pkgs, errs = fetchCriticalPackages(feed.baseURL, *feed.packages)
}

if err != nil {
return nil, err
if len(pkgs) == 0 {
// If none of the packages were successfully polled for, return early.
return nil, append(errs, feeds.ErrNoPackagesPolled)
}

// Ensure packages are sorted by CreatedDate in order of most recent, as goroutine
Expand All @@ -261,7 +273,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
}

pkgs = feeds.ApplyCutoff(pkgs, cutoff)
return pkgs, nil
return pkgs, errs
}

func (feed Feed) GetName() string {
Expand Down
Loading