Skip to content

Commit

Permalink
Move processor builders into internal service (open-telemetry#10782)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This moves the processor builder out of the `processor` package, and
into `service/internal/builders`.
There's no real reason for this struct to be public (folks shouldn't
call it), and making it private will allow us to add profiling support
to it.

<!-- Issue number if applicable -->
#### Link to tracking issue

open-telemetry#10375 (review)
  • Loading branch information
dmathieu authored Aug 22, 2024
1 parent 7cd1579 commit 18d5c02
Show file tree
Hide file tree
Showing 16 changed files with 394 additions and 26 deletions.
25 changes: 25 additions & 0 deletions .chloggen/private-processor-builder.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate processor.Builder, and move it into an internal package of the service module

# One or more tracking issues or pull requests related to the change
issues: [10782]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
1 change: 1 addition & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ require (
go.opentelemetry.io/collector/internal/globalgates v0.107.0 // indirect
go.opentelemetry.io/collector/pdata v1.13.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.107.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.107.0 // indirect
go.opentelemetry.io/collector/semconv v0.107.0 // indirect
go.opentelemetry.io/collector/service v0.107.0 // indirect
go.opentelemetry.io/contrib/config v0.8.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
go.opentelemetry.io/collector/extension v0.107.0
go.opentelemetry.io/collector/pdata v1.13.0
go.opentelemetry.io/collector/pdata/testdata v0.107.0
go.opentelemetry.io/collector/processor v0.107.0
go.opentelemetry.io/collector/receiver v0.107.0
go.opentelemetry.io/collector/receiver/otlpreceiver v0.107.0
go.opentelemetry.io/collector/service v0.107.0
Expand Down Expand Up @@ -82,6 +81,7 @@ require (
go.opentelemetry.io/collector/featuregate v1.13.0 // indirect
go.opentelemetry.io/collector/internal/globalgates v0.107.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.107.0 // indirect
go.opentelemetry.io/collector/processor v0.107.0 // indirect
go.opentelemetry.io/collector/semconv v0.107.0 // indirect
go.opentelemetry.io/contrib/config v0.8.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions internal/e2e/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/internal/sharedcomponent"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/extensions"
Expand Down Expand Up @@ -52,7 +51,6 @@ func Test_ComponentStatusReporting_SharedInstance(t *testing.T) {
ReceiversFactories: map[component.Type]receiver.Factory{
component.MustNewType("test"): newReceiverFactory(),
},
Processors: processortest.NewNopBuilder(),
ExportersConfigs: map[component.ID]component.Config{
component.NewID(nopType): exporterFactory.CreateDefaultConfig(),
},
Expand Down
4 changes: 2 additions & 2 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/otelcol/internal/grpclog"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/service"
)

Expand Down Expand Up @@ -187,7 +186,8 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {

ReceiversConfigs: cfg.Receivers,
ReceiversFactories: factories.Receivers,
Processors: processor.NewBuilder(cfg.Processors, factories.Processors),
ProcessorsConfigs: cfg.Processors,
ProcessorsFactories: factories.Processors,
ExportersConfigs: cfg.Exporters,
ExportersFactories: factories.Exporters,
ConnectorsConfigs: cfg.Connectors,
Expand Down
9 changes: 9 additions & 0 deletions processor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package processor // import "go.opentelemetry.io/collector/processor"

import (
"context"
"errors"
"fmt"

"go.uber.org/zap"
Expand All @@ -13,13 +14,21 @@ import (
"go.opentelemetry.io/collector/consumer"
)

var errNilNextConsumer = errors.New("nil next Consumer")

// Builder processor is a helper struct that given a set of Configs and Factories helps with creating processors.
//
// Deprecated: [v0.108.0] this builder is being internalized within the service module,
// and will be removed soon.
type Builder struct {
cfgs map[component.ID]component.Config
factories map[component.Type]Factory
}

// NewBuilder creates a new processor.Builder to help with creating components form a set of configs and factories.
//
// Deprecated: [v0.108.0] this builder is being internalized within the service module,
// and will be removed soon.
func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder {
return &Builder{cfgs: cfgs, factories: factories}
}
Expand Down
5 changes: 0 additions & 5 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@
package processor // import "go.opentelemetry.io/collector/processor"

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/processor/internal"
)

var (
errNilNextConsumer = errors.New("nil next Consumer")
)

// Traces is a processor that can consume traces.
type Traces = internal.Traces

Expand Down
3 changes: 3 additions & 0 deletions processor/processortest/nop_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type nopProcessor struct {
}

// NewNopBuilder returns a processor.Builder that constructs nop processors.
//
// Deprecated: [v0.108.0] this builder is being internalized within the service module,
// and will be removed soon.
func NewNopBuilder() *processor.Builder {
nopFactory := NewNopFactory()
return processor.NewBuilder(
Expand Down
109 changes: 109 additions & 0 deletions service/internal/builders/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package builders // import "go.opentelemetry.io/collector/service/internal/builders"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processortest"
)

// Processor is an interface that allows using implementations of the builder
// from different packages.
type Processor interface {
CreateTraces(context.Context, processor.Settings, consumer.Traces) (processor.Traces, error)
CreateMetrics(context.Context, processor.Settings, consumer.Metrics) (processor.Metrics, error)
CreateLogs(context.Context, processor.Settings, consumer.Logs) (processor.Logs, error)
Factory(component.Type) component.Factory
}

// ProcessorBuilder processor is a helper struct that given a set of Configs
// and Factories helps with creating processors.
type ProcessorBuilder struct {
cfgs map[component.ID]component.Config
factories map[component.Type]processor.Factory
}

// NewProcessor creates a new ProcessorBuilder to help with creating components form a set of configs and factories.
func NewProcessor(cfgs map[component.ID]component.Config, factories map[component.Type]processor.Factory) *ProcessorBuilder {
return &ProcessorBuilder{cfgs: cfgs, factories: factories}
}

// CreateTraces creates a Traces processor based on the settings and config.
func (b *ProcessorBuilder) CreateTraces(ctx context.Context, set processor.Settings, next consumer.Traces) (processor.Traces, error) {
if next == nil {
return nil, errNilNextConsumer
}
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("processor %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("processor factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.TracesProcessorStability())
return f.CreateTracesProcessor(ctx, set, cfg, next)
}

// CreateMetrics creates a Metrics processor based on the settings and config.
func (b *ProcessorBuilder) CreateMetrics(ctx context.Context, set processor.Settings, next consumer.Metrics) (processor.Metrics, error) {
if next == nil {
return nil, errNilNextConsumer
}
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("processor %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("processor factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.MetricsProcessorStability())
return f.CreateMetricsProcessor(ctx, set, cfg, next)
}

// CreateLogs creates a Logs processor based on the settings and config.
func (b *ProcessorBuilder) CreateLogs(ctx context.Context, set processor.Settings, next consumer.Logs) (processor.Logs, error) {
if next == nil {
return nil, errNilNextConsumer
}
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("processor %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("processor factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.LogsProcessorStability())
return f.CreateLogsProcessor(ctx, set, cfg, next)
}

func (b *ProcessorBuilder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}

// NewNopProcessorConfigsAndFactories returns a configuration and factories that allows building a new nop processor.
func NewNopProcessorConfigsAndFactories() (map[component.ID]component.Config, map[component.Type]processor.Factory) {
nopFactory := processortest.NewNopFactory()
configs := map[component.ID]component.Config{
component.NewID(nopType): nopFactory.CreateDefaultConfig(),
}
factories := map[component.Type]processor.Factory{
nopType: nopFactory,
}

return configs, factories
}
Loading

0 comments on commit 18d5c02

Please sign in to comment.