Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka integration #983

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open

Conversation

TomBebb
Copy link

@TomBebb TomBebb commented Nov 8, 2024

/claim #936

Fixes #936

Description

Integrates with Kakfa, using its crate.
This is toggled on/off via the KAFKA_TOPIC environment variable.
This uses a background polling iterator to loop through messages, and ingesting messages to the stream with the name of the KAFKA_TOPIC.
Covers all possible config options via environment variables.


This PR has:

  • [ y] been tested to ensure log ingestion and log query works.
  • [ y] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [ y] added documentation for new or modified features or behaviors.

@TomBebb
Copy link
Author

TomBebb commented Nov 8, 2024

Kafka-console-producer:
Screenshot_20241108_120807

Data:
Screenshot_20241108_120822

@nitisht nitisht requested a review from de-sh November 15, 2024 16:36
Ok(())
}

pub async fn setup_integration() -> Result<JoinHandle<()>, KafkaError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine to drop the handle if I am not wrong, why are we returning it?

@@ -106,6 +106,8 @@ path-clean = "1.0.1"
prost = "0.13.3"
prometheus-parse = "0.2.5"
sha2 = "0.10.8"
kafka = "0.10.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using https://github.com/fede1024/rust-rdkafka instead of kafka-rust could allow the implementation to be async

@de-sh
Copy link
Contributor

de-sh commented Nov 19, 2024

@TomBebb can you please document the code a bit more, this will help us understand what we are going to maintain in the long run 😅

The code here is lacking comments, especially regarding the functions and what they are doing as well as the use of good const(The env variables should be named in const as the deepsource action points out) that describes what they are doing. Thank you in advance!

Comment on lines +218 to +223
loop {
while let Some(curr) = stream.next().await {
let msg = curr.unwrap();
ingest_message(&stream_name, msg).await.unwrap();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while let inside a loop? Also, why the unwrap is not after stream.next() and instead on the returned value when ref says to do otherwise?

@@ -14,4 +14,5 @@ parseable
parseable_*
parseable-env-secret
cache
.idea
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove unrelated change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Ok(())
}

pub async fn setup_integration() -> Result<JoinHandle<()>, KafkaError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub async fn setup_integration() -> Result<JoinHandle<()>, KafkaError> {
pub async fn setup_integration() -> Result<(), KafkaError> {

log::info!("Setup kafka integration for {stream_name}");
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

let res = task::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let res = task::spawn(async move {
task::spawn(async move {

}

pub async fn setup_integration() -> Result<JoinHandle<()>, KafkaError> {
let my_res = if let Ok(stream_name) = env::var("KAFKA_TOPIC") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let my_res = if let Ok(stream_name) = env::var("KAFKA_TOPIC") {
if let Ok(stream_name) = env::var("KAFKA_TOPIC") {

Comment on lines +227 to +229
} else {
task::spawn_blocking(|| {})
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else {
task::spawn_blocking(|| {})
};
}

} else {
task::spawn_blocking(|| {})
};
Ok(my_res)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ok(my_res)
Ok(())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feat: Add native Kafka integration for Parseable server
2 participants