Skip to content

Commit

Permalink
refactor: restructure as a library
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Nov 25, 2024
1 parent bfca663 commit 6f154b4
Show file tree
Hide file tree
Showing 126 changed files with 1,332 additions and 168 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
target
data*
staging*
staging/
limitcache
examples
cert.pem
Expand All @@ -14,4 +14,3 @@ parseable
parseable_*
parseable-env-secret
cache

132 changes: 129 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,129 @@
[workspace]
members = ["server"]
resolver = "2"
[package]
name = "parseable"
version = "1.6.2"
authors = ["Parseable Team <[email protected]>"]
edition = "2021"
rust-version = "1.77.1"
categories = ["logging", "observability", "log analytics"]
build = "build.rs"

[dependencies]
### apache arrow/datafusion dependencies
# arrow = "51.0.0"
arrow-schema = { version = "53.0.0", features = ["serde"] }
arrow-array = { version = "53.0.0" }
arrow-json = "53.0.0"
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
arrow-select = "53.0.0"
datafusion = "42.0.0"
object_store = { version = "0.11.1", features = ["cloud", "aws", "azure"] }
parquet = "53.0.0"
arrow-flight = { version = "53.0.0", features = [ "tls" ] }
tonic = {version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.12.3"
tower-http = { version = "0.6.1", features = ["cors"] }

### actix dependencies
actix-web-httpauth = "0.8"
actix-web = { version = "4.9.0", features = ["rustls-0_22"] }
actix-cors = "0.7.0"
actix-web-prometheus = { version = "0.1" }
actix-web-static-files = "4.0"
mime = "0.3.17"

### other dependencies
anyhow = { version = "1.0", features = ["backtrace"] }
argon2 = "0.5.0"
async-trait = "0.1.82"
base64 = "0.22.0"
lazy_static = "1.4"
bytes = "1.4"
byteorder = "1.4.3"
bzip2 = { version = "*", features = ["static"] }
cookie = "0.18.1"
chrono = "0.4"
chrono-humanize = "0.2"
clap = { version = "4.1", default-features = false, features = [
"std",
"color",
"help",
"derive",
"env",
"cargo",
"error-context",
] }
clokwerk = "0.4"
crossterm = "0.28.1"
derive_more = "0.99.18"
env_logger = "0.11.3"
fs_extra = "1.3"
futures = "0.3"
futures-util = "0.3.28"
hex = "0.4"
hostname = "0.4.0"
http = "0.2.7"
humantime-serde = "1.1"
itertools = "0.13.0"
log = "0.4"
num_cpus = "1.15"
once_cell = "1.17.1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8.5"
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
"rustls-tls",
"json",
] } # cannot update cause rustls is not latest `see rustls`
rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet
rustls-pemfile = "2.1.2"
semver = "1.0"
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
static-files = "0.2"
sysinfo = "0.31.4"
thiserror = "1.0.64"
thread-priority = "1.0.0"
tokio = { version = "1.28", default-features = false, features = [
"sync",
"macros",
"fs",
] }
tokio-stream = { version = "0.1", features = ["fs"] }
ulid = { version = "1.0", features = ["serde"] }
uptime_lib = "0.3.0"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
xz2 = { version = "*", features = ["static"] }
nom = "7.1.3"
humantime = "2.1.0"
human-size = "0.4"
openid = { version = "0.15.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
serde_repr = "0.1.17"
hashlru = { version = "0.11.0", features = ["serde"] }
path-clean = "1.0.1"
prost = "0.13.3"
prometheus-parse = "0.2.5"
sha2 = "0.10.8"

[build-dependencies]
cargo_toml = "0.20.1"
sha1_smol = { version = "1.0", features = ["std"] }
static-files = "0.2"
ureq = "2.6"
vergen = { version = "8.1", features = ["build", "git", "cargo", "gitcl"] }
zip = { version = "2.2.0", default-features = false, features = ["deflate"] }
url = "2.4.0"
prost-build = "0.13.3"

[dev-dependencies]
maplit = "1.0"
rstest = "0.23.0"

[package.metadata.parseable_ui]
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.11/build.zip"
assets-sha1 = "3f0c0f0e9fe23c6a01f0eb45115da4bfe29f9c3f"

[features]
debug = []
File renamed without changes.
129 changes: 0 additions & 129 deletions server/Cargo.toml

This file was deleted.

16 changes: 13 additions & 3 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ use crate::catalog::Snapshot as CatalogSnapshot;

// schema provider for stream based on global data
pub struct GlobalSchemaProvider {
pub storage: Arc<dyn ObjectStorage + Send>,
pub storage: Arc<dyn ObjectStorage>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -317,17 +317,24 @@ impl TableProvider for StandardTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
// only for staging in standalone
let mut memory_exec = None;
// TODO: remove
let mut cache_exec = None;
// only for data in hottier
let mut hot_tier_exec = None;
// for s3, because older data might not have manifest
let mut listing_exec = None;
// takes object storage registry and get's store from url
let object_store = state
.runtime_env()
.object_store_registry
.get_store(&self.url)
.unwrap();
// get's configured storage from parseable setup
let glob_storage = CONFIG.storage().get_object_store();

// Figures out .stream.json
let object_store_format = glob_storage
.get_object_store_format(&self.stream)
.await
Expand All @@ -338,6 +345,7 @@ impl TableProvider for StandardTableProvider {
return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string()));
}

