Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: clean up parts of the codebase #981

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub struct Report {
memory_total_bytes: u64,
platform: String,
storage_mode: String,
server_mode: String,
server_mode: Mode,
version: String,
commit_hash: String,
active_ingestors: u64,
Expand Down Expand Up @@ -111,7 +111,7 @@ impl Report {
memory_total_bytes: mem_total,
platform: platform().to_string(),
storage_mode: CONFIG.get_storage_mode_string().to_string(),
server_mode: CONFIG.parseable.mode.to_string(),
server_mode: CONFIG.parseable.mode,
version: current().released_version.to_string(),
commit_hash: current().commit_hash,
active_ingestors: ingestor_metrics.0,
Expand Down
4 changes: 2 additions & 2 deletions server/src/catalog.rs → server/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ async fn create_manifest(
}

pub async fn remove_manifest_from_snapshot(
storage: Arc<dyn ObjectStorage + Send>,
storage: Arc<dyn ObjectStorage>,
stream_name: &str,
dates: Vec<String>,
) -> Result<Option<String>, ObjectStorageError> {
Expand All @@ -343,7 +343,7 @@ pub async fn remove_manifest_from_snapshot(
}

pub async fn get_first_event(
storage: Arc<dyn ObjectStorage + Send>,
storage: Arc<dyn ObjectStorage>,
stream_name: &str,
dates: Vec<String>,
) -> Result<Option<String>, ObjectStorageError> {
Expand Down
File renamed without changes.
291 changes: 87 additions & 204 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
use super::ingest::ingester_logstream;
use super::ingest::ingester_rbac;
use super::ingest::ingester_role;
use super::server::Server;
use super::IngestorMetadata;
use super::OpenIdClient;
use super::ParseableServer;
use crate::analytics;
use crate::banner;
use crate::handlers::airplane;
use crate::handlers::http::health_check;
use crate::handlers::http::ingest;
use crate::handlers::http::logstream;
use crate::handlers::http::middleware::DisAllowRootUser;
Expand All @@ -28,180 +33,35 @@ use crate::localcache::LocalCacheManager;
use crate::metrics;
use crate::migration;
use crate::migration::metadata_migration::migrate_ingester_metadata;
use crate::rbac;
use crate::rbac::role::Action;
use crate::storage;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::object_storage::parseable_json_path;
use crate::storage::staging;
use crate::storage::ObjectStorageError;
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use crate::sync;

use std::sync::Arc;

use super::ingest::ingester_logstream;
use super::ingest::ingester_rbac;
use super::ingest::ingester_role;
use super::server::Server;
use super::ssl_acceptor::get_ssl_acceptor;
use super::IngestorMetadata;
use super::OpenIdClient;
use super::ParseableServer;

use crate::{
handlers::http::{base_path, cross_origin_config},
option::CONFIG,
};
use crate::{handlers::http::base_path, option::CONFIG};
use actix_web::body::MessageBody;
use actix_web::middleware::from_fn;
use actix_web::web;
use actix_web::web::resource;
use actix_web::Scope;
use actix_web::{web, App, HttpServer};
use actix_web_prometheus::PrometheusMetrics;
use anyhow::anyhow;
use async_trait::async_trait;
use base64::Engine;
use bytes::Bytes;
use once_cell::sync::Lazy;
use relative_path::RelativePathBuf;
use serde_json::Value;
use tokio::sync::{oneshot, Mutex};

/// ! have to use a guard before using it
pub static INGESTOR_META: Lazy<IngestorMetadata> =
Lazy::new(|| staging::get_ingestor_info().expect("Should Be valid Json"));

#[derive(Default)]
pub struct IngestServer;

#[async_trait(?Send)]
#[async_trait]
impl ParseableServer for IngestServer {
// we dont need oidc client here its just here to satisfy the trait
async fn start(
&self,
prometheus: PrometheusMetrics,
_oidc_client: Option<crate::oidc::OpenidConfig>,
) -> anyhow::Result<()> {
// set the ingestor metadata
self.set_ingestor_metadata().await?;

// get the ssl stuff
let ssl = get_ssl_acceptor(
&CONFIG.parseable.tls_cert_path,
&CONFIG.parseable.tls_key_path,
&CONFIG.parseable.trusted_ca_certs_path,
)?;

// fn that creates the app
let create_app_fn = move || {
App::new()
.wrap(prometheus.clone())
.configure(|config| IngestServer::configure_routes(config, None))
.wrap(from_fn(health_check::check_shutdown_middleware))
.wrap(actix_web::middleware::Logger::default())
.wrap(actix_web::middleware::Compress::default())
.wrap(cross_origin_config())
};

// Create a channel to trigger server shutdown
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger)));

// Clone the shutdown signal for the signal handler
let shutdown_signal = server_shutdown_signal.clone();

// Spawn the signal handler task
let signal_task = tokio::spawn(async move {
health_check::handle_signals(shutdown_signal).await;
log::info!("Received shutdown signal, notifying server to shut down...");
});

// Create the HTTP server
let http_server = HttpServer::new(create_app_fn)
.workers(num_cpus::get())
.shutdown_timeout(60);
de-sh marked this conversation as resolved.
Show resolved Hide resolved

// Start the server with or without TLS
let srv = if let Some(config) = ssl {
http_server
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
.run()
} else {
http_server.bind(&CONFIG.parseable.address)?.run()
};

// Graceful shutdown handling
let srv_handle = srv.handle();

let sync_task = tokio::spawn(async move {
// Wait for the shutdown signal
let _ = shutdown_rx.await;

// Perform S3 sync and wait for completion
log::info!("Starting data sync to S3...");
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
log::warn!("Failed to sync local data with object store. {:?}", e);
} else {
log::info!("Successfully synced all data to S3.");
}

// Initiate graceful shutdown
log::info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
});

// Await the HTTP server to run
let server_result = srv.await;

// Await the signal handler to ensure proper cleanup
if let Err(e) = signal_task.await {
log::error!("Error in signal handler: {:?}", e);
}

// Wait for the sync task to complete before exiting
if let Err(e) = sync_task.await {
log::error!("Error in sync task: {:?}", e);
} else {
log::info!("Sync task completed successfully.");
}

// Return the result of the server
server_result?;

Ok(())
}

/// implement the init method will just invoke the initialize method
async fn init(&self) -> anyhow::Result<()> {
self.validate()?;

// check for querier state. Is it there, or was it there in the past
let parseable_json = self.check_querier_state().await?;
// to get the .parseable.json file in staging
self.validate_credentials().await?;
let metadata = storage::resolve_parseable_metadata(&parseable_json).await?;

banner::print(&CONFIG, &metadata).await;
rbac::map::init(&metadata);
// set the info in the global metadata
metadata.set_global();
self.initialize().await
}

fn validate(&self) -> anyhow::Result<()> {
if CONFIG.get_storage_mode_string() == "Local drive" {
return Err(anyhow::Error::msg(
// Error Message can be better
"Ingest Server cannot be started in local storage mode. Please start the server in a supported storage mode.",
));
}

Ok(())
}
}

