diff --git a/control-plane/agents/src/bin/core/tests/pool/mod.rs b/control-plane/agents/src/bin/core/tests/pool/mod.rs index 649246558..de6b1c10a 100644 --- a/control-plane/agents/src/bin/core/tests/pool/mod.rs +++ b/control-plane/agents/src/bin/core/tests/pool/mod.rs @@ -21,7 +21,10 @@ use stor_port::{ VolumePolicy, }, }, - store::replica::{ReplicaSpec, ReplicaSpecKey}, + store::{ + pool::PoolLabel, + replica::{ReplicaSpec, ReplicaSpecKey}, + }, transport::{ CreatePool, CreateReplica, DestroyPool, DestroyReplica, Filter, GetSpecs, NexusId, NodeId, Protocol, Replica, ReplicaId, ReplicaName, ReplicaOwners, ReplicaShareProtocol, @@ -1027,3 +1030,86 @@ async fn destroy_after_restart() { assert_eq!(pool.state().unwrap().id, create.id); } + +#[tokio::test] +async fn slow_create() { + const POOL_SIZE_BYTES: u64 = 128 * 1024 * 1024; + + let vg = deployer_cluster::lvm::VolGroup::new("slow-pool-1", POOL_SIZE_BYTES).unwrap(); + let lvol = vg.create_lvol("lvol0", POOL_SIZE_BYTES / 2).unwrap(); + lvol.suspend().unwrap(); + { + let cluster = ClusterBuilder::builder() + .with_io_engines(1) + .with_reconcile_period(Duration::from_secs(2), Duration::from_secs(2)) + .with_cache_period("1s") + .with_options(|o| o.with_io_engine_devices(vec![lvol.path()])) + // .with_req_timeouts(Duration::from_secs(2), Duration::from_secs(2)) + .compose_build(|b| b.with_clean(true)) + .await + .unwrap(); + + let client = cluster.grpc_client(); + + let create = CreatePool { + node: cluster.node(0), + id: "bob".into(), + disks: vec![lvol.path().into()], + labels: Some(PoolLabel::from([("a".into(), "b".into())])), + }; + + let error = client + .pool() + .create(&create, None) + .await + .expect_err("device suspended"); + // TODO: check if the errors are being mapped correctly! + assert_eq!(error.kind, ReplyErrorKind::Cancelled); + + lvol.resume().unwrap(); + + let start = std::time::Instant::now(); + let timeout = std::time::Duration::from_secs(30); + loop { + if std::time::Instant::now() > (start + timeout) { + panic!("Timeout waiting for the pool"); + } + tokio::time::sleep(Duration::from_millis(100)).await; + + let pools = client + .pool() + .get(Filter::Pool(create.id.clone()), None) + .await + .unwrap(); + + let Some(pool) = pools.0.first() else { + continue; + }; + let Some(pool_spec) = pool.spec() else { + continue; + }; + if !pool_spec.status.created() { + continue; + } + break; + } + let destroy = DestroyPool::from(create.clone()); + client.pool().destroy(&destroy, None).await.unwrap(); + + // Now we try to recreate using an API call, rather than using the reconciler + lvol.suspend().unwrap(); + + let error = client + .pool() + .create(&create, None) + .await + .expect_err("device suspended"); + // TODO: check if the errors are being mapped correctly! + assert_eq!(error.kind, ReplyErrorKind::Cancelled); + + lvol.resume().unwrap(); + + let pool = client.pool().create(&create, None).await.unwrap(); + assert!(pool.spec().unwrap().status.created()); + } +} diff --git a/control-plane/stor-port/src/types/v0/transport/pool.rs b/control-plane/stor-port/src/types/v0/transport/pool.rs index be5d211c9..3567cc653 100644 --- a/control-plane/stor-port/src/types/v0/transport/pool.rs +++ b/control-plane/stor-port/src/types/v0/transport/pool.rs @@ -346,6 +346,11 @@ impl DestroyPool { Self { node, id } } } +impl From for DestroyPool { + fn from(value: CreatePool) -> Self { + Self::new(value.node, value.id) + } +} /// Label Pool Request. #[derive(Serialize, Deserialize, Default, Debug, Clone, Eq, PartialEq)] diff --git a/utils/deployer-cluster/src/lib.rs b/utils/deployer-cluster/src/lib.rs index 99f6885d1..45c397c7a 100644 --- a/utils/deployer-cluster/src/lib.rs +++ b/utils/deployer-cluster/src/lib.rs @@ -1,3 +1,4 @@ +pub mod lvm; pub mod rest_client; use composer::{Builder, ComposeTest}; @@ -629,6 +630,10 @@ impl TmpDiskFile { pub fn uri(&self) -> &str { self.inner.uri() } + /// Disk path on the host. + pub fn path(&self) -> &str { + &self.inner.path + } /// Get the inner disk if there are no other references to it. pub fn into_inner(self) -> Result> { @@ -652,7 +657,10 @@ impl TmpDiskFileInner { } } fn make_path(name: &str) -> String { - format!("/tmp/io-engine-disk-{name}") + // todo: use known path to facilitate cleanup. + // let root = std::env::var("WORKSPACE_ROOT").as_deref().unwrap_or("/tmp"); + let root = "/tmp"; + format!("{root}/io-engine-disk-{name}") } fn uri(&self) -> &str { &self.uri diff --git a/utils/deployer-cluster/src/lvm.rs b/utils/deployer-cluster/src/lvm.rs new file mode 100644 index 000000000..fb75c2bd9 --- /dev/null +++ b/utils/deployer-cluster/src/lvm.rs @@ -0,0 +1,100 @@ +//! LVM helper methods which are useful for setting up test block devices. + +use crate::TmpDiskFile; + +/// An LVM Logical Volume. +pub struct Lvol { + name: String, + path: String, +} +impl Lvol { + /// Get the host path for the lvol. + pub fn path(&self) -> &str { + &self.path + } + /// Suspends the device for IO. + pub fn suspend(&self) -> anyhow::Result<()> { + let _ = VolGroup::command(&["dmsetup", "suspend", self.path.as_str()])?; + Ok(()) + } + /// Resumes the device for IO. + pub fn resume(&self) -> anyhow::Result<()> { + let _ = VolGroup::command(&["dmsetup", "resume", self.path.as_str()])?; + Ok(()) + } +} +impl Drop for Lvol { + fn drop(&mut self) { + println!("Dropping Lvol {}", self.name); + self.resume().ok(); + } +} + +/// An LVM Volume Group. +pub struct VolGroup { + backing_file: TmpDiskFile, + dev_loop: String, + name: String, +} + +impl VolGroup { + /// Creates a new LVM Volume Group. + pub fn new(name: &str, size: u64) -> Result { + let backing_file = TmpDiskFile::new(name, size); + + let dev_loop = Self::command(&["losetup", "--show", "-f", backing_file.path()])?; + let dev_loop = dev_loop.trim_end().to_string(); + let _ = Self::command(&["pvcreate", dev_loop.as_str()])?; + let _ = Self::command(&["vgcreate", name, dev_loop.as_str()])?; + + Ok(Self { + backing_file, + dev_loop, + name: name.to_string(), + }) + } + /// Creates a new Lvol for the LVM Volume Group. + pub fn create_lvol(&self, name: &str, size: u64) -> Result { + let size = format!("{size}B"); + + let vg_name = self.name.as_str(); + let _ = Self::command(&["lvcreate", "-L", size.as_str(), "-n", name, vg_name])?; + + Ok(Lvol { + name: name.to_string(), + path: format!("/dev/{vg_name}/{name}"), + }) + } + /// Run a command with sudo, and the given args. + /// The output string is returned. + fn command(args: &[&str]) -> Result { + let cmd = args.first().unwrap(); + let output = std::process::Command::new("sudo") + .arg("-E") + .args(args) + .output()?; + if !output.status.success() { + return Err(anyhow::anyhow!( + "{cmd} Exit Code: {}\nstdout: {}, stderr: {}", + output.status, + String::from_utf8(output.stdout).unwrap_or_default(), + String::from_utf8(output.stderr).unwrap_or_default() + )); + } + let output = String::from_utf8(output.stdout)?; + Ok(output) + } +} + +impl Drop for VolGroup { + fn drop(&mut self) { + println!( + "Dropping VolGroup {} <== {}", + self.name, + self.backing_file.path() + ); + + let _ = Self::command(&["vgremove", self.name.as_str(), "-y"]); + let _ = Self::command(&["losetup", "-d", self.dev_loop.as_str()]); + } +}