// Only query staging when provided time range includes now
if include_now(filters, time_partition.clone()) {
if let Some(records) =
event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema)
Expand All @@ -350,6 +358,7 @@ impl TableProvider for StandardTableProvider {
);
}
};
// Create a snapshot that contains all fields from different nodes
let mut merged_snapshot: snapshot::Snapshot = Snapshot::default();
if CONFIG.parseable.mode == Mode::Query {
let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]);
Expand Down Expand Up @@ -377,6 +386,7 @@ impl TableProvider for StandardTableProvider {

// Is query timerange is overlapping with older data.
// if true, then get listing table time filters and execution plan separately
// BUG: unnecessary listings on dates where there is no data
if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) {
let listing_time_fiters =
return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters);
Expand Down Expand Up @@ -614,7 +624,7 @@ async fn get_hottier_exectuion_plan(
#[allow(clippy::too_many_arguments)]
async fn legacy_listing_table(
stream: String,
glob_storage: Arc<dyn ObjectStorage + Send>,
glob_storage: Arc<dyn ObjectStorage>,
object_store: Arc<dyn ObjectStore>,
time_filters: &[PartialTimeFilter],
schema: Arc<Schema>,
Expand Down Expand Up @@ -1064,7 +1074,7 @@ mod tests {
fn datetime_max(year: i32, month: u32, day: u32) -> DateTime<Utc> {
NaiveDate::from_ymd_opt(year, month, day)
.unwrap()
.and_hms_milli_opt(23, 59, 59, 99)
.and_hms_milli_opt(23, 59, 59, 999)
.unwrap()
.and_utc()
}
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion server/src/alerts/mod.rs → src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub mod rule;
pub mod target;

use crate::metrics::ALERTS_STATES;
use crate::option::CONFIG;
use crate::utils::arrow::get_field;
use crate::utils::uid;
use crate::CONFIG;
use crate::{storage, utils};

pub use self::rule::Rule;
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions server/src/catalog.rs → src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
use std::{io::ErrorKind, sync::Arc};

use self::{column::Column, snapshot::ManifestItem};
use crate::handlers;
use crate::handlers::http::base_path_without_preceding_slash;
use crate::metadata::STREAM_INFO;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::CONFIG;
use crate::option::{Mode, CONFIG};
use crate::stats::{
event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats,
};
Expand All @@ -32,7 +33,6 @@ use crate::{
query::PartialTimeFilter,
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
};
use crate::{handlers, Mode};
use bytes::Bytes;
use chrono::{DateTime, Local, NaiveTime, Utc};
use relative_path::RelativePathBuf;
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};

use crate::option::CONFIG;
use crate::{option::CONFIG, ParseableServer};

use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role};
use super::server::Server;
use super::ssl_acceptor::get_ssl_acceptor;
use super::{OpenIdClient, ParseableServer};
use super::OpenIdClient;

#[derive(Default, Debug)]
pub struct QueryServer;
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 6f154b4

Please sign in to comment.