Skip to content

Commit

Permalink
feat: audit logs
Browse files Browse the repository at this point in the history
Co-authored-by: Akshat Agarwal <[email protected]>
  • Loading branch information
Anirudhxx and akshatagarwl committed Nov 19, 2024
1 parent e105eeb commit c455912
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 5 deletions.
93 changes: 89 additions & 4 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*
*/

use clap::{value_parser, Arg, ArgGroup, Command, FromArgMatches};
use std::path::PathBuf;
use clap::{builder::ValueParser, value_parser, Arg, ArgGroup, Command, FromArgMatches};
use std::{collections::HashMap, path::PathBuf};

use url::Url;

Expand Down Expand Up @@ -119,8 +119,28 @@ pub struct Cli {
pub trino_auth: Option<String>,
pub trino_schema: Option<String>,
pub trino_catalog: Option<String>,

// audit log vars
pub audit_log_target: Option<String>,
pub audit_log_target_username: Option<String>,
pub audit_log_target_password: Option<String>,
pub audit_log_target_tls_verify: bool,
pub audit_log_target_headers: HashMap<String, String>,
}

fn parse_header(header: &str) -> Result<HashMap<String, String>, String> {
let mut map = HashMap::new();

for pair in header.split(',') {
if let Some((key, value)) = pair.split_once(':') {
map.insert(key.trim().to_string(), value.trim().to_string());
} else {
return Err(format!("Invalid header format: {}", pair));
}
}

Ok(map)
}
impl Cli {
// identifiers for arguments
pub const TLS_CERT: &'static str = "tls-cert-path";
Expand Down Expand Up @@ -164,6 +184,13 @@ impl Cli {
pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization";
pub const TRINO_SCHEMA: &'static str = "p-trino-schema";

// audit log env vars
pub const P_AUDIT_LOG_TARGET: &'static str = "p-audit-log-target";
pub const P_AUDIT_LOG_TARGET_USERNAME: &'static str = "p-audit-log-target-username";
pub const P_AUDIT_LOG_TARGET_PASSWORD: &'static str = "p-audit-log-target-password";
pub const P_AUDIT_LOG_TARGET_TLS_VERIFY: &'static str = "p-audit-log-target-tls-verify";
pub const P_AUDIT_LOG_TARGET_HEADERS: &'static str = "p-audit-log-target-headers";

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_staging_path.join(stream_name)
}
Expand Down Expand Up @@ -501,8 +528,50 @@ impl Cli {
ArgGroup::new("oidc")
.args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER])
.requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER])
.multiple(true)
)
.multiple(true))
.arg(
Arg::new(Self::P_AUDIT_LOG_TARGET)
.long(Self::P_AUDIT_LOG_TARGET)
.env("P_AUDIT_LOG_TARGET")
.value_name("STRING")
.required(false)
.help("Full endpoint for the audit log target"),
)
.arg(
Arg::new(Self::P_AUDIT_LOG_TARGET_USERNAME)
.long(Self::P_AUDIT_LOG_TARGET_USERNAME)
.env("P_AUDIT_LOG_TARGET_USERNAME")
.value_name("STRING")
.required(false)
.help("Username for the audit log target"),
)
.arg(
Arg::new(Self::P_AUDIT_LOG_TARGET_PASSWORD)
.long(Self::P_AUDIT_LOG_TARGET_PASSWORD)
.env("P_AUDIT_LOG_TARGET_PASSWORD")
.value_name("STRING")
.required(false)
.help("Password for the audit log target"),
)
.arg(
Arg::new(Self::P_AUDIT_LOG_TARGET_TLS_VERIFY)
.long(Self::P_AUDIT_LOG_TARGET_TLS_VERIFY)
.env("P_AUDIT_LOG_TARGET_TLS_VERIFY")
.value_name("BOOL")
.required(false)
.default_value("false")
.value_parser(clap::value_parser!(bool))
.help("Enable/Disable TLS verification for the audit log target"),
)
.arg(
Arg::new(Self::P_AUDIT_LOG_TARGET_HEADERS)
.long(Self::P_AUDIT_LOG_TARGET_HEADERS)
.env("P_AUDIT_LOG_TARGET_HEADERS")
.value_name("HEADER")
.required(false)
.value_parser(ValueParser::new(parse_header))
.help("Comma-separated list of headers for the audit log target"),
)
}
}

