From c45591299763363d8e17fe817eb31d9f059cae2c Mon Sep 17 00:00:00 2001 From: Anirudh Chauhan Date: Tue, 19 Nov 2024 12:52:15 +0530 Subject: [PATCH] feat: audit logs Co-authored-by: Akshat Agarwal --- server/src/cli.rs | 93 ++++++++++++++++++++++++-- server/src/handlers/http/middleware.rs | 82 +++++++++++++++++++++++ server/src/rbac/role.rs | 4 +- 3 files changed, 174 insertions(+), 5 deletions(-) diff --git a/server/src/cli.rs b/server/src/cli.rs index 982a2a765..e38ada54d 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -16,8 +16,8 @@ * */ -use clap::{value_parser, Arg, ArgGroup, Command, FromArgMatches}; -use std::path::PathBuf; +use clap::{builder::ValueParser, value_parser, Arg, ArgGroup, Command, FromArgMatches}; +use std::{collections::HashMap, path::PathBuf}; use url::Url; @@ -119,8 +119,28 @@ pub struct Cli { pub trino_auth: Option, pub trino_schema: Option, pub trino_catalog: Option, + + // audit log vars + pub audit_log_target: Option, + pub audit_log_target_username: Option, + pub audit_log_target_password: Option, + pub audit_log_target_tls_verify: bool, + pub audit_log_target_headers: HashMap, } +fn parse_header(header: &str) -> Result, String> { + let mut map = HashMap::new(); + + for pair in header.split(',') { + if let Some((key, value)) = pair.split_once(':') { + map.insert(key.trim().to_string(), value.trim().to_string()); + } else { + return Err(format!("Invalid header format: {}", pair)); + } + } + + Ok(map) +} impl Cli { // identifiers for arguments pub const TLS_CERT: &'static str = "tls-cert-path"; @@ -164,6 +184,13 @@ impl Cli { pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization"; pub const TRINO_SCHEMA: &'static str = "p-trino-schema"; + // audit log env vars + pub const P_AUDIT_LOG_TARGET: &'static str = "p-audit-log-target"; + pub const P_AUDIT_LOG_TARGET_USERNAME: &'static str = "p-audit-log-target-username"; + pub const P_AUDIT_LOG_TARGET_PASSWORD: &'static str = "p-audit-log-target-password"; + pub const P_AUDIT_LOG_TARGET_TLS_VERIFY: &'static str = "p-audit-log-target-tls-verify"; + pub const P_AUDIT_LOG_TARGET_HEADERS: &'static str = "p-audit-log-target-headers"; + pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) } @@ -501,8 +528,50 @@ impl Cli { ArgGroup::new("oidc") .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) .requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) - .multiple(true) - ) + .multiple(true)) + .arg( + Arg::new(Self::P_AUDIT_LOG_TARGET) + .long(Self::P_AUDIT_LOG_TARGET) + .env("P_AUDIT_LOG_TARGET") + .value_name("STRING") + .required(false) + .help("Full endpoint for the audit log target"), + ) + .arg( + Arg::new(Self::P_AUDIT_LOG_TARGET_USERNAME) + .long(Self::P_AUDIT_LOG_TARGET_USERNAME) + .env("P_AUDIT_LOG_TARGET_USERNAME") + .value_name("STRING") + .required(false) + .help("Username for the audit log target"), + ) + .arg( + Arg::new(Self::P_AUDIT_LOG_TARGET_PASSWORD) + .long(Self::P_AUDIT_LOG_TARGET_PASSWORD) + .env("P_AUDIT_LOG_TARGET_PASSWORD") + .value_name("STRING") + .required(false) + .help("Password for the audit log target"), + ) + .arg( + Arg::new(Self::P_AUDIT_LOG_TARGET_TLS_VERIFY) + .long(Self::P_AUDIT_LOG_TARGET_TLS_VERIFY) + .env("P_AUDIT_LOG_TARGET_TLS_VERIFY") + .value_name("BOOL") + .required(false) + .default_value("false") + .value_parser(clap::value_parser!(bool)) + .help("Enable/Disable TLS verification for the audit log target"), + ) + .arg( + Arg::new(Self::P_AUDIT_LOG_TARGET_HEADERS) + .long(Self::P_AUDIT_LOG_TARGET_HEADERS) + .env("P_AUDIT_LOG_TARGET_HEADERS") + .value_name("HEADER") + .required(false) + .value_parser(ValueParser::new(parse_header)) + .help("Comma-separated list of headers for the audit log target"), + ) } } @@ -649,6 +718,22 @@ impl FromArgMatches for Cli { self.ms_clarity_tag = m.get_one::(Self::MS_CLARITY_TAG).cloned(); + self.audit_log_target = m.get_one::(Self::P_AUDIT_LOG_TARGET).cloned(); + self.audit_log_target_username = m + .get_one::(Self::P_AUDIT_LOG_TARGET_USERNAME) + .cloned(); + self.audit_log_target_password = m + .get_one::(Self::P_AUDIT_LOG_TARGET_PASSWORD) + .cloned(); + self.audit_log_target_tls_verify = m + .get_one::(Self::P_AUDIT_LOG_TARGET_TLS_VERIFY) + .cloned() + .expect("default for P_AUDIT_LOG_TARGET_TLS_VERIFY"); + self.audit_log_target_headers = m + .get_one::>(Self::P_AUDIT_LOG_TARGET_HEADERS) + .cloned() + .unwrap_or_default(); + Ok(()) } } diff --git a/server/src/handlers/http/middleware.rs b/server/src/handlers/http/middleware.rs index 2ee7f95e3..93f924447 100644 --- a/server/src/handlers/http/middleware.rs +++ b/server/src/handlers/http/middleware.rs @@ -25,8 +25,15 @@ use actix_web::{ http::header::{self, HeaderName}, Error, Route, }; +use base64::Engine; +use chrono::Utc; use futures_util::future::LocalBoxFuture; +use reqwest::header::{HeaderMap, HeaderValue}; +use reqwest::Client; +use serde_json::json; +use std::collections::HashMap; + use crate::{ handlers::{ AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, @@ -164,6 +171,31 @@ where /* ## Section end */ let auth_result: Result<_, Error> = (self.auth_method)(&mut req, self.action); + let body = json!([ + { + "version": "1.0", + "user-agent":&req + .headers() + .get("user-agent") + .and_then(|value| value.to_str().ok()) + .unwrap_or("unknown"), + "datetime": Utc::now(), + "action":self.action, + "Actor":{ + "type": &req + .headers() + .get("user-agent") + .and_then(|value| value.to_str().ok()) + .unwrap_or("unknown"), + "id": "user123" + }, + "ip-address":&req + .headers() + .get("host") + .and_then(|value| value.to_str().ok()) + .unwrap_or("unknown"), + } + ]); let fut = self.service.call(req); Box::pin(async move { match auth_result? { @@ -175,10 +207,60 @@ where ), _ => {} } + if let Err(err) = send_post_request(body).await { + eprintln!("Error sending POST request: {}", err); + } fut.await }) } } +fn to_header_map(headers: &HashMap) -> Result { + let mut header_map = HeaderMap::new(); + for (key, value) in headers { + let header_name = reqwest::header::HeaderName::from_bytes(key.as_bytes()) + .map_err(|_| format!("Invalid header name: {}", key))?; + let header_value = HeaderValue::from_str(value) + .map_err(|_| format!("Invalid header value for {}: {}", key, value))?; + header_map.insert(header_name, header_value); + } + Ok(header_map) +} +async fn send_post_request(body: serde_json::Value) -> Result<(), reqwest::Error> { + let client = Client::new(); + match CONFIG.parseable.audit_log_target.as_deref() { + Some(_target) => { + let audit_log_auth_token = format!( + "Basic {}", + base64::prelude::BASE64_STANDARD.encode(format!( + "{}:{}", + CONFIG + .parseable + .audit_log_target_username + .as_deref() + .unwrap(), + CONFIG + .parseable + .audit_log_target_password + .as_deref() + .unwrap() + )) + ); + let headers = to_header_map(&CONFIG.parseable.audit_log_target_headers) + .expect("Failed to convert audit_log_target_headers to HeaderMap"); + let body = body; + let target_url = CONFIG.parseable.audit_log_target.as_ref().unwrap(); + let _response = client + .post(target_url) + .headers(headers) + .header(reqwest::header::AUTHORIZATION, &audit_log_auth_token) + .json(&body) + .send() + .await?; + Ok(()) + } + None => Ok(()), + } +} pub fn auth_no_context(req: &mut ServiceRequest, action: Action) -> Result { let creds = extract_session_key(req); diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 0e8f1ab24..fedcd7b80 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -17,8 +17,10 @@ * */ +use serde::Serialize; + // Represents actions that corresponds to an api -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize)] pub enum Action { Ingest, Query,