diff --git a/pkg/scheduler/feed_group.go b/pkg/scheduler/feed_group.go index c2eda8b6..efc89029 100644 --- a/pkg/scheduler/feed_group.go +++ b/pkg/scheduler/feed_group.go @@ -61,13 +61,9 @@ func (fg *FeedGroup) pollAndPublish() groupResult { return result } log.WithField("num_packages", len(pkgs)).Printf("Publishing packages...") - numPublished, pubErr := fg.publishPackages(pkgs) - result.numPublished = numPublished - if pubErr != nil { - log.Errorf("Failed to publish %v packages due to err: %v", len(pkgs)-numPublished, pubErr) - result.pubErr = errPub - } else { - log.WithField("num_packages", numPublished).Printf("Successfully published packages") + result.numPublished, result.pubErr = fg.publishPackages(pkgs) + if result.numPublished > 0 { + log.WithField("num_packages", result.numPublished).Printf("Successfully published packages") } return result } @@ -117,6 +113,7 @@ func (fg *FeedGroup) poll() ([]*feeds.Package, error) { func (fg *FeedGroup) publishPackages(pkgs []*feeds.Package) (int, error) { processed := 0 + errs := []error{} for _, pkg := range pkgs { log.WithFields(log.Fields{ "name": pkg.Name, @@ -125,14 +122,21 @@ func (fg *FeedGroup) publishPackages(pkgs []*feeds.Package) (int, error) { }).Print("Sending package upstream") b, err := json.Marshal(pkg) if err != nil { - log.Printf("Error marshaling package: %#v", pkg) - return processed, err + log.WithField("name", pkg.Name).WithError(err).Error("Error marshaling package") + errs = append(errs, err) } if err := (fg.publisher).Send(context.Background(), b); err != nil { - log.Printf("Error sending package to upstream publisher %v", err) - return processed, err + log.WithField("name", pkg.Name).WithError(err).Error("Error sending package to upstream publisher") + errs = append(errs, err) } processed++ } - return processed, nil + err := errPub + if len(errs) == 0 { + err = nil + } + if len(pkgs)-processed != 0 { + log.Errorf("Failed to publish %v packages", len(pkgs)-processed) + } + return processed, err } diff --git a/pkg/scheduler/feed_group_test.go b/pkg/scheduler/feed_group_test.go index cd15b9b1..836ed0bc 100644 --- a/pkg/scheduler/feed_group_test.go +++ b/pkg/scheduler/feed_group_test.go @@ -74,6 +74,9 @@ func TestFeedGroupPollWithErr(t *testing.T) { if err == nil { t.Fatalf("Expected error during polling") } + if !errors.Is(err, errPoll) { + t.Fatalf("Expected errPoll during polling") + } if len(pkgs) != 2 { t.Fatalf("Expected 2 packages alongside errors but found %v", len(pkgs)) } @@ -127,4 +130,7 @@ func TestFeedGroupPublishWithErr(t *testing.T) { if err == nil { t.Fatalf("publishPackages provided no error when publishing produced an error") } + if !errors.Is(err, errPub) { + t.Fatalf("Expected errPub during publishing") + } }