Skip to content

Commit

Permalink
Allow user to set block sizes (#80)
Browse files Browse the repository at this point in the history
* Allow user to set block sizes

This commit allows the user to set block sizes for generators. This does not
allow them to adjust individual message sizes -- unless a block is equivalent to
a message -- but does allow indirect control. Essentially, the larger the block
the larger the message for some formats.

This is mostly useful for systems that have a hard cap on the total payload size
-- say, UDP when we get to that -- or in situations where we need to simulate
consistent producers a little more carefully.

Signed-off-by: Brian L. Troutwine <[email protected]>

* clippy ding

Signed-off-by: Brian L. Troutwine <[email protected]>

* Allow block sizes to be defaulted

Signed-off-by: Brian L. Troutwine <[email protected]>

* clippy dings

Signed-off-by: Brian L. Troutwine <[email protected]>
  • Loading branch information
blt authored Aug 16, 2021
1 parent e86f05f commit 3bcd6f3
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 51 deletions.
24 changes: 23 additions & 1 deletion lading_generators/src/file_gen/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This module controls configuration parsing from the end user, providing a
//! convenience mechanism for the rest of the program. Crashes are most likely
//! to originate from this code, intentionally.
use byte_unit::Byte;
use byte_unit::{Byte, ByteUnit};
use serde::Deserialize;
use std::collections::HashMap;
use std::net::SocketAddr;
Expand Down Expand Up @@ -64,6 +64,8 @@ pub struct LogTargetTemplate {
/// possible as the internal governor accumulates, up to
/// `maximum_bytes_burst`.
bytes_per_second: Byte,
/// The block sizes for messages to this target
pub block_sizes: Option<Vec<byte_unit::Byte>>,
/// Defines the maximum internal cache of this log target. file_gen will
/// pre-build its outputs up to the byte capacity specified here.
maximum_prebuild_cache_size_bytes: Byte,
Expand All @@ -89,6 +91,8 @@ pub struct LogTarget {
/// possible as the internal governor accumulates, up to
/// `maximum_bytes_burst`.
pub bytes_per_second: Byte,
/// The block sizes for messages to this target
pub block_sizes: Vec<usize>,
/// The maximum size in bytes that the prebuild cache may be.
pub maximum_prebuild_cache_size_bytes: Byte,
}
Expand All @@ -114,13 +118,31 @@ impl LogTargetTemplate {
/// Function will panic if user configuration contains values that can't be
/// converted to u32 instances.
#[must_use]
#[allow(clippy::cast_possible_truncation)]
pub fn strike(&self, duplicate: u8) -> LogTarget {
let duplicate = format!("{}", duplicate);
let full_path = self.path_template.replace("%NNN%", &duplicate);
let path = PathBuf::from(full_path);
let block_sizes: Vec<usize> = self
.block_sizes
.clone()
.unwrap_or_else(|| {
vec![
Byte::from_unit(1_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(2_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(4_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(8_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(16_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(32_f64, ByteUnit::MB).unwrap(),
]
})
.iter()
.map(|sz| sz.get_bytes() as usize)
.collect();

LogTarget {
path,
block_sizes,
variant: self.variant.clone(),
maximum_bytes_per_file: self.maximum_bytes_per_file,
bytes_per_second: self.bytes_per_second,
Expand Down
12 changes: 1 addition & 11 deletions lading_generators/src/file_gen/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,6 @@ impl From<::std::io::Error> for Error {
}
}

const ONE_MEBIBYTE: usize = 1_000_000;
const BLOCK_BYTE_SIZES: [usize; 6] = [
ONE_MEBIBYTE,
2_000_000,
4_000_000,
8_000_000,
16_000_000,
32_000_000,
];

/// The [`Log`] defines a task that emits variant lines to a file, managing
/// rotation and controlling rate limits.
#[derive(Debug)]
Expand Down Expand Up @@ -85,7 +75,7 @@ impl Log {
let block_chunks = chunk_bytes(
&mut rng,
maximum_prebuild_cache_size_bytes.get() as usize,
&BLOCK_BYTE_SIZES,
&target.block_sizes,
);

let labels = vec![("target".to_string(), name.clone())];
Expand Down
2 changes: 2 additions & 0 deletions lading_generators/src/http_gen/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct Target {
pub headers: HeaderMap,
/// The bytes per second to send or receive from the target
pub bytes_per_second: byte_unit::Byte,
/// The block sizes for messages to this target
pub block_sizes: Option<Vec<byte_unit::Byte>>,
/// The total number of parallel connections to maintain
pub parallel_connections: u16,
}
27 changes: 17 additions & 10 deletions lading_generators/src/http_gen/worker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::http_gen::config::{Method, Target, Variant};
use byte_unit::{Byte, ByteUnit};
use futures::stream::{self, StreamExt};
use governor::state::direct::{self, InsufficientCapacity};
use governor::{clock, state, Quota, RateLimiter};
Expand Down Expand Up @@ -51,15 +52,6 @@ impl From<::std::io::Error> for Error {
}
}

const ONE_MEBIBYTE: usize = 1_000_000;
const BLOCK_BYTE_SIZES: [usize; 5] = [
ONE_MEBIBYTE / 8,
ONE_MEBIBYTE / 4,
ONE_MEBIBYTE / 2,
ONE_MEBIBYTE,
ONE_MEBIBYTE * 2,
];

/// The [`Worker`] defines a task that emits variant lines to an HTTP server
/// controlling throughput.
#[derive(Debug)]
Expand Down Expand Up @@ -88,6 +80,21 @@ impl Worker {
#[allow(clippy::cast_possible_truncation)]
pub fn new(name: String, target: Target) -> Result<Self, Error> {
let mut rng = rand::thread_rng();
let block_sizes: Vec<usize> = target
.block_sizes
.unwrap_or_else(|| {
vec![
Byte::from_unit(1.0 / 8.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 4.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 2.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(2_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(4_f64, ByteUnit::MB).unwrap(),
]
})
.iter()
.map(|sz| sz.get_bytes() as usize)
.collect();
let bytes_per_second = NonZeroU32::new(target.bytes_per_second.get_bytes() as u32).unwrap();
let rate_limiter = RateLimiter::direct(Quota::per_second(bytes_per_second));
let labels = vec![
Expand All @@ -102,7 +109,7 @@ impl Worker {
let block_chunks = chunk_bytes(
&mut rng,
maximum_prebuild_cache_size_bytes.get_bytes() as usize,
&BLOCK_BYTE_SIZES,
&block_sizes,
);
let block_cache = match variant {
Variant::Ascii => {
Expand Down
2 changes: 2 additions & 0 deletions lading_generators/src/kafka_gen/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct Target {
pub throughput: Throughput,
/// The maximum size in bytes of the cache of prebuilt messages
pub maximum_prebuild_cache_size_bytes: byte_unit::Byte,
/// The block sizes for messages to this target
pub block_sizes: Option<Vec<byte_unit::Byte>>,
/// Map of rdkafka=-specific overrides to apply to the producer
pub producer_config: Option<HashMap<String, String>>,
}
35 changes: 22 additions & 13 deletions lading_generators/src/kafka_gen/worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::kafka_gen::config::Throughput;
use crate::kafka_gen::config::{Target, Variant};
use byte_unit::{Byte, ByteUnit};
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use governor::state::direct::{self, InsufficientCapacity};
Expand Down Expand Up @@ -48,18 +49,6 @@ impl From<::std::io::Error> for Error {
}
}

const ONE_MEBIBYTE: usize = 1_000_000;
const BLOCK_BYTE_SIZES: [usize; 8] = [
ONE_MEBIBYTE / 1024,
ONE_MEBIBYTE / 512,
ONE_MEBIBYTE / 256,
ONE_MEBIBYTE / 128,
ONE_MEBIBYTE / 64,
ONE_MEBIBYTE / 32,
ONE_MEBIBYTE / 16,
ONE_MEBIBYTE / 8,
];

/// The [`Worker`] defines a task that emits variant lines to an HTTP server
/// controlling throughput.
#[derive(Debug)]
Expand Down Expand Up @@ -88,9 +77,28 @@ impl Worker {
("server".to_string(), target.bootstrap_server.to_string()),
];

let block_sizes: Vec<usize> = target
.block_sizes
.clone()
.unwrap_or_else(|| {
vec![
Byte::from_unit(1.0 / 8.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 16.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 32.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 64.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 128.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 256.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 512.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 1024.0, ByteUnit::MB).unwrap(),
]
})
.iter()
.map(|sz| sz.get_bytes() as usize)
.collect();
let block_cache = generate_block_cache(
target.maximum_prebuild_cache_size_bytes,
target.variant,
&block_sizes,
&labels,
);

Expand Down Expand Up @@ -182,12 +190,13 @@ impl Worker {
fn generate_block_cache(
cache_size: byte_unit::Byte,
variant: Variant,
block_sizes: &[usize],
#[allow(clippy::ptr_arg)] labels: &Vec<(String, String)>,
) -> Vec<Block> {
let mut rng = rand::thread_rng();

let total_size = cache_size.get_bytes().try_into().unwrap_or(usize::MAX);
let chunks = chunk_bytes(&mut rng, total_size, &BLOCK_BYTE_SIZES);
let chunks = chunk_bytes(&mut rng, total_size, block_sizes);

match variant {
Variant::Ascii => construct_block_cache(&payload::Ascii::default(), &chunks, labels),
Expand Down
2 changes: 2 additions & 0 deletions lading_generators/src/tcp_gen/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct Target {
pub variant: Variant,
/// The bytes per second to send or receive from the target
pub bytes_per_second: byte_unit::Byte,
/// The block sizes for messages to this target
pub block_sizes: Option<Vec<byte_unit::Byte>>,
/// The maximum size in bytes of the cache of prebuilt messages
pub maximum_prebuild_cache_size_bytes: byte_unit::Byte,
}
36 changes: 20 additions & 16 deletions lading_generators/src/tcp_gen/worker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::tcp_gen::config::{Target, Variant};
use byte_unit::{Byte, ByteUnit};
use governor::state::direct::{self, InsufficientCapacity};
use governor::{clock, state, Quota, RateLimiter};
use lading_common::block::{chunk_bytes, construct_block_cache, Block};
Expand All @@ -9,17 +10,6 @@ use std::num::NonZeroU32;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;

const ONE_MEBIBYTE: usize = 1_000_000;
const BLOCK_BYTE_SIZES: [usize; 7] = [
ONE_MEBIBYTE / 32,
ONE_MEBIBYTE / 16,
ONE_MEBIBYTE / 8,
ONE_MEBIBYTE / 4,
ONE_MEBIBYTE / 2,
ONE_MEBIBYTE,
ONE_MEBIBYTE * 2,
];

/// The [`Worker`] defines a task that emits variant lines to an HTTP server
/// controlling throughput.
#[derive(Debug)]
Expand All @@ -34,10 +24,6 @@ pub struct Worker {
#[derive(Debug)]
pub enum Error {
Governor(InsufficientCapacity),
// Io(::std::io::Error),
// Block(block::Error),
// Hyper(hyper::Error),
// Http(hyper::http::Error),
}

impl From<InsufficientCapacity> for Error {
Expand All @@ -60,6 +46,24 @@ impl Worker {
#[allow(clippy::cast_possible_truncation)]
pub fn new(name: String, target: &Target) -> Result<Self, Error> {
let mut rng = rand::thread_rng();
let block_sizes: Vec<usize> = target
.block_sizes
.clone()
.unwrap_or_else(|| {
vec![
Byte::from_unit(1.0 / 32.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 16.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 8.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 4.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1.0 / 2.0, ByteUnit::MB).unwrap(),
Byte::from_unit(1_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(2_f64, ByteUnit::MB).unwrap(),
Byte::from_unit(4_f64, ByteUnit::MB).unwrap(),
]
})
.iter()
.map(|sz| sz.get_bytes() as usize)
.collect();
let bytes_per_second = NonZeroU32::new(target.bytes_per_second.get_bytes() as u32).unwrap();
let rate_limiter = RateLimiter::direct(Quota::per_second(bytes_per_second));
let labels = vec![
Expand All @@ -69,7 +73,7 @@ impl Worker {
let block_chunks = chunk_bytes(
&mut rng,
target.maximum_prebuild_cache_size_bytes.get_bytes() as usize,
&BLOCK_BYTE_SIZES,
&block_sizes,
);
let block_cache = match target.variant {
Variant::Syslog5424 => {
Expand Down

0 comments on commit 3bcd6f3

Please sign in to comment.