Skip to content

Commit

Permalink
Add support for automatic stream creation (#211)
Browse files Browse the repository at this point in the history
In case the user specifies x-p-stream in the header and the stream
doesn't exist, server failed with stream doesn't exist error.

But in automated environments like kubernetes, it is
important to dynamically create the stream so that Parseable
is able to ingest the log data quickly without too much user
involvement.
  • Loading branch information
nitisht authored Dec 12, 2022
1 parent 9c1c13d commit 33009bf
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 53 deletions.
3 changes: 1 addition & 2 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ impl Event {
let inferred_schema = self.infer_schema()?;

let event = self.get_reader(inferred_schema.clone());

let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?;

if let Some(existing_schema) = stream_schema {
Expand Down Expand Up @@ -211,7 +210,7 @@ impl Event {
// note for functions _schema_with_map and _set_schema_with_map,
// these are to be called while holding a write lock specifically.
// this guarantees two things
// - no other metadata operation can happen inbetween
// - no other metadata operation can happen in between
// - map always have an entry for this stream

let stream_name = &self.stream_name;
Expand Down
14 changes: 10 additions & 4 deletions server/src/handlers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use self::error::{PostError, QueryError};

const PREFIX_TAGS: &str = "x-p-tag-";
const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream-name";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const SEPARATOR: char = '^';

pub async fn query(_req: HttpRequest, json: web::Json<Value>) -> Result<HttpResponse, QueryError> {
Expand All @@ -49,6 +49,9 @@ pub async fn query(_req: HttpRequest, json: web::Json<Value>) -> Result<HttpResp
.map_err(|e| e.into())
}

// Handler for POST /api/v1/ingest
// ingests events into the specified logstream in the header
// if the logstream does not exist, it is created
pub async fn ingest(
req: HttpRequest,
body: web::Json<serde_json::Value>,
Expand All @@ -58,21 +61,24 @@ pub async fn ingest(
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
push_logs(stream_name.to_str().unwrap().to_owned(), req, body).await?;

let str_name = stream_name.to_str().unwrap().to_owned();
super::logstream::create_stream_if_not_exists(str_name.clone()).await;
push_logs(str_name, req, body).await?;
Ok(HttpResponse::Ok().finish())
} else {
Err(PostError::Header(ParseHeaderError::MissingStreamName))
}
}

// Handler for POST /api/v1/logstream/{logstream}
// only ingests events into the specified logstream
// fails if the logstream does not exist
pub async fn post_event(
req: HttpRequest,
body: web::Json<serde_json::Value>,
) -> Result<HttpResponse, PostError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
push_logs(stream_name, req, body).await?;

Ok(HttpResponse::Ok().finish())
}

Expand Down
89 changes: 46 additions & 43 deletions server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,50 +164,9 @@ pub async fn get_alert(req: HttpRequest) -> HttpResponse {
.to_http()
}

pub async fn put(req: HttpRequest) -> HttpResponse {
pub async fn put_stream(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

// fail to proceed if there is an error in log stream name validation
if let Err(e) = validator::stream_name(&stream_name) {
return response::ServerResponse {
msg: format!("failed to create log stream due to err: {}", e),
code: StatusCode::BAD_REQUEST,
}
.to_http();
}

let s3 = S3::new();

// Proceed to create log stream if it doesn't exist
if s3.get_schema(&stream_name).await.is_err() {
// Fail if unable to create log stream on object store backend
if let Err(e) = s3.create_stream(&stream_name).await {
return response::ServerResponse {
msg: format!(
"failed to create log stream {} due to err: {}",
stream_name, e
),
code: StatusCode::INTERNAL_SERVER_ERROR,
}
.to_http();
}
metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default());
return response::ServerResponse {
msg: format!("created log stream {}", stream_name),
code: StatusCode::OK,
}
.to_http();
}

// Error if the log stream already exists
response::ServerResponse {
msg: format!(
"log stream {} already exists, please create a new log stream with unique name",
stream_name
),
code: StatusCode::BAD_REQUEST,
}
.to_http()
create_stream_if_not_exists(stream_name).await
}

pub async fn put_alert(req: HttpRequest, body: web::Json<serde_json::Value>) -> HttpResponse {
Expand Down Expand Up @@ -349,3 +308,47 @@ fn remove_id_from_alerts(value: &mut Value) {
});
}
}

// Check if the stream exists and create a new stream if doesn't exist
pub async fn create_stream_if_not_exists(stream_name: String) -> HttpResponse {
if metadata::STREAM_INFO.stream_exists(stream_name.as_str()) {
// Error if the log stream already exists
response::ServerResponse {
msg: format!(
"log stream {} already exists, please create a new log stream with unique name",
stream_name
),
code: StatusCode::BAD_REQUEST,
}
.to_http();
}

// fail to proceed if invalid stream name
if let Err(e) = validator::stream_name(&stream_name) {
response::ServerResponse {
msg: format!("failed to create log stream due to err: {}", e),
code: StatusCode::BAD_REQUEST,
}
.to_http();
}

// Proceed to create log stream if it doesn't exist
let s3 = S3::new();
if let Err(e) = s3.create_stream(&stream_name).await {
// Fail if unable to create log stream on object store backend
response::ServerResponse {
msg: format!(
"failed to create log stream {} due to err: {}",
stream_name, e
),
code: StatusCode::INTERNAL_SERVER_ERROR,
}
.to_http();
}
metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default());
response::ServerResponse {
msg: format!("created log stream {}", stream_name),
code: StatusCode::OK,
}
.to_http()
}
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) {
// logstream API
web::resource(logstream_path("{logstream}"))
// PUT "/logstream/{logstream}" ==> Create log stream
.route(web::put().to(handlers::logstream::put))
.route(web::put().to(handlers::logstream::put_stream))
// POST "/logstream/{logstream}" ==> Post logs to given log stream
.route(web::post().to(handlers::event::post_event))
// DELETE "/logstream/{logstream}" ==> Delete log stream
Expand Down
7 changes: 6 additions & 1 deletion server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ impl STREAM_INFO {
})
}

