Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(openobserve transform): Add OpenObserve as an officially supported sink #21531

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ Nextbook
Nextcloud
OVH
Odys
openobserve
Openpeak
Oppo
Ovi
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ sinks-logs = [
"sinks-new_relic",
"sinks-papertrail",
"sinks-pulsar",
"sinks-openobserve",
pront marked this conversation as resolved.
Show resolved Hide resolved
"sinks-redis",
"sinks-sematext",
"sinks-socket",
Expand Down Expand Up @@ -772,6 +773,7 @@ sinks-mqtt = ["dep:rumqttc"]
sinks-nats = ["dep:async-nats", "dep:nkeys"]
sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-openobserve = ["sinks-http"]
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"]
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
Expand Down Expand Up @@ -821,6 +823,7 @@ all-integration-tests = [
"nats-integration-tests",
"nginx-integration-tests",
"opentelemetry-integration-tests",
"openobserve-integration-tests",
"postgresql_metrics-integration-tests",
"prometheus-integration-tests",
"pulsar-integration-tests",
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ test-integration: ## Runs all integration tests
test-integration: test-integration-amqp test-integration-appsignal test-integration-aws test-integration-axiom test-integration-azure test-integration-chronicle test-integration-clickhouse
test-integration: test-integration-databend test-integration-docker-logs test-integration-elasticsearch
test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-greptimedb test-integration-humio test-integration-http-client test-integration-influxdb
test-integration: test-integration-kafka test-integration-logstash test-integration-loki test-integration-mongodb test-integration-nats
test-integration: test-integration-kafka test-integration-logstash test-integration-loki test-integration-mongodb test-integration-nats
chaitanya-sistla marked this conversation as resolved.
Show resolved Hide resolved
test-integration: test-integration-nginx test-integration-opentelemetry test-integration-postgres test-integration-prometheus test-integration-pulsar
test-integration: test-integration-redis test-integration-splunk test-integration-dnstap test-integration-datadog-agent test-integration-datadog-logs test-integration-e2e-datadog-logs
test-integration: test-integration-datadog-traces test-integration-shutdown
Expand Down
2 changes: 2 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ pub mod nats;
pub mod new_relic;
#[cfg(feature = "sinks-webhdfs")]
pub mod opendal_common;
#[cfg(feature = "sinks-openobserve")]
pub mod openobserve;
#[cfg(feature = "sinks-papertrail")]
pub mod papertrail;
#[cfg(feature = "sinks-prometheus")]
Expand Down
133 changes: 133 additions & 0 deletions src/sinks/openobserve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use http::Uri;
use vector_lib::codecs::encoding::{FramingConfig, JsonSerializerConfig, SerializerConfig};
use vector_lib::configurable::configurable_component;

use crate::{
codecs::{EncodingConfig, EncodingConfigWithFraming},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
http::{Auth, MaybeAuth},
sinks::{
http::config::{HttpMethod, HttpSinkConfig},
util::{
http::RequestConfig, BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
UriSerde,
},
Healthcheck, VectorSink,
},
tls::TlsConfig,
};

/// Configuration for the `openobserve` sink.
#[configurable_component(sink("openobserve", "Deliver log events to OpenObserve."))]
#[derive(Clone, Debug)]
pub struct OpenObserveConfig {
/// The OpenObserve endpoint to send data to.
#[serde(default = "default_endpoint")]
#[configurable(metadata(docs::examples = "http://localhost:5080/api/default/default/_json"))]
uri: UriSerde,

/// The user and password to authenticate with OpenObserve endpoint.
prabhatsharma marked this conversation as resolved.
Show resolved Hide resolved
#[configurable(derived)]
auth: Option<Auth>,

#[configurable(derived)]
prabhatsharma marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default)]
request: RequestConfig,

/// The compression algorithm to use.
#[configurable(derived)]
#[serde(default = "Compression::gzip_default")]
compression: Compression,

#[configurable(derived)]
prabhatsharma marked this conversation as resolved.
Show resolved Hide resolved
encoding: EncodingConfig,

/// The batch settings for the sink.
#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,

/// Controls how acknowledgements are handled for this sink.
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,

/// The TLS settings for the connection.
///
/// Optional, constrains TLS settings for this sink.
#[configurable(derived)]
tls: Option<TlsConfig>,
}

impl GenerateConfig for OpenObserveConfig {
fn generate_config() -> toml::Value {
toml::from_str(
r#"
uri = "http://localhost:5080/api/default/default/_json"
Auth = "user: [email protected], password: your_ingestion_password"
encoding.codec = "json"
"#,
)
.unwrap()
}
}

fn default_endpoint() -> UriSerde {
UriSerde {
uri: Uri::from_static("http://localhost:5080/api/default/default/_json"),
auth: None,
}
}

/// This sink wraps the Vector HTTP sink to provide official support for OpenObserve's
/// native HTTP ingest endpoint. By doing so, it maintains a distinct configuration for
/// the OpenObserve sink, separate from the Vector HTTP sink. This approach ensures
/// that future changes to OpenObserve's interface can be accommodated without impacting
/// the underlying Vector HTTP sink.
#[async_trait::async_trait]
#[typetag::serde(name = "openobserve")]
impl SinkConfig for OpenObserveConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let request = self.request.clone();
let http_sink_config = HttpSinkConfig {
uri: self.uri.clone(),
compression: self.compression,
auth: self.auth.choose_one(&self.uri.auth)?,
method: HttpMethod::Post,
tls: self.tls.clone(),
request,
acknowledgements: self.acknowledgements,
batch: self.batch,
headers: None,
encoding: EncodingConfigWithFraming::new(
Some(FramingConfig::Bytes),
SerializerConfig::Json(JsonSerializerConfig::default()),
self.encoding.transformer(),
),
payload_prefix: "".into(), // Always newline delimited JSON
payload_suffix: "".into(), // Always newline delimited JSON
};

http_sink_config.build(cx).await
}

fn input(&self) -> Input {
Input::new(self.encoding.config().input_type() & DataType::Log)
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

#[cfg(test)]
mod test {
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<super::OpenObserveConfig>();
}
}
Loading
Loading