From c0df839e978663ce52e6f7ca3d75c9fcec5ef283 Mon Sep 17 00:00:00 2001 From: Greg Johnston Date: Sat, 23 Nov 2024 09:19:14 -0500 Subject: [PATCH] fix: wait for blocking resources before sending subsequent chunks (close #3280) --- integrations/utils/src/lib.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/integrations/utils/src/lib.rs b/integrations/utils/src/lib.rs index ab0d199a35..666e3c67de 100644 --- a/integrations/utils/src/lib.rs +++ b/integrations/utils/src/lib.rs @@ -40,15 +40,28 @@ pub trait ExtendResponse: Sized { let (owner, stream) = build_response(app_fn, additional_context, stream_builder); + let sc = owner.shared_context().unwrap(); + let stream = stream.await.ready_chunks(32).map(|n| n.join("")); - let sc = owner.shared_context().unwrap(); while let Some(pending) = sc.await_deferred() { pending.await; } - let mut stream = - Box::pin(meta_context.inject_meta_context(stream).await); + let mut stream = Box::pin( + meta_context.inject_meta_context(stream).await.then({ + let sc = Arc::clone(&sc); + move |chunk| { + let sc = Arc::clone(&sc); + async move { + while let Some(pending) = sc.await_deferred() { + pending.await; + } + chunk + } + } + }), + ); // wait for the first chunk of the stream, then set the status and headers let first_chunk = stream.next().await.unwrap_or_default();