Skip to content

Commit

Permalink
test(pool): create on very large or very slow disks
Browse files Browse the repository at this point in the history
Uses LVM Lvols as backend devices for the pool.
We suspend these before pool creation, allowing us to simulate slow
pool creation.
This test ensures that the pool creation is completed by itself and also
that a client can also complete it by calling create again.

Signed-off-by: Tiago Castro <[email protected]>
  • Loading branch information
tiagolobocastro committed Nov 20, 2024
1 parent abfa7ec commit cc56335
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 2 deletions.
88 changes: 87 additions & 1 deletion control-plane/agents/src/bin/core/tests/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use stor_port::{
VolumePolicy,
},
},
store::replica::{ReplicaSpec, ReplicaSpecKey},
store::{
pool::PoolLabel,
replica::{ReplicaSpec, ReplicaSpecKey},
},
transport::{
CreatePool, CreateReplica, DestroyPool, DestroyReplica, Filter, GetSpecs, NexusId,
NodeId, Protocol, Replica, ReplicaId, ReplicaName, ReplicaOwners, ReplicaShareProtocol,
Expand Down Expand Up @@ -1027,3 +1030,86 @@ async fn destroy_after_restart() {

assert_eq!(pool.state().unwrap().id, create.id);
}

#[tokio::test]
async fn slow_create() {
const POOL_SIZE_BYTES: u64 = 128 * 1024 * 1024;

let vg = deployer_cluster::lvm::VolGroup::new("slow-pool-1", POOL_SIZE_BYTES).unwrap();
let lvol = vg.create_lvol("lvol0", POOL_SIZE_BYTES / 2).unwrap();
lvol.suspend().unwrap();
{
let cluster = ClusterBuilder::builder()
.with_io_engines(1)
.with_reconcile_period(Duration::from_secs(2), Duration::from_secs(2))
.with_cache_period("1s")
.with_options(|o| o.with_io_engine_devices(vec![lvol.path()]))
// .with_req_timeouts(Duration::from_secs(2), Duration::from_secs(2))
.compose_build(|b| b.with_clean(true))
.await
.unwrap();

let client = cluster.grpc_client();

let create = CreatePool {
node: cluster.node(0),
id: "bob".into(),
disks: vec![lvol.path().into()],
labels: Some(PoolLabel::from([("a".into(), "b".into())])),
};

let error = client
.pool()
.create(&create, None)
.await
.expect_err("device suspended");
// TODO: check if the errors are being mapped correctly!
assert_eq!(error.kind, ReplyErrorKind::Cancelled);

lvol.resume().unwrap();

let start = std::time::Instant::now();
let timeout = std::time::Duration::from_secs(30);
loop {
if std::time::Instant::now() > (start + timeout) {
panic!("Timeout waiting for the pool");
}
tokio::time::sleep(Duration::from_millis(100)).await;

let pools = client
.pool()
.get(Filter::Pool(create.id.clone()), None)
.await
.unwrap();

let Some(pool) = pools.0.first() else {
continue;
};
let Some(pool_spec) = pool.spec() else {
continue;
};
if !pool_spec.status.created() {
continue;
}
break;
}
let destroy = DestroyPool::from(create.clone());
client.pool().destroy(&destroy, None).await.unwrap();

// Now we try to recreate using an API call, rather than using the reconciler
lvol.suspend().unwrap();

let error = client
.pool()
.create(&create, None)
.await
.expect_err("device suspended");
// TODO: check if the errors are being mapped correctly!
assert_eq!(error.kind, ReplyErrorKind::Cancelled);

lvol.resume().unwrap();

let pool = client.pool().create(&create, None).await.unwrap();
assert!(pool.spec().unwrap().status.created());
}
}
5 changes: 5 additions & 0 deletions control-plane/stor-port/src/types/v0/transport/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ impl DestroyPool {
Self { node, id }
}
}
impl From<CreatePool> for DestroyPool {
fn from(value: CreatePool) -> Self {
Self::new(value.node, value.id)
}
}

/// Label Pool Request.
#[derive(Serialize, Deserialize, Default, Debug, Clone, Eq, PartialEq)]
Expand Down
10 changes: 9 additions & 1 deletion utils/deployer-cluster/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod lvm;
pub mod rest_client;

