Skip to content

Commit

Permalink
refactor: use serde
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Nov 20, 2024
1 parent a794eb8 commit ea95a4b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 65 deletions.
4 changes: 2 additions & 2 deletions server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub struct Report {
memory_total_bytes: u64,
platform: String,
storage_mode: String,
server_mode: String,
server_mode: Mode,
version: String,
commit_hash: String,
active_ingestors: u64,
Expand Down Expand Up @@ -111,7 +111,7 @@ impl Report {
memory_total_bytes: mem_total,
platform: platform().to_string(),
storage_mode: CONFIG.get_storage_mode_string().to_string(),
server_mode: CONFIG.parseable.mode.to_string(),
server_mode: CONFIG.parseable.mode,
version: current().released_version.to_string(),
commit_hash: current().commit_hash,
active_ingestors: ingestor_metrics.0,
Expand Down
27 changes: 6 additions & 21 deletions server/src/migration/metadata_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use base64::Engine;
use rand::distributions::DistString;
use serde_json::{Map, Value as JsonValue};
use serde_json::{json, Map, Value as JsonValue};

use crate::{
handlers::http::modal::IngestorMetadata,
Expand Down Expand Up @@ -48,10 +48,7 @@ pub fn v1_v3(mut storage_metadata: JsonValue) -> JsonValue {
metadata.insert("users".to_string(), JsonValue::Array(vec![]));
metadata.insert("streams".to_string(), JsonValue::Array(vec![]));
metadata.insert("roles".to_string(), JsonValue::Array(vec![]));
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode));
storage_metadata
}

Expand Down Expand Up @@ -112,10 +109,7 @@ pub fn v2_v3(mut storage_metadata: JsonValue) -> JsonValue {
"roles".to_string(),
JsonValue::Object(Map::from_iter(privileges_map)),
);
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode));
storage_metadata
}

Expand All @@ -126,10 +120,7 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue {
let sm = metadata.get("server_mode");

if sm.is_none() || sm.unwrap().as_str().unwrap() == "All" {
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode));
}