impl IngestServer {
// configure the api routes
fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option<OpenIdClient>) {
config
Expand All @@ -221,6 +81,83 @@ impl IngestServer {
.service(Server::get_ingest_otel_factory());
}

async fn load_metadata(&self) -> anyhow::Result<Option<Bytes>> {
// parseable can't use local storage for persistence when running a distributed setup
if CONFIG.get_storage_mode_string() == "Local drive" {
return Err(anyhow::Error::msg(
"This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.",
));
}

// check for querier state. Is it there, or was it there in the past
let parseable_json = self.check_querier_state().await?;
// to get the .parseable.json file in staging
self.validate_credentials().await?;

Ok(parseable_json)
}

/// configure the server and start an instance to ingest data
async fn init(&self) -> anyhow::Result<()> {
// ! Undefined and Untested behaviour
if let Some(cache_manager) = LocalCacheManager::global() {
cache_manager
.validate(CONFIG.parseable.local_cache_size)
.await?;
};

let prometheus = metrics::build_metrics_handler();
CONFIG.storage().register_store_metrics(&prometheus);

migration::run_migration(&CONFIG).await?;

let (localsync_handler, mut localsync_outbox, localsync_inbox) =
sync::run_local_sync().await;
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync().await;

tokio::spawn(airplane::server());

// set the ingestor metadata
self.set_ingestor_metadata().await?;

// Ingestors shouldn't have to deal with OpenId auth flow
let app = self.start(prometheus, None);

tokio::pin!(app);
loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
if let Err(e) = localsync_handler.await {
log::error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_sync_handler.await {
log::error!("Error joining remote_sync_handler: {:?}", e);
}
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
},
_ = &mut remote_sync_outbox => {
// remote_sync failed, this is recoverable by just starting remote_sync thread again
if let Err(e) = remote_sync_handler.await {
log::error!("Error joining remote_sync_handler: {:?}", e);
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
}

};
}
}
}

impl IngestServer {
fn analytics_factory() -> Scope {
web::scope("/analytics").service(
// GET "/analytics" ==> Get analytics data
Expand Down Expand Up @@ -459,58 +396,4 @@ impl IngestServer {

Ok(())
}

async fn initialize(&self) -> anyhow::Result<()> {
// ! Undefined and Untested behaviour
if let Some(cache_manager) = LocalCacheManager::global() {
cache_manager
.validate(CONFIG.parseable.local_cache_size)
.await?;
};

let prometheus = metrics::build_metrics_handler();
CONFIG.storage().register_store_metrics(&prometheus);

migration::run_migration(&CONFIG).await?;

let (localsync_handler, mut localsync_outbox, localsync_inbox) =
sync::run_local_sync().await;
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync().await;

tokio::spawn(airplane::server());

let app = self.start(prometheus, CONFIG.parseable.openid.clone());

tokio::pin!(app);
loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
if let Err(e) = localsync_handler.await {
log::error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_sync_handler.await {
log::error!("Error joining remote_sync_handler: {:?}", e);
}
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
},
_ = &mut remote_sync_outbox => {
// remote_sync failed, this is recoverable by just starting remote_sync thread again
if let Err(e) = remote_sync_handler.await {
log::error!("Error joining remote_sync_handler: {:?}", e);
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
}

};
}
}
}
Loading
Loading