Skip to content

Commit

Permalink
Add ingestion API (#210)
Browse files Browse the repository at this point in the history
Add another API to ingest log data. This will push logs 
to stream name present in header `x-p-stream-name`
  • Loading branch information
123vivekr authored Dec 11, 2022
1 parent 77cf001 commit 9c1c13d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
31 changes: 29 additions & 2 deletions server/src/handlers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ use crate::event;
use crate::query::Query;
use crate::response::QueryResponse;
use crate::s3::S3;
use crate::utils::header_parsing::collect_labelled_headers;
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
use crate::utils::{self, flatten_json_body, merge};

use self::error::{PostError, QueryError};

const PREFIX_TAGS: &str = "x-p-tag-";
const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream-name";
const SEPARATOR: char = '^';

pub async fn query(_req: HttpRequest, json: web::Json<Value>) -> Result<HttpResponse, QueryError> {
Expand All @@ -48,12 +49,38 @@ pub async fn query(_req: HttpRequest, json: web::Json<Value>) -> Result<HttpResp
.map_err(|e| e.into())
}

pub async fn ingest(
req: HttpRequest,
body: web::Json<serde_json::Value>,
) -> Result<HttpResponse, PostError> {
if let Some((_, stream_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
push_logs(stream_name.to_str().unwrap().to_owned(), req, body).await?;

Ok(HttpResponse::Ok().finish())
} else {
Err(PostError::Header(ParseHeaderError::MissingStreamName))
}
}

pub async fn post_event(
req: HttpRequest,
body: web::Json<serde_json::Value>,
) -> Result<HttpResponse, PostError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
push_logs(stream_name, req, body).await?;

Ok(HttpResponse::Ok().finish())
}

async fn push_logs(
stream_name: String,
req: HttpRequest,
body: web::Json<serde_json::Value>,
) -> Result<(), PostError> {
let tags = HashMap::from([(
"p_tags".to_string(),
collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?,
Expand Down Expand Up @@ -89,7 +116,7 @@ pub async fn post_event(
event.process().await?;
}

Ok(HttpResponse::Ok().finish())
Ok(())
}

pub mod error {
Expand Down
6 changes: 6 additions & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) {
web::scope(&base_path())
// POST "/query" ==> Get results of the SQL query passed in request body
.service(web::resource(query_path()).route(web::post().to(handlers::event::query)))
// POST "/ingest" ==> Post logs to given log stream based on header
.service(web::resource(ingest_path()).route(web::post().to(handlers::event::ingest)))
.service(
// logstream API
web::resource(logstream_path("{logstream}"))
Expand Down Expand Up @@ -341,6 +343,10 @@ fn query_path() -> String {
"/query".to_string()
}

fn ingest_path() -> String {
"/ingest".to_string()
}

fn alert_path(stream_name: &str) -> String {
format!("{}/alert", logstream_path(stream_name))
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub mod header_parsing {
SeperatorInKey(char),
#[error("A value passed in header contains reserved char {0}")]
SeperatorInValue(char),
#[error("Stream name not found in header")]
MissingStreamName,
}

impl ResponseError for ParseHeaderError {
Expand Down

0 comments on commit 9c1c13d

Please sign in to comment.