let roles = metadata.get_mut("roles").unwrap().as_object_mut().unwrap();
Expand Down Expand Up @@ -157,10 +148,7 @@ pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue {

match metadata.get("server_mode") {
None => {
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode));
}
Some(JsonValue::String(mode)) => match mode.as_str() {
"Query" => {
Expand All @@ -170,10 +158,7 @@ pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue {
);
}
"All" => {
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode));
metadata.insert(
"querier_endpoint".to_string(),
JsonValue::String(CONFIG.parseable.address.clone()),
Expand Down
29 changes: 2 additions & 27 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use crate::storage::{
use bytes::Bytes;
use clap::error::ErrorKind;
use clap::{command, Args, Command, FromArgMatches};
use core::fmt;
use once_cell::sync::Lazy;
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
use serde::{Deserialize, Serialize};
use std::env;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -245,39 +245,14 @@ Join the community at https://logg.ing/community.
.subcommands([local, s3, azureblob])
}

#[derive(Debug, Default, Eq, PartialEq)]
#[derive(Debug, Default, Eq, PartialEq, Clone, Copy, Serialize, Deserialize)]
pub enum Mode {
Query,
Ingest,
#[default]
All,
}

impl Mode {
pub fn to_str(&self) -> &str {
match self {
Mode::Query => "Query",
Mode::Ingest => "Ingest",
Mode::All => "All",
}
}

pub fn from_string(mode: &str) -> Result<Self, String> {
match mode {
"Query" => Ok(Mode::Query),
"Ingest" => Ok(Mode::Ingest),
"All" => Ok(Mode::All),
x => Err(format!("Trying to Parse Invalid mode: {}", x)),
}
}
}

impl fmt::Display for Mode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.to_str())
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
pub enum Compression {
Expand Down
25 changes: 10 additions & 15 deletions server/src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct StorageMetadata {
pub deployment_id: uid::Uid,
pub users: Vec<User>,
pub streams: Vec<String>,
pub server_mode: String,
pub server_mode: Mode,
#[serde(default)]
pub roles: HashMap<String, Vec<DefaultPrivilege>>,
#[serde(default)]
Expand Down Expand Up @@ -93,7 +93,7 @@ impl StorageMetadata {
staging: CONFIG.staging_dir().to_path_buf(),
storage: CONFIG.storage().get_endpoint(),
deployment_id: uid::gen(),
server_mode: CONFIG.parseable.mode.to_string(),
server_mode: CONFIG.parseable.mode,
users: Vec::new(),
streams: Vec::new(),
roles: HashMap::default(),
Expand Down Expand Up @@ -140,8 +140,7 @@ pub async fn resolve_parseable_metadata(
// overwrite staging anyways so that it matches remote in case of any divergence
overwrite_staging = true;
if CONFIG.parseable.mode == Mode::All {
standalone_after_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid here"))
?;
standalone_after_distributed(metadata.server_mode)?;
}
Ok(metadata)
},
Expand All @@ -151,10 +150,7 @@ pub async fn resolve_parseable_metadata(
EnvChange::NewStaging(mut metadata) => {
// if server is started in ingest mode,we need to make sure that query mode has been started
// i.e the metadata is updated to reflect the server mode = Query
if Mode::from_string(&metadata.server_mode)
.map_err(ObjectStorageError::Custom)
?
== Mode::All && CONFIG.parseable.mode == Mode::Ingest {
if metadata.server_mode== Mode::All && CONFIG.parseable.mode == Mode::Ingest {
Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet")
} else {
create_dir_all(CONFIG.staging_dir())?;
Expand All @@ -165,21 +161,21 @@ pub async fn resolve_parseable_metadata(
// because staging dir has changed.
match CONFIG.parseable.mode {
Mode::All => {
standalone_after_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here"))
standalone_after_distributed(metadata.server_mode)
.map_err(|err| {
ObjectStorageError::Custom(err.to_string())
})?;
overwrite_remote = true;
},
Mode::Query => {
overwrite_remote = true;
metadata.server_mode = CONFIG.parseable.mode.to_string();
metadata.server_mode = CONFIG.parseable.mode;
metadata.staging = CONFIG.staging_dir().to_path_buf();
},
Mode::Ingest => {
// if ingest server is started fetch the metadata from remote
// update the server mode for local metadata
metadata.server_mode = CONFIG.parseable.mode.to_string();
metadata.server_mode = CONFIG.parseable.mode;
metadata.staging = CONFIG.staging_dir().to_path_buf();
},
}
Expand Down Expand Up @@ -207,7 +203,7 @@ pub async fn resolve_parseable_metadata(
ObjectStorageError::UnhandledError(err)
})?;

metadata.server_mode = CONFIG.parseable.mode.to_string();
metadata.server_mode = CONFIG.parseable.mode;
if overwrite_remote {
put_remote_metadata(&metadata).await?;
}
Expand All @@ -227,8 +223,7 @@ fn determine_environment(
(Some(staging), Some(remote)) => {
// if both staging and remote have same deployment id but different server modes
if staging.deployment_id == remote.deployment_id
&& Mode::from_string(&remote.server_mode).expect("server mode is valid here")
== Mode::All
&& remote.server_mode == Mode::All
&& (CONFIG.parseable.mode == Mode::Query || CONFIG.parseable.mode == Mode::Ingest)
{
EnvChange::NewStaging(remote)
Expand Down Expand Up @@ -292,7 +287,7 @@ pub async fn put_remote_metadata(metadata: &StorageMetadata) -> Result<(), Objec

pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> {
let mut staging_metadata = meta.clone();
staging_metadata.server_mode = CONFIG.parseable.mode.to_string();
staging_metadata.server_mode = CONFIG.parseable.mode;
staging_metadata.staging = CONFIG.staging_dir().to_path_buf();
let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
let mut file = OpenOptions::new()
Expand Down

0 comments on commit ea95a4b

Please sign in to comment.