Skip to content

Commit

Permalink
Add events ingested counter in stats (#253)
Browse files Browse the repository at this point in the history
  • Loading branch information
trueleo authored Jan 9, 2023
1 parent 4c537e4 commit 1787010
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
1 change: 1 addition & 0 deletions server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
"stream": stream_name,
"time": time,
"ingestion": {
"count": stats.events,
"size": format!("{} {}", stats.ingestion, "Bytes"),
"format": "json"
},
Expand Down
1 change: 1 addition & 0 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl STREAM_INFO {
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?;

stream.stats.add_ingestion_size(size);
stream.stats.increase_event_by_one();

Ok(())
}
Expand Down
21 changes: 17 additions & 4 deletions server/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use serde::{Deserialize, Serialize};

#[derive(Debug)]
pub struct StatsCounter {
events_ingested: AtomicU64,
ingestion_size: AtomicU64,
storage_size: AtomicU64,
}

impl Default for StatsCounter {
fn default() -> Self {
Self {
events_ingested: AtomicU64::new(0),
ingestion_size: AtomicU64::new(0),
storage_size: AtomicU64::new(0),
}
Expand All @@ -43,13 +45,18 @@ impl PartialEq for StatsCounter {
}

impl StatsCounter {
pub fn new(ingestion_size: u64, storage_size: u64) -> Self {
pub fn new(ingestion_size: u64, storage_size: u64, event_ingested: u64) -> Self {
Self {
ingestion_size: AtomicU64::new(ingestion_size),
storage_size: AtomicU64::new(storage_size),
ingestion_size: ingestion_size.into(),
storage_size: storage_size.into(),
events_ingested: event_ingested.into(),
}
}

pub fn events_ingested(&self) -> u64 {
self.events_ingested.load(Ordering::Relaxed)
}

pub fn ingestion_size(&self) -> u64 {
self.ingestion_size.load(Ordering::Relaxed)
}
Expand All @@ -65,18 +72,24 @@ impl StatsCounter {
pub fn add_storage_size(&self, size: u64) {
self.storage_size.fetch_add(size, Ordering::AcqRel);
}

pub fn increase_event_by_one(&self) {
self.events_ingested.fetch_add(1, Ordering::Relaxed);
}
}

/// Helper struct type created by copying stats values from metadata
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct Stats {
pub events: u64,
pub ingestion: u64,
pub storage: u64,
}

impl From<&StatsCounter> for Stats {
fn from(stats: &StatsCounter) -> Self {
Self {
events: stats.events_ingested(),
ingestion: stats.ingestion_size(),
storage: stats.storage_size(),
}
Expand All @@ -85,6 +98,6 @@ impl From<&StatsCounter> for Stats {

impl From<Stats> for StatsCounter {
fn from(stats: Stats) -> Self {
StatsCounter::new(stats.ingestion, stats.storage)
StatsCounter::new(stats.ingestion, stats.storage, stats.events)
}
}

0 comments on commit 1787010

Please sign in to comment.