Skip to content

Commit

Permalink
feeds/: Allow for per package/feed error handling when polling
Browse files Browse the repository at this point in the history
When errors occur but package data can still be processed then
the system should continue where possible. This is particularly
beneficial in feeds where data is fetched on a per package basis,
as a singular failure shouldn't immediately lead to data loss
for the whole session.
  • Loading branch information
tom--pollard committed Jun 8, 2021
1 parent 940c020 commit a829d21
Show file tree
Hide file tree
Showing 19 changed files with 215 additions and 139 deletions.
6 changes: 3 additions & 3 deletions feeds/crates/crates.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er
}, nil
}

func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
packages, err := fetchPackages(feed.baseURL)
if err != nil {
return pkgs, err
return pkgs, []error{err}
}
for _, pkg := range packages {
pkg := feeds.NewPackage(pkg.UpdatedAt, pkg.Name, pkg.NewestVersion, FeedName)
Expand All @@ -93,7 +93,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
feed.lossyFeedAlerter.ProcessPackages(FeedName, pkgs)

pkgs = feeds.ApplyCutoff(pkgs, cutoff)
return pkgs, nil
return pkgs, []error{}
}

func (feed Feed) GetName() string {
Expand Down
10 changes: 5 additions & 5 deletions feeds/crates/crates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func TestCratesLatest(t *testing.T) {
}

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
pkgs, err := feed.Latest(cutoff)
if err != nil {
pkgs, errs := feed.Latest(cutoff)
if len(errs) != 0 {
t.Fatalf("feed.Latest returned error: %v", err)
}

Expand Down Expand Up @@ -67,11 +67,11 @@ func TestCratesNotFound(t *testing.T) {
}

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
_, err = feed.Latest(cutoff)
if err == nil {
_, errs := feed.Latest(cutoff)
if len(errs) == 0 {
t.Fatalf("feed.Latest() was successful when an error was expected")
}
if !errors.Is(err, utils.ErrUnsuccessfulRequest) {
if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) {
t.Fatalf("feed.Latest() returned an error which did not match the expected error")
}
}
Expand Down
14 changes: 13 additions & 1 deletion feeds/feed.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package feeds

import (
"errors"
"fmt"
"time"
)

const schemaVer = "1.0"

var ErrNoPackagesPolled = errors.New("no packages were successfully polled")

type UnsupportedOptionError struct {
Option string
Feed string
}

type ScheduledFeed interface {
Latest(cutoff time.Time) ([]*Package, error)
Latest(cutoff time.Time) ([]*Package, []error)
GetFeedOptions() FeedOptions
GetName() string
}
Expand All @@ -37,6 +40,15 @@ type Package struct {
SchemaVer string `json:"schema_ver"`
}

type PackagePollError struct {
Err error
Name string
}

func (err PackagePollError) Error() string {
return fmt.Sprintf("Polling for package %s returned error: %v", err.Name, err.Err)
}

func NewPackage(created time.Time, name, version, feed string) *Package {
return &Package{
Name: name,
Expand Down
6 changes: 3 additions & 3 deletions feeds/goproxy/goproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,18 @@ func New(feedOptions feeds.FeedOptions) (*Feed, error) {
}, nil
}

func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
packages, err := fetchPackages(feed.baseURL, cutoff)
if err != nil {
return pkgs, err
return pkgs, []error{err}
}
for _, pkg := range packages {
pkg := feeds.NewPackage(pkg.ModifiedDate, pkg.Title, pkg.Version, FeedName)
pkgs = append(pkgs, pkg)
}
pkgs = feeds.ApplyCutoff(pkgs, cutoff)
return pkgs, nil
return pkgs, []error{}
}

func (feed Feed) GetName() string {
Expand Down
10 changes: 5 additions & 5 deletions feeds/goproxy/goproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func TestGoProxyLatest(t *testing.T) {
feed.baseURL = srv.URL

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
pkgs, err := feed.Latest(cutoff)
if err != nil {
pkgs, errs := feed.Latest(cutoff)
if len(errs) != 0 {
t.Fatalf("feed.Latest returned error: %v", err)
}

Expand Down Expand Up @@ -66,11 +66,11 @@ func TestGoproxyNotFound(t *testing.T) {
feed.baseURL = srv.URL

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
_, err = feed.Latest(cutoff)
if err == nil {
_, errs := feed.Latest(cutoff)
if len(errs) == 0 {
t.Fatalf("feed.Latest() was successful when an error was expected")
}
if !errors.Is(err, utils.ErrUnsuccessfulRequest) {
if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) {
t.Fatalf("feed.Latest() returned an error which did not match the expected error")
}
}
Expand Down
60 changes: 36 additions & 24 deletions feeds/npm/npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,15 @@ func fetchPackage(baseURL, pkgTitle string) ([]*Package, error) {
return versionSlice, nil
}

func fetchAllPackages(url string) ([]*feeds.Package, error) {
func fetchAllPackages(url string) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
errs := []error{}
packageChannel := make(chan []*Package)
errs := make(chan error)
errChannel := make(chan error)
packageEvents, err := fetchPackageEvents(url)
if err != nil {
return pkgs, err
// If we can't generate package events then return early.
return pkgs, append(errs, err)
}
// Handle the possibility of multiple releases of the same package
// within the polled `packages` slice.
Expand All @@ -157,7 +159,10 @@ func fetchAllPackages(url string) ([]*feeds.Package, error) {
go func(pkgTitle string, count int) {
pkgs, err := fetchPackage(url, pkgTitle)
if err != nil {
errs <- err
if !errors.Is(err, errUnpublished) {
err = feeds.PackagePollError{Name: pkgTitle, Err: err}
}
errChannel <- err
return
}
// Apply count slice
Expand All @@ -173,27 +178,31 @@ func fetchAllPackages(url string) ([]*feeds.Package, error) {
pkg.Version, FeedName)
pkgs = append(pkgs, feedPkg)
}
case err := <-errs:
// TODO: Add an event for unpublished package? This shouldn't
// be a hard error for the 'firehose'.
case err := <-errChannel:
// When polling the 'firehose' unpublished packages
// don't need to be logged as an error.
if !errors.Is(err, errUnpublished) {
return pkgs, fmt.Errorf("error in fetching version information: %w", err)
errs = append(errs, err)
}
}
}
return pkgs, nil
return pkgs, errs
}

func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, error) {
func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
errs := []error{}
packageChannel := make(chan []*Package)
errs := make(chan error)
errChannel := make(chan error)

for _, pkgTitle := range packages {
go func(pkgTitle string) {
pkgs, err := fetchPackage(url, pkgTitle)
if err != nil {
errs <- err
if !errors.Is(err, errUnpublished) {
err = feeds.PackagePollError{Name: pkgTitle, Err: err}
}
errChannel <- err
return
}
packageChannel <- pkgs
Expand All @@ -208,13 +217,15 @@ func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, err
pkg.Version, FeedName)
pkgs = append(pkgs, feedPkg)
}
case err := <-errs:
// Assume if a package has been unpublished that it is a valid hard
// error when polling for 'critical' packages.
return pkgs, fmt.Errorf("error in fetching version information: %w", err)
case err := <-errChannel:
// Assume if a package has been unpublished that it is a valid reason
// to log the error when polling for 'critical' packages. This could
// be changed for a 'lossy' type event instead. Further packages should
// be proccessed.
errs = append(errs, err)
}
}
return pkgs, nil
return pkgs, errs
}

type Feed struct {
Expand All @@ -233,18 +244,19 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er
}, nil
}

func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
var err error
var errs []error

if feed.packages == nil {
pkgs, err = fetchAllPackages(feed.baseURL)
pkgs, errs = fetchAllPackages(feed.baseURL)
} else {
pkgs, err = fetchCriticalPackages(feed.baseURL, *feed.packages)
pkgs, errs = fetchCriticalPackages(feed.baseURL, *feed.packages)
}

if err != nil {
return nil, err
if len(pkgs) == 0 {
// If none of the packages were successfully polled for, return early.
return nil, append(errs, feeds.ErrNoPackagesPolled)
}

// Ensure packages are sorted by CreatedDate in order of most recent, as goroutine
Expand All @@ -261,7 +273,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
}

pkgs = feeds.ApplyCutoff(pkgs, cutoff)
return pkgs, nil
return pkgs, errs
}

