Skip to content

Commit

Permalink
fix(ingestor): skip self when forwarding put stream request to querier
Browse files Browse the repository at this point in the history
  • Loading branch information
Anirudhxx committed Nov 11, 2024
1 parent 4190f96 commit 66d89f4
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 6 deletions.
17 changes: 14 additions & 3 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub async fn sync_streams_with_ingestors(
headers: HeaderMap,
body: Bytes,
stream_name: &str,
skip_ingestor: Option<String>,
) -> Result<(), StreamError> {
let mut reqwest_headers = http_header::HeaderMap::new();

Expand All @@ -77,7 +78,16 @@ pub async fn sync_streams_with_ingestors(
})?;

let client = reqwest::Client::new();
for ingestor in ingestor_infos.iter() {

let final_ingestor_infos = match skip_ingestor {
None => ingestor_infos,
Some(skip_ingestor) => ingestor_infos
.into_iter()
.filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone()))
.collect::<Vec<IngestorMetadata>>(),
};

for ingestor in final_ingestor_infos {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
Expand Down Expand Up @@ -858,10 +868,11 @@ pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), Stre
}

let url = format!(
"{}{}/logstream/{}",
"{}{}/logstream/{}?skip_ingestors={}",
querier_endpoint,
base_path_without_preceding_slash(),
stream_name
stream_name,
CONFIG.parseable.ingestor_endpoint,
);

let response = client
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?;
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME, None).await?;
}
Ok(())
}
Expand Down
16 changes: 14 additions & 2 deletions server/src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use core::str;
use std::fs;

use actix_web::{web, HttpRequest, Responder};
use bytes::Bytes;
use chrono::Utc;
use http::StatusCode;
use serde::Deserialize;
use tokio::sync::Mutex;

static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());
Expand Down Expand Up @@ -77,12 +79,22 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
}

pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
#[derive(Deserialize)]
pub struct PutStreamQuery {
skip_ingestors: Option<String>,
}

pub async fn put_stream(
req: HttpRequest,
body: Bytes,
info: web::Query<PutStreamQuery>,
) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

let _ = CREATE_STREAM_LOCK.lock().await;
let headers = create_update_stream(&req, &body, &stream_name).await?;
sync_streams_with_ingestors(headers, body, &stream_name).await?;

sync_streams_with_ingestors(headers, body, &stream_name, info.skip_ingestors.clone()).await?;

Ok(("Log stream created", StatusCode::OK))
}
Expand Down

0 comments on commit 66d89f4

Please sign in to comment.