From 863aeff528aefb173cea5c27d771fc992aa888dc Mon Sep 17 00:00:00 2001 From: Tom Pollard Date: Fri, 21 May 2021 11:45:14 +0100 Subject: [PATCH] feeds/: Allow for per package/feed error handling when polling When errors occur but package data can still be processed then the system should continue where possible. This is particularly beneficial in feeds where data is fetched on a per package basis, as a singular failure shouldn't immediately lead to data loss for the whole session. `pollAndPublish()` error handling and subsequent callers have been modified to handle this. This also removes `errUnpublished` as a 'hard' error when polling npm for critical packages as such it is only logged. --- feeds/crates/crates.go | 6 +- feeds/crates/crates_test.go | 10 +- feeds/feed.go | 14 ++- feeds/goproxy/goproxy.go | 6 +- feeds/goproxy/goproxy_test.go | 10 +- feeds/npm/npm.go | 60 +++++++----- feeds/npm/npm_test.go | 121 ++++++++++++++++++++----- feeds/nuget/nuget.go | 18 ++-- feeds/nuget/nuget_test.go | 6 +- feeds/packagist/packagist.go | 10 +- feeds/packagist/packagist_test.go | 14 +-- feeds/pypi/pypi.go | 40 +++++--- feeds/pypi/pypi_test.go | 59 +++++++++--- feeds/rubygems/rubygems.go | 31 ++++--- feeds/rubygems/rubygems_test.go | 42 +++++++-- feeds/scheduler/feed_group.go | 67 +++++++++----- feeds/scheduler/feed_group_test.go | 14 +-- feeds/scheduler/feed_groups_handler.go | 37 ++++---- feeds/scheduler/mocks.go | 6 +- feeds/scheduler/scheduler.go | 2 +- 20 files changed, 391 insertions(+), 182 deletions(-) diff --git a/feeds/crates/crates.go b/feeds/crates/crates.go index d2ffefb9..b9519bcc 100644 --- a/feeds/crates/crates.go +++ b/feeds/crates/crates.go @@ -80,11 +80,11 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er }, nil } -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} packages, err := fetchPackages(feed.baseURL) if err != nil { - return pkgs, err + return pkgs, []error{err} } for _, pkg := range packages { pkg := feeds.NewPackage(pkg.UpdatedAt, pkg.Name, pkg.NewestVersion, FeedName) @@ -93,7 +93,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { feed.lossyFeedAlerter.ProcessPackages(FeedName, pkgs) pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, []error{} } func (feed Feed) GetName() string { diff --git a/feeds/crates/crates_test.go b/feeds/crates/crates_test.go index 99af8ea1..c17c785d 100644 --- a/feeds/crates/crates_test.go +++ b/feeds/crates/crates_test.go @@ -27,8 +27,8 @@ func TestCratesLatest(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { t.Fatalf("feed.Latest returned error: %v", err) } @@ -67,11 +67,11 @@ func TestCratesNotFound(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - _, err = feed.Latest(cutoff) - if err == nil { + _, errs := feed.Latest(cutoff) + if len(errs) == 0 { t.Fatalf("feed.Latest() was successful when an error was expected") } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } } diff --git a/feeds/feed.go b/feeds/feed.go index 78c5661f..3719b421 100644 --- a/feeds/feed.go +++ b/feeds/feed.go @@ -1,19 +1,22 @@ package feeds import ( + "errors" "fmt" "time" ) const schemaVer = "1.0" +var ErrNoPackagesPolled = errors.New("no packages were successfully polled") + type UnsupportedOptionError struct { Option string Feed string } type ScheduledFeed interface { - Latest(cutoff time.Time) ([]*Package, error) + Latest(cutoff time.Time) ([]*Package, []error) GetFeedOptions() FeedOptions GetName() string } @@ -37,6 +40,15 @@ type Package struct { SchemaVer string `json:"schema_ver"` } +type PackagePollError struct { + Err error + Name string +} + +func (err PackagePollError) Error() string { + return fmt.Sprintf("Polling for package %s returned error: %v", err.Name, err.Err) +} + func NewPackage(created time.Time, name, version, feed string) *Package { return &Package{ Name: name, diff --git a/feeds/goproxy/goproxy.go b/feeds/goproxy/goproxy.go index 0751e46a..67046e30 100644 --- a/feeds/goproxy/goproxy.go +++ b/feeds/goproxy/goproxy.go @@ -100,18 +100,18 @@ func New(feedOptions feeds.FeedOptions) (*Feed, error) { }, nil } -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} packages, err := fetchPackages(feed.baseURL, cutoff) if err != nil { - return pkgs, err + return pkgs, []error{err} } for _, pkg := range packages { pkg := feeds.NewPackage(pkg.ModifiedDate, pkg.Title, pkg.Version, FeedName) pkgs = append(pkgs, pkg) } pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, []error{} } func (feed Feed) GetName() string { diff --git a/feeds/goproxy/goproxy_test.go b/feeds/goproxy/goproxy_test.go index a5647e10..304fb721 100644 --- a/feeds/goproxy/goproxy_test.go +++ b/feeds/goproxy/goproxy_test.go @@ -26,8 +26,8 @@ func TestGoProxyLatest(t *testing.T) { feed.baseURL = srv.URL cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { t.Fatalf("feed.Latest returned error: %v", err) } @@ -66,11 +66,11 @@ func TestGoproxyNotFound(t *testing.T) { feed.baseURL = srv.URL cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - _, err = feed.Latest(cutoff) - if err == nil { + _, errs := feed.Latest(cutoff) + if len(errs) == 0 { t.Fatalf("feed.Latest() was successful when an error was expected") } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } } diff --git a/feeds/npm/npm.go b/feeds/npm/npm.go index e8b47cff..3050dbdd 100644 --- a/feeds/npm/npm.go +++ b/feeds/npm/npm.go @@ -138,13 +138,15 @@ func fetchPackage(baseURL, pkgTitle string) ([]*Package, error) { return versionSlice, nil } -func fetchAllPackages(url string) ([]*feeds.Package, error) { +func fetchAllPackages(url string) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} + errs := []error{} packageChannel := make(chan []*Package) - errs := make(chan error) + errChannel := make(chan error) packageEvents, err := fetchPackageEvents(url) if err != nil { - return pkgs, err + // If we can't generate package events then return early. + return pkgs, append(errs, err) } // Handle the possibility of multiple releases of the same package // within the polled `packages` slice. @@ -157,7 +159,10 @@ func fetchAllPackages(url string) ([]*feeds.Package, error) { go func(pkgTitle string, count int) { pkgs, err := fetchPackage(url, pkgTitle) if err != nil { - errs <- err + if !errors.Is(err, errUnpublished) { + err = feeds.PackagePollError{Name: pkgTitle, Err: err} + } + errChannel <- err return } // Apply count slice @@ -173,27 +178,31 @@ func fetchAllPackages(url string) ([]*feeds.Package, error) { pkg.Version, FeedName) pkgs = append(pkgs, feedPkg) } - case err := <-errs: - // TODO: Add an event for unpublished package? This shouldn't - // be a hard error for the 'firehose'. + case err := <-errChannel: + // When polling the 'firehose' unpublished packages + // don't need to be logged as an error. if !errors.Is(err, errUnpublished) { - return pkgs, fmt.Errorf("error in fetching version information: %w", err) + errs = append(errs, err) } } } - return pkgs, nil + return pkgs, errs } -func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, error) { +func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} + errs := []error{} packageChannel := make(chan []*Package) - errs := make(chan error) + errChannel := make(chan error) for _, pkgTitle := range packages { go func(pkgTitle string) { pkgs, err := fetchPackage(url, pkgTitle) if err != nil { - errs <- err + if !errors.Is(err, errUnpublished) { + err = feeds.PackagePollError{Name: pkgTitle, Err: err} + } + errChannel <- err return } packageChannel <- pkgs @@ -208,13 +217,15 @@ func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, err pkg.Version, FeedName) pkgs = append(pkgs, feedPkg) } - case err := <-errs: - // Assume if a package has been unpublished that it is a valid hard - // error when polling for 'critical' packages. - return pkgs, fmt.Errorf("error in fetching version information: %w", err) + case err := <-errChannel: + // Assume if a package has been unpublished that it is a valid reason + // to log the error when polling for 'critical' packages. This could + // be changed for a 'lossy' type event instead. Further packages should + // be proccessed. + errs = append(errs, err) } } - return pkgs, nil + return pkgs, errs } type Feed struct { @@ -233,18 +244,19 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er }, nil } -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} - var err error + var errs []error if feed.packages == nil { - pkgs, err = fetchAllPackages(feed.baseURL) + pkgs, errs = fetchAllPackages(feed.baseURL) } else { - pkgs, err = fetchCriticalPackages(feed.baseURL, *feed.packages) + pkgs, errs = fetchCriticalPackages(feed.baseURL, *feed.packages) } - if err != nil { - return nil, err + if len(pkgs) == 0 { + // If none of the packages were successfully polled for, return early. + return nil, append(errs, feeds.ErrNoPackagesPolled) } // Ensure packages are sorted by CreatedDate in order of most recent, as goroutine @@ -261,7 +273,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { } pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, errs } func (feed Feed) GetName() string { diff --git a/feeds/npm/npm_test.go b/feeds/npm/npm_test.go index 5caf2600..6ddf82ec 100644 --- a/feeds/npm/npm_test.go +++ b/feeds/npm/npm_test.go @@ -10,7 +10,6 @@ import ( "github.com/ossf/package-feeds/events" "github.com/ossf/package-feeds/feeds" - "github.com/ossf/package-feeds/utils" testutils "github.com/ossf/package-feeds/utils/test" ) @@ -34,9 +33,9 @@ func TestNpmLatest(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { - t.Fatalf("feed.Latest returned error: %v", err) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { + t.Fatalf("feed.Latest returned error: %v", errs[len(errs)-1]) } if pkgs[0].Name != "FooPackage" { @@ -121,9 +120,9 @@ func TestNpmCritical(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { - t.Fatalf("Failed to call Latest() with err: %v", err) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { + t.Fatalf("Failed to call Latest() with err: %v", errs[len(errs)-1]) } if len(pkgs) != 5 { @@ -177,22 +176,24 @@ func TestNpmCriticalUnpublished(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) + pkgs, errs := feed.Latest(cutoff) - if err == nil { - t.Fatalf("Expected unpublish error did not occur") + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) } - if !errors.Is(err, errUnpublished) { + if !errors.Is(errs[len(errs)-1], errUnpublished) { t.Fatalf("Failed to return unpublished error when polling for an unpublished package, instead: %v", err) } - if !strings.Contains(err.Error(), "QuxPackage") { - t.Fatalf("Failed to correctly include the package name in unpublished error, instead: % v", err) + if !strings.Contains(errs[len(errs)-1].Error(), "QuxPackage") { + t.Fatalf("Failed to correctly include the package name in unpublished error, instead: %v", errs[len(errs)-1]) } - if len(pkgs) > 0 { - t.Fatalf("Latest() produced %v packages instead of the expected 0", len(pkgs)) + // Even though QuxPackage is unpublished, the error should be + // logged and FooPackage should still be processed. + if len(pkgs) != 3 { + t.Fatalf("Latest() produced %v packages instead of the expected 3", len(pkgs)) } } @@ -218,13 +219,11 @@ func TestNpmNonUtf8Response(t *testing.T) { } } -func TestNPMNotFound(t *testing.T) { +func TestNpmNotFound(t *testing.T) { t.Parallel() handlers := map[string]testutils.HTTPHandlerFunc{ - "/-/rss/": testutils.NotFoundHandlerFunc, - "/FooPackage": testutils.NotFoundHandlerFunc, - "/BarPackage": testutils.NotFoundHandlerFunc, + "/-/rss/": testutils.NotFoundHandlerFunc, } srv := testutils.HTTPServerMock(handlers) @@ -236,15 +235,91 @@ func TestNPMNotFound(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - _, err = feed.Latest(cutoff) - if err == nil { - t.Fatalf("feed.Latest() was successful when an error was expected") + _, errs := feed.Latest(cutoff) + if len(errs) != 2 { + t.Fatalf("feed.Latest() returned %v errors when 2 were expected", len(errs)) } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], feeds.ErrNoPackagesPolled) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } } +func TestNpmPartialNotFound(t *testing.T) { + t.Parallel() + + handlers := map[string]testutils.HTTPHandlerFunc{ + "/-/rss/": npmLatestPackagesResponse, + "/FooPackage": fooVersionInfoResponse, + "/BarPackage": barVersionInfoResponse, + "/BazPackage": bazVersionInfoResponse, + "/QuxPackage": testutils.NotFoundHandlerFunc, + } + srv := testutils.HTTPServerMock(handlers) + + feed, err := New(feeds.FeedOptions{}, events.NewNullHandler()) + feed.baseURL = srv.URL + + if err != nil { + t.Fatalf("Failed to create new npm feed: %v", err) + } + + cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) + } + if !strings.Contains(errs[len(errs)-1].Error(), "QuxPackage") { + t.Fatalf("Failed to correctly include the package name in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + if !strings.Contains(errs[len(errs)-1].Error(), "404") { + t.Fatalf("Failed to wrapped expected 404 error in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + // Even though QuxPackage returns a 404, the error should be + // logged and the rest of the packages should still be processed. + if len(pkgs) != 4 { + t.Fatalf("Latest() produced %v packages instead of the expected 3", len(pkgs)) + } +} + +func TestNpmCriticalPartialNotFound(t *testing.T) { + t.Parallel() + + handlers := map[string]testutils.HTTPHandlerFunc{ + "/FooPackage": fooVersionInfoResponse, + "/BarPackage": testutils.NotFoundHandlerFunc, + } + srv := testutils.HTTPServerMock(handlers) + + packages := []string{ + "FooPackage", + "BarPackage", + } + + feed, err := New(feeds.FeedOptions{Packages: &packages}, events.NewNullHandler()) + feed.baseURL = srv.URL + + if err != nil { + t.Fatalf("Failed to create new npm feed: %v", err) + } + + cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) + } + if !strings.Contains(errs[len(errs)-1].Error(), "BarPackage") { + t.Fatalf("Failed to correctly include the package name in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + if !strings.Contains(errs[len(errs)-1].Error(), "404") { + t.Fatalf("Failed to wrapped expected 404 error in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + // Even though BarPackage returns a 404, the error should be + // logged and FooPackage should still be processed. + if len(pkgs) != 3 { + t.Fatalf("Latest() produced %v packages instead of the expected 3", len(pkgs)) + } +} + func npmLatestPackagesResponse(w http.ResponseWriter, r *http.Request) { _, err := w.Write([]byte(` diff --git a/feeds/nuget/nuget.go b/feeds/nuget/nuget.go index cf8fd516..ecf238fa 100644 --- a/feeds/nuget/nuget.go +++ b/feeds/nuget/nuget.go @@ -175,17 +175,18 @@ func New(feedOptions feeds.FeedOptions) (*Feed, error) { // Latest will parse all creation events for packages in the nuget.org catalog feed // for packages that have been published since the cutoff // https://docs.microsoft.com/en-us/nuget/api/catalog-resource -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} + var errs []error catalogService, err := fetchCatalogService(feed.baseURL) if err != nil { - return nil, err + return nil, append(errs, err) } catalogPages, err := fetchCatalogPages(catalogService.URI) if err != nil { - return nil, err + return nil, append(errs, err) } for _, catalogPage := range catalogPages { @@ -195,7 +196,8 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { page, err := fetchCatalogPage(catalogPage.URI) if err != nil { - return nil, err + errs = append(errs, err) + continue } for _, catalogLeafNode := range page { @@ -209,19 +211,17 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { pkgInfo, err := fetchPackageInfo(catalogLeafNode.URI) if err != nil { - return nil, err + errs = append(errs, err) + continue } pkg := feeds.NewPackage(pkgInfo.Created, pkgInfo.PackageID, pkgInfo.Version, FeedName) - if err != nil { - continue - } pkgs = append(pkgs, pkg) } } pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, errs } func (feed Feed) GetName() string { diff --git a/feeds/nuget/nuget_test.go b/feeds/nuget/nuget_test.go index 57381ad4..2b039a73 100644 --- a/feeds/nuget/nuget_test.go +++ b/feeds/nuget/nuget_test.go @@ -39,9 +39,9 @@ func TestCanParseFeed(t *testing.T) { cutoff := time.Now().Add(-5 * time.Minute) - results, err := sut.Latest(cutoff) - if err != nil { - t.Fatal(err) + results, errs := sut.Latest(cutoff) + if len(errs) != 0 { + t.Fatal(errs[len(errs)-1]) } if len(results) != 1 { diff --git a/feeds/packagist/packagist.go b/feeds/packagist/packagist.go index 78321ab9..ba579fec 100644 --- a/feeds/packagist/packagist.go +++ b/feeds/packagist/packagist.go @@ -114,11 +114,12 @@ func fetchVersionInformation(versionHost string, action actions) ([]*feeds.Packa } // Latest returns all package updates of packagist packages since cutoff. -func (f Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (f Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} + var errs []error packages, err := fetchPackages(f.updateHost, cutoff) if err != nil { - return nil, err + return nil, append(errs, err) } for _, pkg := range packages { if time.Unix(pkg.Time, 0).Before(cutoff) { @@ -129,12 +130,13 @@ func (f Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { } updates, err := fetchVersionInformation(f.versionHost, pkg) if err != nil { - return nil, fmt.Errorf("error in fetching version information: %w", err) + errs = append(errs, fmt.Errorf("error in fetching version information: %w", err)) + continue } pkgs = append(pkgs, updates...) } pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, errs } func (f Feed) GetName() string { diff --git a/feeds/packagist/packagist_test.go b/feeds/packagist/packagist_test.go index a595c065..c7583ea8 100644 --- a/feeds/packagist/packagist_test.go +++ b/feeds/packagist/packagist_test.go @@ -29,9 +29,9 @@ func TestFetch(t *testing.T) { feed.versionHost = srv.URL cutoff := time.Unix(1614513658, 0) - latest, err := feed.Latest(cutoff) - if err != nil { - t.Fatalf("got error: %s", err) + latest, errs := feed.Latest(cutoff) + if len(errs) != 0 { + t.Fatalf("got error: %v", errs[len(errs)-1]) } if len(latest) == 0 { t.Fatalf("did not get any updates") @@ -64,11 +64,11 @@ func TestPackagistNotFound(t *testing.T) { feed.versionHost = srv.URL cutoff := time.Unix(1614513658, 0) - _, err = feed.Latest(cutoff) - if err == nil { - t.Fatalf("feed.Latest() was successful when an error was expected") + _, errs := feed.Latest(cutoff) + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } } diff --git a/feeds/pypi/pypi.go b/feeds/pypi/pypi.go index 4ea62cfd..b43ad55f 100644 --- a/feeds/pypi/pypi.go +++ b/feeds/pypi/pypi.go @@ -97,7 +97,7 @@ func fetchPackages(baseURL string) ([]*Package, error) { return rssResponse.Packages, nil } -func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, error) { +func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, []error) { responseChannel := make(chan *Response) errChannel := make(chan error) @@ -106,19 +106,19 @@ func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, er packageDataPath := fmt.Sprintf(packagePathFormat, pkgName) pkgURL, err := utils.URLPathJoin(baseURL, packageDataPath) if err != nil { - errChannel <- err + errChannel <- feeds.PackagePollError{Name: pkgName, Err: err} return } resp, err := httpClient.Get(pkgURL) if err != nil { - errChannel <- err + errChannel <- feeds.PackagePollError{Name: pkgName, Err: err} return } defer resp.Body.Close() err = utils.CheckResponseStatus(resp) if err != nil { - errChannel <- fmt.Errorf("failed to fetch pypi package data: %w", err) + errChannel <- feeds.PackagePollError{Name: pkgName, Err: fmt.Errorf("failed to fetch pypi package data: %w", err)} return } @@ -126,7 +126,7 @@ func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, er reader := utils.NewUTF8OnlyReader(resp.Body) err = xml.NewDecoder(reader).Decode(rssResponse) if err != nil { - errChannel <- err + errChannel <- feeds.PackagePollError{Name: pkgName, Err: err} return } @@ -135,15 +135,16 @@ func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, er } pkgs := []*Package{} + errs := []error{} for i := 0; i < len(packageList); i++ { select { case response := <-responseChannel: pkgs = append(pkgs, response.Packages...) case err := <-errChannel: - return nil, err + errs = append(errs, err) } } - return pkgs, nil + return pkgs, errs } type Feed struct { @@ -164,30 +165,39 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er }, nil } -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} var pypiPackages []*Package + var errs []error var err error if feed.packages == nil { // Firehose fetch all packages. + // If this fails then we need to return, as it's the only source of + // data. pypiPackages, err = fetchPackages(feed.baseURL) + if err != nil { + return nil, append(errs, err) + } } else { // Fetch specific packages individually from configured packages list. - pypiPackages, err = fetchCriticalPackages(feed.baseURL, *feed.packages) + pypiPackages, errs = fetchCriticalPackages(feed.baseURL, *feed.packages) + if len(pypiPackages) == 0 { + // If none of the packages were successfully polled for, return early. + return nil, append(errs, feeds.ErrNoPackagesPolled) + } } - if err != nil { - return nil, err - } for _, pkg := range pypiPackages { pkgName, err := pkg.Name() if err != nil { - return nil, err + errs = append(errs, err) + continue } pkgVersion, err := pkg.Version() if err != nil { - return nil, err + errs = append(errs, err) + continue } pkg := feeds.NewPackage(pkg.CreatedDate.Time, pkgName, pkgVersion, FeedName) pkgs = append(pkgs, pkg) @@ -199,7 +209,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { } pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, errs } func (feed Feed) GetPackageList() *[]string { diff --git a/feeds/pypi/pypi_test.go b/feeds/pypi/pypi_test.go index c062f7c3..954305ff 100644 --- a/feeds/pypi/pypi_test.go +++ b/feeds/pypi/pypi_test.go @@ -3,12 +3,12 @@ package pypi import ( "errors" "net/http" + "strings" "testing" "time" "github.com/ossf/package-feeds/events" "github.com/ossf/package-feeds/feeds" - "github.com/ossf/package-feeds/utils" testutils "github.com/ossf/package-feeds/utils/test" ) @@ -27,8 +27,8 @@ func TestPypiLatest(t *testing.T) { feed.baseURL = srv.URL cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { t.Fatalf("feed.Latest returned error: %v", err) } @@ -74,9 +74,9 @@ func TestPypiCriticalLatest(t *testing.T) { feed.baseURL = srv.URL cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { - t.Fatalf("Failed to call Latest() with err: %v", err) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { + t.Fatalf("Failed to call Latest() with err: %v", errs[len(errs)-1]) } const expectedNumPackages = 4 @@ -105,7 +105,7 @@ func TestPypiCriticalLatest(t *testing.T) { } } -func TestPypiNotFound(t *testing.T) { +func TestPypiAllNotFound(t *testing.T) { t.Parallel() handlers := map[string]testutils.HTTPHandlerFunc{ @@ -127,15 +127,52 @@ func TestPypiNotFound(t *testing.T) { feed.baseURL = srv.URL cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - _, err = feed.Latest(cutoff) - if err == nil { - t.Fatalf("feed.Latest() was successful when an error was expected") + _, errs := feed.Latest(cutoff) + if len(errs) != 3 { + t.Fatalf("feed.Latest() returned %v errors when 3 were expected", len(errs)) } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], feeds.ErrNoPackagesPolled) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } } +func TestPypiCriticalPartialNotFound(t *testing.T) { + t.Parallel() + + handlers := map[string]testutils.HTTPHandlerFunc{ + "/rss/project/foopy/releases.xml": foopyReleasesResponse, + "/rss/project/barpy/releases.xml": testutils.NotFoundHandlerFunc, + } + packages := []string{ + "foopy", + "barpy", + } + srv := testutils.HTTPServerMock(handlers) + + feed, err := New(feeds.FeedOptions{ + Packages: &packages, + }, events.NewNullHandler()) + if err != nil { + t.Fatalf("Failed to create pypi feed: %v", err) + } + feed.baseURL = srv.URL + + cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) + } + if !strings.Contains(errs[len(errs)-1].Error(), "barpy") { + t.Fatalf("Failed to correctly include the package name in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + if !strings.Contains(errs[len(errs)-1].Error(), "404") { + t.Fatalf("Failed to wrapped expected 404 error in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + if len(pkgs) != 2 { + t.Fatalf("Latest() produced %v packages instead of the expected %v", len(pkgs), 2) + } +} + // Mock data for pypi firehose with all packages. func updatesXMLHandle(w http.ResponseWriter, r *http.Request) { _, err := w.Write([]byte(` diff --git a/feeds/rubygems/rubygems.go b/feeds/rubygems/rubygems.go index 9cc48320..b14e2234 100644 --- a/feeds/rubygems/rubygems.go +++ b/feeds/rubygems/rubygems.go @@ -63,31 +63,38 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er }, nil } -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} packages := make(map[string]*Package) + var errs []error newPackagesURL, err := utils.URLPathJoin(feed.baseURL, activityPath, "latest.json") if err != nil { - return nil, err + // Failure to construct a url should lead to a hard failure. + return nil, append(errs, err) } newPackages, err := fetchPackages(newPackagesURL) if err != nil { - return pkgs, err - } - for _, pkg := range newPackages { - packages[pkg.Name] = pkg + // Updated Packages could still be processed. + errs = append(errs, err) + } else { + for _, pkg := range newPackages { + packages[pkg.Name] = pkg + } } updatedPackagesURL, err := utils.URLPathJoin(feed.baseURL, activityPath, "just_updated.json") if err != nil { - return nil, err + // Failure to construct a url should lead to a hard failure. + return nil, append(errs, err) } updatedPackages, err := fetchPackages(updatedPackagesURL) if err != nil { - return pkgs, err - } - for _, pkg := range updatedPackages { - packages[pkg.Name] = pkg + // New Packages could still be processed. + errs = append(errs, err) + } else { + for _, pkg := range updatedPackages { + packages[pkg.Name] = pkg + } } for _, pkg := range packages { @@ -97,7 +104,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { feed.lossyFeedAlerter.ProcessPackages(FeedName, pkgs) pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, errs } func (feed Feed) GetName() string { diff --git a/feeds/rubygems/rubygems_test.go b/feeds/rubygems/rubygems_test.go index e6014320..4dbcfac1 100644 --- a/feeds/rubygems/rubygems_test.go +++ b/feeds/rubygems/rubygems_test.go @@ -28,9 +28,9 @@ func TestRubyLatest(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { - t.Fatalf("feed.Latest returned error: %v", err) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { + t.Fatalf("feed.Latest returned error: %v", errs[len(errs)-1]) } var fooPkg *feeds.Package @@ -78,13 +78,43 @@ func TestRubyGemsNotFound(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - _, err = feed.Latest(cutoff) - if err == nil { + _, errs := feed.Latest(cutoff) + if len(errs) == 0 { t.Fatalf("feed.Latest() was successful when an error was expected") } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) { + t.Fatalf("feed.Latest() returned an error which did not match the expected error") + } +} + +func TestRubyGemsPartialNotFound(t *testing.T) { + t.Parallel() + + handlers := map[string]testutils.HTTPHandlerFunc{ + "/api/v1/activity/latest.json": rubyGemsPackagesResponse, + "/api/v1/activity/just_updated.json": testutils.NotFoundHandlerFunc, + } + srv := testutils.HTTPServerMock(handlers) + + feed, err := New(feeds.FeedOptions{}, events.NewNullHandler()) + feed.baseURL = srv.URL + if err != nil { + t.Fatalf("failed to create new ruby feed: %v", err) + } + + cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) + } + if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } + // Although the just_updated (updatedPackages) endpoint failed, the two latest (newPackages) + // should be processed. + if len(pkgs) != 2 { + t.Fatalf("Latest() produced %v packages instead of the expected %v", len(pkgs), 2) + } } func rubyGemsPackagesResponse(w http.ResponseWriter, r *http.Request) { diff --git a/feeds/scheduler/feed_group.go b/feeds/scheduler/feed_group.go index 1d817f9a..82befa82 100644 --- a/feeds/scheduler/feed_group.go +++ b/feeds/scheduler/feed_group.go @@ -3,6 +3,7 @@ package scheduler import ( "context" "encoding/json" + "errors" "time" log "github.com/sirupsen/logrus" @@ -11,12 +12,23 @@ import ( "github.com/ossf/package-feeds/publisher" ) +var ( + errPoll = errors.New("error when polling for packages") + errPub = errors.New("error when publishing packages") +) + type FeedGroup struct { feeds []feeds.ScheduledFeed publisher publisher.Publisher lastPoll time.Time } +type groupResult struct { + numPublished int + pollErr error + pubErr error +} + func NewFeedGroup(scheduledFeeds []feeds.ScheduledFeed, pub publisher.Publisher, initialCutoff time.Duration) *FeedGroup { return &FeedGroup{ @@ -31,32 +43,37 @@ func (fg *FeedGroup) AddFeed(feed feeds.ScheduledFeed) { } func (fg *FeedGroup) Run() { - _, err := fg.PollAndPublish() - if err != nil { - log.Error(err) + result := fg.pollAndPublish() + if result.pollErr != nil { + log.Error(result.pollErr) + } + if result.pubErr != nil { + log.Error(result.pubErr) } } -func (fg *FeedGroup) PollAndPublish() (int, error) { - pkgs, errs := fg.poll() - if len(errs) > 0 { - return 0, errs[0] - } else if len(pkgs) == 0 { - return 0, nil +func (fg *FeedGroup) pollAndPublish() groupResult { + result := groupResult{} + pkgs, err := fg.poll() + result.pollErr = err + // Return early if no packages to process + if len(pkgs) == 0 { + return result } - log.WithField("num_packages", len(pkgs)).Printf("Publishing packages...") - numPublished, err := fg.publishPackages(pkgs) - if err != nil { - log.Errorf("Failed to publish %v packages due to err: %v", len(pkgs)-numPublished, err) - return numPublished, err + numPublished, pubErr := fg.publishPackages(pkgs) + result.numPublished = numPublished + if pubErr != nil { + log.Errorf("Failed to publish %v packages due to err: %v", len(pkgs)-numPublished, pubErr) + result.pubErr = errPub + } else { + log.WithField("num_packages", numPublished).Printf("Successfully published packages") } - log.WithField("num_packages", numPublished).Printf("Successfully published packages") - return numPublished, nil + return result } // Poll fetches the latest packages from each registered feed. -func (fg *FeedGroup) poll() ([]*feeds.Package, []error) { +func (fg *FeedGroup) poll() ([]*feeds.Package, error) { results := make(chan pollResult, len(fg.feeds)) for _, feed := range fg.feeds { go func(feed feeds.ScheduledFeed) { @@ -64,7 +81,7 @@ func (fg *FeedGroup) poll() ([]*feeds.Package, []error) { name: feed.GetName(), feed: feed, } - result.packages, result.err = feed.Latest(fg.lastPoll) + result.packages, result.errs = feed.Latest(fg.lastPoll) results <- result }(feed) } @@ -74,10 +91,9 @@ func (fg *FeedGroup) poll() ([]*feeds.Package, []error) { result := <-results logger := log.WithField("feed", result.name) - if result.err != nil { - logger.WithError(result.err).Error("Error fetching packages") - errs = append(errs, result.err) - continue + for _, err := range result.errs { + logger.WithError(err).Error("Error fetching packages") + errs = append(errs, err) } for _, pkg := range result.packages { log.WithFields(log.Fields{ @@ -89,11 +105,14 @@ func (fg *FeedGroup) poll() ([]*feeds.Package, []error) { packages = append(packages, result.packages...) logger.WithField("num_processed", len(result.packages)).Print("Packages successfully processed") } - + err := errPoll + if len(errs) == 0 { + err = nil + } fg.lastPoll = time.Now().UTC() log.Printf("%d packages processed", len(packages)) - return packages, errs + return packages, err } func (fg *FeedGroup) publishPackages(pkgs []*feeds.Package) (int, error) { diff --git a/feeds/scheduler/feed_group_test.go b/feeds/scheduler/feed_group_test.go index b863cabd..e6913b29 100644 --- a/feeds/scheduler/feed_group_test.go +++ b/feeds/scheduler/feed_group_test.go @@ -37,9 +37,9 @@ func TestFeedGroupPoll(t *testing.T) { feedGroup := NewFeedGroup(mockFeeds, pub, time.Minute) startLastPollValue := feedGroup.lastPoll - pkgs, errs := feedGroup.poll() - if len(errs) != 0 { - t.Fatalf("Unexpected errors arose during polling: %v", errs) + pkgs, err := feedGroup.poll() + if err != nil { + t.Fatalf("Unexpected error arose during polling: %v", err) } if len(pkgs) != 4 { t.Fatalf("poll() returned %v packages when 4 were expected", len(pkgs)) @@ -54,7 +54,7 @@ func TestFeedGroupPollWithErr(t *testing.T) { mockFeeds := []feeds.ScheduledFeed{ mockFeed{ - err: errPackage, + errs: []error{errPackage}, }, mockFeed{ packages: []*feeds.Package{ @@ -70,9 +70,9 @@ func TestFeedGroupPollWithErr(t *testing.T) { feedGroup := NewFeedGroup(mockFeeds, pub, time.Minute) startLastPollValue := feedGroup.lastPoll - pkgs, errs := feedGroup.poll() - if len(errs) != 1 { - t.Fatalf("Expected error during polling but error list had %v entries", len(errs)) + pkgs, err := feedGroup.poll() + if err == nil { + t.Fatalf("Expected error during polling") } if len(pkgs) != 2 { t.Fatalf("Expected 2 packages alongside errors but found %v", len(pkgs)) diff --git a/feeds/scheduler/feed_groups_handler.go b/feeds/scheduler/feed_groups_handler.go index f0bd7a29..fe9b91b6 100644 --- a/feeds/scheduler/feed_groups_handler.go +++ b/feeds/scheduler/feed_groups_handler.go @@ -3,45 +3,50 @@ package scheduler import ( "fmt" "net/http" + "strings" ) type FeedGroupsHandler struct { feedGroups []*FeedGroup } -type publishResult struct { - numPublished int - err error -} - func NewFeedGroupsHandler(feeds []*FeedGroup) *FeedGroupsHandler { return &FeedGroupsHandler{feedGroups: feeds} } func (srv *FeedGroupsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - var err error - resultChannel := make(chan publishResult, len(srv.feedGroups)) + resultChannel := make(chan groupResult, len(srv.feedGroups)) numPublished := 0 + var pollErr, pubErr error + var errStrings []string for _, group := range srv.feedGroups { go func(group *FeedGroup) { - numPublished, err := group.PollAndPublish() - resultChannel <- publishResult{numPublished, err} + result := group.pollAndPublish() + resultChannel <- result }(group) } for range srv.feedGroups { result := <-resultChannel - numPublished += result.numPublished - if result.err != nil { - http.Error(w, result.err.Error(), http.StatusInternalServerError) - return + if result.pollErr != nil { + pollErr = result.pollErr + } + if result.pubErr != nil { + pubErr = result.pubErr } } - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + for _, err := range []error{pollErr, pubErr} { + if err != nil { + errStrings = append(errStrings, err.Error()) + } + } + if len(errStrings) > 0 { + http.Error(w, strings.Join(errStrings, "\n")+fmt.Sprintf("\n%d packages successfully processed, see log for details", + numPublished), + http.StatusInternalServerError) return } - _, err = w.Write([]byte(fmt.Sprintf("%d packages processed", numPublished))) + _, err := w.Write([]byte(fmt.Sprintf("%d packages processed", numPublished))) if err != nil { http.Error(w, "unexpected error during http server write: %w", http.StatusInternalServerError) } diff --git a/feeds/scheduler/mocks.go b/feeds/scheduler/mocks.go index 0b05eb30..aada2bb1 100644 --- a/feeds/scheduler/mocks.go +++ b/feeds/scheduler/mocks.go @@ -9,7 +9,7 @@ import ( type mockFeed struct { packages []*feeds.Package - err error + errs []error options feeds.FeedOptions } @@ -21,8 +21,8 @@ func (feed mockFeed) GetFeedOptions() feeds.FeedOptions { return feed.options } -func (feed mockFeed) Latest(cutoff time.Time) ([]*feeds.Package, error) { - return feed.packages, feed.err +func (feed mockFeed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { + return feed.packages, feed.errs } type mockPublisher struct { diff --git a/feeds/scheduler/scheduler.go b/feeds/scheduler/scheduler.go index 51454829..993734ce 100644 --- a/feeds/scheduler/scheduler.go +++ b/feeds/scheduler/scheduler.go @@ -33,7 +33,7 @@ type pollResult struct { name string feed feeds.ScheduledFeed packages []*feeds.Package - err error + errs []error } // Runs several services for the operation of scheduler, this call is blocking until application exit