diff --git a/feeds/crates/crates.go b/feeds/crates/crates.go index d2ffefb9..b9519bcc 100644 --- a/feeds/crates/crates.go +++ b/feeds/crates/crates.go @@ -80,11 +80,11 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er }, nil } -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} packages, err := fetchPackages(feed.baseURL) if err != nil { - return pkgs, err + return pkgs, []error{err} } for _, pkg := range packages { pkg := feeds.NewPackage(pkg.UpdatedAt, pkg.Name, pkg.NewestVersion, FeedName) @@ -93,7 +93,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { feed.lossyFeedAlerter.ProcessPackages(FeedName, pkgs) pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, []error{} } func (feed Feed) GetName() string { diff --git a/feeds/crates/crates_test.go b/feeds/crates/crates_test.go index 99af8ea1..c17c785d 100644 --- a/feeds/crates/crates_test.go +++ b/feeds/crates/crates_test.go @@ -27,8 +27,8 @@ func TestCratesLatest(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { t.Fatalf("feed.Latest returned error: %v", err) } @@ -67,11 +67,11 @@ func TestCratesNotFound(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - _, err = feed.Latest(cutoff) - if err == nil { + _, errs := feed.Latest(cutoff) + if len(errs) == 0 { t.Fatalf("feed.Latest() was successful when an error was expected") } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } } diff --git a/feeds/feed.go b/feeds/feed.go index 78c5661f..3719b421 100644 --- a/feeds/feed.go +++ b/feeds/feed.go @@ -1,19 +1,22 @@ package feeds import ( + "errors" "fmt" "time" ) const schemaVer = "1.0" +var ErrNoPackagesPolled = errors.New("no packages were successfully polled") + type UnsupportedOptionError struct { Option string Feed string } type ScheduledFeed interface { - Latest(cutoff time.Time) ([]*Package, error) + Latest(cutoff time.Time) ([]*Package, []error) GetFeedOptions() FeedOptions GetName() string } @@ -37,6 +40,15 @@ type Package struct { SchemaVer string `json:"schema_ver"` } +type PackagePollError struct { + Err error + Name string +} + +func (err PackagePollError) Error() string { + return fmt.Sprintf("Polling for package %s returned error: %v", err.Name, err.Err) +} + func NewPackage(created time.Time, name, version, feed string) *Package { return &Package{ Name: name, diff --git a/feeds/goproxy/goproxy.go b/feeds/goproxy/goproxy.go index 0751e46a..67046e30 100644 --- a/feeds/goproxy/goproxy.go +++ b/feeds/goproxy/goproxy.go @@ -100,18 +100,18 @@ func New(feedOptions feeds.FeedOptions) (*Feed, error) { }, nil } -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} packages, err := fetchPackages(feed.baseURL, cutoff) if err != nil { - return pkgs, err + return pkgs, []error{err} } for _, pkg := range packages { pkg := feeds.NewPackage(pkg.ModifiedDate, pkg.Title, pkg.Version, FeedName) pkgs = append(pkgs, pkg) } pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, []error{} } func (feed Feed) GetName() string { diff --git a/feeds/goproxy/goproxy_test.go b/feeds/goproxy/goproxy_test.go index a5647e10..304fb721 100644 --- a/feeds/goproxy/goproxy_test.go +++ b/feeds/goproxy/goproxy_test.go @@ -26,8 +26,8 @@ func TestGoProxyLatest(t *testing.T) { feed.baseURL = srv.URL cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { t.Fatalf("feed.Latest returned error: %v", err) } @@ -66,11 +66,11 @@ func TestGoproxyNotFound(t *testing.T) { feed.baseURL = srv.URL cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - _, err = feed.Latest(cutoff) - if err == nil { + _, errs := feed.Latest(cutoff) + if len(errs) == 0 { t.Fatalf("feed.Latest() was successful when an error was expected") } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } } diff --git a/feeds/npm/npm.go b/feeds/npm/npm.go index e8b47cff..3050dbdd 100644 --- a/feeds/npm/npm.go +++ b/feeds/npm/npm.go @@ -138,13 +138,15 @@ func fetchPackage(baseURL, pkgTitle string) ([]*Package, error) { return versionSlice, nil } -func fetchAllPackages(url string) ([]*feeds.Package, error) { +func fetchAllPackages(url string) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} + errs := []error{} packageChannel := make(chan []*Package) - errs := make(chan error) + errChannel := make(chan error) packageEvents, err := fetchPackageEvents(url) if err != nil { - return pkgs, err + // If we can't generate package events then return early. + return pkgs, append(errs, err) } // Handle the possibility of multiple releases of the same package // within the polled `packages` slice. @@ -157,7 +159,10 @@ func fetchAllPackages(url string) ([]*feeds.Package, error) { go func(pkgTitle string, count int) { pkgs, err := fetchPackage(url, pkgTitle) if err != nil { - errs <- err + if !errors.Is(err, errUnpublished) { + err = feeds.PackagePollError{Name: pkgTitle, Err: err} + } + errChannel <- err return } // Apply count slice @@ -173,27 +178,31 @@ func fetchAllPackages(url string) ([]*feeds.Package, error) { pkg.Version, FeedName) pkgs = append(pkgs, feedPkg) } - case err := <-errs: - // TODO: Add an event for unpublished package? This shouldn't - // be a hard error for the 'firehose'. + case err := <-errChannel: + // When polling the 'firehose' unpublished packages + // don't need to be logged as an error. if !errors.Is(err, errUnpublished) { - return pkgs, fmt.Errorf("error in fetching version information: %w", err) + errs = append(errs, err) } } } - return pkgs, nil + return pkgs, errs } -func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, error) { +func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} + errs := []error{} packageChannel := make(chan []*Package) - errs := make(chan error) + errChannel := make(chan error) for _, pkgTitle := range packages { go func(pkgTitle string) { pkgs, err := fetchPackage(url, pkgTitle) if err != nil { - errs <- err + if !errors.Is(err, errUnpublished) { + err = feeds.PackagePollError{Name: pkgTitle, Err: err} + } + errChannel <- err return } packageChannel <- pkgs @@ -208,13 +217,15 @@ func fetchCriticalPackages(url string, packages []string) ([]*feeds.Package, err pkg.Version, FeedName) pkgs = append(pkgs, feedPkg) } - case err := <-errs: - // Assume if a package has been unpublished that it is a valid hard - // error when polling for 'critical' packages. - return pkgs, fmt.Errorf("error in fetching version information: %w", err) + case err := <-errChannel: + // Assume if a package has been unpublished that it is a valid reason + // to log the error when polling for 'critical' packages. This could + // be changed for a 'lossy' type event instead. Further packages should + // be proccessed. + errs = append(errs, err) } } - return pkgs, nil + return pkgs, errs } type Feed struct { @@ -233,18 +244,19 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er }, nil } -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} - var err error + var errs []error if feed.packages == nil { - pkgs, err = fetchAllPackages(feed.baseURL) + pkgs, errs = fetchAllPackages(feed.baseURL) } else { - pkgs, err = fetchCriticalPackages(feed.baseURL, *feed.packages) + pkgs, errs = fetchCriticalPackages(feed.baseURL, *feed.packages) } - if err != nil { - return nil, err + if len(pkgs) == 0 { + // If none of the packages were successfully polled for, return early. + return nil, append(errs, feeds.ErrNoPackagesPolled) } // Ensure packages are sorted by CreatedDate in order of most recent, as goroutine @@ -261,7 +273,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { } pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, errs } func (feed Feed) GetName() string { diff --git a/feeds/npm/npm_test.go b/feeds/npm/npm_test.go index 5caf2600..6ddf82ec 100644 --- a/feeds/npm/npm_test.go +++ b/feeds/npm/npm_test.go @@ -10,7 +10,6 @@ import ( "github.com/ossf/package-feeds/events" "github.com/ossf/package-feeds/feeds" - "github.com/ossf/package-feeds/utils" testutils "github.com/ossf/package-feeds/utils/test" ) @@ -34,9 +33,9 @@ func TestNpmLatest(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { - t.Fatalf("feed.Latest returned error: %v", err) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { + t.Fatalf("feed.Latest returned error: %v", errs[len(errs)-1]) } if pkgs[0].Name != "FooPackage" { @@ -121,9 +120,9 @@ func TestNpmCritical(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { - t.Fatalf("Failed to call Latest() with err: %v", err) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { + t.Fatalf("Failed to call Latest() with err: %v", errs[len(errs)-1]) } if len(pkgs) != 5 { @@ -177,22 +176,24 @@ func TestNpmCriticalUnpublished(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) + pkgs, errs := feed.Latest(cutoff) - if err == nil { - t.Fatalf("Expected unpublish error did not occur") + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) } - if !errors.Is(err, errUnpublished) { + if !errors.Is(errs[len(errs)-1], errUnpublished) { t.Fatalf("Failed to return unpublished error when polling for an unpublished package, instead: %v", err) } - if !strings.Contains(err.Error(), "QuxPackage") { - t.Fatalf("Failed to correctly include the package name in unpublished error, instead: % v", err) + if !strings.Contains(errs[len(errs)-1].Error(), "QuxPackage") { + t.Fatalf("Failed to correctly include the package name in unpublished error, instead: %v", errs[len(errs)-1]) } - if len(pkgs) > 0 { - t.Fatalf("Latest() produced %v packages instead of the expected 0", len(pkgs)) + // Even though QuxPackage is unpublished, the error should be + // logged and FooPackage should still be processed. + if len(pkgs) != 3 { + t.Fatalf("Latest() produced %v packages instead of the expected 3", len(pkgs)) } } @@ -218,13 +219,11 @@ func TestNpmNonUtf8Response(t *testing.T) { } } -func TestNPMNotFound(t *testing.T) { +func TestNpmNotFound(t *testing.T) { t.Parallel() handlers := map[string]testutils.HTTPHandlerFunc{ - "/-/rss/": testutils.NotFoundHandlerFunc, - "/FooPackage": testutils.NotFoundHandlerFunc, - "/BarPackage": testutils.NotFoundHandlerFunc, + "/-/rss/": testutils.NotFoundHandlerFunc, } srv := testutils.HTTPServerMock(handlers) @@ -236,15 +235,91 @@ func TestNPMNotFound(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - _, err = feed.Latest(cutoff) - if err == nil { - t.Fatalf("feed.Latest() was successful when an error was expected") + _, errs := feed.Latest(cutoff) + if len(errs) != 2 { + t.Fatalf("feed.Latest() returned %v errors when 2 were expected", len(errs)) } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], feeds.ErrNoPackagesPolled) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } } +func TestNpmPartialNotFound(t *testing.T) { + t.Parallel() + + handlers := map[string]testutils.HTTPHandlerFunc{ + "/-/rss/": npmLatestPackagesResponse, + "/FooPackage": fooVersionInfoResponse, + "/BarPackage": barVersionInfoResponse, + "/BazPackage": bazVersionInfoResponse, + "/QuxPackage": testutils.NotFoundHandlerFunc, + } + srv := testutils.HTTPServerMock(handlers) + + feed, err := New(feeds.FeedOptions{}, events.NewNullHandler()) + feed.baseURL = srv.URL + + if err != nil { + t.Fatalf("Failed to create new npm feed: %v", err) + } + + cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) + } + if !strings.Contains(errs[len(errs)-1].Error(), "QuxPackage") { + t.Fatalf("Failed to correctly include the package name in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + if !strings.Contains(errs[len(errs)-1].Error(), "404") { + t.Fatalf("Failed to wrapped expected 404 error in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + // Even though QuxPackage returns a 404, the error should be + // logged and the rest of the packages should still be processed. + if len(pkgs) != 4 { + t.Fatalf("Latest() produced %v packages instead of the expected 3", len(pkgs)) + } +} + +func TestNpmCriticalPartialNotFound(t *testing.T) { + t.Parallel() + + handlers := map[string]testutils.HTTPHandlerFunc{ + "/FooPackage": fooVersionInfoResponse, + "/BarPackage": testutils.NotFoundHandlerFunc, + } + srv := testutils.HTTPServerMock(handlers) + + packages := []string{ + "FooPackage", + "BarPackage", + } + + feed, err := New(feeds.FeedOptions{Packages: &packages}, events.NewNullHandler()) + feed.baseURL = srv.URL + + if err != nil { + t.Fatalf("Failed to create new npm feed: %v", err) + } + + cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) + } + if !strings.Contains(errs[len(errs)-1].Error(), "BarPackage") { + t.Fatalf("Failed to correctly include the package name in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + if !strings.Contains(errs[len(errs)-1].Error(), "404") { + t.Fatalf("Failed to wrapped expected 404 error in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + // Even though BarPackage returns a 404, the error should be + // logged and FooPackage should still be processed. + if len(pkgs) != 3 { + t.Fatalf("Latest() produced %v packages instead of the expected 3", len(pkgs)) + } +} + func npmLatestPackagesResponse(w http.ResponseWriter, r *http.Request) { _, err := w.Write([]byte(` diff --git a/feeds/nuget/nuget.go b/feeds/nuget/nuget.go index cf8fd516..ecf238fa 100644 --- a/feeds/nuget/nuget.go +++ b/feeds/nuget/nuget.go @@ -175,17 +175,18 @@ func New(feedOptions feeds.FeedOptions) (*Feed, error) { // Latest will parse all creation events for packages in the nuget.org catalog feed // for packages that have been published since the cutoff // https://docs.microsoft.com/en-us/nuget/api/catalog-resource -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} + var errs []error catalogService, err := fetchCatalogService(feed.baseURL) if err != nil { - return nil, err + return nil, append(errs, err) } catalogPages, err := fetchCatalogPages(catalogService.URI) if err != nil { - return nil, err + return nil, append(errs, err) } for _, catalogPage := range catalogPages { @@ -195,7 +196,8 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { page, err := fetchCatalogPage(catalogPage.URI) if err != nil { - return nil, err + errs = append(errs, err) + continue } for _, catalogLeafNode := range page { @@ -209,19 +211,17 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { pkgInfo, err := fetchPackageInfo(catalogLeafNode.URI) if err != nil { - return nil, err + errs = append(errs, err) + continue } pkg := feeds.NewPackage(pkgInfo.Created, pkgInfo.PackageID, pkgInfo.Version, FeedName) - if err != nil { - continue - } pkgs = append(pkgs, pkg) } } pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, errs } func (feed Feed) GetName() string { diff --git a/feeds/nuget/nuget_test.go b/feeds/nuget/nuget_test.go index 57381ad4..2b039a73 100644 --- a/feeds/nuget/nuget_test.go +++ b/feeds/nuget/nuget_test.go @@ -39,9 +39,9 @@ func TestCanParseFeed(t *testing.T) { cutoff := time.Now().Add(-5 * time.Minute) - results, err := sut.Latest(cutoff) - if err != nil { - t.Fatal(err) + results, errs := sut.Latest(cutoff) + if len(errs) != 0 { + t.Fatal(errs[len(errs)-1]) } if len(results) != 1 { diff --git a/feeds/packagist/packagist.go b/feeds/packagist/packagist.go index 78321ab9..ba579fec 100644 --- a/feeds/packagist/packagist.go +++ b/feeds/packagist/packagist.go @@ -114,11 +114,12 @@ func fetchVersionInformation(versionHost string, action actions) ([]*feeds.Packa } // Latest returns all package updates of packagist packages since cutoff. -func (f Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (f Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} + var errs []error packages, err := fetchPackages(f.updateHost, cutoff) if err != nil { - return nil, err + return nil, append(errs, err) } for _, pkg := range packages { if time.Unix(pkg.Time, 0).Before(cutoff) { @@ -129,12 +130,13 @@ func (f Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { } updates, err := fetchVersionInformation(f.versionHost, pkg) if err != nil { - return nil, fmt.Errorf("error in fetching version information: %w", err) + errs = append(errs, fmt.Errorf("error in fetching version information: %w", err)) + continue } pkgs = append(pkgs, updates...) } pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, errs } func (f Feed) GetName() string { diff --git a/feeds/packagist/packagist_test.go b/feeds/packagist/packagist_test.go index a595c065..c7583ea8 100644 --- a/feeds/packagist/packagist_test.go +++ b/feeds/packagist/packagist_test.go @@ -29,9 +29,9 @@ func TestFetch(t *testing.T) { feed.versionHost = srv.URL cutoff := time.Unix(1614513658, 0) - latest, err := feed.Latest(cutoff) - if err != nil { - t.Fatalf("got error: %s", err) + latest, errs := feed.Latest(cutoff) + if len(errs) != 0 { + t.Fatalf("got error: %v", errs[len(errs)-1]) } if len(latest) == 0 { t.Fatalf("did not get any updates") @@ -64,11 +64,11 @@ func TestPackagistNotFound(t *testing.T) { feed.versionHost = srv.URL cutoff := time.Unix(1614513658, 0) - _, err = feed.Latest(cutoff) - if err == nil { - t.Fatalf("feed.Latest() was successful when an error was expected") + _, errs := feed.Latest(cutoff) + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } } diff --git a/feeds/pypi/pypi.go b/feeds/pypi/pypi.go index 4ea62cfd..b43ad55f 100644 --- a/feeds/pypi/pypi.go +++ b/feeds/pypi/pypi.go @@ -97,7 +97,7 @@ func fetchPackages(baseURL string) ([]*Package, error) { return rssResponse.Packages, nil } -func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, error) { +func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, []error) { responseChannel := make(chan *Response) errChannel := make(chan error) @@ -106,19 +106,19 @@ func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, er packageDataPath := fmt.Sprintf(packagePathFormat, pkgName) pkgURL, err := utils.URLPathJoin(baseURL, packageDataPath) if err != nil { - errChannel <- err + errChannel <- feeds.PackagePollError{Name: pkgName, Err: err} return } resp, err := httpClient.Get(pkgURL) if err != nil { - errChannel <- err + errChannel <- feeds.PackagePollError{Name: pkgName, Err: err} return } defer resp.Body.Close() err = utils.CheckResponseStatus(resp) if err != nil { - errChannel <- fmt.Errorf("failed to fetch pypi package data: %w", err) + errChannel <- feeds.PackagePollError{Name: pkgName, Err: fmt.Errorf("failed to fetch pypi package data: %w", err)} return } @@ -126,7 +126,7 @@ func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, er reader := utils.NewUTF8OnlyReader(resp.Body) err = xml.NewDecoder(reader).Decode(rssResponse) if err != nil { - errChannel <- err + errChannel <- feeds.PackagePollError{Name: pkgName, Err: err} return } @@ -135,15 +135,16 @@ func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, er } pkgs := []*Package{} + errs := []error{} for i := 0; i < len(packageList); i++ { select { case response := <-responseChannel: pkgs = append(pkgs, response.Packages...) case err := <-errChannel: - return nil, err + errs = append(errs, err) } } - return pkgs, nil + return pkgs, errs } type Feed struct { @@ -164,30 +165,39 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er }, nil } -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} var pypiPackages []*Package + var errs []error var err error if feed.packages == nil { // Firehose fetch all packages. + // If this fails then we need to return, as it's the only source of + // data. pypiPackages, err = fetchPackages(feed.baseURL) + if err != nil { + return nil, append(errs, err) + } } else { // Fetch specific packages individually from configured packages list. - pypiPackages, err = fetchCriticalPackages(feed.baseURL, *feed.packages) + pypiPackages, errs = fetchCriticalPackages(feed.baseURL, *feed.packages) + if len(pypiPackages) == 0 { + // If none of the packages were successfully polled for, return early. + return nil, append(errs, feeds.ErrNoPackagesPolled) + } } - if err != nil { - return nil, err - } for _, pkg := range pypiPackages { pkgName, err := pkg.Name() if err != nil { - return nil, err + errs = append(errs, err) + continue } pkgVersion, err := pkg.Version() if err != nil { - return nil, err + errs = append(errs, err) + continue } pkg := feeds.NewPackage(pkg.CreatedDate.Time, pkgName, pkgVersion, FeedName) pkgs = append(pkgs, pkg) @@ -199,7 +209,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { } pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, errs } func (feed Feed) GetPackageList() *[]string { diff --git a/feeds/pypi/pypi_test.go b/feeds/pypi/pypi_test.go index c062f7c3..954305ff 100644 --- a/feeds/pypi/pypi_test.go +++ b/feeds/pypi/pypi_test.go @@ -3,12 +3,12 @@ package pypi import ( "errors" "net/http" + "strings" "testing" "time" "github.com/ossf/package-feeds/events" "github.com/ossf/package-feeds/feeds" - "github.com/ossf/package-feeds/utils" testutils "github.com/ossf/package-feeds/utils/test" ) @@ -27,8 +27,8 @@ func TestPypiLatest(t *testing.T) { feed.baseURL = srv.URL cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { t.Fatalf("feed.Latest returned error: %v", err) } @@ -74,9 +74,9 @@ func TestPypiCriticalLatest(t *testing.T) { feed.baseURL = srv.URL cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { - t.Fatalf("Failed to call Latest() with err: %v", err) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { + t.Fatalf("Failed to call Latest() with err: %v", errs[len(errs)-1]) } const expectedNumPackages = 4 @@ -105,7 +105,7 @@ func TestPypiCriticalLatest(t *testing.T) { } } -func TestPypiNotFound(t *testing.T) { +func TestPypiAllNotFound(t *testing.T) { t.Parallel() handlers := map[string]testutils.HTTPHandlerFunc{ @@ -127,15 +127,52 @@ func TestPypiNotFound(t *testing.T) { feed.baseURL = srv.URL cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - _, err = feed.Latest(cutoff) - if err == nil { - t.Fatalf("feed.Latest() was successful when an error was expected") + _, errs := feed.Latest(cutoff) + if len(errs) != 3 { + t.Fatalf("feed.Latest() returned %v errors when 3 were expected", len(errs)) } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], feeds.ErrNoPackagesPolled) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } } +func TestPypiCriticalPartialNotFound(t *testing.T) { + t.Parallel() + + handlers := map[string]testutils.HTTPHandlerFunc{ + "/rss/project/foopy/releases.xml": foopyReleasesResponse, + "/rss/project/barpy/releases.xml": testutils.NotFoundHandlerFunc, + } + packages := []string{ + "foopy", + "barpy", + } + srv := testutils.HTTPServerMock(handlers) + + feed, err := New(feeds.FeedOptions{ + Packages: &packages, + }, events.NewNullHandler()) + if err != nil { + t.Fatalf("Failed to create pypi feed: %v", err) + } + feed.baseURL = srv.URL + + cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) + } + if !strings.Contains(errs[len(errs)-1].Error(), "barpy") { + t.Fatalf("Failed to correctly include the package name in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + if !strings.Contains(errs[len(errs)-1].Error(), "404") { + t.Fatalf("Failed to wrapped expected 404 error in feeds.PackagePollError, instead: %v", errs[len(errs)-1]) + } + if len(pkgs) != 2 { + t.Fatalf("Latest() produced %v packages instead of the expected %v", len(pkgs), 2) + } +} + // Mock data for pypi firehose with all packages. func updatesXMLHandle(w http.ResponseWriter, r *http.Request) { _, err := w.Write([]byte(` diff --git a/feeds/rubygems/rubygems.go b/feeds/rubygems/rubygems.go index 9cc48320..b14e2234 100644 --- a/feeds/rubygems/rubygems.go +++ b/feeds/rubygems/rubygems.go @@ -63,31 +63,38 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er }, nil } -func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { +func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { pkgs := []*feeds.Package{} packages := make(map[string]*Package) + var errs []error newPackagesURL, err := utils.URLPathJoin(feed.baseURL, activityPath, "latest.json") if err != nil { - return nil, err + // Failure to construct a url should lead to a hard failure. + return nil, append(errs, err) } newPackages, err := fetchPackages(newPackagesURL) if err != nil { - return pkgs, err - } - for _, pkg := range newPackages { - packages[pkg.Name] = pkg + // Updated Packages could still be processed. + errs = append(errs, err) + } else { + for _, pkg := range newPackages { + packages[pkg.Name] = pkg + } } updatedPackagesURL, err := utils.URLPathJoin(feed.baseURL, activityPath, "just_updated.json") if err != nil { - return nil, err + // Failure to construct a url should lead to a hard failure. + return nil, append(errs, err) } updatedPackages, err := fetchPackages(updatedPackagesURL) if err != nil { - return pkgs, err - } - for _, pkg := range updatedPackages { - packages[pkg.Name] = pkg + // New Packages could still be processed. + errs = append(errs, err) + } else { + for _, pkg := range updatedPackages { + packages[pkg.Name] = pkg + } } for _, pkg := range packages { @@ -97,7 +104,7 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { feed.lossyFeedAlerter.ProcessPackages(FeedName, pkgs) pkgs = feeds.ApplyCutoff(pkgs, cutoff) - return pkgs, nil + return pkgs, errs } func (feed Feed) GetName() string { diff --git a/feeds/rubygems/rubygems_test.go b/feeds/rubygems/rubygems_test.go index e6014320..4dbcfac1 100644 --- a/feeds/rubygems/rubygems_test.go +++ b/feeds/rubygems/rubygems_test.go @@ -28,9 +28,9 @@ func TestRubyLatest(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - pkgs, err := feed.Latest(cutoff) - if err != nil { - t.Fatalf("feed.Latest returned error: %v", err) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 0 { + t.Fatalf("feed.Latest returned error: %v", errs[len(errs)-1]) } var fooPkg *feeds.Package @@ -78,13 +78,43 @@ func TestRubyGemsNotFound(t *testing.T) { } cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) - _, err = feed.Latest(cutoff) - if err == nil { + _, errs := feed.Latest(cutoff) + if len(errs) == 0 { t.Fatalf("feed.Latest() was successful when an error was expected") } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) { + t.Fatalf("feed.Latest() returned an error which did not match the expected error") + } +} + +func TestRubyGemsPartialNotFound(t *testing.T) { + t.Parallel() + + handlers := map[string]testutils.HTTPHandlerFunc{ + "/api/v1/activity/latest.json": rubyGemsPackagesResponse, + "/api/v1/activity/just_updated.json": testutils.NotFoundHandlerFunc, + } + srv := testutils.HTTPServerMock(handlers) + + feed, err := New(feeds.FeedOptions{}, events.NewNullHandler()) + feed.baseURL = srv.URL + if err != nil { + t.Fatalf("failed to create new ruby feed: %v", err) + } + + cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + pkgs, errs := feed.Latest(cutoff) + if len(errs) != 1 { + t.Fatalf("feed.Latest() returned %v errors when 1 was expected", len(errs)) + } + if !errors.Is(errs[len(errs)-1], utils.ErrUnsuccessfulRequest) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } + // Although the just_updated (updatedPackages) endpoint failed, the two latest (newPackages) + // should be processed. + if len(pkgs) != 2 { + t.Fatalf("Latest() produced %v packages instead of the expected %v", len(pkgs), 2) + } } func rubyGemsPackagesResponse(w http.ResponseWriter, r *http.Request) { diff --git a/feeds/scheduler/feed_group.go b/feeds/scheduler/feed_group.go index 1d817f9a..24e184f0 100644 --- a/feeds/scheduler/feed_group.go +++ b/feeds/scheduler/feed_group.go @@ -3,6 +3,7 @@ package scheduler import ( "context" "encoding/json" + "errors" "time" log "github.com/sirupsen/logrus" @@ -11,12 +12,23 @@ import ( "github.com/ossf/package-feeds/publisher" ) +var ( + errPoll = errors.New("error when polling for packages") + errPub = errors.New("error when publishing packages") +) + type FeedGroup struct { feeds []feeds.ScheduledFeed publisher publisher.Publisher lastPoll time.Time } +type groupResult struct { + numPublished int + pollErr error + pubErr error +} + func NewFeedGroup(scheduledFeeds []feeds.ScheduledFeed, pub publisher.Publisher, initialCutoff time.Duration) *FeedGroup { return &FeedGroup{ @@ -31,32 +43,40 @@ func (fg *FeedGroup) AddFeed(feed feeds.ScheduledFeed) { } func (fg *FeedGroup) Run() { - _, err := fg.PollAndPublish() - if err != nil { - log.Error(err) + result := fg.pollAndPublish() + if result.pollErr != nil { + log.Error(result.pollErr) + } + if result.pubErr != nil { + log.Error(result.pubErr) } } -func (fg *FeedGroup) PollAndPublish() (int, error) { - pkgs, errs := fg.poll() - if len(errs) > 0 { - return 0, errs[0] - } else if len(pkgs) == 0 { - return 0, nil - } +func (fg *FeedGroup) pollAndPublish() groupResult { + result := groupResult{} + pkgs, err := fg.poll() - log.WithField("num_packages", len(pkgs)).Printf("Publishing packages...") - numPublished, err := fg.publishPackages(pkgs) if err != nil { - log.Errorf("Failed to publish %v packages due to err: %v", len(pkgs)-numPublished, err) - return numPublished, err + result.pollErr = err + } + // Return early if no packages to process + if len(pkgs) == 0 { + return result + } + log.WithField("num_packages", len(pkgs)).Printf("Publishing packages...") + numPublished, pubErr := fg.publishPackages(pkgs) + result.numPublished = numPublished + if pubErr != nil { + log.Errorf("Failed to publish %v packages due to err: %v", len(pkgs)-numPublished, pubErr) + result.pubErr = errPub + } else { + log.WithField("num_packages", numPublished).Printf("Successfully published packages") } - log.WithField("num_packages", numPublished).Printf("Successfully published packages") - return numPublished, nil + return result } // Poll fetches the latest packages from each registered feed. -func (fg *FeedGroup) poll() ([]*feeds.Package, []error) { +func (fg *FeedGroup) poll() ([]*feeds.Package, error) { results := make(chan pollResult, len(fg.feeds)) for _, feed := range fg.feeds { go func(feed feeds.ScheduledFeed) { @@ -64,7 +84,7 @@ func (fg *FeedGroup) poll() ([]*feeds.Package, []error) { name: feed.GetName(), feed: feed, } - result.packages, result.err = feed.Latest(fg.lastPoll) + result.packages, result.errs = feed.Latest(fg.lastPoll) results <- result }(feed) } @@ -74,10 +94,9 @@ func (fg *FeedGroup) poll() ([]*feeds.Package, []error) { result := <-results logger := log.WithField("feed", result.name) - if result.err != nil { - logger.WithError(result.err).Error("Error fetching packages") - errs = append(errs, result.err) - continue + for _, err := range result.errs { + logger.WithError(err).Error("Error fetching packages") + errs = append(errs, err) } for _, pkg := range result.packages { log.WithFields(log.Fields{ @@ -89,11 +108,14 @@ func (fg *FeedGroup) poll() ([]*feeds.Package, []error) { packages = append(packages, result.packages...) logger.WithField("num_processed", len(result.packages)).Print("Packages successfully processed") } - + err := errPoll + if len(errs) == 0 { + err = nil + } fg.lastPoll = time.Now().UTC() log.Printf("%d packages processed", len(packages)) - return packages, errs + return packages, err } func (fg *FeedGroup) publishPackages(pkgs []*feeds.Package) (int, error) { diff --git a/feeds/scheduler/feed_group_test.go b/feeds/scheduler/feed_group_test.go index b863cabd..039e071e 100644 --- a/feeds/scheduler/feed_group_test.go +++ b/feeds/scheduler/feed_group_test.go @@ -54,7 +54,7 @@ func TestFeedGroupPollWithErr(t *testing.T) { mockFeeds := []feeds.ScheduledFeed{ mockFeed{ - err: errPackage, + errs: []error{errPackage}, }, mockFeed{ packages: []*feeds.Package{ diff --git a/feeds/scheduler/feed_groups_handler.go b/feeds/scheduler/feed_groups_handler.go index f0bd7a29..707b44e7 100644 --- a/feeds/scheduler/feed_groups_handler.go +++ b/feeds/scheduler/feed_groups_handler.go @@ -3,45 +3,51 @@ package scheduler import ( "fmt" "net/http" + "strings" ) type FeedGroupsHandler struct { feedGroups []*FeedGroup } -type publishResult struct { - numPublished int - err error -} - func NewFeedGroupsHandler(feeds []*FeedGroup) *FeedGroupsHandler { return &FeedGroupsHandler{feedGroups: feeds} } func (srv *FeedGroupsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - var err error - resultChannel := make(chan publishResult, len(srv.feedGroups)) + resultChannel := make(chan groupResult, len(srv.feedGroups)) numPublished := 0 + var pollErr error + var pubErr error + var errStrings []string for _, group := range srv.feedGroups { go func(group *FeedGroup) { - numPublished, err := group.PollAndPublish() - resultChannel <- publishResult{numPublished, err} + result := group.pollAndPublish() + resultChannel <- result }(group) } for range srv.feedGroups { result := <-resultChannel - numPublished += result.numPublished - if result.err != nil { - http.Error(w, result.err.Error(), http.StatusInternalServerError) - return + if result.pollErr != nil { + pollErr = result.pollErr + } + if result.pubErr != nil { + pubErr = result.pubErr } } - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + for _, err := range []error{pollErr, pubErr} { + if err != nil { + errStrings = append(errStrings, err.Error()) + } + } + if len(errStrings) > 0 { + http.Error(w, strings.Join(errStrings, "\n")+fmt.Sprintf("\n%d packages successfully processed, see log for details", + numPublished), + http.StatusInternalServerError) return } - _, err = w.Write([]byte(fmt.Sprintf("%d packages processed", numPublished))) + _, err := w.Write([]byte(fmt.Sprintf("%d packages processed", numPublished))) if err != nil { http.Error(w, "unexpected error during http server write: %w", http.StatusInternalServerError) } diff --git a/feeds/scheduler/mocks.go b/feeds/scheduler/mocks.go index 0b05eb30..aada2bb1 100644 --- a/feeds/scheduler/mocks.go +++ b/feeds/scheduler/mocks.go @@ -9,7 +9,7 @@ import ( type mockFeed struct { packages []*feeds.Package - err error + errs []error options feeds.FeedOptions } @@ -21,8 +21,8 @@ func (feed mockFeed) GetFeedOptions() feeds.FeedOptions { return feed.options } -func (feed mockFeed) Latest(cutoff time.Time) ([]*feeds.Package, error) { - return feed.packages, feed.err +func (feed mockFeed) Latest(cutoff time.Time) ([]*feeds.Package, []error) { + return feed.packages, feed.errs } type mockPublisher struct { diff --git a/feeds/scheduler/scheduler.go b/feeds/scheduler/scheduler.go index 51454829..993734ce 100644 --- a/feeds/scheduler/scheduler.go +++ b/feeds/scheduler/scheduler.go @@ -33,7 +33,7 @@ type pollResult struct { name string feed feeds.ScheduledFeed packages []*feeds.Package - err error + errs []error } // Runs several services for the operation of scheduler, this call is blocking until application exit