pub fn stream_exists(&self, stream_name: &str) -> bool {
let map = self.read().expect(LOCK_EXPECT);
map.contains_key(stream_name)
}

pub fn schema(&self, stream_name: &str) -> Result<Option<Schema>, MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
map.get(stream_name)
Expand Down Expand Up @@ -112,7 +117,7 @@ impl STREAM_INFO {
}

pub async fn load(&self, storage: &impl ObjectStorage) -> Result<(), LoadError> {
// When loading streams this funtion will assume list_streams only returns valid streams.
// When loading streams this function will assume list_streams only returns valid streams.
// a valid stream would have a .schema file.
// .schema file could be empty in that case it will be treated as an uninitialized stream.
// return error in case of an error from object storage itself.
Expand Down
18 changes: 18 additions & 0 deletions server/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
* Parseable Server (C) 2022 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::sync::atomic::{AtomicU64, Ordering};

use serde::{Deserialize, Serialize};
Expand Down
4 changes: 2 additions & 2 deletions server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub fn merge(value: Value, fields: HashMap<String, String>) -> Value {
for (k, v) in fields {
match m.get_mut(&k) {
Some(val) => {
let mut final_val = String::new();
let mut final_val = String::default();
final_val.push_str(val.as_str().unwrap());
final_val.push(',');
final_val.push_str(&v);
Expand Down Expand Up @@ -103,7 +103,7 @@ pub mod header_parsing {
SeperatorInKey(char),
#[error("A value passed in header contains reserved char {0}")]
SeperatorInValue(char),
#[error("Stream name not found in header")]
#[error("Stream name not found in header [x-p-stream]")]
MissingStreamName,
}

Expand Down

0 comments on commit 33009bf

Please sign in to comment.