use composer::{Builder, ComposeTest};
Expand Down Expand Up @@ -629,6 +630,10 @@ impl TmpDiskFile {
pub fn uri(&self) -> &str {
self.inner.uri()
}
/// Disk path on the host.
pub fn path(&self) -> &str {
&self.inner.path
}

/// Get the inner disk if there are no other references to it.
pub fn into_inner(self) -> Result<TmpDiskFileInner, Arc<TmpDiskFileInner>> {
Expand All @@ -652,7 +657,10 @@ impl TmpDiskFileInner {
}
}
fn make_path(name: &str) -> String {
format!("/tmp/io-engine-disk-{name}")
// todo: use known path to facilitate cleanup.
// let root = std::env::var("WORKSPACE_ROOT").as_deref().unwrap_or("/tmp");
let root = "/tmp";
format!("{root}/io-engine-disk-{name}")
}
fn uri(&self) -> &str {
&self.uri
Expand Down
100 changes: 100 additions & 0 deletions utils/deployer-cluster/src/lvm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//! LVM helper methods which are useful for setting up test block devices.

use crate::TmpDiskFile;

/// An LVM Logical Volume.
pub struct Lvol {
name: String,
path: String,
}
impl Lvol {
/// Get the host path for the lvol.
pub fn path(&self) -> &str {
&self.path
}
/// Suspends the device for IO.
pub fn suspend(&self) -> anyhow::Result<()> {
let _ = VolGroup::command(&["dmsetup", "suspend", self.path.as_str()])?;
Ok(())
}
/// Resumes the device for IO.
pub fn resume(&self) -> anyhow::Result<()> {
let _ = VolGroup::command(&["dmsetup", "resume", self.path.as_str()])?;
Ok(())
}
}
impl Drop for Lvol {
fn drop(&mut self) {
println!("Dropping Lvol {}", self.name);
self.resume().ok();
}
}

/// An LVM Volume Group.
pub struct VolGroup {
backing_file: TmpDiskFile,
dev_loop: String,
name: String,
}

impl VolGroup {
/// Creates a new LVM Volume Group.
pub fn new(name: &str, size: u64) -> Result<Self, anyhow::Error> {
let backing_file = TmpDiskFile::new(name, size);

let dev_loop = Self::command(&["losetup", "--show", "-f", backing_file.path()])?;
let dev_loop = dev_loop.trim_end().to_string();
let _ = Self::command(&["pvcreate", dev_loop.as_str()])?;
let _ = Self::command(&["vgcreate", name, dev_loop.as_str()])?;

Ok(Self {
backing_file,
dev_loop,
name: name.to_string(),
})
}
/// Creates a new Lvol for the LVM Volume Group.
pub fn create_lvol(&self, name: &str, size: u64) -> Result<Lvol, anyhow::Error> {
let size = format!("{size}B");

let vg_name = self.name.as_str();
let _ = Self::command(&["lvcreate", "-L", size.as_str(), "-n", name, vg_name])?;

Ok(Lvol {
name: name.to_string(),
path: format!("/dev/{vg_name}/{name}"),
})
}
/// Run a command with sudo, and the given args.
/// The output string is returned.
fn command(args: &[&str]) -> Result<String, anyhow::Error> {
let cmd = args.first().unwrap();
let output = std::process::Command::new("sudo")
.arg("-E")
.args(args)
.output()?;
if !output.status.success() {
return Err(anyhow::anyhow!(
"{cmd} Exit Code: {}\nstdout: {}, stderr: {}",
output.status,
String::from_utf8(output.stdout).unwrap_or_default(),
String::from_utf8(output.stderr).unwrap_or_default()
));
}
let output = String::from_utf8(output.stdout)?;
Ok(output)
}
}

impl Drop for VolGroup {
fn drop(&mut self) {
println!(
"Dropping VolGroup {} <== {}",
self.name,
self.backing_file.path()
);

let _ = Self::command(&["vgremove", self.name.as_str(), "-y"]);
let _ = Self::command(&["losetup", "-d", self.dev_loop.as_str()]);
}
}

0 comments on commit cc56335

Please sign in to comment.