From 3bcd6f3e75abe875c4f48baaab879ca2f92bb8fb Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Mon, 16 Aug 2021 10:05:41 -0700 Subject: [PATCH] Allow user to set block sizes (#80) * Allow user to set block sizes This commit allows the user to set block sizes for generators. This does not allow them to adjust individual message sizes -- unless a block is equivalent to a message -- but does allow indirect control. Essentially, the larger the block the larger the message for some formats. This is mostly useful for systems that have a hard cap on the total payload size -- say, UDP when we get to that -- or in situations where we need to simulate consistent producers a little more carefully. Signed-off-by: Brian L. Troutwine * clippy ding Signed-off-by: Brian L. Troutwine * Allow block sizes to be defaulted Signed-off-by: Brian L. Troutwine * clippy dings Signed-off-by: Brian L. Troutwine --- lading_generators/src/file_gen/config.rs | 24 ++++++++++++++- lading_generators/src/file_gen/worker.rs | 12 +------- lading_generators/src/http_gen/config.rs | 2 ++ lading_generators/src/http_gen/worker.rs | 27 ++++++++++------- lading_generators/src/kafka_gen/config.rs | 2 ++ lading_generators/src/kafka_gen/worker.rs | 35 ++++++++++++++-------- lading_generators/src/tcp_gen/config.rs | 2 ++ lading_generators/src/tcp_gen/worker.rs | 36 +++++++++++++---------- 8 files changed, 89 insertions(+), 51 deletions(-) diff --git a/lading_generators/src/file_gen/config.rs b/lading_generators/src/file_gen/config.rs index f1afef455..156bf0ce4 100644 --- a/lading_generators/src/file_gen/config.rs +++ b/lading_generators/src/file_gen/config.rs @@ -1,7 +1,7 @@ //! This module controls configuration parsing from the end user, providing a //! convenience mechanism for the rest of the program. Crashes are most likely //! to originate from this code, intentionally. -use byte_unit::Byte; +use byte_unit::{Byte, ByteUnit}; use serde::Deserialize; use std::collections::HashMap; use std::net::SocketAddr; @@ -64,6 +64,8 @@ pub struct LogTargetTemplate { /// possible as the internal governor accumulates, up to /// `maximum_bytes_burst`. bytes_per_second: Byte, + /// The block sizes for messages to this target + pub block_sizes: Option>, /// Defines the maximum internal cache of this log target. file_gen will /// pre-build its outputs up to the byte capacity specified here. maximum_prebuild_cache_size_bytes: Byte, @@ -89,6 +91,8 @@ pub struct LogTarget { /// possible as the internal governor accumulates, up to /// `maximum_bytes_burst`. pub bytes_per_second: Byte, + /// The block sizes for messages to this target + pub block_sizes: Vec, /// The maximum size in bytes that the prebuild cache may be. pub maximum_prebuild_cache_size_bytes: Byte, } @@ -114,13 +118,31 @@ impl LogTargetTemplate { /// Function will panic if user configuration contains values that can't be /// converted to u32 instances. #[must_use] + #[allow(clippy::cast_possible_truncation)] pub fn strike(&self, duplicate: u8) -> LogTarget { let duplicate = format!("{}", duplicate); let full_path = self.path_template.replace("%NNN%", &duplicate); let path = PathBuf::from(full_path); + let block_sizes: Vec = self + .block_sizes + .clone() + .unwrap_or_else(|| { + vec![ + Byte::from_unit(1_f64, ByteUnit::MB).unwrap(), + Byte::from_unit(2_f64, ByteUnit::MB).unwrap(), + Byte::from_unit(4_f64, ByteUnit::MB).unwrap(), + Byte::from_unit(8_f64, ByteUnit::MB).unwrap(), + Byte::from_unit(16_f64, ByteUnit::MB).unwrap(), + Byte::from_unit(32_f64, ByteUnit::MB).unwrap(), + ] + }) + .iter() + .map(|sz| sz.get_bytes() as usize) + .collect(); LogTarget { path, + block_sizes, variant: self.variant.clone(), maximum_bytes_per_file: self.maximum_bytes_per_file, bytes_per_second: self.bytes_per_second, diff --git a/lading_generators/src/file_gen/worker.rs b/lading_generators/src/file_gen/worker.rs index 59527a98b..a9cbd962a 100644 --- a/lading_generators/src/file_gen/worker.rs +++ b/lading_generators/src/file_gen/worker.rs @@ -34,16 +34,6 @@ impl From<::std::io::Error> for Error { } } -const ONE_MEBIBYTE: usize = 1_000_000; -const BLOCK_BYTE_SIZES: [usize; 6] = [ - ONE_MEBIBYTE, - 2_000_000, - 4_000_000, - 8_000_000, - 16_000_000, - 32_000_000, -]; - /// The [`Log`] defines a task that emits variant lines to a file, managing /// rotation and controlling rate limits. #[derive(Debug)] @@ -85,7 +75,7 @@ impl Log { let block_chunks = chunk_bytes( &mut rng, maximum_prebuild_cache_size_bytes.get() as usize, - &BLOCK_BYTE_SIZES, + &target.block_sizes, ); let labels = vec![("target".to_string(), name.clone())]; diff --git a/lading_generators/src/http_gen/config.rs b/lading_generators/src/http_gen/config.rs index e3b6be67c..3543ec7ab 100644 --- a/lading_generators/src/http_gen/config.rs +++ b/lading_generators/src/http_gen/config.rs @@ -61,6 +61,8 @@ pub struct Target { pub headers: HeaderMap, /// The bytes per second to send or receive from the target pub bytes_per_second: byte_unit::Byte, + /// The block sizes for messages to this target + pub block_sizes: Option>, /// The total number of parallel connections to maintain pub parallel_connections: u16, } diff --git a/lading_generators/src/http_gen/worker.rs b/lading_generators/src/http_gen/worker.rs index 1c7f1f59c..a13c7bf09 100644 --- a/lading_generators/src/http_gen/worker.rs +++ b/lading_generators/src/http_gen/worker.rs @@ -1,4 +1,5 @@ use crate::http_gen::config::{Method, Target, Variant}; +use byte_unit::{Byte, ByteUnit}; use futures::stream::{self, StreamExt}; use governor::state::direct::{self, InsufficientCapacity}; use governor::{clock, state, Quota, RateLimiter}; @@ -51,15 +52,6 @@ impl From<::std::io::Error> for Error { } } -const ONE_MEBIBYTE: usize = 1_000_000; -const BLOCK_BYTE_SIZES: [usize; 5] = [ - ONE_MEBIBYTE / 8, - ONE_MEBIBYTE / 4, - ONE_MEBIBYTE / 2, - ONE_MEBIBYTE, - ONE_MEBIBYTE * 2, -]; - /// The [`Worker`] defines a task that emits variant lines to an HTTP server /// controlling throughput. #[derive(Debug)] @@ -88,6 +80,21 @@ impl Worker { #[allow(clippy::cast_possible_truncation)] pub fn new(name: String, target: Target) -> Result { let mut rng = rand::thread_rng(); + let block_sizes: Vec = target + .block_sizes + .unwrap_or_else(|| { + vec![ + Byte::from_unit(1.0 / 8.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 4.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 2.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1_f64, ByteUnit::MB).unwrap(), + Byte::from_unit(2_f64, ByteUnit::MB).unwrap(), + Byte::from_unit(4_f64, ByteUnit::MB).unwrap(), + ] + }) + .iter() + .map(|sz| sz.get_bytes() as usize) + .collect(); let bytes_per_second = NonZeroU32::new(target.bytes_per_second.get_bytes() as u32).unwrap(); let rate_limiter = RateLimiter::direct(Quota::per_second(bytes_per_second)); let labels = vec![ @@ -102,7 +109,7 @@ impl Worker { let block_chunks = chunk_bytes( &mut rng, maximum_prebuild_cache_size_bytes.get_bytes() as usize, - &BLOCK_BYTE_SIZES, + &block_sizes, ); let block_cache = match variant { Variant::Ascii => { diff --git a/lading_generators/src/kafka_gen/config.rs b/lading_generators/src/kafka_gen/config.rs index 99030091a..17a23926e 100644 --- a/lading_generators/src/kafka_gen/config.rs +++ b/lading_generators/src/kafka_gen/config.rs @@ -60,6 +60,8 @@ pub struct Target { pub throughput: Throughput, /// The maximum size in bytes of the cache of prebuilt messages pub maximum_prebuild_cache_size_bytes: byte_unit::Byte, + /// The block sizes for messages to this target + pub block_sizes: Option>, /// Map of rdkafka=-specific overrides to apply to the producer pub producer_config: Option>, } diff --git a/lading_generators/src/kafka_gen/worker.rs b/lading_generators/src/kafka_gen/worker.rs index 34b06f768..f39f5f868 100644 --- a/lading_generators/src/kafka_gen/worker.rs +++ b/lading_generators/src/kafka_gen/worker.rs @@ -1,5 +1,6 @@ use crate::kafka_gen::config::Throughput; use crate::kafka_gen::config::{Target, Variant}; +use byte_unit::{Byte, ByteUnit}; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use governor::state::direct::{self, InsufficientCapacity}; @@ -48,18 +49,6 @@ impl From<::std::io::Error> for Error { } } -const ONE_MEBIBYTE: usize = 1_000_000; -const BLOCK_BYTE_SIZES: [usize; 8] = [ - ONE_MEBIBYTE / 1024, - ONE_MEBIBYTE / 512, - ONE_MEBIBYTE / 256, - ONE_MEBIBYTE / 128, - ONE_MEBIBYTE / 64, - ONE_MEBIBYTE / 32, - ONE_MEBIBYTE / 16, - ONE_MEBIBYTE / 8, -]; - /// The [`Worker`] defines a task that emits variant lines to an HTTP server /// controlling throughput. #[derive(Debug)] @@ -88,9 +77,28 @@ impl Worker { ("server".to_string(), target.bootstrap_server.to_string()), ]; + let block_sizes: Vec = target + .block_sizes + .clone() + .unwrap_or_else(|| { + vec![ + Byte::from_unit(1.0 / 8.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 16.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 32.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 64.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 128.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 256.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 512.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 1024.0, ByteUnit::MB).unwrap(), + ] + }) + .iter() + .map(|sz| sz.get_bytes() as usize) + .collect(); let block_cache = generate_block_cache( target.maximum_prebuild_cache_size_bytes, target.variant, + &block_sizes, &labels, ); @@ -182,12 +190,13 @@ impl Worker { fn generate_block_cache( cache_size: byte_unit::Byte, variant: Variant, + block_sizes: &[usize], #[allow(clippy::ptr_arg)] labels: &Vec<(String, String)>, ) -> Vec { let mut rng = rand::thread_rng(); let total_size = cache_size.get_bytes().try_into().unwrap_or(usize::MAX); - let chunks = chunk_bytes(&mut rng, total_size, &BLOCK_BYTE_SIZES); + let chunks = chunk_bytes(&mut rng, total_size, block_sizes); match variant { Variant::Ascii => construct_block_cache(&payload::Ascii::default(), &chunks, labels), diff --git a/lading_generators/src/tcp_gen/config.rs b/lading_generators/src/tcp_gen/config.rs index 1a187ed0b..c348a157f 100644 --- a/lading_generators/src/tcp_gen/config.rs +++ b/lading_generators/src/tcp_gen/config.rs @@ -33,6 +33,8 @@ pub struct Target { pub variant: Variant, /// The bytes per second to send or receive from the target pub bytes_per_second: byte_unit::Byte, + /// The block sizes for messages to this target + pub block_sizes: Option>, /// The maximum size in bytes of the cache of prebuilt messages pub maximum_prebuild_cache_size_bytes: byte_unit::Byte, } diff --git a/lading_generators/src/tcp_gen/worker.rs b/lading_generators/src/tcp_gen/worker.rs index 3b29757f9..a75028ccd 100644 --- a/lading_generators/src/tcp_gen/worker.rs +++ b/lading_generators/src/tcp_gen/worker.rs @@ -1,4 +1,5 @@ use crate::tcp_gen::config::{Target, Variant}; +use byte_unit::{Byte, ByteUnit}; use governor::state::direct::{self, InsufficientCapacity}; use governor::{clock, state, Quota, RateLimiter}; use lading_common::block::{chunk_bytes, construct_block_cache, Block}; @@ -9,17 +10,6 @@ use std::num::NonZeroU32; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; -const ONE_MEBIBYTE: usize = 1_000_000; -const BLOCK_BYTE_SIZES: [usize; 7] = [ - ONE_MEBIBYTE / 32, - ONE_MEBIBYTE / 16, - ONE_MEBIBYTE / 8, - ONE_MEBIBYTE / 4, - ONE_MEBIBYTE / 2, - ONE_MEBIBYTE, - ONE_MEBIBYTE * 2, -]; - /// The [`Worker`] defines a task that emits variant lines to an HTTP server /// controlling throughput. #[derive(Debug)] @@ -34,10 +24,6 @@ pub struct Worker { #[derive(Debug)] pub enum Error { Governor(InsufficientCapacity), - // Io(::std::io::Error), - // Block(block::Error), - // Hyper(hyper::Error), - // Http(hyper::http::Error), } impl From for Error { @@ -60,6 +46,24 @@ impl Worker { #[allow(clippy::cast_possible_truncation)] pub fn new(name: String, target: &Target) -> Result { let mut rng = rand::thread_rng(); + let block_sizes: Vec = target + .block_sizes + .clone() + .unwrap_or_else(|| { + vec![ + Byte::from_unit(1.0 / 32.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 16.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 8.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 4.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1.0 / 2.0, ByteUnit::MB).unwrap(), + Byte::from_unit(1_f64, ByteUnit::MB).unwrap(), + Byte::from_unit(2_f64, ByteUnit::MB).unwrap(), + Byte::from_unit(4_f64, ByteUnit::MB).unwrap(), + ] + }) + .iter() + .map(|sz| sz.get_bytes() as usize) + .collect(); let bytes_per_second = NonZeroU32::new(target.bytes_per_second.get_bytes() as u32).unwrap(); let rate_limiter = RateLimiter::direct(Quota::per_second(bytes_per_second)); let labels = vec![ @@ -69,7 +73,7 @@ impl Worker { let block_chunks = chunk_bytes( &mut rng, target.maximum_prebuild_cache_size_bytes.get_bytes() as usize, - &BLOCK_BYTE_SIZES, + &block_sizes, ); let block_cache = match target.variant { Variant::Syslog5424 => {