Expand Down Expand Up @@ -649,6 +718,22 @@ impl FromArgMatches for Cli {

self.ms_clarity_tag = m.get_one::<String>(Self::MS_CLARITY_TAG).cloned();

self.audit_log_target = m.get_one::<String>(Self::P_AUDIT_LOG_TARGET).cloned();
self.audit_log_target_username = m
.get_one::<String>(Self::P_AUDIT_LOG_TARGET_USERNAME)
.cloned();
self.audit_log_target_password = m
.get_one::<String>(Self::P_AUDIT_LOG_TARGET_PASSWORD)
.cloned();
self.audit_log_target_tls_verify = m
.get_one::<bool>(Self::P_AUDIT_LOG_TARGET_TLS_VERIFY)
.cloned()
.expect("default for P_AUDIT_LOG_TARGET_TLS_VERIFY");
self.audit_log_target_headers = m
.get_one::<HashMap<String, String>>(Self::P_AUDIT_LOG_TARGET_HEADERS)
.cloned()
.unwrap_or_default();

Ok(())
}
}
82 changes: 82 additions & 0 deletions server/src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@ use actix_web::{
http::header::{self, HeaderName},
Error, Route,
};
use base64::Engine;
use chrono::Utc;
use futures_util::future::LocalBoxFuture;

use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::Client;
use serde_json::json;
use std::collections::HashMap;

use crate::{
handlers::{
AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS,
Expand Down Expand Up @@ -164,6 +171,31 @@ where
/* ## Section end */

let auth_result: Result<_, Error> = (self.auth_method)(&mut req, self.action);
let body = json!([
{
"version": "1.0",
"user-agent":&req
.headers()
.get("user-agent")
.and_then(|value| value.to_str().ok())
.unwrap_or("unknown"),
"datetime": Utc::now(),
"action":self.action,
"Actor":{
"type": &req
.headers()
.get("user-agent")
.and_then(|value| value.to_str().ok())
.unwrap_or("unknown"),
"id": "user123"
},
"ip-address":&req
.headers()
.get("host")
.and_then(|value| value.to_str().ok())
.unwrap_or("unknown"),
}
]);
let fut = self.service.call(req);
Box::pin(async move {
match auth_result? {
Expand All @@ -175,10 +207,60 @@ where
),
_ => {}
}
if let Err(err) = send_post_request(body).await {
eprintln!("Error sending POST request: {}", err);
}
fut.await
})
}
}
fn to_header_map(headers: &HashMap<String, String>) -> Result<HeaderMap, String> {
let mut header_map = HeaderMap::new();
for (key, value) in headers {
let header_name = reqwest::header::HeaderName::from_bytes(key.as_bytes())
.map_err(|_| format!("Invalid header name: {}", key))?;
let header_value = HeaderValue::from_str(value)
.map_err(|_| format!("Invalid header value for {}: {}", key, value))?;
header_map.insert(header_name, header_value);
}
Ok(header_map)
}
async fn send_post_request(body: serde_json::Value) -> Result<(), reqwest::Error> {
let client = Client::new();
match CONFIG.parseable.audit_log_target.as_deref() {
Some(_target) => {
let audit_log_auth_token = format!(
"Basic {}",
base64::prelude::BASE64_STANDARD.encode(format!(
"{}:{}",
CONFIG
.parseable
.audit_log_target_username
.as_deref()
.unwrap(),
CONFIG
.parseable
.audit_log_target_password
.as_deref()
.unwrap()
))
);
let headers = to_header_map(&CONFIG.parseable.audit_log_target_headers)
.expect("Failed to convert audit_log_target_headers to HeaderMap");
let body = body;
let target_url = CONFIG.parseable.audit_log_target.as_ref().unwrap();
let _response = client
.post(target_url)
.headers(headers)
.header(reqwest::header::AUTHORIZATION, &audit_log_auth_token)
.json(&body)
.send()
.await?;
Ok(())
}
None => Ok(()),
}
}

pub fn auth_no_context(req: &mut ServiceRequest, action: Action) -> Result<rbac::Response, Error> {
let creds = extract_session_key(req);
Expand Down
4 changes: 3 additions & 1 deletion server/src/rbac/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*
*/

use serde::Serialize;

// Represents actions that corresponds to an api
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize)]
pub enum Action {
Ingest,
Query,
Expand Down

0 comments on commit c455912

Please sign in to comment.