func (feed Feed) GetName() string {
Expand Down
37 changes: 19 additions & 18 deletions feeds/npm/npm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/ossf/package-feeds/events"
"github.com/ossf/package-feeds/feeds"
"github.com/ossf/package-feeds/utils"
testutils "github.com/ossf/package-feeds/utils/test"
)

Expand All @@ -34,9 +33,9 @@ func TestNpmLatest(t *testing.T) {
}

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
pkgs, err := feed.Latest(cutoff)
if err != nil {
t.Fatalf("feed.Latest returned error: %v", err)
pkgs, errs := feed.Latest(cutoff)
if len(errs) != 0 {
t.Fatalf("feed.Latest returned error: %v", errs[len(errs)-1])
}

if pkgs[0].Name != "FooPackage" {
Expand Down Expand Up @@ -121,9 +120,9 @@ func TestNpmCritical(t *testing.T) {
}

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
pkgs, err := feed.Latest(cutoff)
if err != nil {
t.Fatalf("Failed to call Latest() with err: %v", err)
pkgs, errs := feed.Latest(cutoff)
if len(errs) != 0 {
t.Fatalf("Failed to call Latest() with err: %v", errs[len(errs)-1])
}

if len(pkgs) != 5 {
Expand Down Expand Up @@ -177,21 +176,23 @@ func TestNpmCriticalUnpublished(t *testing.T) {
}

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
pkgs, err := feed.Latest(cutoff)
pkgs, errs := feed.Latest(cutoff)

if err == nil {
t.Fatalf("Expected unpublish error did not occur")
if len(errs) != 1 {
t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs))
}

if !errors.Is(err, errUnpublished) {
if !errors.Is(errs[len(errs)-1], errUnpublished) {
t.Fatalf("Failed to return unpublished error when polling for an unpublished package, instead: %v", err)
}

if !strings.Contains(err.Error(), "QuxPackage") {
t.Fatalf("Failed to correctly include the package name in unpublished error, instead: % v", err)
if !strings.Contains(errs[len(errs)-1].Error(), "QuxPackage") {
t.Fatalf("Failed to correctly include the package name in unpublished error, instead: %v", errs[len(errs)-1])
}

if len(pkgs) > 0 {
// Even though QuxPackage is unpublished, the error should be
// logged and FooPackage should still be processed.
if len(pkgs) > 3 {
t.Fatalf("Latest() produced %v packages instead of the expected 0", len(pkgs))
}
}
Expand Down Expand Up @@ -236,11 +237,11 @@ func TestNPMNotFound(t *testing.T) {
}

cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
_, err = feed.Latest(cutoff)
if err == nil {
t.Fatalf("feed.Latest() was successful when an error was expected")
_, errs := feed.Latest(cutoff)
if len(errs) != 2 {
t.Fatalf("feed.Latest() returned %v errors when 2 were expected", len(errs))
}
if !errors.Is(err, utils.ErrUnsuccessfulRequest) {
if !errors.Is(errs[len(errs)-1], feeds.ErrNoPackagesPolled) {
t.Fatalf("feed.Latest() returned an error which did not match the expected error")
}
}
Expand Down
18 changes: 9 additions & 9 deletions feeds/nuget/nuget.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,18 @@ func New(feedOptions feeds.FeedOptions) (*Feed, error) {
// Latest will parse all creation events for packages in the nuget.org catalog feed
// for packages that have been published since the cutoff
// https://docs.microsoft.com/en-us/nuget/api/catalog-resource
func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
var errs []error

catalogService, err := fetchCatalogService(feed.baseURL)
if err != nil {
return nil, err
return nil, append(errs, err)
}

catalogPages, err := fetchCatalogPages(catalogService.URI)
if err != nil {
return nil, err
return nil, append(errs, err)
}

for _, catalogPage := range catalogPages {
Expand All @@ -195,7 +196,8 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {

page, err := fetchCatalogPage(catalogPage.URI)
if err != nil {
return nil, err
errs = append(errs, err)
continue
}

for _, catalogLeafNode := range page {
Expand All @@ -209,19 +211,17 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {

pkgInfo, err := fetchPackageInfo(catalogLeafNode.URI)
if err != nil {
return nil, err
errs = append(errs, err)
continue
}

pkg := feeds.NewPackage(pkgInfo.Created, pkgInfo.PackageID, pkgInfo.Version, FeedName)
if err != nil {
continue
}
pkgs = append(pkgs, pkg)
}
}
pkgs = feeds.ApplyCutoff(pkgs, cutoff)

return pkgs, nil
return pkgs, errs
}

func (feed Feed) GetName() string {
Expand Down
Loading

0 comments on commit a829d21

Please sign in to comment.