Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix regression for pool creation timeout retry #887

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl TaskPoller for PoolReconciler {
results.push(Self::squash_results(vec![
pool.garbage_collect(context).await,
pool.recreate_state(context).await,
retry_create_pool_reconciler(&mut pool, context).await,
]))
}
capacity::remove_larger_replicas(context.registry()).await;
Expand Down Expand Up @@ -214,3 +215,12 @@ async fn deleting_pool_spec_reconciler(
))
.await
}

#[tracing::instrument(skip(pool, context), level = "trace", fields(pool.id = %pool.id(), request.reconcile = true))]
async fn retry_create_pool_reconciler(
pool: &mut OperationGuardArc<PoolSpec>,
context: &PollContext,
) -> PollResult {
pool.retry_creating(context.registry()).await?;
Ok(PollerState::Idle)
}
11 changes: 11 additions & 0 deletions control-plane/agents/src/bin/core/controller/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ pub(crate) struct RegistryInner<S: Store> {
/// The duration for which the reconciler waits for the replica to
/// to be healthy again before attempting to online the faulted child.
faulted_child_wait_period: Option<std::time::Duration>,
/// When the pool creation gRPC times out, the actual call in the io-engine
/// may still progress.
/// We wait up to this period before considering the operation a failure and
/// GC'ing the pool.
pool_async_creat_tmo: std::time::Duration,
/// Disable partial rebuild for volume targets.
disable_partial_rebuild: bool,
/// Disable nvmf target access control gates.
Expand Down Expand Up @@ -122,6 +127,7 @@ impl Registry {
reconcile_period: std::time::Duration,
reconcile_idle_period: std::time::Duration,
faulted_child_wait_period: Option<std::time::Duration>,
pool_async_creat_tmo: std::time::Duration,
disable_partial_rebuild: bool,
disable_target_acc: bool,
max_rebuilds: Option<NumRebuilds>,
Expand Down Expand Up @@ -165,6 +171,7 @@ impl Registry {
reconcile_period,
reconcile_idle_period,
faulted_child_wait_period,
pool_async_creat_tmo,
disable_partial_rebuild,
disable_target_acc,
reconciler: ReconcilerControl::new(),
Expand Down Expand Up @@ -298,6 +305,10 @@ impl Registry {
pub(crate) fn faulted_child_wait_period(&self) -> Option<std::time::Duration> {
self.faulted_child_wait_period
}
/// Allow for this given time before assuming failure and allowing the pool to get deleted.
pub(crate) fn pool_async_creat_tmo(&self) -> std::time::Duration {
self.pool_async_creat_tmo
}
/// The maximum number of concurrent create volume requests.
pub(crate) fn create_volume_limit(&self) -> usize {
self.create_volume_limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ enum SpecError {
}

/// What to do when creation fails.
#[derive(Debug)]
pub(crate) enum OnCreateFail {
/// Leave object as `Creating`, could allow for frontend retries.
#[allow(unused)]
LeaveAsIs,
/// When frontend retries don't make sense, set it to deleting so we can clean-up.
SetDeleting,
Expand All @@ -68,7 +68,28 @@ impl OnCreateFail {
pub(crate) fn eeinval_delete<O>(result: &Result<O, SvcError>) -> Self {
match result {
Err(error) if error.tonic_code() == tonic::Code::InvalidArgument => Self::Delete,
Err(error) if error.tonic_code() == tonic::Code::NotFound => Self::Delete,
_ => Self::SetDeleting,
}
}
/// Map errors into `Self` for pool creation requests, specifically.
pub(crate) fn on_pool_create_err<O>(result: &Result<O, SvcError>) -> Self {
let Err(ref error) = result else {
// nonsensical but that's how the api is today...
return Self::SetDeleting;
};
match error.tonic_code() {
// 1. the disk is open by another pool or bdev
// 2. the disk contains a pool with another name
tonic::Code::InvalidArgument => Self::Delete,
// 1. the pool disk is not available (ie not found or broken somehow)
tonic::Code::NotFound => Self::Delete,
// In this case, it's the pool operator's job to attempt re-creation of the pool.
// 1. pre-2.6 dataplane, contention on the pool service
// 2. pool disk is very slow or extremely large
// 3. dataplane core is shared with other processes
// TODO: use higher timeout on larger pool sizes or potentially make this
// an async operation.
tonic::Code::Cancelled => Self::LeaveAsIs,
_ => Self::SetDeleting,
}
}
Expand Down
8 changes: 8 additions & 0 deletions control-plane/agents/src/bin/core/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ pub(crate) struct CliArgs {
#[clap(long)]
pub(crate) faulted_child_wait_period: Option<humantime::Duration>,

/// When the pool creation gRPC times out, the actual call in the io-engine
/// may still progress.
/// We wait up to this period before considering the operation a failure and
/// GC'ing the pool.
#[clap(long, default_value = "15m")]
pub(crate) pool_async_creat_tmo: humantime::Duration,

/// Disable partial rebuild for volume targets.
#[clap(long, env = "DISABLE_PARTIAL_REBUILD")]
pub(crate) disable_partial_rebuild: bool,
Expand Down Expand Up @@ -194,6 +201,7 @@ async fn server(cli_args: CliArgs) -> anyhow::Result<()> {
cli_args.reconcile_period.into(),
cli_args.reconcile_idle_period.into(),
cli_args.faulted_child_wait_period.map(|t| t.into()),
cli_args.pool_async_creat_tmo.into(),
cli_args.disable_partial_rebuild,
cli_args.disable_target_acc,
cli_args.max_rebuilds,
Expand Down
73 changes: 68 additions & 5 deletions control-plane/agents/src/bin/core/pool/operations_helper.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,77 @@
use crate::controller::{
registry::Registry,
resources::{operations::ResourceLifecycle, OperationGuardArc},
use crate::{
controller::{
io_engine::PoolApi,
registry::Registry,
resources::{
operations::ResourceLifecycle,
operations_helper::{GuardedOperationsHelper, OnCreateFail, SpecOperationsHelper},
OperationGuardArc,
},
},
node::wrapper::GetterOps,
};
use agents::{errors, errors::SvcError};
use snafu::OptionExt;
use std::ops::Deref;
use stor_port::types::v0::{
store::replica::{PoolRef, ReplicaSpec},
transport::{DestroyReplica, NodeId, ReplicaOwners},
store::{
pool::PoolSpec,
replica::{PoolRef, ReplicaSpec},
},
transport::{CreatePool, DestroyReplica, NodeId, ReplicaOwners},
};

impl OperationGuardArc<PoolSpec> {
/// Retries the creation of the pool which is being done in the background by the io-engine.
/// This may happen if the pool create gRPC times out, for very large pools.
/// We could increase the timeout but as things stand today that would block all gRPC
/// access to the node.
/// TODO: Since the data-plane now allows concurrent gRPC we should also modify the
/// control-plane to allow this, which would allows to set large timeouts for some gRPCs.
pub(crate) async fn retry_creating(&mut self, registry: &Registry) -> Result<(), SvcError> {
let request = {
let spec = self.lock();
if on_create_fail(&spec, registry).is_some() {
return Ok(());
}
CreatePool::from(spec.deref())
};

let node = registry.node_wrapper(&request.node).await?;
if node.pool(&request.id).await.is_none() {
return Ok(());
}

let _ = self.start_create(registry, &request).await?;
let result = node.create_pool(&request).await;
let _state = self
.complete_create(result, registry, OnCreateFail::LeaveAsIs)
.await?;

Ok(())
}

/// Ge the `OnCreateFail` policy.
/// For more information see [`Self::retry_creating`].
pub(crate) fn on_create_fail(&self, registry: &Registry) -> OnCreateFail {
let spec = self.lock();
on_create_fail(&spec, registry).unwrap_or(OnCreateFail::LeaveAsIs)
}
}

fn on_create_fail(pool: &PoolSpec, registry: &Registry) -> Option<OnCreateFail> {
if !pool.status().creating() {
return Some(OnCreateFail::LeaveAsIs);
}
let Some(last_mod_elapsed) = pool.creat_tsc.and_then(|t| t.elapsed().ok()) else {
return Some(OnCreateFail::SetDeleting);
};
if last_mod_elapsed > registry.pool_async_creat_tmo() {
return Some(OnCreateFail::SetDeleting);
}
None
}

impl OperationGuardArc<ReplicaSpec> {
/// Destroy the replica from its volume
pub(crate) async fn destroy_volume_replica(
Expand Down
16 changes: 6 additions & 10 deletions control-plane/agents/src/bin/core/pool/pool_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ impl ResourceLifecycle for OperationGuardArc<PoolSpec> {
let _ = pool.start_create(registry, request).await?;

let result = node.create_pool(request).await;
let on_fail = OnCreateFail::eeinval_delete(&result);

let on_fail = OnCreateFail::on_pool_create_err(&result);
let state = pool.complete_create(result, registry, on_fail).await?;
let spec = pool.lock().clone();
Ok(Pool::new(spec, Some(CtrlPoolState::new(state))))
Expand All @@ -93,14 +92,11 @@ impl ResourceLifecycle for OperationGuardArc<PoolSpec> {
Err(SvcError::PoolNotFound { .. }) => {
match node.import_pool(&self.as_ref().into()).await {
Ok(_) => node.destroy_pool(request).await,
Err(error)
if allow_not_found
&& error.tonic_code() == tonic::Code::InvalidArgument =>
{
Ok(())
}
Err(error) if error.tonic_code() == tonic::Code::InvalidArgument => Ok(()),
Err(error) => Err(error),
Err(error) => match error.tonic_code() {
tonic::Code::NotFound if allow_not_found => Ok(()),
tonic::Code::InvalidArgument => Ok(()),
_other => Err(error),
},
}
}
Err(error) => Err(error),
Expand Down
4 changes: 3 additions & 1 deletion control-plane/agents/src/bin/core/pool/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,9 @@ impl ResourceSpecsLocked {
let pools = self.pools_rsc();
for pool in pools {
if let Ok(mut guard) = pool.operation_guard() {
if !guard.handle_incomplete_ops(registry).await {
let on_fail = guard.on_create_fail(registry);

if !guard.handle_incomplete_ops_ext(registry, on_fail).await {
// Not all pending operations could be handled.
pending_ops = true;
}
Expand Down
1 change: 1 addition & 0 deletions control-plane/agents/src/bin/core/tests/deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ fn test_deserialization_v1_to_v2() {
},
sequencer: Default::default(),
operation: None,
creat_tsc: None,
}),
},
TestEntry {
Expand Down
88 changes: 87 additions & 1 deletion control-plane/agents/src/bin/core/tests/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use stor_port::{
VolumePolicy,
},
},
store::replica::{ReplicaSpec, ReplicaSpecKey},
store::{
pool::PoolLabel,
replica::{ReplicaSpec, ReplicaSpecKey},
},
transport::{
CreatePool, CreateReplica, DestroyPool, DestroyReplica, Filter, GetSpecs, NexusId,
NodeId, Protocol, Replica, ReplicaId, ReplicaName, ReplicaOwners, ReplicaShareProtocol,
Expand Down Expand Up @@ -1027,3 +1030,86 @@ async fn destroy_after_restart() {

assert_eq!(pool.state().unwrap().id, create.id);
}

#[tokio::test]
async fn slow_create() {
const POOL_SIZE_BYTES: u64 = 128 * 1024 * 1024;

let vg = deployer_cluster::lvm::VolGroup::new("slow-pool-1", POOL_SIZE_BYTES).unwrap();
let lvol = vg.create_lvol("lvol0", POOL_SIZE_BYTES / 2).unwrap();
lvol.suspend().unwrap();
{
let cluster = ClusterBuilder::builder()
.with_io_engines(1)
.with_reconcile_period(Duration::from_secs(2), Duration::from_secs(2))
.with_cache_period("1s")
.with_options(|o| o.with_io_engine_devices(vec![lvol.path()]))
// .with_req_timeouts(Duration::from_secs(2), Duration::from_secs(2))
.compose_build(|b| b.with_clean(true))
.await
.unwrap();

let client = cluster.grpc_client();

let create = CreatePool {
node: cluster.node(0),
id: "bob".into(),
disks: vec![lvol.path().into()],
labels: Some(PoolLabel::from([("a".into(), "b".into())])),
};

let error = client
.pool()
.create(&create, None)
.await
.expect_err("device suspended");
// TODO: check if the errors are being mapped correctly!
assert_eq!(error.kind, ReplyErrorKind::Cancelled);

lvol.resume().unwrap();

let start = std::time::Instant::now();
let timeout = std::time::Duration::from_secs(30);
loop {
if std::time::Instant::now() > (start + timeout) {
panic!("Timeout waiting for the pool");
}
tokio::time::sleep(Duration::from_millis(100)).await;

let pools = client
.pool()
.get(Filter::Pool(create.id.clone()), None)
.await
.unwrap();

let Some(pool) = pools.0.first() else {
continue;
};
let Some(pool_spec) = pool.spec() else {
continue;
};
if !pool_spec.status.created() {
continue;
}
break;
}
let destroy = DestroyPool::from(create.clone());
client.pool().destroy(&destroy, None).await.unwrap();

// Now we try to recreate using an API call, rather than using the reconciler
lvol.suspend().unwrap();

let error = client
.pool()
.create(&create, None)
.await
.expect_err("device suspended");
// TODO: check if the errors are being mapped correctly!
assert_eq!(error.kind, ReplyErrorKind::Cancelled);

lvol.resume().unwrap();

let pool = client.pool().create(&create, None).await.unwrap();
assert!(pool.spec().unwrap().status.created());
}
}
2 changes: 1 addition & 1 deletion control-plane/agents/src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ fn grpc_to_reply_error(error: SvcError) -> ReplyError {
} => {
let kind = match source.code() {
Code::Ok => ReplyErrorKind::Internal,
Code::Cancelled => ReplyErrorKind::Internal,
Code::Cancelled => ReplyErrorKind::Cancelled,
Code::Unknown => ReplyErrorKind::Internal,
Code::InvalidArgument => ReplyErrorKind::InvalidArgument,
Code::DeadlineExceeded => ReplyErrorKind::DeadlineExceeded,
Expand Down
1 change: 1 addition & 0 deletions control-plane/grpc/proto/v1/misc/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ enum ReplyErrorKind {
InUse = 29;
CapacityLimitExceeded = 30;
NotAcceptable = 31;
Cancelled = 32;
}

// ResourceKind for the resource which has undergone this error
Expand Down
2 changes: 2 additions & 0 deletions control-plane/grpc/src/misc/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl From<ReplyErrorKind> for common::ReplyErrorKind {
ReplyErrorKind::InUse => Self::InUse,
ReplyErrorKind::CapacityLimitExceeded => Self::CapacityLimitExceeded,
ReplyErrorKind::NotAcceptable => Self::NotAcceptable,
ReplyErrorKind::Cancelled => Self::Cancelled,
}
}
}
Expand Down Expand Up @@ -141,6 +142,7 @@ impl From<common::ReplyErrorKind> for ReplyErrorKind {
common::ReplyErrorKind::InUse => Self::InUse,
common::ReplyErrorKind::CapacityLimitExceeded => Self::CapacityLimitExceeded,
common::ReplyErrorKind::NotAcceptable => Self::NotAcceptable,
common::ReplyErrorKind::Cancelled => Self::Cancelled,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions control-plane/grpc/src/operations/pool/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl TryFrom<pool::PoolDefinition> for PoolSpec {
},
sequencer: Default::default(),
operation: None,
creat_tsc: None,
})
}
}
Expand Down
Loading