Skip to content

Commit

Permalink
feeds/scheduler/feed_group.go: Handle per package errs when publishing
Browse files Browse the repository at this point in the history
publishPackages() no longer immediately returns if a single package
fails to be marshalled or sent to the publisher. Instead the error
is logged and a generic `errPub` is returned, inline with poll().
  • Loading branch information
tom--pollard committed Jun 16, 2021
1 parent cc28d35 commit 4fb53ce
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
28 changes: 16 additions & 12 deletions pkg/scheduler/feed_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,9 @@ func (fg *FeedGroup) pollAndPublish() groupResult {
return result
}
log.WithField("num_packages", len(pkgs)).Printf("Publishing packages...")
numPublished, pubErr := fg.publishPackages(pkgs)
result.numPublished = numPublished
if pubErr != nil {
log.Errorf("Failed to publish %v packages due to err: %v", len(pkgs)-numPublished, pubErr)
result.pubErr = errPub
} else {
log.WithField("num_packages", numPublished).Printf("Successfully published packages")
result.numPublished, result.pubErr = fg.publishPackages(pkgs)
if result.numPublished > 0 {
log.WithField("num_packages", result.numPublished).Printf("Successfully published packages")
}
return result
}
Expand Down Expand Up @@ -117,6 +113,7 @@ func (fg *FeedGroup) poll() ([]*feeds.Package, error) {

func (fg *FeedGroup) publishPackages(pkgs []*feeds.Package) (int, error) {
processed := 0
errs := []error{}
for _, pkg := range pkgs {
log.WithFields(log.Fields{
"name": pkg.Name,
Expand All @@ -125,14 +122,21 @@ func (fg *FeedGroup) publishPackages(pkgs []*feeds.Package) (int, error) {
}).Print("Sending package upstream")
b, err := json.Marshal(pkg)
if err != nil {
log.Printf("Error marshaling package: %#v", pkg)
return processed, err
log.WithField("name", pkg.Name).WithError(err).Error("Error marshaling package")
errs = append(errs, err)
}
if err := (fg.publisher).Send(context.Background(), b); err != nil {
log.Printf("Error sending package to upstream publisher %v", err)
return processed, err
log.WithField("name", pkg.Name).WithError(err).Error("Error sending package to upstream publisher")
errs = append(errs, err)
}
processed++
}
return processed, nil
err := errPub
if len(errs) == 0 {
err = nil
}
if len(pkgs)-processed != 0 {
log.Errorf("Failed to publish %v packages", len(pkgs)-processed)
}
return processed, err
}
6 changes: 6 additions & 0 deletions pkg/scheduler/feed_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func TestFeedGroupPollWithErr(t *testing.T) {
if err == nil {
t.Fatalf("Expected error during polling")
}
if !errors.Is(err, errPoll) {
t.Fatalf("Expected errPoll during polling")
}
if len(pkgs) != 2 {
t.Fatalf("Expected 2 packages alongside errors but found %v", len(pkgs))
}
Expand Down Expand Up @@ -127,4 +130,7 @@ func TestFeedGroupPublishWithErr(t *testing.T) {
if err == nil {
t.Fatalf("publishPackages provided no error when publishing produced an error")
}
if !errors.Is(err, errPub) {
t.Fatalf("Expected errPub during publishing")
}
}

0 comments on commit 4fb53ce

Please sign in to comment.