diff --git a/feeds/feed.go b/feeds/feed.go index 78c5661f..5cb6407d 100644 --- a/feeds/feed.go +++ b/feeds/feed.go @@ -1,12 +1,15 @@ package feeds import ( + "errors" "fmt" "time" ) const schemaVer = "1.0" +var ErrNoPackagesPolled = errors.New("no packages were successfully polled") + type UnsupportedOptionError struct { Option string Feed string @@ -37,6 +40,11 @@ type Package struct { SchemaVer string `json:"schema_ver"` } +type FailedPackagePoll struct { + Name string + Err error +} + func NewPackage(created time.Time, name, version, feed string) *Package { return &Package{ Name: name, diff --git a/feeds/pypi/pypi.go b/feeds/pypi/pypi.go index 4ea62cfd..80c18f56 100644 --- a/feeds/pypi/pypi.go +++ b/feeds/pypi/pypi.go @@ -8,6 +8,8 @@ import ( "strings" "time" + log "github.com/sirupsen/logrus" + "github.com/ossf/package-feeds/events" "github.com/ossf/package-feeds/feeds" "github.com/ossf/package-feeds/utils" @@ -97,28 +99,28 @@ func fetchPackages(baseURL string) ([]*Package, error) { return rssResponse.Packages, nil } -func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, error) { +func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, []feeds.FailedPackagePoll) { responseChannel := make(chan *Response) - errChannel := make(chan error) + errChannel := make(chan feeds.FailedPackagePoll) for _, pkgName := range packageList { go func(pkgName string) { packageDataPath := fmt.Sprintf(packagePathFormat, pkgName) pkgURL, err := utils.URLPathJoin(baseURL, packageDataPath) if err != nil { - errChannel <- err + errChannel <- feeds.FailedPackagePoll{Name: pkgName, Err: err} return } resp, err := httpClient.Get(pkgURL) if err != nil { - errChannel <- err + errChannel <- feeds.FailedPackagePoll{Name: pkgName, Err: err} return } defer resp.Body.Close() err = utils.CheckResponseStatus(resp) if err != nil { - errChannel <- fmt.Errorf("failed to fetch pypi package data: %w", err) + errChannel <- feeds.FailedPackagePoll{Name: pkgName, Err: fmt.Errorf("failed to fetch pypi package data: %w", err)} return } @@ -126,7 +128,7 @@ func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, er reader := utils.NewUTF8OnlyReader(resp.Body) err = xml.NewDecoder(reader).Decode(rssResponse) if err != nil { - errChannel <- err + errChannel <- feeds.FailedPackagePoll{Name: pkgName, Err: err} return } @@ -135,15 +137,16 @@ func fetchCriticalPackages(baseURL string, packageList []string) ([]*Package, er } pkgs := []*Package{} + failedpkgs := []feeds.FailedPackagePoll{} for i := 0; i < len(packageList); i++ { select { case response := <-responseChannel: pkgs = append(pkgs, response.Packages...) case err := <-errChannel: - return nil, err + failedpkgs = append(failedpkgs, err) } } - return pkgs, nil + return pkgs, failedpkgs } type Feed struct { @@ -167,19 +170,30 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) { pkgs := []*feeds.Package{} var pypiPackages []*Package + var failedPackages []feeds.FailedPackagePoll var err error if feed.packages == nil { // Firehose fetch all packages. + // If this fails then we need to return, as it's the only source of + // data. pypiPackages, err = fetchPackages(feed.baseURL) + if err != nil { + return nil, err + } } else { // Fetch specific packages individually from configured packages list. - pypiPackages, err = fetchCriticalPackages(feed.baseURL, *feed.packages) + pypiPackages, failedPackages = fetchCriticalPackages(feed.baseURL, *feed.packages) + for _, pkg := range failedPackages { + // This could be an entry point for a 'lossy' type of event. + log.Errorf("Failed to poll for %s package due to err: %v", pkg.Name, pkg.Err) + } + if len(pypiPackages) == 0 { + // If none of the packages were successfully polled for, return early. + return nil, feeds.ErrNoPackagesPolled + } } - if err != nil { - return nil, err - } for _, pkg := range pypiPackages { pkgName, err := pkg.Name() if err != nil { diff --git a/feeds/pypi/pypi_test.go b/feeds/pypi/pypi_test.go index c062f7c3..5b393a10 100644 --- a/feeds/pypi/pypi_test.go +++ b/feeds/pypi/pypi_test.go @@ -8,7 +8,6 @@ import ( "github.com/ossf/package-feeds/events" "github.com/ossf/package-feeds/feeds" - "github.com/ossf/package-feeds/utils" testutils "github.com/ossf/package-feeds/utils/test" ) @@ -105,7 +104,7 @@ func TestPypiCriticalLatest(t *testing.T) { } } -func TestPypiNotFound(t *testing.T) { +func TestPypiAllNotFound(t *testing.T) { t.Parallel() handlers := map[string]testutils.HTTPHandlerFunc{ @@ -131,11 +130,42 @@ func TestPypiNotFound(t *testing.T) { if err == nil { t.Fatalf("feed.Latest() was successful when an error was expected") } - if !errors.Is(err, utils.ErrUnsuccessfulRequest) { + if !errors.Is(err, feeds.ErrNoPackagesPolled) { t.Fatalf("feed.Latest() returned an error which did not match the expected error") } } +func TestPypiPartialNotFound(t *testing.T) { + t.Parallel() + + handlers := map[string]testutils.HTTPHandlerFunc{ + "/rss/project/foopy/releases.xml": foopyReleasesResponse, + "/rss/project/barpy/releases.xml": testutils.NotFoundHandlerFunc, + } + packages := []string{ + "foopy", + "barpy", + } + srv := testutils.HTTPServerMock(handlers) + + feed, err := New(feeds.FeedOptions{ + Packages: &packages, + }, events.NewNullHandler()) + if err != nil { + t.Fatalf("Failed to create pypi feed: %v", err) + } + feed.baseURL = srv.URL + + cutoff := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + pkgs, err := feed.Latest(cutoff) + if err != nil { + t.Fatalf("feed.Latest() was unsuccessful when an error wasn't expected") + } + if len(pkgs) != 2 { + t.Fatalf("Latest() produced %v packages instead of the expected %v", len(pkgs), 2) + } +} + // Mock data for pypi firehose with all packages. func updatesXMLHandle(w http.ResponseWriter, r *http.Request) { _, err := w.Write([]byte(`